RxJava — почему исполнители используют только один поток

Я создал фиксированный пул потоков для обработки события, испускаемого за 300 миллисекунд, и предположил, что процессу требуется 1000 миллисекунд. Предположим, что многопоточность будет работать, но повторно используется только один поток.

Если я установлю sleepTime меньше 300 мс, поток обработки изменится, но это бесполезно.

Вопросы: Что я могу сделать, чтобы сделать это одновременно? Почему программа повторно использует поток?

заранее спасибо

public static void main(String[] args) throws InterruptedException {
    long sleepTime = 1000;
    ExecutorService e = Executors.newFixedThreadPool(3);

    Observable.interval(300, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.computation())
    .flatMap(new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long pT) {
            return Observable.just(pT).subscribeOn(Schedulers.from(e));
        }
    })
    .doOnNext(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    })
    .subscribe(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    });


    Thread.sleep(50000);
    e.shutdownNow();

}

Журналы

i am 0in thread:pool-1-thread-1
i am 1in thread:pool-1-thread-1
i am 2in thread:pool-1-thread-1
i am 3in thread:pool-1-thread-1
i am 4in thread:pool-1-thread-1
i am 5in thread:pool-1-thread-1
i am 6in thread:pool-1-thread-1
i am 7in thread:pool-1-thread-1
i am 8in thread:pool-1-thread-1
i am 9in thread:pool-1-thread-1
i am 10in thread:pool-1-thread-1
i am 11in thread:pool-1-thread-1

person Rockman12352    schedule 30.12.2015    source источник
comment
Просто примечание: вы можете использовать jvisualvm, чтобы более надежно выяснить, что происходит с точки зрения планирования и какие потоки используются: docs.oracle.com/javase/6/docs/technotes/tools/share/   -  person Reut Sharabani    schedule 30.12.2015
comment
@ReutSharabani В режиме отладки eclipse я вижу, что создаются потоки, но программа повторно использует только один поток.   -  person Rockman12352    schedule 30.12.2015


Ответы (3)


Насколько я понимаю в вашем коде, производитель производит быстрее, чем подписчик. Однако Observable<Long> interval(long interval, TimeUnit unit) на самом деле не поддерживает Backpressure. В документации указано, что

Этот оператор не поддерживает обратное давление, так как использует время. Если нисходящему потоку нужно медленнее, он должен замедлить таймер или использовать что-то вроде {@link #onBackpressureDrop}.

Если ваша обработка действительно медленнее, чем у производителя, то, что вы можете сделать в своем коде подписчика, выглядит примерно так:

.subscribe(new Action1<Long>() {

    @Override
    public void call(Long pT) {
        e.submit(new Runnable() {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    }
});
person Wins    schedule 30.12.2015
comment
Конечно, я могу отправить задачу в другую ветку, как вы сказали. Но я хочу сделать это естественно с помощью планировщика. - person Rockman12352; 30.12.2015
comment
@ Rockman12352 Я согласен, однако, насколько я помню, Observable будет выполнять все выполнение (от производителя к подписчику) в одном потоке для каждой эмиссии. Это означает, что для каждого Long данных в вашем производителе он будет вызывать всех подписчиков в одном потоке. Я могу ошибаться здесь, но это то, что я получил до сих пор - person Wins; 31.12.2015

Вместо

 .subscribeOn(Schedulers.computation())

пытаться

 .observeOn(Schedulers.computation())

Этот пример, который я сделал некоторое время назад, чтобы поиграть с параллелизмом с Rx, работает довольно хорошо в качестве примера.

   public class ObservableZip {

private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;

@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
                                                                                        .concat(s3))
              .subscribe(result -> showResult("Async:", start, result));
}




public void showResult(String transactionType, long start, String result) {
    System.out.println(result + " " +
                               transactionType + String.valueOf(System.currentTimeMillis() - start));
}


public Observable<String> obAsyncString() {
    return Observable.just("")
                     .observeOn(scheduler)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

public Observable<String> obAsyncString1() {
    return Observable.just("")
                     .observeOn(scheduler1)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

public Observable<String> obAsyncString2() {
    return Observable.just("")
                     .observeOn(scheduler2)
                     .doOnNext(val -> {
                         System.out.println("Thread " +  Thread.currentThread()
                                                               .getName());
                     })
                     .map(val -> "!");
  }

 }
person paul    schedule 30.12.2015
comment
Это не работает. В вашем примере 3 наблюдаемых исходят из разных потоков, поэтому они, естественно, многопоточные. Но в моем случае я хочу разделить его в пуле. - person Rockman12352; 30.12.2015
comment
Но каждый раз, когда вы вызываете Schedulers.computation(), вы не получаете новый поток? - person paul; 30.12.2015
comment
Да, я попробую другой компьютер - person Rockman12352; 30.12.2015

Я нашел ответ на GitHub!

Внутренний наблюдаемый действительно испускает многопоточность, но следующее действие на следующем - нет. Если я хочу, чтобы это было параллельно, я должен сделать это во внутреннем наблюдаемом .

person Rockman12352    schedule 31.12.2015
comment
У вас есть ссылка или пример? - person Christoph; 23.10.2017