Как предоставить абоненту возможность маршрутизации

У меня есть следующий путь для потока -

 kafkaStream[message] -> 
 kafkaStream[message] -> mergedKafkaStream[message] -> stream[EnrichedMessage] -> I/O
 kafkaStream[message] -> 

Я не уверен, как написать это в потоке akka. Я пробовал следовать (псевдо).

KafkaStream extends ActorPublisher[message] {

}

IOHandler extends ActorSubscriber {

}

k1, k2, k3 — издатели потоков kafka

f = Flow[message].map(_.enrichMessage)

FlowGraph { b =>
  k1 ~> merge
  k2 ~> merge
  k3 ~> merge
  merge ~> f ~> ioHandlerSink
}

Таким образом, я подключаю издателя к приемнику. Но здесь проблема, которую я хочу решить, это медленный ввод-вывод. Актер IOHandler очень медленно обрабатывает сообщения, поэтому, как я могу иметь несколько IOHandler, и я должен иметь возможность распределять задачи. И я также хочу поддерживать обратное давление, чтобы не использовать огонь и забыть об использовании маршрутизатора.

Я очень новичок в akka stream, поэтому предложите мне выход.

Спасибо


person pankajmi    schedule 19.11.2014    source источник
comment
akka-streams сама по себе новая, поэтому функциональность может отсутствовать :) В частности, есть билет №15959 которые предлагают что-то похожее на то, что вы ищете. Это еще не реализовано, но если вы добавите комментарий о том, что вы ожидаете от него, это может быть полезно.   -  person jrudolph    schedule 20.11.2014


Ответы (1)


Вы можете маршрутизировать к нескольким IOHandler приемникам, используя Balance на перекрестке FlowGraph.

person Eric Zoerner    schedule 08.02.2015