Kotlin - Как запустить n сопрограмм и дождаться первых m результатов или тайм-аута?

Я пытаюсь написать функцию, которая запустит n сопрограмм и дождется завершения первого m. Если m сопрограмм не удается завершить в течение некоторого времени ожидания, все сопрограммы / задания отменяются. Моя первоначальная реализация для этого показана ниже, но я считаю, что ее можно улучшить. Моя первоначальная мысль заключалась в том, чтобы использовать родительское задание для запуска всех других заданий, чтобы родительское задание можно было отменить и передать его остальным дочерним элементам. Однако это приводит к исключению TimeoutCancellationException, которое необходимо перехватить.

Как мне написать функцию для запуска n сопрограмм и ожидания, пока не завершатся первые m сопрограмм, или до истечения времени ожидания до завершения m сопрограмм?

private suspend fun queryAllHosts(
        queryFactories: List<(query: String, pageIndex: Int) -> String>
        , query: String
        , pageIndex: Int
        , maxSuccessfulHosts: Int
        , queryTimeout: Long
        , requestTimeout: Long
): ArrayList<QueryResult<ResultModel>> {
    val results = ArrayList<QueryResult<ResultModel>>()
    val rootJob = Job()

    try {
        withTimeout(queryTimeout, TimeUnit.MILLISECONDS) {
            queryFactories.map {
                async(parent = rootJob) {
                    val pagedResult = queryHost(
                            it
                            , query
                            , pageIndex
                            , requestTimeout
                    )

                    if (pagedResult.isSuccessful()) {
                        results.add(pagedResult)
                    }

                    if (results.size == maxSuccessfulHosts) {
                        rootJob.cancelAndJoin()
                        return@async
                    }
                }
            }.awaitAll()
        }
    } catch (ex: TimeoutCancellationException) {
        Log.w(Tag, "Query timed out, successful queries: ${results.size}")
    } catch (ignored: JobCancellationException) {
        // Ignored
    } catch (ex: Exception) {
        Log.w(Tag, "Unexpected exception", ex)
    }

    return results
}

ОБНОВЛЕНИЕ

Мне не повезло с ранее принятым ответом из-за одновременных исключений модификации. Ниже приведена небольшая настройка исходного ответа, чтобы избежать исключения одновременной модификации, однако он не учитывает тайм-аут тикера.

Как избежать исключения одновременной модификации и при этом соблюдать тайм-аут тикера?

suspend fun <T> List<Deferred<T>>.awaitCount(count: Int, timeoutMs: Long): List<T> {
    require(count <= size)

    val Tag = "DERP"

    val toAwait = HashSet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticker = ticker(timeoutMs)

    forEach { deferred ->
        deferred.invokeOnCompletion {
            if (!deferred.isCompletedExceptionally) {
                Log.d(Tag, "(Completed) Value: ${deferred.getCompleted()}")
            } else {
                Log.d(Tag, "(Completed) Exception: $it")
            }
        }
    }

    val processed = HashSet<Deferred<T>>()

    val elapsedTime = measureTimeMillis {
        whileSelect {
            toAwait.minus(processed).forEach { deferred ->
                processed.add(deferred)
                deferred.onAwait {
                    toAwait.remove(deferred)
                    result.add(it)
                    result.size != count
                }
            }

            ticker.onReceive {
                toAwait.forEach { it.cancel() }
                false
            }
        }
    }

    Log.d(Tag, "Elapsed time: $elapsedTime")

    return result
}

Отложенные экземпляры теперь создаются с помощью следующего кода:

private fun makeRequest(
        url: String
        , timeoutMs: Int
): Document? = try {
    Jsoup.connect(url).timeout(timeoutMs).get()
} catch (ex: Exception) {
    null
}

private fun createAsyncRequests(
        queryFactories: List<(query: String, pageIndex: Int) -> String>
        , query: String
        , pageIndex: Int
        , timeoutMs: Int
): List<Deferred<QueryResult<TorrentResult>>> = queryFactories.map { queryFactory ->
    async(start = CoroutineStart.LAZY) {
        try {
            val url = queryFactory(query, pageIndex)
            makeRequest(
                    url
                    , timeoutMs
            ).getQueryResult(pageIndex, url)
        } catch (ex: Exception) {
            QueryResult<TorrentResult>(state = QueryResult.State.ERROR)
        }
    }
}

ОБНОВЛЕНИЕ

В приведенном ниже журнале показано, что предоставленные таймауты в 2000 мс не соблюдаются, поскольку таймаут составляет 11861 мс:

2018-09-13 22:39:00.307 20475-20807/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.315 20475-20807/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.454 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|1/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://indiapirate.com/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.455 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.456 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.470 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|2/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://superbay.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.471 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.472 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.479 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|3/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://superbay.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.480 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.480 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.481 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|4/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebays.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.481 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.482 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.500 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|5/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebays.be/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.501 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.501 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.577 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|6/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebay.nz/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.577 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.578 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.602 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|7/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebay6.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.602 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.603 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.604 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|8/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.604 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.605 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.621 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|9/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://uktpb.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.621 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.622 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.694 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|10/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxyproxyproxy.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.695 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.695 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.712 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|11/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://fastpirate.link/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.713 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.713 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.868 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|12/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://freepirate.eu/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.869 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.869 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.480 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|13/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://pirate.tel/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.481 20475-20968/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.482 20475-20968/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.649 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|14/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratesbay.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.649 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.650 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.780 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|15/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepirate.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.781 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.782 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.131 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|16/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:03.132 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.132 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.250 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|17/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:03.251 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.253 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.296 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|18/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://freeproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:03.296 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.297 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.441 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|19/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:03.442 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.443 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:04.826 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|20/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepirate.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:04.868 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|21/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxyproxy.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:05.325 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|22/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:05.926 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|23/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.002 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|24/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.117 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|25/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:06.338 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|26/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.923 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|27/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebay.red/search/hobbit+1977/0/7
2018-09-13 22:39:07.214 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|28/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:07.625 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|29/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateway.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:08.398 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|30/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:08.431 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|31/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxybay.blue/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:08.684 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|32/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepiratepirate.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.033 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|33/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://unblocktpb.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.759 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|34/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:09.879 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|35/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbunblock.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.961 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|36/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:10.554 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|37/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:10.715 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|38/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://Piratebayproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:10.855 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|39/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:11.014 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|40/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpb.fun/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.298 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|41/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebayproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.313 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|42/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpb.review/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.932 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|43/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxybay.life/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:12.166 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|44/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebayproxy.one/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:12.168 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Elapsed time: 11861
2018-09-13 22:39:12.885 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|45/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@b129740
2018-09-13 22:39:13.437 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|46/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@c801679
2018-09-13 22:39:14.051 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|47/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@3f89fbe
2018-09-13 22:39:14.154 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|48/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@c7c181f
2018-09-13 22:39:14.658 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|49/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@729e76c
2018-09-13 22:39:17.334 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|50/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@ae2a135

Обновленный код с ведением журнала (я удалил исключение TimeoutException и вместо этого вернул false):

suspend fun <T> List<Deferred<T>>.awaitCount(
        count: Int
        , timeoutMs: Long
): List<T> {
    require(count <= size)

    val Tag = "DERP"

    val toAwait = CopyOnWriteArraySet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticket = ticker(timeoutMs)
    var completedCount = 0

    forEach { deferred ->
        deferred.invokeOnCompletion {
            completedCount++

            if (deferred.isCompletedExceptionally) {
                Log.d(Tag, "(Completed|$completedCount/$size) Exception: $it")
            } else {
                Log.d(Tag, "(Completed|$completedCount/$size) Value: ${deferred.getCompleted()}")
            }
        }
    }

    val elapsedTime = measureTimeMillis {
        whileSelect {
            ticket.onReceive {
                toAwait.forEach { it.cancel() }
                false
            }

            toAwait.forEach { deferred ->
                Log.d(Tag, "Starting deferred..")
                deferred.onAwait {
                    toAwait.remove(deferred)
                    result.add(it)
                    result.size != count
                }
            }
        }
    }

    Log.d(Tag, "Elapsed time: $elapsedTime")

    return result
}

person masterwok    schedule 05.09.2018    source источник
comment
Две незначительные вещи: 1. Используйте launch вместо async, потому что он ничего не возвращает. Есть joinAll, который вы можете использовать вместо awaitAll. 2. Вам не нужен оператор return@async, в любом случае это последняя строка.   -  person Marko Topolnik    schedule 06.09.2018
comment
Ваш код не гарантирует maxSuccessfulHosts как верхнюю границу количества результатов. Пока вы отменяете rootJob, другие задания, возможно, уже получили свой результат и добавят его в results.   -  person Marko Topolnik    schedule 06.09.2018
comment
Если вам нужна точная верхняя граница, вы должны использовать async { queryHost(...) }, поместить все эти Deferred в mutableSet, затем select поверх всех: select { deferreds.forEach { it.onAwait { results.add(it.completedValue); deferreds -= it } } } и повторно запустить это select, пока не истечет время ожидания или пока не будет получено достаточно результатов.   -  person Marko Topolnik    schedule 06.09.2018


Ответы (1)


Его можно улучшить, если вообще не использовать дополнительную запускаемую задачу и корневое задание.

kotlinx.coroutines имеет предложение select для таких сложных операторов, что идеально подходит для вашего варианта использования. Более того, легко обобщить:

suspend fun <T> List<Deferred<T>>.awaitCount(count: Int, timeoutMs: Long): List<T> {
    require(count <= size)

    val toAwait = CopyOnWriteArraySet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticket = ticker(timeoutMs)

    whileSelect {
        toAwait.forEach { deferred ->
            deferred.onAwait {
                toAwait.remove(deferred)
                result.add(it)
                result.size != count
            }
        }

        ticket.onReceive {
            val e = TimeoutException()
            toAwait.forEach { it.cancel(e) }
            throw e
        }
    }

    return result
}

Затем вы можете использовать его в queryAllHosts:

val queries = queryFactories.map { queryHost(...) }
return queries.awaitCount(maxSuccessfulHosts, queryTimeout)

Вы можете настроить awaitCount так, как хотите, например специализировать его для модернизации и добавить isSuccessful проверку

person qwwdfsad    schedule 06.09.2018
comment
Еще одна мелочь: queryHost - это suspend fun, а не Deferred возвращающийся. Так что нельзя избежать запуска этих сопрограмм: queryFactories.map { async { queryHost(...) } } - person Marko Topolnik; 06.09.2018
comment
Спасибо, это именно то, что я искал. - person masterwok; 10.09.2018
comment
Хотя это, похоже, приводит к исключению ConcurrentModificationException, вызванному удалением отложенного из toAwait HashSet. - person masterwok; 10.09.2018
comment
@masterwok Вы правы, если deferred уже завершен к тому времени, когда вы выполняете onAwait на нем, он немедленно перейдет к коду обработчика и, следовательно, попытается изменить toAwait во время итерации. Честно говоря, в этой идиоме есть за что ненавидеть, но это довольно важная идиома. Kotlin должен добавить некоторую первоклассную поддержку для многократного выбора набора сигналов, пока они не исчерпаются. - person Marko Topolnik; 10.09.2018
comment
@MarkoTopolnik С тех пор я перешел к использованию итератора и вызова remove() в onAwait, и это, похоже, устранило проблему одновременной модификации. Однако теперь у меня проблемы с тайм-аутом, onReceive никогда не вызывается. - person masterwok; 10.09.2018
comment
Вы не можете использовать iterator.remove(), потому что это вызывает противоположную проблему: если он еще не завершен, итератор уже переместится на другие элементы или будет полностью исчерпан к моменту запуска обработчика. Это может объяснить ваше наблюдаемое поведение: вы удалили запись таймера в наборе при обработке onAwait. - person Marko Topolnik; 10.09.2018
comment
@MarkoTopolnik благодарит вас за отзыв. Кажется, что приведенный выше код по-прежнему не работает при использовании ленивого Deferred? - person masterwok; 12.09.2018
comment
Не понимаю, что вы имеете в виду под ленивым Deferred. Да, код не работает, как описано выше, и переход на iterator.remove() не помогает. - person Marko Topolnik; 12.09.2018
comment
@MarkoTopolnik под ленивым отложенным я имею в виду: async(start = CoroutineStart.LAZY) { .. }. Асинхронный блок не будет выполняться до onAwait. Я обновил вопрос выше, указав, как мне избежать исключения. - person masterwok; 12.09.2018
comment
Это не исправляет, потому что вы выполняете select в цикле, и вы все равно можете наткнуться на уже завершенный Deferred. Один из способов исправить - перебрать неизменяемый снимок набора (копию). - person Marko Topolnik; 12.09.2018
comment
@MarkoTopolnik, это было моим намерением вызвать toAwait.minus(processed), а затем немедленно добавить отложенное в обработанное (до его запуска). Я надеялся, что вычисленное значение на следующем проходе не будет содержать ранее обработанных элементов. Не следует ли в любом случае вызывать ticker.onRecieve { .. }? - person masterwok; 12.09.2018
comment
Я исправил CME в своем ответе. Его можно улучшить, используя более точный рисунок CoW. - person qwwdfsad; 12.09.2018
comment
CoW - это параллельный дизайн, я считаю его излишним, хотя он и выполняет свою работу. Простой toAwait.toList().forEach ... столь же эффективен. - person Marko Topolnik; 12.09.2018
comment
@qwwdfsad благодарит за обновление ответа. Однако я вижу, что ticker.onReceive {...} не вызывается до ~ 16 000 мс, когда проходит тайм-аут в 2 000 мс. Я вызываю этот метод на List из примерно 50 Deferred экземпляров. Вы знаете, что могло вызвать это? - person masterwok; 14.09.2018
comment
@qwwdfsad Я добавил немного измененную версию вашего ответа на вопрос, который включает ведение журнала и возвращает false, а не генерирует исключение при истечении времени ожидания. Журналы, показывающие, что тайм-аут происходит позже, чем ожидалось (ожидаемое ~ 2000 мс), также были добавлены к вопросу. - person masterwok; 14.09.2018
comment
@qwwdfsad хм .. неважно. Проблема связана с Deferred экземплярами, которые я создаю. При запуске вашего решения с 50 простыми async { delay(10000) } оно работает, как ожидалось. Спасибо вам обоим за вашу помощь! - person masterwok; 14.09.2018
comment
Требуется ли здесь блокировка onAwait и доступ к списку result мьютекс / блокировка? Я бы предположил, что этот блок может быть вызван из продолжения каждой сопрограммы, которая может выполняться в разных потоках. Это неправильно, и эти блоки также используют средства ContinuationIntercepter, чтобы запланировать выполнение в одном потоке? - person Matthias247; 03.05.2019
comment
@qwwdfsad Разве вам не нужно отменять оставшиеся отложенные задания, когда result.size становится равным count? И я считаю, что ticker отмену также следует рассмотреть - person Andrey; 15.02.2021