ReactiveX Backpressure не работает должным образом

Пытаюсь сделать текучую с противодавлением. Моя идея состоит в том, что новый элемент текучего не будет генерироваться, пока один из текущих элементов не завершит свою обработку. Для этого я использую методы ResourceSubscriber и subscribeWith ().

Каждый элемент потока обрабатывается асинхронно в отдельном пуле потоков. (Чего я достигаю с помощью flatMap / subscribeOn)

Я ожидаю, что каждый элемент после секунды будет испускаться ПОСЛЕ метода onNext вызываемого подписчика. Однако, когда я пытаюсь запустить этот код, Flowable неконтролируемо испускает элементы. Противодавление не работает.

Вот код для воспроизведения проблемы:

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.ResourceSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;

public class RxTest2 {

    private static final Logger log = LoggerFactory.getLogger(RxTest.class);

    static AtomicInteger integer = new AtomicInteger();

    public static void main(String[] args) {
        Flowable.generate(emitter -> {
            final int i1 = integer.incrementAndGet();
            if (i1 >= 20) {
                Thread.sleep(10000);
                System.exit(0);
            }
            emitter.onNext(i1);
        })
                .doOnNext(i -> log.info("Published: " + i))
                .flatMap(i -> Flowable.defer(() -> {
                    log.info("Starting consuming {}", i);
                    Thread.sleep(100);
                    log.info("Finished consuming {}", i);
                    return Flowable.just(i);
                }).subscribeOn(Schedulers.computation()))
                .doOnNext(i -> log.info("Consuming finished, result: " + i))
                .subscribeWith(new BackpressureSubscriber(2));
    }
}

class BackpressureSubscriber extends ResourceSubscriber<Object> {

    private static final Logger log = LoggerFactory.getLogger(BackpressureSubscriber.class);

    private final long initialRequest;

    public BackpressureSubscriber(final long initialRequest) {
        this.initialRequest = initialRequest;
    }

    @Override
    protected void onStart() {
        super.onStart();
        log.info("Starting execution with {} initial requests", initialRequest);
        request(initialRequest);
    }

    @Override
    public void onNext(final Object message) {
        log.info("On next for {}", message);
        request(1);
    }

    @Override
    public void onError(final Throwable throwable) {
        log.error("Unhandled error: ", throwable);
    }

    @Override
    public void onComplete() {
        log.info("On Complete");
    }
}

Ожидаемый результат примерно такой:

[main] INFO RxTest - Published: 1
[main] INFO RxTest - Published: 2
[RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
[RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
[RxComputationThreadPool-1] INFO RxTest -  On next for 1
[main] INFO RxTest - Published: 3
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 2

Фактический выход:

11:30:32.166 [main] INFO BackpressureSubscriber - Starting execution with 2 initial requests
11:30:32.170 [main] INFO RxTest - Published: 1
11:30:32.189 [main] INFO RxTest - Published: 2
11:30:32.189 [RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
11:30:32.189 [RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
11:30:32.189 [main] INFO RxTest - Published: 3
11:30:32.190 [main] INFO RxTest - Published: 4
11:30:32.190 [RxComputationThreadPool-3] INFO RxTest - Starting consuming 3
11:30:32.190 [main] INFO RxTest - Published: 5
11:30:32.190 [RxComputationThreadPool-4] INFO RxTest - Starting consuming 4
11:30:32.190 [main] INFO RxTest - Published: 6
11:30:32.190 [RxComputationThreadPool-5] INFO RxTest - Starting consuming 5
11:30:32.190 [main] INFO RxTest - Published: 7
11:30:32.191 [RxComputationThreadPool-6] INFO RxTest - Starting consuming 6
11:30:32.191 [main] INFO RxTest - Published: 8
11:30:32.191 [RxComputationThreadPool-7] INFO RxTest - Starting consuming 7
11:30:32.191 [main] INFO RxTest - Published: 9
11:30:32.191 [RxComputationThreadPool-8] INFO RxTest - Starting consuming 8
11:30:32.191 [main] INFO RxTest - Published: 10
11:30:32.191 [RxComputationThreadPool-9] INFO RxTest - Starting consuming 9
11:30:32.191 [main] INFO RxTest - Published: 11
11:30:32.191 [RxComputationThreadPool-10] INFO RxTest - Starting consuming 10
11:30:32.192 [main] INFO RxTest - Published: 12
11:30:32.192 [RxComputationThreadPool-11] INFO RxTest - Starting consuming 11
11:30:32.192 [main] INFO RxTest - Published: 13
11:30:32.192 [main] INFO RxTest - Published: 14
11:30:32.192 [RxComputationThreadPool-12] INFO RxTest - Starting consuming 12
11:30:32.192 [main] INFO RxTest - Published: 15
11:30:32.192 [main] INFO RxTest - Published: 16
11:30:32.192 [main] INFO RxTest - Published: 17
11:30:32.192 [main] INFO RxTest - Published: 18
11:30:32.192 [main] INFO RxTest - Published: 19
11:30:32.294 [RxComputationThreadPool-2] INFO RxTest - Finished consuming 2
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Consuming finished, result: 1
11:30:32.294 [RxComputationThreadPool-1] INFO BackpressureSubscriber - On next for 1

Проверено на версиях библиотек:

2.2.19 2.1.2

Насколько я понимаю документацию ReactiveX, я думаю, что это ошибка RX. Однако я могу ошибаться и буду благодарен, если вы укажете


person Konstantin Abakumov    schedule 25.06.2020    source источник


Ответы (1)


flatMap фактически запрашивает от восходящего потока пакетами и буферизует элементы до тех пор, пока нисходящий поток не запросит их. Этого факта достаточно, чтобы описать наблюдаемое вами поведение. Если вы установили bufferSize в 1, вы можете увидеть ожидаемое поведение. Есть перегрузка, позволяющая установить bufferSize.

Вдобавок flatMap имеет параметр maxConcurrent, который легче понять, если вы поймете, что flatMap фактически является map, тогда merge применяется к потоку потоков, заданному map. merge может реально подписаться только на ограниченное количество источников одновременно, и это maxConcurrent. По умолчанию для bufferSize и maxConcurrent 128.

Имейте в виду, что когда этап слияния получает запрос от нисходящего потока, он не знает, на сколько потоков (помните, что здесь мы имеем дело с потоком потоков) ему потребуется подписаться для выполнения запроса! Первые 10 потоков вообще не могли возвращать никаких значений. Если первый поток ничего не возвращает и не завершается в течение 1 часа, и у нас есть maxConcurrent = 1, тогда мы не получим никаких событий в течение этого первого часа, даже если поток 2 и поток 3 были готовы отправить нам материал. По причинам, подобным этим, мы должны выбрать универсальные значения по умолчанию для bufferSize и maxConcurrent, и обычно выбираются значения, которые оптимизируют производительность в определенных тестовых случаях и минимизируют проблемы для многих крайних случаев.

person Dave Moten    schedule 25.06.2020
comment
Имеет смысл! Я попробовал поиграть с этими параметрами, и это сработало. Однако теперь я не понимаю, как здесь работает противодавление. Кажется, что конфигурация flatMap полностью переопределяет мою логику противодавления, независимо от того, сколько начальных запросов я указываю в BackpressureSubscriber, она всегда запрашивает maxConcurrent из восходящего потока. - person Konstantin Abakumov; 26.06.2020
comment
Обратное давление по-прежнему работает, но есть некоторые операторы, такие как flatMap, которые удивляют нас, казалось бы, чрезмерно запрашивающими. Я добавил к ответу еще один абзац. - person Dave Moten; 28.06.2020