Назначение процессора Akka Reactive Streams

Я пытаюсь понять реактивные потоки в акке. Я прочитал этот блог http://bryangilbert.com/blog/2015/02/04/akka-reactive-streams/ и, кажется, я понял, как это работает. Однако я не понимаю цели процессора в рамках этой концепции. Для чего это? Разве подписчику недостаточно запросить N-объекты, а издателю отправить их с помощью onNext()?


person Rise    schedule 01.02.2016    source источник


Ответы (1)


Скажем, у вас есть очень простой процесс, только источник (издатель) и приемник (подписчик). Вы подключаете эти два, и приемник подписывается на издателя и начинает запрашивать данные, и данные поступают в приемник. В этом примере все, что вам действительно нужно, это издатель и подписчик. Но в этом примере с данными ничего не происходит на пути от источника к приемнику. Он никак не трансформируется и поэтому не очень интересен и не очень полезен.

Процессор сочетает в себе интерфейсы издателя и подписчика и поэтому может действовать как обе эти роли. Процессор предназначен для подключения к потоку обработки между источником и приемником и преобразования данных. Если я впишусь в пример источника/приемника из предыдущего примера, поток данных и кто на что подписывается, изменится. Теперь приемник подписывается на этот процессор, а процессор, в свою очередь, подписывается на источник. Приемник запрашивает элементы у процессора, а процессор передает этот запрос вверх по течению к источнику. Он также отвечает за перемещение элементов вниз по течению к стоку, когда есть элементы для удовлетворения спроса. Вот почему он должен реализовать оба интерфейса, поскольку он должен выполнять обе роли.

С каждым добавляемым шагом обработки, например map или filter, вы добавляете еще одно место, где можно обрабатывать противодавление. Эти шаги не являются точкой начала (источником) или местом назначения (приемником) данных. Все, что они должны делать, это получать данные и что-то с ними делать или изменять их поток и отправлять элементы вниз по течению для удовлетворения спроса. Поскольку они должны иметь возможность подключаться к любой цепочке, им нужны функции как публикации, так и подписки, и именно поэтому существует Processor.

person cmbaxter    schedule 02.02.2016
comment
Таким образом, в основном все, что он делает, находится между ними, изменяет данные, если это необходимо, и еще немного надувает их? - person Rise; 02.02.2016
comment
@Rise, процессор действительно включает все, что касается реактивных потоков. Потоки — это все, что касается манипулирования данными и изменения потоков, и без процессора, который будет стоять там на этих этапах, все, что у вас будет, — это поток источник->приемник и ничего между ними. Будучи этой небольшой автономной парой Pub/Sub, он позволяет вам привязываться к существующим потокам и вносить изменения, а также обрабатывать противодавление и, возможно, даже изменять эффект противодавления (через что-то вроде buffer) - person cmbaxter; 02.02.2016
comment
Но то же самое без процессора все равно было бы реактивным? Приемник по-прежнему будет запрашивать объем данных, который он может обработать, не так ли? - person Rise; 02.02.2016
comment
@Rise, да, это все равно было бы реактивным (хотя и не очень интересным потоком) - person cmbaxter; 02.02.2016