Вопросы по теме 'rx-scala'
Reactive Extensions (RxScala), эквивалент fold ifEmpty
Мне было интересно, есть ли какой-либо эквивалент функции Scala fold ifEmpty, которая существует для коллекций и параметров:
fold[B](ifEmpty: => B)(f: (A) => B)
Эта функция мощна тем, что может преобразовывать монады разных типов —...
191 просмотров
schedule
13.06.2023
Рекурсивный поток RxScala с тайм-аутом
Я пытаюсь рекурсивно определить наблюдаемую, которая либо испускает элементы от субъекта, либо, если проходит определенное количество времени, значение по умолчанию, в этом случае я использую значение таймера по умолчанию, равное нулю. Я использую...
264 просмотров
schedule
24.09.2022
Опрос базы данных с использованием RxScala
TableEntries Я начинаю с RxScala и пытаюсь придумать механизм опроса, который проверяет базу данных для каждого интервала (скажем, 20 секунд), чтобы проверить, были ли какие-либо изменения в некоторых строках в таблице.
object MyDatabaseService {...
59 просмотров
schedule
11.01.2023
RxScala Observables с повтором
Я пытаюсь понять replay в RxScala. Я создаю наблюдаемое следующим образом:
lazy val toyObservable : Observable[Int] = {
val coldObservable : Observable[Int] = intPerSecond
val hotObservable : ConnectableObservable[Int] =...
61 просмотров
schedule
10.06.2022
В Rx (или RxJava/RxScala), как сделать автоматически сбрасываемую карту/фильтр блокировки с отслеживанием состояния для измерения времени, прошедшего в потоке до касания барьера?
Извините, если вопрос коряво сформулирован, я постараюсь.
Если у меня есть последовательность значений со временем в виде Observable[(U,T)] , где U — это значение, а T — тип, подобный времени (или что-то другое, что я полагаю), как я могу написать...
228 просмотров
schedule
02.12.2022
Обработка наблюдаемых исключений
Я изучаю RxScala и пришел к этому очень синтетическому фрагменту. Я пытаюсь обработать исключение в блоке onError:
def doLongOperation():String = {
(1 to 10).foreach {
_ =>
Thread.sleep(100)
print(".")
}
println()
if...
272 просмотров
schedule
09.07.2023
Как обновить наблюдаемую вручную?
Я новичок в reactivex и rxscala и могу создать Observable вот так:
val observable = Observable[String] { subscriber =>
subscriber.onNext("something")
}
Я могу поместить новые строки в subscriber внутри Observable.apply ....
142 просмотров
schedule
24.07.2023
Преобразование наблюдаемых в RxJava и RxScala
У меня есть Observable, который выдает список записей, как показано ниже:
val obsList: Observable[List[MyEntry] = Observable.from(getListEntry)
// getListEntry would give me a List[MyEntry]
Как теперь я могу передать содержимое в другой...
43 просмотров
schedule
08.04.2024
RxScala подписывается с несколькими наблюдателями, просто выдает событие первому
Я пытаюсь использовать несколько Observer подписаться на Observable , который onNext произошел в цикле. Кажется, это не работает для каждого наблюдателя.
import rx.lang.scala.Observable
object SubscribeMultiEvent extends App{
val obv =...
113 просмотров
schedule
13.02.2023
Как отменить отображение Observable с указанием условия в RxScala/Java?
исходная наблюдаемая ------a-------b-------c----------d-------->.... отображаемый наблюдаемый -----A-------B(finish) Простой код:
val original = Observable.just('a', 'b', 'c', 'd')
val mapped = original.map(x => x.toUpper)
//how to let...
260 просмотров
schedule
17.11.2022
Вызов (перегруженных) функций RxJava из Scala
Я хотел создать Observable из массива Observable следующим образом:
package rxtest
import concurrent._
import concurrent.ExecutionContext.Implicits.global
import rx.lang.scala._
import rx.lang.scala.JavaConversions._
import...
200 просмотров
schedule
16.11.2023
Управление наблюдаемой буферизацией с помощью самого наблюдаемого
Я пытаюсь нарезать наблюдаемый поток сам по себе, например:
val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)
result.subscribe((buf) => println(buf.toString))...
70 просмотров
schedule
05.02.2023
Как реализовать одноразовое использование заказанного ReplaySubject?
Как я могу иметь одного подписчика ReplaySubject , который:
буферизует все события, полученные с помощью onNext() , пока кто-нибудь не подпишется на него,
как только кто-то подписывается на него, все буферизованные события перенаправляются...
307 просмотров
schedule
20.09.2022
Тема поведения с паузой?
Возможно ли иметь что-то вроде BehaviorSubject с переключателями паузы и возобновления? Что-то вроде PausableBehaviorSubject.pause() и PausableBehaviorSubject.resume() ? Как это можно было сделать?
Идея состоит в том, что во время паузы...
332 просмотров
schedule
19.06.2023
Как создать один Observable из другого
Допустим, у нас есть исходный Observable of Ints:
val source:Observable[Int]
Я хотел бы создать еще один Observable, производящий значения, разница которых с первым появившимся значением в source больше 10:
def detect() =...
90 просмотров
schedule
26.08.2022
Превратите TCP-сокет в Observable of Array[Byte]
В моем приложении для Android мне нужно использовать Socket для отправки и получения массивов байтов. Для удобства я хочу работать с Observable , подключенным к Socket .
Глядя в Интернете, я нашел этот код:
import rx.lang.scala.Observable...
163 просмотров
schedule
29.12.2022
Перезапустите Observable, подключенный к ресурсу.
В следующем коде я превращаю сокет TCP в Observable[Array[Byte]] :
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler
val sock = new Socket
type Bytes = Array[Byte]
lazy val s: Observable[Bytes] = Obs.using[Bytes,...
70 просмотров
schedule
24.08.2022