Как мы можем буферизовать элементы в каждую миллисекунду и передавать каждый элемент с постоянным интервалом времени

Метод onNext для publishSubject вызывается непрерывно (неравномерно, примерно через 1 миллисекунду), и требование состоит в том, чтобы генерировать эти элементы каждую 1 секунду, и данные не должны теряться, значит, должен выдавать каждый элемент.

    publishSubject.onNext("Data1");
    publishSubject.onNext("Data2");
    publishSubject.onNext("Data3");
    publishSubject.onNext("Data4");
    publishSubject.onNext("Data5");
    publishSubject.onNext("Data6");
    publishSubject.onNext("Data7");

и так далее ... См. раздел "Структура кода" для справки:

var publishSubject = PublishSubject.create<String>()
publishSubject.onNext(stateObject) // Executing at every milliseconds...


publishSubject
        /* Business Logic Required Here ?? */
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            // Should execute at every 1 second
        }

Пожалуйста, помогите, заранее спасибо,


person Rajat Sharma    schedule 11.01.2019    source источник
comment
Что-то вроде этой stackoverflow .com / questions / 33291245 /?   -  person user    schedule 11.01.2019
comment
Как насчет сохранения элементов в двухсторонней очереди и использования сопрограммы, которая запускает функцию приостановки, чтобы получить первый элемент двухсторонней очереди, равный единице в секунду?   -  person Ariles    schedule 11.01.2019
comment
Ваше требование сбивает с толку. Если данные поступают раз в миллисекунду и могут передаваться только раз в секунду, то через одну секунду вы отстаете на 999 выбросов и никогда не догоните их. Что ты хочешь с ними делать?   -  person Bob Dalgleish    schedule 11.01.2019
comment
@BobDalgleish, да, теперь 999 выбросов будут испускаться каждую секунду одно за другим   -  person Rajat Sharma    schedule 11.01.2019
comment
Через 1 день вы будете отставать примерно на 86 313 600 пунктов; через 12 дней у вас останется более 1 миллиона товаров. Вы не видите проблемы?   -  person Bob Dalgleish    schedule 11.01.2019
comment
@BobDalgleish, да но это не долгий процесс вроде 12 дней. это полный процесс от 10 до 15 минут.   -  person Rajat Sharma    schedule 14.01.2019
comment
@BobDalgleish, это что-то вроде буферизации и потоковой передачи элементов.   -  person Rajat Sharma    schedule 14.01.2019


Ответы (2)


Это расширение функции для класса Observable - именно то, что вам нужно:

fun <T> Observable<T>.delayBetweenItems(timeout: Long, unit: TimeUnit): Observable<T> =
    Observable.zip(this, Observable.interval(timeout, unit), BiFunction<T, Long, T> { item, _ -> item })

Вы можете объявить его в каком-либо служебном классе в своем проекте, а затем применить его так же, как и другие операторы RxJava:

publishSubject
    .delayBetweenItems(1000, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        // Should execute at every 1 second
    }
person jackqack    schedule 11.01.2019
comment
Он работает ... но иногда (по прошествии длительного времени) автоматически удаляется. и ни один элемент не излучает. - person Rajat Sharma; 14.01.2019
comment
@RajatSharma Я думаю, причина в том, как вы используете его в Android, а не в самой функции. Попробуйте проверить свой код на логику подписки и удаления. - person jackqack; 14.01.2019
comment
это может быть из-за того, что мы не обработали в нем обратную прессу. - person Rajat Sharma; 18.01.2019

Как насчет хранения предметов в Deque. Затем использовать сопрограмму, которая запускает функцию приостановки, чтобы получать первый элемент двухсторонней очереди раз в секунду?

Вот какой быстрый и грязный код, чтобы убедиться, что он работает. Вы можете запустить этот код в Интернете на веб-сайте kotlin. Имейте в виду, что я новичок в Котлине.

val deque: Deque<String> = ArrayDeque()
var refMillisAdd: Long = 0
var refMillisTake: Long = 0

fun main() {

    println(" Delay(ms) -> Action")
    println("---------------------")

    kotlinx.coroutines.runBlocking {

        launch {

            refMillisAdd = currentTimeMillis()
            refMillisTake = currentTimeMillis()

            for(i in 0..20){
               oncePer10ms(i.toString())
               refMillisAdd = currentTimeMillis()
            }

            for(i in 0..6){
                oncePerSecond()
                refMillisTake = currentTimeMillis()
            }
        }
    }
}

suspend fun oncePerSecond(){
    kotlinx.coroutines.delay(1_000L)
    println("  ${currentTimeMillis() - refMillisTake} -> TAKE ${deque.pop()}")
}

suspend fun oncePer10ms(item: String){
    kotlinx.coroutines.delay(10L)
    deque.add(item)
    println("  ${currentTimeMillis() - refMillisAdd} -> ADD $item")
}

Приведенный выше код печатает:

 Delay(ms) -> Action
---------------------
  17 -> ADD 0
  11 -> ADD 1
  10 -> ADD 2
  10 -> ADD 3
  10 -> ADD 4
  10 -> ADD 5
  10 -> ADD 6
  10 -> ADD 7
  10 -> ADD 8
  11 -> ADD 9
  10 -> ADD 10
  10 -> ADD 11
  10 -> ADD 12
  11 -> ADD 13
  10 -> ADD 14
  10 -> ADD 15
  11 -> ADD 16
  10 -> ADD 17
  10 -> ADD 18
  10 -> ADD 19
  11 -> ADD 20
  1223 -> TAKE 0
  1000 -> TAKE 1
  1000 -> TAKE 2
  1001 -> TAKE 3
  1000 -> TAKE 4
  1000 -> TAKE 5
  1000 -> TAKE 6
person Ariles    schedule 11.01.2019