Создать интервал синхронизации в RxJava

Мне нужно запустить 2 JOB с определенным интервалом 4,8,12,16... секунд, а еще одно - 5,9,13,17... секунд.

Я использовал оператор Interval в RxJava. Задание B должно запускаться после задания A. Задание B должно находиться в спящем режиме, когда выполняется задание A, и наоборот. До сих пор код выглядит ниже

var compositeDisposable = CompositeDisposable()
compositeDisposable.add(Observable.interval(0, recordIntervalPeriod, TimeUnit.MILLISECONDS)
                        .serialize()
                        .subscribe {
                            JobA()
                        })
compositeDisposable.add(Observable.interval(0, recorderStopIntervalStartTime, TimeUnit.MILLISECONDS)
                        .serialize()
                        .subscribe {
                            JobB()
                        })

Нужна помощь в следовании

<сильный>1. Лучший способ добиться вышеперечисленного с помощью RxJava

<сильный>2. Запустите задание A на 4 секунды, затем запустите задание B на 4 секунды и повторите процесс снова.


person Chiradeep    schedule 22.03.2020    source источник
comment
Я думаю, что некоторые из этих требований немного невыполнимы. 1. Мне нужно запустить 2 задания с определенным интервалом 4,8,12,16... секунд, а еще одно - 5,9,13,17... секунд. 2. Задание B должно находиться в спящем режиме, когда выполняется задание A, и наоборот. 3 Запустите задание A на 4 секунды, затем запустите задание B на 4 секунды и повторите процесс снова. Если оба задания занимают 4 секунды, и они должны спать, пока выполняется другое, то они не могут работать с интервалами, которые вы определили для начала.   -  person GSala    schedule 23.03.2020
comment
Позвольте мне немного пояснить. Допустим, у нас есть асинхронная задача, и мы вызываем другую асинхронную задачу из onPostexecute. После того, как вторая асинхронная задача завершает свои операции, она снова вызывает первую асинхронную задачу, и цикл продолжается.   -  person Chiradeep    schedule 23.03.2020


Ответы (2)


Я бы посоветовал вам использовать одно задание, которое запускается каждую секунду, и каждый раз решать, какое задание вызывать на основе значения счетчика:

val disposable = Observable.interval(1, TimeUnit.SECONDS)
        .serialize()
        .subscribe { counter ->
            if (counter % 4 == 0L) {
                jobA()
            } else if ((counter - 1) % 4 == 0L) {
                jobB()
            }
        }

Если вы все еще хотите использовать две наблюдаемые, я думаю, что это тоже сработает:

val disposable = CompositeDisposable()
disposable.addAll(
        Observable.interval(4, TimeUnit.SECONDS)
                .subscribe {
                    jobA()
                },
        Observable.interval(4, TimeUnit.SECONDS)
                .delay(1, TimeUnit.SECONDS)
                .subscribe {
                    jobB()
                })

Отказ от ответственности: я мало использовал RxJava.

person Nicolas    schedule 22.03.2020
comment
Спасибо Николай за ответ. Это сработает. Я все еще буду ждать лучшего подхода, кроме этого :) - person Chiradeep; 23.03.2020
comment
Проблема приведенного выше ответа заключается в том, что JobA будет работать на 4,8,12,16... а JobB будет работать на 5,9,13... Но JobB не зависит от JobA. Оба будут работать одновременно. - person Chiradeep; 23.03.2020
comment
Эффект должен быть таким же. Если задание A не может занять более 1 секунды, в этом случае вы хотите пропустить задание B? - person Nicolas; 23.03.2020
comment
Да. Что-то вроде. Мне нужно сделать это синхронизированным. это означает, что JobB должен начинаться после JobA, но жизненный цикл каждого потока должен составлять 4 секунды. - person Chiradeep; 23.03.2020
comment
Итак, JobA отменяется при запуске JobB и наоборот? Максимальная продолжительность JobA составляет 1 секунду, а JobB - 3 секунды? - person Nicolas; 23.03.2020
comment
Правильный. JobA всегда будет выполняться 4 секунды, а JobB будет выполняться 4 секунды, но один за другим. Обе работы займут 4 секунды строго. - person Chiradeep; 23.03.2020

Как насчет

Observable.interval(4,TimeUnit.SECONDS)
    .flatMap({
        jobA().zipWith(Observable.timer(1, TimeUnit.SECONDS) }
            .flatMap { jobB() }
    }, maxConcurrent = 1).subscribe()

Я предполагаю, что jobA() и jobB() - это какие-то наблюдаемые.

Задание A должно ожидать выполнения задания B, так как максимальный параллелизм установлен на 1.

Задание B должно ждать задания A или 1 секунду от начала задания A, в зависимости от того, что произойдет позже.

person GSala    schedule 22.03.2020
comment
Спасибо за Ваш ответ. И jobA, и jobB не будут выдавать никаких данных, но будут выполнять некоторую операцию в функциональном блоке. Но идея состоит в том, чтобы сделать его синхронизированным. Задание JobA займет 4 секунды после того, как начнется задание B, а также это займет 4 секунды. Снова должен запуститься jobA, и итерация продолжится.... - person Chiradeep; 23.03.2020