У меня есть Актер, который был разработан для работы с подтверждением akka-io, так что он будет ждать подтверждения при отправке сообщений вверх по течению (в сеть). Этот актор является интерфейсом к асинхронному приложению в бэкенде.
Я хотел бы иметь слой-оболочку, который позволяет мне преобразовать этого Актера в akka-streams Flow[Incoming, Outgoing, ???]
, чтобы его можно было интегрировать с более новыми библиотеками, которые ожидают такую подпись.
(Входящие сообщения из восходящего потока редки, поэтому мы не слишком заботимся о обратном давлении там, но было бы неплохо иметь его.)
sealed trait Incoming //... with implementations
sealed trait Outgoing //... with implementations
object Ack
// `upstream` is an akka-io connection actor that will send Ack
// when it writes an Outgoing message to the socket
class SimpleActor(upstream: Actor) extends Actor {
def receive = {
case in: Incoming if sender() == upstream =>
// does some work in response to upstream
case other =>
// does some work in response to downstream
// including sending messages to upstream and
// `becoming` a stashing state waiting for Ack
// to `unbecome`, then sending Ack downstream
// (which will respect the backpressure).
}
}
У меня есть достоверные данные из списка рассылки akka-user, что в akka-streams нет кода, который интегрирует акторов с потоками, и для того, чтобы подключить Actor к Stream и сохранить обратное давление на основе Ack, нужно было бы реализовать PushPullStage .
Похоже, нам действительно понадобятся два PushPullStage
здесь... один для upstream => SimpleActor
и один для SimpleActor => upstream
.
Мои вопросы:
- Существуют ли какие-либо библиотеки, которые предлагают подобную интеграцию между актерами и потоками?
- Есть ли более простой способ сделать это, чем реализация двунаправленного
PushPullStage
с нуля? - Существует ли какая-либо существующая тестовая среда, которая позволила бы провести стресс-тестирование такой реализации?
mapAsync
? Если нет, то написатьPushPullStage
все же гораздо проще, чем написатьActorProcessor
. - person jrudolph   schedule 28.05.2015Flow
: вы обычно ожидаете, что ввод и вывод обязательно связаны, когда вы также можете создать их по отдельности и просто поместить их в поток, чтобы получить правильную форму (возможно, это то, что вы имели в виду в нашем обсуждении билетов также). - person jrudolph   schedule 28.05.2015Source[Message]
иSink[Message]
по отдельности. Будет ли это лучше подходить для таких приложений, как ваше? (За кулисами мы по-прежнему просто вызываем Flow.wrap и вызываем уже существующий.) Теперь, когда путаница с потоком исчезла из поля зрения, мы можем сосредоточиться на том, как реализовать источники и приемники с обратным давлением. - person jrudolph   schedule 28.05.2015ActorPublisher
с нуля. Однако это не рекомендуется, так как таким образом легко ввести неограниченные буферы. Как правило, невозможно просто переключиться с модели, основанной на вытягивании (reactive-streams/akka-stream) на push-стиль (сообщение актора) внутри конвейера обработки. Итак, вопрос в вашем случае заключается в том, можете ли вы замедлить конечного производителя исходящих данных или нет. - person jrudolph   schedule 28.05.2015Ack
на стороне приложения для замедления производителя? Затем вам нужно реализоватьActorPublisher
, как предлагает ответ ниже. Кажется, схема может быть достаточно общей, чтобы гарантировать поддержку непосредственно в akka-stream в будущем. Я создал github.com/akka/akka/issues/17610, чтобы отслеживать эту проблему. - person jrudolph   schedule 29.05.2015