Как вы можете создать таймер, который работает со списком в Rx?

Я хочу найти весь список элементов, которые нужно найти, прежде чем завершить, и если весь этот список не будет найден, то должно быть выбрано исключение (время ожидания или пользовательское). Подобно встроенному Observable.timer(), но вместо того, чтобы тест проходил после выдачи первого элемента, я хочу, чтобы он требовал, чтобы все элементы в списке были найдены.

Вот пример. Допустим, у меня есть тестовая функция, которая выдает Observable‹FoundNumber>. Это выглядит так:

var emittedList: List<String?> = listOf(null, "202", "302", "400")

data class FoundNumber(val numberId: String?)

fun scanNumbers(): Observable<FoundNumber> = Observable
    .intervalRange(0, 
                   emittedList.size.toLong(), 
                   0, 
                   1, 
                   TimeUnit.SECONDS).map { index -> 
                     FoundNumber(emittedList[index.toInt()]) }

Затем эта функция будет вызываться для получения чисел, которые будут сравниваться со списком ожидаемых чисел. Неважно, есть ли дополнительные номера, поступающие от scanForNumbers, которых нет в «целевом» списке. Их просто будут игнорировать. Что-то вроде этого:

val expectedNumbers = listOf("202", "302","999")

        scanForNumbers(expectedNumbers)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe { value -> Log.d(TAG, "Was returned a $value") }

Таким образом, ожидаемые числа (202, 302 и 999) не совсем совпадают с числами, которые будут выданы (202, 302 и 400). Таким образом, тайм-аут ДОЛЖЕН произойти, но со встроенной версией Observable.timer() он не истечет, поскольку был обнаружен хотя бы один элемент.

Вот примерно то, что я хотел бы иметь. Кто-нибудь знает, как это закодировать в RxJava/RxKotlin?

fun scanForNumbers(targets: List<String>): Observable<FoundNumber> {
  val accumulator: Pair<Set<Any>, FoundNumber?> = targets.toSet() to null
    return scanNumbers()
        .SPECIAL_TIMEOUT_FOR_LIST(5, TimeUnit.SECONDS, List)
        .scan(accumulator) { acc, next ->
            val (set, previous) = acc
            val stringSet:MutableSet<String> = hashSetOf()

            set.forEach { stringSet.add(it.toString()) }

            val item = if (next.numberId in stringSet) {
                next
            } else null
            (set - next) to item       // return set and nullable item
        }
        .filter { Log.d(TAG, "Filtering on ${it.second}")
                  it.second != null }  // item not null
        .take(targets.size.toLong())         // limit to the number of items
        .map { it.second }                   // unwrap the item from the pair
        .map { FoundController(it.numberId) }  // wrap in your class
}

Как вы кодируете, надеюсь, используя RxJava/Kotlin, средство тайм-аута в списке, как уже упоминалось?


person user443654    schedule 06.06.2018    source источник


Ответы (1)


Я думаю, теперь я понимаю, вы хотите, чтобы тайм-аут начинал отсчитываться с момента подписки, а не после того, как вы просмотрели элементы.

Если это то, что вам нужно, то вам может помочь оператор takeUntil:

return scanNumbers()
    .takeUntil(Observable.timer(5, TimeUnit.SECONDS))
    .scan(accumulator) { acc, next -> ...

В этом случае таймер начнет отсчет, как только вы подпишетесь. Если основной наблюдаемый объект завершится раньше, то отлично, если нет, то таймер все равно завершит основной наблюдаемый объект.

Но takeUntil сам по себе ошибки не выдаст, просто завершится. Если вам нужно, чтобы он заканчивался ошибкой, вы можете использовать следующую комбинацию:

return scanNumbers()
    .takeUntil(
            Observable
                    .error<Void>(new TimeoutError("timeout!"))
                    .delay(5, TimeUnit.SECONDS, true))
    .scan(accumulator) { acc, next -> ...
person ESala    schedule 07.06.2018
comment
Да, это правильно. Я думаю, что это будет сделано. Пометка как принятая. Спасибо! - person user443654; 08.06.2018