Создание актера-издателя и актера-подписчика с одним и тем же актером

Я новичок в akka streams. Я использую kafka в качестве источника (используя библиотеку ReactiveKafka) и выполняю некоторую обработку данных через поток и использую подписчика (EsHandler) в качестве приемника.

Теперь мне нужно обработать ошибки и отправить их в другую очередь kafka через обработчик ошибок. Я пытаюсь использовать EsHandler как в качестве издателя, так и подписчика. Я не уверен, как включить EsHandler в качестве посредника вместо стока.

Это мой код:

val publisher = Kafka.kafka.consume(topic, "es", new StringDecoder())

val flow = Flow[String].map { elem => JsonConverter.convert(elem.toString()) }

val sink = Sink.actorSubscriber[GenModel](Props(classOf[EsHandler]))

Source(publisher).via(flow).to(sink).run()


class EsHandler extends ActorSubscriber with ActorPublisher[Model] {

  val requestStrategy = WatermarkRequestStrategy(100)

  def receive = {
    case OnNext(msg: Model) =>
      context.actorOf(Props(classOf[EsStorage], self)) ! msg

    case OnError(err: Exception) =>
      context.stop(self)

    case OnComplete =>
      context.stop(self)

    case Response(msg) =>
      if (msg.isError()) onNext(msg.getContent())
  }
}

class ErrorHandler extends ActorSubscriber {

  val requestStrategy = WatermarkRequestStrategy(100)

  def receive = {
    case OnNext(msg: Model) =>
      println(msg)
  }
}

person Yoda    schedule 07.07.2015    source источник


Ответы (1)


Мы настоятельно рекомендуем против реализации собственного процессора (так называется спецификация реактивных потоков). на "Subscriber && Publisher". Это довольно сложно сделать правильно, поэтому издатель не отображается непосредственно как вспомогательная черта.

Вместо этого большую часть времени вы захотите использовать предоставленные вам Sources/Sinks (или Publishers/Subscribers) и выполнять свои операции между ними, например, этапы карты/фильтра и т. д.

На самом деле существует существующая реализация для Kafka Sources and Sinks, которую вы можете использовать, она называется reactive-kafka и проверено TCK Reactive Streams, поэтому вы можете доверять ему, чтобы быть действительными реализациями.

person Konrad 'ktoso' Malawski    schedule 07.07.2015
comment
Я использую реактивного издателя Кафки в качестве источника. В середине моего потока есть актер. Мне нужно обработать сообщения об ошибках и отправить их обратно в kafka. какой предпочтительный способ сделать? - person Yoda; 07.07.2015
comment
Кроме того, я буду использовать подписчика ReactiveKafka (издателя kafka) в качестве приемника. - person Yoda; 07.07.2015
comment
И если я хочу ограничить входящие данные (в случае временного сбоя сервера) из моего источника, я не смогу сделать это с помощью шаблона запроса. Вот почему я перешел на свой собственный процессор. Есть ли способ добиться того же? - person Yoda; 07.07.2015
comment
@ Konrad'ktoso'Malawski Не могли бы вы взглянуть на связанный вопрос? stackoverflow.com/q/32290285/226895 - person expert; 30.08.2015