Каналы Kotlin — это мощная конструкция параллелизма, которая обеспечивает связь между двумя или более сопрограммами. Они обеспечивают способ координации между различными частями приложения, позволяя им обмениваться данными и работать вместе, не мешая друг другу. В этой серии статей мы углубимся в Kotlin Channels и рассмотрим советы, приемы и рекомендации по освоению этого мощного инструмента.

Что такое каналы Kotlin?

Каналы Kotlin можно рассматривать как конвейер, по которому данные могут передаваться от одной сопрограммы к другой. Канал — это, по сути, буфер или очередь, где сопрограммы могут отправлять и получать сообщения. Одна сопрограмма может поместить данные (отправить) в канал, а другая сопрограмма может получить эти данные из канала.

Каналы — это реализация шаблона «производитель-потребитель», но созданные специально для сопрограмм. Сопрограмму, которая отправляет (производит) информацию, часто называют производителем, а сопрограмму, которая получает (потребляет) информацию, называют потребителем. Одна или несколько сопрограмм могут отправлять информацию в один и тот же канал, и одна или несколько сопрограмм могут получать данные из него:

Когда многие сопрограммы получают информацию из одного и того же канала, каждый элемент обрабатывается только один раз одним из потребителей. Как только элемент обрабатывается, он немедленно удаляется из канала.

Вот пример того, как работать с каналом Колтина:

fun channel(
    coroutineScope: CoroutineScope,
) {
    val channel = Channel<String>() // Channel exchanges a data of String type

    // Producer starts sending data inside a coroutine
    coroutineScope.launch {
        Log.d("Channel", "Sent data 1 to channel")
        channel.send("Data 1")
        Log.d("Channel","Sent data 2 to channel")
        channel.send("Data 2")
        channel.close() // we're done sending so channel should be closed
    }

    // Consumer starts receiving data inside another coroutine
    coroutineScope.launch {
        channel.consumeEach {
            Log.d("Channel","Received: $it")
        }
        Log.d("Channel","Done!") // This line called when channel is closed
    }
}

В этом примере запускаются две сопрограммы. Первая сопрограмма отправляет строки данных в канал с помощью функции channel.send(). Вторая сопрограмма использует данные из канала с помощью функции channel.consumeEach(), что является удобным способом перебора всех значений, отправленных в канал. После отправки данных сопрограмма закрывает канал с помощью функции channel.close().

Функция close() в Kotlin Channels используется для сигнализации об окончании передачи данных. Концептуально это похоже на отправку специального токена, означающего закрытие канала. Когда сопрограмма получает этот токен, она знает, что в канал больше не будут отправляться данные. В этот момент итерация останавливается, гарантируя, что все ранее отправленные элементы будут получены до закрытия канала. Это упрощает обработку завершения сеанса связи и гарантирует, что все данные будут обработаны до выхода сопрограммы.

Вывод для приведенного выше кода:

Sent data 1 to channel
Received: Data 1
Sent data 2 to channel
Received: Data 2
Done!

Канал приема

В Kotlin Coroutines ReceiveChannel — это тип канала, который позволяет получать данные от сопрограммы. Он используется, когда вы хотите использовать данные из канала, не имея возможности отправить какие-либо данные обратно отправителю.

Используя ReceiveChannel для потребительской сопрограммы, мы гарантируем, что она может только получать данные и не может случайно отправить данные обратно. Такое разделение задач помогает сделать код более удобным для сопровождения и менее подверженным ошибкам.

Вот пример использования ReceiveChannel

@OptIn(ExperimentalCoroutinesApi::class)
fun receiveChannel(
    coroutineScope: CoroutineScope
) {
    var channel: ReceiveChannel<String> = Channel()

    // Producer Coroutine
    coroutineScope.launch {
        channel = produce {
            send("A")
            send("B")
            send("C")
            send("D")
            // we don't have to close the channel explicitly
        }
    }

    // Consumer Coroutine
    coroutineScope.launch {
        channel.consumeEach {
            Log.d(TAG, "Received $it")
        }
        // sending back data to channel inside consumer coroutine is not possible 
        // because it is a ReceiveChannel
        // channel.send("E") 
        
        // channel is automatically closed
        Log.d(TAG, "Is producer closed: ${channel.isClosedForReceive}")
    }
}

В приведенном выше коде мы используем функцию produce, которая является удобным способом создания ProducerScope для сопрограмм. Функция produce позволяет отправлять данные из сопрограммы и возвращает ReceiveChannel, который может использоваться другой сопрограммой.

Строка channel.send("E") закомментирована, так как это приведет к ошибке компиляции. Поскольку channel является ReceiveChannel, у него нет метода send(). Это демонстрирует, что сопрограмма-потребитель не может отправить данные обратно производителю при использовании ReceiveChannel.

После завершения блока consumeEach канал автоматически закрывается функцией produce. В конце проверяется свойство channel.isClosedForReceive, и результат истинен, что означает, что канал закрыт.

Вывод кода выше:

Received A
Received B
Received C
Received D
Is producer closed: true

Разница между Channel и ReceiveChannel

Основное различие между ReceiveChannel и обычным Channel в сопрограммах Kotlin заключается в том, что ReceiveChannel можно использовать только для потребления данных из канала, а обычный Channel можно использовать как для отправки, так и для получения данных.

Когда вы создаете Channel, вы создаете ссылку на канал, который можно использовать для отправки и получения данных между сопрограммами. Класс Channel предоставляет такие функции, как send() и receive(), которые можно использовать для отправки и получения данных по каналу.

С другой стороны, когда вы создаете ReceiveChannel, вы создаете ссылку на канал, который можно использовать только для получения данных из канала. Класс ReceiveChannel предоставляет такие функции, как receive() и tryReceive(), которые можно использовать для получения данных из канала.

Другими словами, обычный Channel — это двунаправленный канал, который можно использовать для отправки и получения данных между сопрограммами, а ReceiveChannel — это однонаправленный канал, который можно использовать только для получения данных из канала.

Конвейер

Каналы получателя можно использовать для реализации конвейеров. Конвейер — это набор стадий, соединенных каналами и работающих вместе для преобразования входных данных в выходные данные.

Каждый этап в конвейере представляет собой сопрограмму, которая потребляет данные из входного канала, выполняет некоторые вычисления с данными, а затем отправляет преобразованные данные в выходной канал, который используется следующим этапом в конвейере.

Каналы ввода и вывода между этапами в конвейере действуют как буфер, который позволяет каждому этапу обрабатывать данные асинхронно и независимо. Это позволяет конвейеру эффективно обрабатывать большие объемы данных и распараллеливать вычисления между несколькими ядрами или потоками.

Конвейеры полезны в сценариях, где вам нужно обрабатывать данные поэтапно, где каждый этап выполняет определенные вычисления с данными. Например, вы можете использовать конвейер для обработки потока данных от датчика, где каждый этап выполняет определенное преобразование данных, например фильтрацию, сглаживание или усреднение.

Пример конвейера 1:

Вот пример конвейера, который обрабатывает поток целых чисел, отфильтровывая четные числа, возводя в квадрат оставшиеся нечетные числа, а затем суммируя их:

fun streamingNumbers(scope: CoroutineScope) {
    scope.launch {
        val numbers = produceNumbers(10)
        val result = pipeline(numbers)

        Log.d(TAG, result.receive().toString())
    }
}

// Producing numbers, each number being sent to the pipeline
fun CoroutineScope.produceNumbers(count: Int): ReceiveChannel<Int> = produce {
    for (i in 1..count) send(i)
}

// Pipeline which process the numbers
fun CoroutineScope.pipeline(
    numbers: ReceiveChannel<Int>
): ReceiveChannel<Int> = produce {
    // Filtering out even numbers
    val filtered = filter(numbers) { it % 2 != 0 }

    // Squaring the remaining odd numbers
    val squared = map(filtered) { it * it }

    // Summing them up
    val sum = reduce(squared) { acc, x -> acc + x }

    send(sum)
}

fun CoroutineScope.filter(
    numbers: ReceiveChannel<Int>,
    predicate: (Int) -> Boolean
): ReceiveChannel<Int> = produce {
    numbers.consumeEach { number ->
        if (predicate(number)) send(number)
    }
}

fun CoroutineScope.map(
    numbers: ReceiveChannel<Int>,
    mapper: (Int) -> Int
): ReceiveChannel<Int> = produce {
    numbers.consumeEach { number ->
        send(mapper(number))
    }
}

fun reduce(
    numbers: ReceiveChannel<Int>,
    accumulator: (Int, Int) -> Int
): Int = runBlocking {
    var result = 0
    for (number in numbers) {
        result = accumulator(result, number)
    }
    result
}

В этом примере функция pipeline создает новый конвейер, объединяя три этапа: filter, map и reduce. Этап filter отфильтровывает четные числа, этап map возводит в квадрат оставшиеся нечетные числа, а этап reduce суммирует возведенные в квадрат нечетные числа.

Каждый этап реализован как отдельная сопрограмма, которая потребляет данные из входного канала и выдает данные в выходной канал с помощью функций filter, map и reduce. Функция pipeline возвращает новый ReceiveChannel, представляющий выходной канал конвейера.

Пример конвейера 2 — обработка изображений:

Вот пример конвейера, который обрабатывает поток изображений, изменяя их размер, сжимая и сохраняя:

fun processImages(
    coroutineScope: CoroutineScope
) {
    coroutineScope.launch {
        val images = produceImages(listOf(
            "https://via.placeholder.com/300x300.png",
            "https://via.placeholder.com/500x500.png",
            "https://via.placeholder.com/800x800.png"
        ))
        val resized = resizeImages(images, 400)
        val compressed = compressImages(resized, 80)
        storeImages(compressed, Paths.get("output/"))
    }
}

fun CoroutineScope.produceImages(urls: List<String>): ReceiveChannel<ByteArray> = produce {
    for (url in urls) {
        val bytes = URL(url).readBytes()
        send(bytes)
    }
}

fun CoroutineScope.resizeImages(
    images: ReceiveChannel<ByteArray>, size: Int
): ReceiveChannel<ByteArray> = produce {
    images.consumeEach { image ->
        // ImageResizer can a util class to resize the image
        val resizedImage = ImageResizer.resize(image, size)
        send(resizedImage)
    }
}

fun CoroutineScope.compressImages(
    images: ReceiveChannel<ByteArray>, quality: Int
): ReceiveChannel<ByteArray> = produce {
    images.consumeEach { image ->
        // ImageCompressor can a util class to compress the image
        val compressedImage = ImageCompressor.compress(image, quality)
        send(compressedImage)
    }
}

suspend fun storeImages(images: ReceiveChannel<ByteArray>, directory: Path) {
    Files.createDirectories(directory)
    var index = 1
    for (image in images) {
        val file = directory.resolve("image${index++}.jpg")
        FileOutputStream(file.toFile()).use { output ->
            output.write(image)
        }
    }
}

В этом примере функция processImages создает ReceiveChannel, который создает поток данных изображения из списка URL-адресов с помощью функции produceImages. Затем он передает этот канал функции resizeImages, которая изменяет размеры изображений до заданного размера, а затем передает выходной канал функции compressImages, которая сжимает изображения до заданного уровня качества. Наконец, выходной канал функции compressImages передается функции storeImages, которая сохраняет сжатые изображения на диск.

Каждый этап в конвейере реализован как отдельная сопрограмма, которая потребляет данные из входного канала и создает данные для выходного канала с помощью функций resizeImages, compressImages и storeImages.

Классы ImageResizer и ImageCompressor, используемые в функциях resizeImages и compressImages, являются лишь примерами гипотетических классов, которые могут выполнять эти операции с данными изображения.

В целом, этот конвейер обеспечивает удобный и эффективный способ обработки потока изображений путем изменения их размера, сжатия и сохранения. Этот конвейер можно легко расширить, включив в него дополнительные этапы или обработав различные типы задач обработки изображений.

Что дальше?

В следующей части этой серии мы углубимся в мир Kotlin Channels и изучим различные типы каналов и их реальные приложения. К концу второй статьи у вас будет полное представление о том, как использовать Kotlin Channels для создания эффективных и масштабируемых параллельных приложений. Следите за обновлениями!

Вы можете найти следующую часть здесь