как ограничить максимальный параллелизм сопрограмм kotlin

У меня есть последовательность (из File.walkTopDown), и мне нужно запустить длительную операцию для каждой из них. Я хотел бы использовать лучшие практики / сопрограммы Kotlin, но я либо не получаю параллелизма, либо слишком много параллелизма и получаю ошибку ввода-вывода «слишком много открытых файлов».

File("/Users/me/Pictures/").walkTopDown()
    .onFail { file, ex -> println("ERROR: $file caused $ex") }
    .filter { ... only big images... }
    .map { file ->
        async { // I *think* I want async and not "launch"...
            ImageProcessor.fromFile(file)
        }
    }

Похоже, что это не работает параллельно, и мой многоядерный процессор никогда не превышает производительность 1 процессора. Есть ли способ с помощью сопрограмм запускать «параллельные операции NumberOfCores» для отложенных заданий?

Я посмотрел Многопоточность с использованием Kotlin Coroutines, которая сначала создает ВСЕ задания, а затем присоединяется к ним, но это означает завершение обхода дерева последовательности/файла полностью перед этапом соединения с тяжелой обработкой, и это кажется... сомнительным! Разделение его на сбор и этап обработки означает, что сбор может выполняться намного раньше обработки.

val jobs = ... the Sequence above...
    .toSet()
println("Found ${jobs.size}")
jobs.forEach { it.await() }

person Benjamin H    schedule 07.12.2017    source источник


Ответы (5)


Это не относится к вашей проблеме, но отвечает на вопрос, как ограничить максимальный параллелизм сопрограмм kotlin.

Сначала я думал использовать newFixedThreadPoolContext, но 1) это устарело и 2) это будет использовать потоки, и я не думаю, что это необходимо или желательно (то же самое с Executors.newFixedThreadPool().asCoroutineDispatcher()). Это решение может иметь недостатки, о которых я не знаю, используя Семафор, но это очень просто:

import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit

/**
 * Maps the inputs using [transform] at most [maxConcurrency] at a time until all Jobs are done.
 */
suspend fun <TInput, TOutput> Iterable<TInput>.mapConcurrently(
    maxConcurrency: Int,
    transform: suspend (TInput) -> TOutput,
) = coroutineScope {
    val gate = Semaphore(maxConcurrency)
    [email protected] {
        async {
            gate.withPermit {
                transform(it)
            }
        }
    }.awaitAll()
}

Тесты (извините, он использует тест Spek, hamcrest и kotlin):

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestCoroutineDispatcher
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Matchers.greaterThanOrEqualTo
import org.hamcrest.Matchers.lessThanOrEqualTo
import org.spekframework.spek2.Spek
import org.spekframework.spek2.style.specification.describe
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals

@OptIn(ExperimentalCoroutinesApi::class)
object AsyncHelpersKtTest : Spek({
    val actionDelay: Long = 1_000 // arbitrary; obvious if non-test dispatcher is used on accident
    val testDispatcher = TestCoroutineDispatcher()

    afterEachTest {
        // Clean up the TestCoroutineDispatcher to make sure no other work is running.
        testDispatcher.cleanupTestCoroutines()
    }

    describe("mapConcurrently") {
        it("should run all inputs concurrently if maxConcurrency >= size") {
            val concurrentJobCounter = AtomicInteger(0)
            val inputs = IntRange(1, 2).toList()
            val maxConcurrency = inputs.size

            // https://github.com/Kotlin/kotlinx.coroutines/issues/1266 has useful info & examples
            runBlocking(testDispatcher) {
                print("start runBlocking $coroutineContext\n")

                // We have to run this async so that the code afterwards can advance the virtual clock
                val job = launch {
                    testDispatcher.pauseDispatcher {
                        val result = inputs.mapConcurrently(maxConcurrency) {
                            print("action $it $coroutineContext\n")

                            // Sanity check that we never run more in parallel than max
                            assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))

                            // Allow for virtual clock adjustment
                            delay(actionDelay)

                            // Sanity check that we never run more in parallel than max
                            assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
                            print("action $it after delay $coroutineContext\n")

                            it
                        }

                        // Order is not guaranteed, thus a Set
                        assertEquals(inputs.toSet(), result.toSet())
                        print("end mapConcurrently $coroutineContext\n")
                    }
                }
                print("before advanceTime $coroutineContext\n")

                // Start the coroutines
                testDispatcher.advanceTimeBy(0)
                assertEquals(inputs.size, concurrentJobCounter.get(), "All jobs should have been started")

                testDispatcher.advanceTimeBy(actionDelay)
                print("after advanceTime $coroutineContext\n")
                assertEquals(0, concurrentJobCounter.get(), "All jobs should have finished")
                job.join()
            }
        }

        it("should run one at a time if maxConcurrency = 1") {
            val concurrentJobCounter = AtomicInteger(0)
            val inputs = IntRange(1, 2).toList()
            val maxConcurrency = 1

            runBlocking(testDispatcher) {
                val job = launch {
                    testDispatcher.pauseDispatcher {
                        inputs.mapConcurrently(maxConcurrency) {
                            assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))
                            delay(actionDelay)
                            assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
                            it
                        }
                    }
                }

                testDispatcher.advanceTimeBy(0)
                assertEquals(1, concurrentJobCounter.get(), "Only one job should have started")

                val elapsedTime = testDispatcher.advanceUntilIdle()
                print("elapsedTime=$elapsedTime")
                assertThat(
                    "Virtual time should be at least as long as if all jobs ran sequentially",
                    elapsedTime,
                    greaterThanOrEqualTo(actionDelay * inputs.size)
                )
                job.join()
            }
        }

        it("should handle cancellation") {
            val jobCounter = AtomicInteger(0)
            val inputs = IntRange(1, 2).toList()
            val maxConcurrency = 1

            runBlocking(testDispatcher) {
                val job = launch {
                    testDispatcher.pauseDispatcher {
                        inputs.mapConcurrently(maxConcurrency) {
                            jobCounter.addAndGet(1)
                            delay(actionDelay)
                            it
                        }
                    }
                }

                testDispatcher.advanceTimeBy(0)
                assertEquals(1, jobCounter.get(), "Only one job should have started")

                job.cancel()
                testDispatcher.advanceUntilIdle()
                assertEquals(1, jobCounter.get(), "Only one job should have run")
                job.join()
            }
        }
    }
})

Согласно https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/09_Testing вам также может потребоваться изменить аргументы компилятора для выполнения тестов:

compileTestKotlin {
    kotlinOptions {
        // Needed for runBlocking test coroutine dispatcher?
        freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
        freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn"
    }
}
testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.1'
person Pat    schedule 28.11.2020
comment
Спас мой день! Спасибо, что поделились, и особенно большое спасибо за тесты. Там нужно многому научиться - person Sam; 08.05.2021

Проблема с вашим первым фрагментом заключается в том, что он вообще не запускается — помните, Sequence ленив, и вам приходится использовать терминальную операцию, такую ​​как toSet() или forEach(). Кроме того, вам необходимо ограничить количество потоков, которые можно использовать для этой задачи, создав контекст newFixedThreadPoolContext и используя его в async:

val pictureContext = newFixedThreadPoolContext(nThreads = 10, name = "reading pictures in parallel")

File("/Users/me/Pictures/").walkTopDown()
    .onFail { file, ex -> println("ERROR: $file caused $ex") }
    .filter { ... only big images... }
    .map { file ->
        async(pictureContext) {
            ImageProcessor.fromFile(file)
        }
    }
    .toList()
    .forEach { it.await() }

Изменить: вы должны использовать оператор терминала (toList) до ожидания результатов

person voddan    schedule 07.12.2017
comment
Я думал, что это сработает, но по-прежнему кажется, что окончательный forEach обрабатывается последовательно. например. .map { файл -> async(CommonPool) { println(start) val img = ImageFile.fromFile(file) println(end) img } } .forEach { imageFiles.add(it.await()) if (Math.random( ) › 0,999) { imageFiles.save() } } - person Benjamin H; 08.12.2017
comment
О, снэп, ты прав. Теперь я думаю, что нет способа сделать это с последовательностями. Отредактировал ответ - person voddan; 08.12.2017
comment
Стоит отметить, что использование ограниченного пула потоков ограничивает параллелизм, но не параллелизм, а это означает, что если ImageProcessor.fromFile является функцией приостановки (которая не блокируется), вы все равно можете обрабатывать несколько файлов одновременно, что, возможно, не то, что вам нужно. - person Nicklas A.; 07.02.2019

У меня это работает с каналом. Но, может быть, я переборщила с твоим способом?

val pipe = ArrayChannel<Deferred<ImageFile>>(20)
launch {
    while (!(pipe.isEmpty && pipe.isClosedForSend)) {
        imageFiles.add(pipe.receive().await())
    }
    println("pipe closed")
}
File("/Users/me/").walkTopDown()
        .onFail { file, ex -> println("ERROR: $file caused $ex") }
        .forEach { pipe.send(async { ImageFile.fromFile(it) }) }
pipe.close()
person Benjamin H    schedule 09.12.2017

Это не сохраняет порядок проекции, но ограничивает пропускную способность не более чем до maxDegreeOfParallelism. Расширяйте и расширяйте, как считаете нужным.

suspend fun <TInput, TOutput> (Collection<TInput>).inParallel(
        maxDegreeOfParallelism: Int,
        action: suspend CoroutineScope.(input: TInput) -> TOutput
): Iterable<TOutput> = coroutineScope {

    val list = this@inParallel

    if (list.isEmpty())
        return@coroutineScope listOf<TOutput>()

    val brake = Channel<Unit>(maxDegreeOfParallelism)
    val output = Channel<TOutput>()
    val counter = AtomicInteger(0)

    this.launch {

        repeat(maxDegreeOfParallelism) {
            brake.send(Unit)
        }

        for (input in list) {

            val task = this.async {
                action(input)
            }

            this.launch {
                val result = task.await()
                output.send(result)
                val completed = counter.incrementAndGet()
                if (completed == list.size) {
                    output.close()
                } else brake.send(Unit)
            }

            brake.receive()
        }
    }

    val results = mutableListOf<TOutput>()
    for (item in output) {
        results.add(item)
    }

    return@coroutineScope results
}

Пример использования:

val output = listOf(1, 2, 3).inParallel(2) {
    it + 1
} // Note that output may not be in same order as list.
person Gleno    schedule 08.01.2020

Это закроет сопрограммы для рабочих. Я рекомендую посмотреть https://www.youtube.com/watch?v=3WGM-_MnPQA

package com.example.workers

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlin.system.measureTimeMillis

class ChannellibgradleApplication

fun main(args: Array<String>) {
    var myList = mutableListOf<Int>(3000,1200,1400,3000,1200,1400,3000)
    runBlocking {
        var myChannel = produce(CoroutineName("MyInts")) {
            myList.forEach { send(it) }
        }

        println("Starting coroutineScope  ")
        var time = measureTimeMillis {
            coroutineScope {
                var workers = 2
                repeat(workers)
                {
                    launch(CoroutineName("Sleep 1")) { theHardWork(myChannel) }
                }
            }
        }
        println("Ending coroutineScope  $time ms")
    }
}

suspend fun theHardWork(channel : ReceiveChannel<Int>) 
{
    for(m in channel) {
        println("Starting Sleep $m")
        delay(m.toLong())
        println("Ending Sleep $m")
    }
}
person user1749644    schedule 22.09.2019