Как происходит внутреннее противодавление в RxJava

Я читал несколько документов по противодавлению в RxJava, но я не могу найти подробного объяснения, например, как это происходит внутри библиотеки, все просто резюмируют это, как «производитель» слишком быстрый, а «потребитель» слишком медленный.

Например, как в приведенном ниже коде:

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

Я просматривал исходный код RxJava , поэтому я понимаю, что в основном потоке мы будем генерировать события каждые миллисекунды, и как только мы его испустим, мы передаем значение методу System.out.println (i) и бросаем его в пул потоков планировщика newThead и запустить метод внутри исполняемого файла.

Итак, мой вопрос: как происходит внутреннее исключение? Потому что, когда мы вызываем Thread.sleep (), мы просто спим поток, который обрабатывает вызов метода -> System.out.println (), не влияя на другие потоки в пуле потоков, почему это вызовет исключение. Это потому, что в пуле потоков больше нет доступных потоков?

Спасибо


person Qing    schedule 12.08.2016    source источник


Ответы (1)


Вы можете думать о противодавлении как о системе разрешений, которые один оператор передает своему исходному источнику: вы можете дать мне 128 элементов. Чуть позже этот оператор может сказать: «Хорошо, дайте мне еще 96», так что всего может остаться 224 нереализованных разрешения. Некоторые источники, такие как interval, не заботятся о разрешениях и просто периодически выдают значения. Поскольку количество разрешений обычно сильно привязано к доступной емкости в очереди или буфере, выдача большего количества разрешений, чем эти хранилища могут вместить MissingBackpressureException.

Обнаружение нарушения противодавления происходит в основном, когда offer в ограниченную очередь возвращает ложь, например значение в observeOn, указывающее, что очередь заполнена.

Второй способ обнаружения нарушений - отслеживание количества невыполненных разрешений в операторе, таком как onBackpressureDrop, и всякий раз, когда восходящий поток отправляет больше, чем это, оператор просто не пересылает его:

// in onBackpressureDrop
public void onNext(T value) {
    if (emitted != availablePermits) {
        emitted++;
        child.onNext(value);
    } else {
        // ignoring this value
    }
}

Дочерний подписчик сигнализирует о своих разрешениях через request (), что обычно приводит к примерно следующему в onBackpressureDrop:

public void childRequested(long n) {
    availablePermits += n;
}

На практике из-за возможного асинхронного выполнения availablePermits является AtomicLong (и называется requested).

person akarnokd    schedule 12.08.2016
comment
Вы говорите, что некоторые операторы в RxJava будут помещать события onNext () в очередь или структуру данных буфера, верно? так что исключение не из-за пула потоков;)? - person Qing; 13.08.2016