Наблюдается несколько потоков в одном планировщике, событие не используется в том же порядке, в котором оно было отправлено.

Я использую событие, используя перенос из разных внешних источников/подписки в разные Flowable. Источник не имеет значения, так как я могу воспроизвести проблему с помощью простого цикла. У меня есть :

  • Различные FlowableEmitter (достаточно 3 для воспроизведения)
  • Одиночный поток для эмиттера (ниже основной поток)
  • Одиночный поток для подписчика (newSingleThreadExecutor)

Это простой код, который воспроизводит

    final ExecutorService executor = Executors.newSingleThreadExecutor();
    final Scheduler scheduler = Schedulers.from(executor);

    List<FlowableEmitter<Long>> emitterList = new ArrayList<>();
    for (int i=0; i<3; ++i) {
        final int finalI = i;
        Flowable.create( new FlowableOnSubscribe<Long>(){
            @Override
            public void subscribe(FlowableEmitter<Long> emitter) {
                emitterList.add(emitter);
            }
        },  BackpressureStrategy.MISSING)
                .observeOn(scheduler)
                .subscribe(
                        val -> System.out.println(
                            "[" +Thread.currentThread().getName()
                            + "] Flow:" + finalI 
                            + " > " + Long.toString(val)));
    }

    long state = 0;
    for (int i=0; i<5; ++i) {
        for (FlowableEmitter<Long> emitter: emitterList){
            emitter.onNext(++state);
        }
    }

    executor.shutdown();

Моя проблема в том, что события не потребляются в том же порядке, в котором они испускаются. если я удаляю наблюдатель (планировщик), он работает нормально, но мне нужно, чтобы эмиттер и подписчик были в разных потоках. Я также тестировал разные стратегии BackpressureStrategy, и это не помогло.
Любая подсказка, чтобы все номера были подписаны/распечатаны по порядку (1,2,3,4,5...14,15) вместо того, что я указал ниже

[pool-1-thread-1] Flow:0 > 1
[pool-1-thread-1] Flow:0 > 4
[pool-1-thread-1] Flow:0 > 7
[pool-1-thread-1] Flow:0 > 10
[pool-1-thread-1] Flow:0 > 13
[pool-1-thread-1] Flow:1 > 2
[pool-1-thread-1] Flow:1 > 5
[pool-1-thread-1] Flow:1 > 8
[pool-1-thread-1] Flow:1 > 11
[pool-1-thread-1] Flow:1 > 14
[pool-1-thread-1] Flow:2 > 3
[pool-1-thread-1] Flow:2 > 6
[pool-1-thread-1] Flow:2 > 9
[pool-1-thread-1] Flow:2 > 12
[pool-1-thread-1] Flow:2 > 15

Я использую rx-java 2.2.5 и Java 8, если это имеет значение.


person Rod1677    schedule 04.03.2019    source источник


Ответы (1)


observeOn не является справедливым и может охватывать нить эмиссии, если есть еще работа. При циклическом излучении один источник может быть готов излучать больше, и, таким образом, вы не получите идеального чередования. Также обратите внимание, что при использовании стратегии MISSING вы подвержены исключениям обратного давления. Попробуйте requestObserveOn из проекта расширений:

final ExecutorService executor = Executors.newSingleThreadExecutor();
final Scheduler scheduler = Schedulers.from(executor);

List<FlowableEmitter<Long>> emitterList = new ArrayList<>();
for (int i=0; i<3; ++i) {
    final int finalI = i;
    Flowable.create( new FlowableOnSubscribe<Long>(){
        @Override
        public void subscribe(FlowableEmitter<Long> emitter) {
            emitterList.add(emitter);
        }
    },  BackpressureStrategy.BUFFER)
            // ---------------------------------------------------------
            .compose(FlowableTransformers.requestObserveOn(scheduler))
            // ---------------------------------------------------------
            .subscribe(
                    val -> System.out.println(
                        "[" +Thread.currentThread().getName()
                        + "] Flow:" + finalI 
                        + " > " + Long.toString(val)));
}

long state = 0;
for (int i=0; i<5; ++i) {
    for (FlowableEmitter<Long> emitter: emitterList){
        emitter.onNext(++state);
    }
}

executor.shutdown();
person akarnokd    schedule 04.03.2019
comment
Будут ли у него проблемы, если он никогда не вызовет .onComplete() ни на одном из своих потоковых эмиттеров? - person EpicPandaForce; 04.03.2019
comment
Не в этом конкретном сценарии. - person akarnokd; 04.03.2019
comment
@akarnokd Спасибо за разъяснение проблемы. Только что попробовал FlowableTransformers.requestObserveOn, но все еще путаю вывод (№ 1 фактически потребляется после № 15 в моей первой попытке), так что на самом деле не лучше. - person Rod1677; 05.03.2019