Я новичок в akka streams. Я использую kafka в качестве источника (используя библиотеку ReactiveKafka) и выполняю некоторую обработку данных через поток и использую подписчика (EsHandler) в качестве приемника.
Теперь мне нужно обработать ошибки и отправить их в другую очередь kafka через обработчик ошибок. Я пытаюсь использовать EsHandler как в качестве издателя, так и подписчика. Я не уверен, как включить EsHandler в качестве посредника вместо стока.
Это мой код:
val publisher = Kafka.kafka.consume(topic, "es", new StringDecoder())
val flow = Flow[String].map { elem => JsonConverter.convert(elem.toString()) }
val sink = Sink.actorSubscriber[GenModel](Props(classOf[EsHandler]))
Source(publisher).via(flow).to(sink).run()
class EsHandler extends ActorSubscriber with ActorPublisher[Model] {
val requestStrategy = WatermarkRequestStrategy(100)
def receive = {
case OnNext(msg: Model) =>
context.actorOf(Props(classOf[EsStorage], self)) ! msg
case OnError(err: Exception) =>
context.stop(self)
case OnComplete =>
context.stop(self)
case Response(msg) =>
if (msg.isError()) onNext(msg.getContent())
}
}
class ErrorHandler extends ActorSubscriber {
val requestStrategy = WatermarkRequestStrategy(100)
def receive = {
case OnNext(msg: Model) =>
println(msg)
}
}