Вопросы по теме 'rx-scala'

Reactive Extensions (RxScala), эквивалент fold ifEmpty
Мне было интересно, есть ли какой-либо эквивалент функции Scala fold ifEmpty, которая существует для коллекций и параметров: fold[B](ifEmpty: => B)(f: (A) => B) Эта функция мощна тем, что может преобразовывать монады разных типов —...
191 просмотров
schedule 13.06.2023

Рекурсивный поток RxScala с тайм-аутом
Я пытаюсь рекурсивно определить наблюдаемую, которая либо испускает элементы от субъекта, либо, если проходит определенное количество времени, значение по умолчанию, в этом случае я использую значение таймера по умолчанию, равное нулю. Я использую...
264 просмотров

Опрос базы данных с использованием RxScala
TableEntries Я начинаю с RxScala и пытаюсь придумать механизм опроса, который проверяет базу данных для каждого интервала (скажем, 20 секунд), чтобы проверить, были ли какие-либо изменения в некоторых строках в таблице. object MyDatabaseService {...
59 просмотров
schedule 11.01.2023

RxScala Observables с повтором
Я пытаюсь понять replay в RxScala. Я создаю наблюдаемое следующим образом: lazy val toyObservable : Observable[Int] = { val coldObservable : Observable[Int] = intPerSecond val hotObservable : ConnectableObservable[Int] =...
61 просмотров
schedule 10.06.2022

В Rx (или RxJava/RxScala), как сделать автоматически сбрасываемую карту/фильтр блокировки с отслеживанием состояния для измерения времени, прошедшего в потоке до касания барьера?
Извините, если вопрос коряво сформулирован, я постараюсь. Если у меня есть последовательность значений со временем в виде Observable[(U,T)] , где U — это значение, а T — тип, подобный времени (или что-то другое, что я полагаю), как я могу написать...
228 просмотров

Обработка наблюдаемых исключений
Я изучаю RxScala и пришел к этому очень синтетическому фрагменту. Я пытаюсь обработать исключение в блоке onError: def doLongOperation():String = { (1 to 10).foreach { _ => Thread.sleep(100) print(".") } println() if...
272 просмотров
schedule 09.07.2023

Как обновить наблюдаемую вручную?
Я новичок в reactivex и rxscala и могу создать Observable вот так: val observable = Observable[String] { subscriber => subscriber.onNext("something") } Я могу поместить новые строки в subscriber внутри Observable.apply ....
142 просмотров

Преобразование наблюдаемых в RxJava и RxScala
У меня есть Observable, который выдает список записей, как показано ниже: val obsList: Observable[List[MyEntry] = Observable.from(getListEntry) // getListEntry would give me a List[MyEntry] Как теперь я могу передать содержимое в другой...
43 просмотров
schedule 08.04.2024

RxScala подписывается с несколькими наблюдателями, просто выдает событие первому
Я пытаюсь использовать несколько Observer подписаться на Observable , который onNext произошел в цикле. Кажется, это не работает для каждого наблюдателя. import rx.lang.scala.Observable object SubscribeMultiEvent extends App{ val obv =...
113 просмотров
schedule 13.02.2023

Как отменить отображение Observable с указанием условия в RxScala/Java?
исходная наблюдаемая ------a-------b-------c----------d-------->.... отображаемый наблюдаемый -----A-------B(finish) Простой код: val original = Observable.just('a', 'b', 'c', 'd') val mapped = original.map(x => x.toUpper) //how to let...
260 просмотров
schedule 17.11.2022

Вызов (перегруженных) функций RxJava из Scala
Я хотел создать Observable из массива Observable следующим образом: package rxtest import concurrent._ import concurrent.ExecutionContext.Implicits.global import rx.lang.scala._ import rx.lang.scala.JavaConversions._ import...
200 просмотров
schedule 16.11.2023

Управление наблюдаемой буферизацией с помощью самого наблюдаемого
Я пытаюсь нарезать наблюдаемый поток сам по себе, например: val source = Observable.from(1 to 10).share val boundaries = source.filter(_ % 3 == 0) val result = source.tumblingBuffer(boundaries) result.subscribe((buf) => println(buf.toString))...
70 просмотров
schedule 05.02.2023

Как реализовать одноразовое использование заказанного ReplaySubject?
Как я могу иметь одного подписчика ReplaySubject , который: буферизует все события, полученные с помощью onNext() , пока кто-нибудь не подпишется на него, как только кто-то подписывается на него, все буферизованные события перенаправляются...
307 просмотров
schedule 20.09.2022

Тема поведения с паузой?
Возможно ли иметь что-то вроде BehaviorSubject с переключателями паузы и возобновления? Что-то вроде PausableBehaviorSubject.pause() и PausableBehaviorSubject.resume() ? Как это можно было сделать? Идея состоит в том, что во время паузы...
332 просмотров
schedule 19.06.2023

Как создать один Observable из другого
Допустим, у нас есть исходный Observable of Ints: val source:Observable[Int] Я хотел бы создать еще один Observable, производящий значения, разница которых с первым появившимся значением в source больше 10: def detect() =...
90 просмотров
schedule 26.08.2022

Превратите TCP-сокет в Observable of Array[Byte]
В моем приложении для Android мне нужно использовать Socket для отправки и получения массивов байтов. Для удобства я хочу работать с Observable , подключенным к Socket . Глядя в Интернете, я нашел этот код: import rx.lang.scala.Observable...
163 просмотров
schedule 29.12.2022

Перезапустите Observable, подключенный к ресурсу.
В следующем коде я превращаю сокет TCP в Observable[Array[Byte]] : import rx.lang.scala.Observable import rx.lang.scala.schedulers.IOScheduler val sock = new Socket type Bytes = Array[Byte] lazy val s: Observable[Bytes] = Obs.using[Bytes,...
70 просмотров
schedule 24.08.2022