RxJava - Flowables - Что, когда и как использовать?

Чтобы понять Flowables, нам сначала нужно понять Observables. Наблюдаемые - это те объекты, которые мы наблюдаем за любым событием. Наблюдаемые используются, когда у нас относительно мало элементов за время и нет риска переполнения потребителей. Если есть вероятность того, что потребитель может быть переполнен, мы используем Flowable. Одним из примеров может быть получение огромного количества данных с датчика. Обычно они отправляют данные с высокой скоростью. В предыдущей версии RxJava это переполнение можно было предотвратить, применив противодавление. Но в RxJava 2 команда разработчиков разделила эти два типа производителей на две сущности. то есть наблюдаемые и текучие. Согласно документации:

Небольшое сожаление по поводу введения противодавления в RxJava 0.x состоит в том, что вместо отдельного ›базового реактивного класса был модифицирован сам Observable. Основная проблема с противодавлением заключается в ›том, что многие горячие источники, такие как события пользовательского интерфейса, не могут иметь достаточное противодавление и вызывают неожиданное› исключение MissingBackpressureException (т.е. новички этого не ожидают).

Мы пытаемся исправить эту ситуацию в версии 2.x, используя io.reactivex.Observable без обратного давления и ›новый io.reactivex.Flowable как базовый реактивный класс с поддержкой обратного давления.

Давайте разберемся с использованием Flowable на другом примере. Предположим, у вас есть источник, который выдает элементы данных со скоростью 1 миллион элементов в секунду. Следующим шагом является выполнение сетевого запроса для каждого элемента. Предположим, устройство может обрабатывать 100 сетевых запросов в секунду. Ты видишь проблему? Второй шаг - это узкое место, потому что устройство может обрабатывать не более 100 запросов в секунду, и поэтому огромный объем данных с шага 1 вызовет исключение OOM (Out Of Memory). Приведенный ниже код является прекрасным примером этого:

val observable = PublishSubject.create<Int>()
  observable.observeOn(Schedulers.computation())
            .subscribeBy (
                onNext ={
                  println("number: ${it}")
                },onError = {t->
                  print(t.message)
                }
            )
    for (i in 0..1000000){
        observable.onNext(i)
    }

В этих сценариях нам требуется обратное давление, которое, говоря простыми словами, является просто способом обработки элементов, которые не могут быть обработаны. В приведенном ниже коде мы будем обрабатывать случай, используя Flowable:

val observable = PublishSubject.create<Int>()
    observable
            .toFlowable(BackpressureStrategy.MISSING)
            .observeOn(Schedulers.computation())
            .subscribeBy (
                onNext ={
                    println("number: ${it}")
                },onError = {t->
                print(t.message)
            }
            )
    for (i in 0..1000000){
        observable.onNext(i)
    }

Если вы запустите приведенный выше код, вы увидите результат:

Queue is full?!

Это связано с тем, что мы не указали BackpressureStrategy, поэтому он возвращается к значению по умолчанию, которое в основном буферизует до 128 элементов в очереди. Следовательно, вывод Очередь заполнена

Есть много других стратегий противодавления, которые мы сейчас рассмотрим:

  • Отбрасывание. Что вы делаете, когда не можете справиться со слишком многими вещами? Вы бросаете это. Эта стратегия обратного давления делает то же самое. Он отбрасывает элементы, если он не может обработать больше, чем его емкость, т.е. 128 элементов (размер буфера). Есть два способа применить эту стратегию обратного давления:

observable.toFlowable(BackpressureStrategy.DROP)

Or

observable.toFlowable(BackpressureStrategy.MISSING).onBackpressureDrop()

  • Сохранить последний элемент: если производитель видит, что нисходящий поток не может справиться с потоком элементов, он прекращает его отправку и ждет, пока он не станет доступным. В то же время он продолжает отбрасывать элементы, кроме последнего, который прибыл, и отправляет последний, когда нисходящий поток снова становится доступен. Есть два способа применить эту стратегию противодавления:

observable.toFlowable(BackpressureStrategy.LATEST)

Or

observable.toFlowable(BackpressureStrategy.MISSING).onBackpressureLatest()

Другой вариант, который чаще всего используется в мире Android, - это debounce. Обычно это используется при нажатии кнопки, когда мы не хотим, чтобы пользователи постоянно нажимали кнопку, пока обрабатывается действие нажатия кнопки. Подумайте о кнопке «Войти»: когда пользователь нажимает на нее, мы делаем сетевой запрос к серверу. Мы не хотим, чтобы пользователи продолжали нажимать кнопку. Используя дебаунс, он принимает последнее значение по истечении заданного времени. В приведенном ниже примере он принимает последнее значение, выданное через 1 секунду:

observable.toFlowable(BackpressureStrategy.MISSING).debounce(1000,TimeUnit.MILLISECONDS)

  • Буферизация: возможно, это не лучший способ справиться с большим количеством выбросов, но, безусловно, это доступный способ. При этом вы можете сохранять элементы в буфере. В этом случае элементы хранятся в буфере до тех пор, пока они не будут обработаны. Есть два способа применить эту стратегию противодавления:

observable.toFlowable(BackpressureStrategy.BUFFER)

Or

observable.toFlowable(BackpressureStrategy.MISSING).onBackpressureBuffer()

Вы также можете указать размер буфера как:

observable.toFlowable(BackpressureStrategy.MISSING).buffer(10)

PS: Я сделал это простое бесплатное приложение для Android, которое помогает вам поддерживать последовательность в достижении ваших целей, на основе технологий / инструментов, упомянутых выше. Не стесняйтесь проверить это:

Если вам это нравится, вы должны хлопнуть ( 👏 ) по нему.

Это все на сегодня! Подпишитесь на меня, чтобы узнать больше о разработке Android и Kotlin.

Далее в строке идет Планировщики: что, когда и как использовать?

Спасибо за прочтение. Этот пост изначально был опубликован в моем блоге. Не стесняйтесь проверить это.