Akka-streams: Как подключить поток к потоку?

Если поток А производит

Pair<Source<ByteString,?>, Object> 

Как мне соединить этот выход с входом потока B, который отображается на источнике. Например, выход потока B будет

Pair<InputStream<Long>,Object>.

person jack    schedule 04.08.2015    source источник
comment
Я не понимаю вопроса. Под InputStream вы подразумеваете java.io.InputStream? Какова цель Пара? Вы хотите подключить поток Akka к потоку Akka? Или источник Akka для java.io.InputStream?   -  person Quizzie    schedule 04.08.2015
comment
возьмем, к примеру, поток akka-http-pool, который создает reponse.get().entity().getDataBytes(), поэтому ваш поток теперь является источником-потоком-источником. теперь мой приемник хочет создать json с помощью ObjectMapper.read(InputStream). Конечно, я мог бы измерять между ними, но это противоречит потокам.   -  person jack    schedule 04.08.2015
comment
Что такое акка-http-пул? Может быть, я просто еще не понял, что вы имеете в виду, но... то, что вы описали, это не Источник -> Поток -> Источник, это источник источников, или Источник‹Источник‹ByteString, _›, _›. Что вам нужно сделать, так это присоединить Sink‹Source‹ByteString, _›, _›, который будет потреблять источники ByteString. Затем приемник может присоединить каждый из полученных источников к некоторому приемнику ByteString и запустить его.   -  person Quizzie    schedule 04.08.2015
comment
Или вы можете свести исходный Sink‹Source‹ByteString, _›, _› к Source‹ByteString, _›, используя метод flatten с concat FlattenStrategy. Но это похоже на неправильный подход.   -  person Quizzie    schedule 04.08.2015
comment
проблема с Sink‹Source‹ByteString, _›, _› заключается в том, что в приемнике мне нужно использовать источник. разве теперь я не потеряю проводку обратного давления, которую потоки покупают мне? (Я проверю, поможет ли мне Flatten)   -  person jack    schedule 04.08.2015
comment
Нет, вы не теряете противодавление. Нет причин ослаблять противодавление. Вы просто получаете один поток верхнего уровня и несколько потоков низкого уровня. Поток верхнего уровня обрабатывает новые соединения, а потоки нижнего уровня обрабатывают данные, поступающие через соединения. Если вы сгладите его, вы в основном смешаете все данные со всех подключений вместе. Что может быть тем, что вы хотите, или нет.   -  person Quizzie    schedule 04.08.2015
comment
@quizzle Я не могу сгладить, потому что каждое соединение представляет собой отдельную полезную нагрузку/сообщение. Но я думаю, ты прав, я не теряю обратное давление.   -  person jack    schedule 04.08.2015
comment
Но я думаю, ты прав, я не теряю противодавление. поэтому я просто запускаю новые потоки из заданных источников стока. Хотя выглядит необычно   -  person jack    schedule 04.08.2015
comment
Я почти уверен, что сглаживание сработает. Каждое сообщение представляет собой одно новое соединение и содержит источник ByteStrings. Сглаживание с помощью concat создаст поток, в котором первый источник ByteString будет слит до конца, затем следующий, затем следующий...   -  person Quizzie    schedule 04.08.2015
comment
нет, я проверил, что concat не работает, потому что это испортит параллелизм. Мне нужно объединить github.com/akka/akka/issues/15089   -  person jack    schedule 04.08.2015
comment
Конечно, это портит параллелизм. Вот почему говорят, что это был неправильный подход и, вероятно, не то, что вы хотите.   -  person Quizzie    schedule 04.08.2015


Ответы (1)


Если вам предоставлен источник того типа, который вы предоставили:

Source< Pair<Source<ByteString, BoxedUnit>, Object>, BoxedUnit> pairSource;

И вам также дан поток, который преобразует значения ByteString в InputStream<Long>:

Flow<ByteString, InputStream<Long>, ?> byteStrToInputStream;

Затем это можно преобразовать с помощью flatMapConcat:

Source< Pair<InputStream<Long>,Object>, BoxedUnit> inputStreamSource = 
  pairSource.flatMapConcat( pair -> {
    pair.getKey()
        .via(byteStrToInputStream)
        .map(inputStream -> ImmutablePair(inputStream, pair.getValue()))
  })

Каждый из входящих источников ByteString исчерпывается, и результаты объединяются. Исходная правая часть каждой пары, а именно Object, добавляется к каждому InputStream.

person Ramón J Romero y Vigil    schedule 29.11.2015