Я использую событие, используя перенос из разных внешних источников/подписки в разные Flowable. Источник не имеет значения, так как я могу воспроизвести проблему с помощью простого цикла. У меня есть :
- Различные FlowableEmitter (достаточно 3 для воспроизведения)
- Одиночный поток для эмиттера (ниже основной поток)
- Одиночный поток для подписчика (newSingleThreadExecutor)
Это простой код, который воспроизводит
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Scheduler scheduler = Schedulers.from(executor);
List<FlowableEmitter<Long>> emitterList = new ArrayList<>();
for (int i=0; i<3; ++i) {
final int finalI = i;
Flowable.create( new FlowableOnSubscribe<Long>(){
@Override
public void subscribe(FlowableEmitter<Long> emitter) {
emitterList.add(emitter);
}
}, BackpressureStrategy.MISSING)
.observeOn(scheduler)
.subscribe(
val -> System.out.println(
"[" +Thread.currentThread().getName()
+ "] Flow:" + finalI
+ " > " + Long.toString(val)));
}
long state = 0;
for (int i=0; i<5; ++i) {
for (FlowableEmitter<Long> emitter: emitterList){
emitter.onNext(++state);
}
}
executor.shutdown();
Моя проблема в том, что события не потребляются в том же порядке, в котором они испускаются. если я удаляю наблюдатель (планировщик), он работает нормально, но мне нужно, чтобы эмиттер и подписчик были в разных потоках. Я также тестировал разные стратегии BackpressureStrategy, и это не помогло.
Любая подсказка, чтобы все номера были подписаны/распечатаны по порядку (1,2,3,4,5...14,15) вместо того, что я указал ниже
[pool-1-thread-1] Flow:0 > 1
[pool-1-thread-1] Flow:0 > 4
[pool-1-thread-1] Flow:0 > 7
[pool-1-thread-1] Flow:0 > 10
[pool-1-thread-1] Flow:0 > 13
[pool-1-thread-1] Flow:1 > 2
[pool-1-thread-1] Flow:1 > 5
[pool-1-thread-1] Flow:1 > 8
[pool-1-thread-1] Flow:1 > 11
[pool-1-thread-1] Flow:1 > 14
[pool-1-thread-1] Flow:2 > 3
[pool-1-thread-1] Flow:2 > 6
[pool-1-thread-1] Flow:2 > 9
[pool-1-thread-1] Flow:2 > 12
[pool-1-thread-1] Flow:2 > 15
Я использую rx-java 2.2.5 и Java 8, если это имеет значение.