Интеграция Актера на основе ack с akka-stream

У меня есть Актер, который был разработан для работы с подтверждением akka-io, так что он будет ждать подтверждения при отправке сообщений вверх по течению (в сеть). Этот актор является интерфейсом к асинхронному приложению в бэкенде.

Я хотел бы иметь слой-оболочку, который позволяет мне преобразовать этого Актера в akka-streams Flow[Incoming, Outgoing, ???], чтобы его можно было интегрировать с более новыми библиотеками, которые ожидают такую ​​подпись.

(Входящие сообщения из восходящего потока редки, поэтому мы не слишком заботимся о обратном давлении там, но было бы неплохо иметь его.)

sealed trait Incoming //... with implementations
sealed trait Outgoing //... with implementations
object Ack

// `upstream` is an akka-io connection actor that will send Ack
// when it writes an Outgoing message to the socket
class SimpleActor(upstream: Actor) extends Actor {
  def receive = {
    case in: Incoming if sender() == upstream =>
       // does some work in response to upstream
    case other =>
       // does some work in response to downstream
       // including sending messages to upstream and
       // `becoming` a stashing state waiting for Ack
       // to `unbecome`, then sending Ack downstream
       // (which will respect the backpressure).
  }
}

У меня есть достоверные данные из списка рассылки akka-user, что в akka-streams нет кода, который интегрирует акторов с потоками, и для того, чтобы подключить Actor к Stream и сохранить обратное давление на основе Ack, нужно было бы реализовать PushPullStage .

Похоже, нам действительно понадобятся два PushPullStage здесь... один для upstream => SimpleActor и один для SimpleActor => upstream.

Мои вопросы:

  1. Существуют ли какие-либо библиотеки, которые предлагают подобную интеграцию между актерами и потоками?
  2. Есть ли более простой способ сделать это, чем реализация двунаправленного PushPullStage с нуля?
  3. Существует ли какая-либо существующая тестовая среда, которая позволила бы провести стресс-тестирование такой реализации?

person fommil    schedule 27.05.2015    source источник
comment
Вы пробовали использовать вопрос об актере и mapAsync? Если нет, то написать PushPullStage все же гораздо проще, чем написать ActorProcessor.   -  person jrudolph    schedule 28.05.2015
comment
@jrudolph, но у меня нет ни одного ответа на запрос (если бы это было так, я бы использовал REST вместо WebSockets). Издатель может отправлять сообщения в любое время.   -  person fommil    schedule 28.05.2015
comment
Я предполагаю, что в любое время вы имеете в виду всякий раз, когда это позволяет противодавление? Как это работает? Если вы создаете поток выходных данных для каждого входного элемента, вы можете просто смоделировать его таким образом, а затем сгладить поток.   -  person jrudolph    schedule 28.05.2015
comment
да, когда противодавление позволяет. Это не протокол запроса/ответа, клиент и сервер могут отправлять сообщения друг другу в любое время (по сети).   -  person fommil    schedule 28.05.2015
comment
Ах, так входные и выходные каналы полностью разделены, и противодавление тоже должно обрабатываться отдельно?   -  person jrudolph    schedule 28.05.2015
comment
да. Я думаю, что это вполне нормально для приложений веб-сокетов.   -  person fommil    schedule 28.05.2015
comment
Да, согласен, так же, как и для любого другого двунаправленного соединения. Это немного прискорбно насчет Flow: вы обычно ожидаете, что ввод и вывод обязательно связаны, когда вы также можете создать их по отдельности и просто поместить их в поток, чтобы получить правильную форму (возможно, это то, что вы имели в виду в нашем обсуждении билетов также).   -  person jrudolph    schedule 28.05.2015
comment
да, мне интересно, возможно, API WebSockets не очень подходит для типичного варианта использования. Это ты написал? (между рассказами о том, как реактивный банан работал над API в течение 1,5 лет, Роланд сказал что-то о том, что вы написали этот кусок)   -  person fommil    schedule 28.05.2015
comment
Мы могли бы предоставить альтернативную точку входа, которая принимала бы Source[Message] и Sink[Message] по отдельности. Будет ли это лучше подходить для таких приложений, как ваше? (За кулисами мы по-прежнему просто вызываем Flow.wrap и вызываем уже существующий.) Теперь, когда путаница с потоком исчезла из поля зрения, мы можем сосредоточиться на том, как реализовать источники и приемники с обратным давлением.   -  person jrudolph    schedule 28.05.2015
comment
Сначала сосредоточившись на стороне вывода, я еще не понял, как именно создаются элементы вывода и как вы хотите, чтобы обратное давление работало с этим производителем. Что должно произойти, если клиент веб-сокета не может прочитать больше данных?   -  person jrudolph    schedule 28.05.2015
comment
Например. см. zuchos.com/ блог/2015/05/23/   -  person jrudolph    schedule 28.05.2015
comment
Он реализует ActorPublisher с нуля. Однако это не рекомендуется, так как таким образом легко ввести неограниченные буферы. Как правило, невозможно просто переключиться с модели, основанной на вытягивании (reactive-streams/akka-stream) на push-стиль (сообщение актора) внутри конвейера обработки. Итак, вопрос в вашем случае заключается в том, можете ли вы замедлить конечного производителя исходящих данных или нет.   -  person jrudolph    schedule 28.05.2015
comment
брать отдельные раковину и источник было бы здорово! Обратное давление на приемник и источник довольно тривиально, не будет ли он просто использовать подход обычных потоков? Как вы знаете, в TCP уже встроено обратное давление, поэтому, пока вы используете akka-io Acking/ResumeRead, с миром все в порядке. Если мне нужно обратное давление на уровне приложения (т. е. явное подтверждение от клиента), то это должно быть встроено в протокол / API приложения и не зависит от akka.   -  person fommil    schedule 29.05.2015
comment
может быть, мы должны переместить это в билет или что-то еще? Или создайте тему в списке рассылки akka.   -  person fommil    schedule 29.05.2015
comment
чтобы ответить на ваш вопрос, да, я могу замедлить производитель (выше в вопросе). В текущей реализации он отправляет исходящие сообщения только в ответ на Ack, но я готов изменить это на любое сообщение обратного давления.   -  person fommil    schedule 29.05.2015
comment
Я создал github.com/akka/akka/issues/17609, чтобы ввести другую перегрузку.   -  person jrudolph    schedule 29.05.2015
comment
Если я вас правильно понимаю, вы используете схему на основе Ack на стороне приложения для замедления производителя? Затем вам нужно реализовать ActorPublisher, как предлагает ответ ниже. Кажется, схема может быть достаточно общей, чтобы гарантировать поддержку непосредственно в akka-stream в будущем. Я создал github.com/akka/akka/issues/17610, чтобы отслеживать эту проблему.   -  person jrudolph    schedule 29.05.2015


Ответы (2)


Я думаю, что философия akka-stream состоит в том, чтобы предоставлять кирпичи низкого уровня и создавать инструменты более высокого уровня поверх них. Если вы посмотрите на нашу недавно выпущенную библиотеку с открытым исходным кодом https://github.com/MfgLabs/akka-stream-extensions, вы увидите, что мы сделали именно это. Мы предоставляем некоторые полезные структуры, облегчающие управление ограничителями скорости, процессорами с отслеживанием состояния, ленивостью и генераторами и т. д. Для интеграции акторов, я думаю, должна быть возможность создать какие-то помощники, чтобы упростить интеграцию акторов с akka- поток пытается распространять обратное давление. Akka-Stream еще молод, а экосистема продолжает расти ;)

person mandubian    schedule 31.05.2015

Да, вы можете интегрировать акторы с потоками.
Для этого есть специальные акторы: издатель и подписчик.

Все это здесь: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

Конечно, вы должны написать актора таким образом, чтобы он работал с противодавлением потоков. Но вам не нужна двухтактная стадия.

person Quizzie    schedule 27.05.2015
comment
Звучит чертовски проще, чем введение новой фазы и реализация ее методов. Я надеялся избежать написания промежуточного актора, но похоже, что это единственный путь вперед — реализация как издателя, так и подписчика. Это будет много шаблонного текста :-/ - person fommil; 27.05.2015