Вопросы по теме 'reactive-streams'

Обработка ошибок / исключений Iteratees против реактивных потоков / akka-stream
Неожиданный сюрприз, у меня есть несколько проблем с Iteratees и обработкой ошибок. Проблема; Прочтите несколько байтов из InputStream (из сети, должно быть InputStream), сделайте несколько фрагментов / группировок в этом InputStream (для...
565 просмотров

Как подписаться на реализацию реактивных потоков, работающую на другой JVM?
Предположим, у нас есть два потока Akka Stream, каждый из которых работает на собственной JVM. // A reactive streams publisher running on JVM 1: val stringPublisher: Publisher[String] = Source(() => "Lorem...
493 просмотров
schedule 16.12.2023

Akka-Streams: поведение по крайней мере при доставке с Kafka и Cassandra
Начинаю с Akka Streams и пока все идет хорошо. Однако я встречал вариант использования, к которому не знаю, как подойти. Сценарий представляет собой поток с ActorPublisher в качестве источника, который принимает сообщения от Kafka, и подписчиком в...
1625 просмотров

какая связь между реактивным IPC, реактивным потоком ввода-вывода, сетью реактивного потока и реактивным сокетом
Я нашел много репозиториев на Github о реактивном потоке, кажется, есть похожие. Однако многие репозитории не обновляются несколько месяцев. Я хочу знать отношения к ним, и я могу сосредоточиться на последних репозиториях. реактивный-ipc:...
358 просмотров

Назначение процессора Akka Reactive Streams
Я пытаюсь понять реактивные потоки в акке. Я прочитал этот блог http://bryangilbert.com/blog/2015/02/04/akka-reactive-streams/ и, кажется, я понял, как это работает. Однако я не понимаю цели процессора в рамках этой концепции. Для чего это?...
272 просмотров
schedule 31.07.2023

Интеграционное тестирование Spring SseEmitters
Я искал подсказки о том, как лучше всего тестировать методы Spring MVC Controller, которые возвращают SseEmitters. Я пришел довольно коротко, но у меня есть решение проб и ошибок, которое тестирует асинхронное многопоточное поведение. Ниже приведен...
1689 просмотров

реактивная кафка с диспетчером по умолчанию?
Я работаю над проектом с Kafka и Akka Streams, используя коннектор reactive-kafka . Мы обнаружили, что reactive-kafka использует свой собственный диспетчер (akka.kafka.default-dispatcher), но если, например, мы используем диспетчер akka по...
522 просмотров

Akka Reactive Streams всегда отстает на одно сообщение
По какой-то причине мои потоки Akka всегда ждут второго сообщения, прежде чем «испускать» (?) Первое. Вот пример кода, который демонстрирует мою проблему. val rx = Source((1 to 100).toStream.map { t => Thread.sleep(1000) println(s"doing...
120 просмотров
schedule 25.11.2022

Akka Streams разделяет поток по типу
У меня есть следующая простая иерархия классов case: sealed trait Message case class Foo(bar: Int) extends Message case class Baz(qux: String) extends Message И у меня есть Flow[Message, Message, NotUsed] (из протокола на основе Websocket с...
1578 просмотров
schedule 09.05.2023

RxJava 2.0 - Как преобразовать Observable в Publisher
Как преобразовать Observable в Publisher в RxJava версии 2? В первой версии у нас есть проект https://github.com/ReactiveX/RxJavaReactiveStreams , который делает именно то, что Я нуждаюсь. Но как мне это сделать в RxJava 2?
12866 просмотров

Получение элементов из реактивного потока SubmissionPublisher
Я пробовал некоторые новые функции в Java 9. Итак, я составил тест, чтобы издатель выдавал числа с заданной скоростью. Я также реализовал подписчика, чтобы слушать эти публикации и просто печатать их на консоли. Хотя я, возможно, не совсем...
509 просмотров

Почему поток Akka имеет ровно один источник и один приемник?
В документах Akka Streams четко указано, что для запуска потока он должен иметь ровно один источник и ровно один приемник. Интересно, что накладывает такое ограничение. Сценарии с несколькими источниками, объединенными в один, с несколькими...
171 просмотров
schedule 15.10.2022

поток akka использует веб-сокет
Приступая к работе с akka-streams, я хочу построить простой пример. В Chrome с помощью подключаемого модуля веб-сокета я просто могу подключиться к потоку, подобному этому https://blockchain.info/api/api_websocket через...
1737 просмотров
schedule 07.07.2023

Реактивные потоки - пакетирование с таймаутом
Я подумываю о замене собственной библиотеки обработки журналов, которая очень похожа на ReactiveStreams, на io.projectreactor . Цель состоит в том, чтобы сократить количество поддерживаемого нами кода и воспользоваться преимуществами любых новых...
1122 просмотров

Поток для фильтрации до тех пор, пока не будет испущен другой поток, но повторите передачу отфильтрованных излучений после того, как 2-й поток испустит
У меня есть поток Foo. Для испускания Foo требуется, чтобы Android View был выложен (ширина и высота > 0). Я использую RxBinding для этого, т.е. fooOservable() .subscribe(foo -> {}); RxView.preDraws(mPager, () -> true)...
142 просмотров
schedule 04.06.2022

Левое внешнее соединение двух отсортированных разреженных последовательностей с проектным реактором
Даны два отсортированных потока сущностей с разреженными идентификаторами. Смоделируем их как: Flux<Long> stream1 = Flux.fromArray(new Long[] {1L, 3L, 4L, 5L, 6L}); Flux<Long> stream2 = Flux.fromIterable(List.of(1L, 2L, 3L, 4L, 6L,...
171 просмотров
schedule 19.06.2022

Как получить журналы информации о потоках при вызове publishOn
Я работаю над планировщиками в реактивных потоках и использую Flux и Scheduler в этом потоке, используя метод publishOn следующим образом: System.out.println("*********Calling Concurrency************"); List<Integer> elements = new...
595 просмотров

Разница между Flux.compose и Flux.transform?
Я изучаю реактивные потоки и работаю над издателями (Flux) и работаю над преобразованием Flux. Для этого у меня есть методы создания и преобразования. Вот мой код: private static void composeStream() { System.out.println("*********Calling...
1435 просмотров

Один поток и несколько подписчиков
Я тестирую воду с реактивными потоками Java9 и RxJava2. У меня действительно нет предпочтений, но я ищу некоторые рекомендации, возможно ли это. Я создаю предварительно настроенное количество подписчиков, например: for(int i = 0;...
1352 просмотров

состояние отказа и перезапуска активной зоны реактора
Я новичок в активной зоне реактора, в прошлом работал с Erlang. Надеюсь, быстрый вопрос. Допустим, мы отправили сообщение «А» и ожидаем ответа через x минут. Если ответ не приходит в течение x минут, мы запускаем резервную процедуру. Это...
175 просмотров