Каналы Kotlin — это мощная конструкция параллелизма, которая обеспечивает связь между двумя или более сопрограммами. Они обеспечивают способ координации между различными частями приложения, позволяя им обмениваться данными и работать вместе, не мешая друг другу. В этой серии статей мы углубимся в Kotlin Channels и рассмотрим советы, приемы и рекомендации по освоению этого мощного инструмента.
Что такое каналы Kotlin?
Каналы Kotlin можно рассматривать как конвейер, по которому данные могут передаваться от одной сопрограммы к другой. Канал — это, по сути, буфер или очередь, где сопрограммы могут отправлять и получать сообщения. Одна сопрограмма может поместить данные (отправить) в канал, а другая сопрограмма может получить эти данные из канала.
Каналы — это реализация шаблона «производитель-потребитель», но созданные специально для сопрограмм. Сопрограмму, которая отправляет (производит) информацию, часто называют производителем, а сопрограмму, которая получает (потребляет) информацию, называют потребителем. Одна или несколько сопрограмм могут отправлять информацию в один и тот же канал, и одна или несколько сопрограмм могут получать данные из него:
Когда многие сопрограммы получают информацию из одного и того же канала, каждый элемент обрабатывается только один раз одним из потребителей. Как только элемент обрабатывается, он немедленно удаляется из канала.
Вот пример того, как работать с каналом Колтина:
fun channel( coroutineScope: CoroutineScope, ) { val channel = Channel<String>() // Channel exchanges a data of String type // Producer starts sending data inside a coroutine coroutineScope.launch { Log.d("Channel", "Sent data 1 to channel") channel.send("Data 1") Log.d("Channel","Sent data 2 to channel") channel.send("Data 2") channel.close() // we're done sending so channel should be closed } // Consumer starts receiving data inside another coroutine coroutineScope.launch { channel.consumeEach { Log.d("Channel","Received: $it") } Log.d("Channel","Done!") // This line called when channel is closed } }
В этом примере запускаются две сопрограммы. Первая сопрограмма отправляет строки данных в канал с помощью функции channel.send()
. Вторая сопрограмма использует данные из канала с помощью функции channel.consumeEach()
, что является удобным способом перебора всех значений, отправленных в канал. После отправки данных сопрограмма закрывает канал с помощью функции channel.close()
.
Функция close()
в Kotlin Channels используется для сигнализации об окончании передачи данных. Концептуально это похоже на отправку специального токена, означающего закрытие канала. Когда сопрограмма получает этот токен, она знает, что в канал больше не будут отправляться данные. В этот момент итерация останавливается, гарантируя, что все ранее отправленные элементы будут получены до закрытия канала. Это упрощает обработку завершения сеанса связи и гарантирует, что все данные будут обработаны до выхода сопрограммы.
Вывод для приведенного выше кода:
Sent data 1 to channel Received: Data 1 Sent data 2 to channel Received: Data 2 Done!
Канал приема
В Kotlin Coroutines ReceiveChannel
— это тип канала, который позволяет получать данные от сопрограммы. Он используется, когда вы хотите использовать данные из канала, не имея возможности отправить какие-либо данные обратно отправителю.
Используя ReceiveChannel
для потребительской сопрограммы, мы гарантируем, что она может только получать данные и не может случайно отправить данные обратно. Такое разделение задач помогает сделать код более удобным для сопровождения и менее подверженным ошибкам.
Вот пример использования ReceiveChannel
@OptIn(ExperimentalCoroutinesApi::class) fun receiveChannel( coroutineScope: CoroutineScope ) { var channel: ReceiveChannel<String> = Channel() // Producer Coroutine coroutineScope.launch { channel = produce { send("A") send("B") send("C") send("D") // we don't have to close the channel explicitly } } // Consumer Coroutine coroutineScope.launch { channel.consumeEach { Log.d(TAG, "Received $it") } // sending back data to channel inside consumer coroutine is not possible // because it is a ReceiveChannel // channel.send("E") // channel is automatically closed Log.d(TAG, "Is producer closed: ${channel.isClosedForReceive}") } }
В приведенном выше коде мы используем функцию produce
, которая является удобным способом создания ProducerScope
для сопрограмм. Функция produce
позволяет отправлять данные из сопрограммы и возвращает ReceiveChannel
, который может использоваться другой сопрограммой.
Строка channel.send("E")
закомментирована, так как это приведет к ошибке компиляции. Поскольку channel
является ReceiveChannel
, у него нет метода send()
. Это демонстрирует, что сопрограмма-потребитель не может отправить данные обратно производителю при использовании ReceiveChannel
.
После завершения блока consumeEach
канал автоматически закрывается функцией produce
. В конце проверяется свойство channel.isClosedForReceive
, и результат истинен, что означает, что канал закрыт.
Вывод кода выше:
Received A Received B Received C Received D Is producer closed: true
Разница между Channel
и ReceiveChannel
Основное различие между ReceiveChannel
и обычным Channel
в сопрограммах Kotlin заключается в том, что ReceiveChannel
можно использовать только для потребления данных из канала, а обычный Channel
можно использовать как для отправки, так и для получения данных.
Когда вы создаете Channel
, вы создаете ссылку на канал, который можно использовать для отправки и получения данных между сопрограммами. Класс Channel
предоставляет такие функции, как send()
и receive()
, которые можно использовать для отправки и получения данных по каналу.
С другой стороны, когда вы создаете ReceiveChannel
, вы создаете ссылку на канал, который можно использовать только для получения данных из канала. Класс ReceiveChannel
предоставляет такие функции, как receive()
и tryReceive()
, которые можно использовать для получения данных из канала.
Другими словами, обычный Channel
— это двунаправленный канал, который можно использовать для отправки и получения данных между сопрограммами, а ReceiveChannel
— это однонаправленный канал, который можно использовать только для получения данных из канала.
Конвейер
Каналы получателя можно использовать для реализации конвейеров. Конвейер — это набор стадий, соединенных каналами и работающих вместе для преобразования входных данных в выходные данные.
Каждый этап в конвейере представляет собой сопрограмму, которая потребляет данные из входного канала, выполняет некоторые вычисления с данными, а затем отправляет преобразованные данные в выходной канал, который используется следующим этапом в конвейере.
Каналы ввода и вывода между этапами в конвейере действуют как буфер, который позволяет каждому этапу обрабатывать данные асинхронно и независимо. Это позволяет конвейеру эффективно обрабатывать большие объемы данных и распараллеливать вычисления между несколькими ядрами или потоками.
Конвейеры полезны в сценариях, где вам нужно обрабатывать данные поэтапно, где каждый этап выполняет определенные вычисления с данными. Например, вы можете использовать конвейер для обработки потока данных от датчика, где каждый этап выполняет определенное преобразование данных, например фильтрацию, сглаживание или усреднение.
Пример конвейера 1:
Вот пример конвейера, который обрабатывает поток целых чисел, отфильтровывая четные числа, возводя в квадрат оставшиеся нечетные числа, а затем суммируя их:
fun streamingNumbers(scope: CoroutineScope) { scope.launch { val numbers = produceNumbers(10) val result = pipeline(numbers) Log.d(TAG, result.receive().toString()) } } // Producing numbers, each number being sent to the pipeline fun CoroutineScope.produceNumbers(count: Int): ReceiveChannel<Int> = produce { for (i in 1..count) send(i) } // Pipeline which process the numbers fun CoroutineScope.pipeline( numbers: ReceiveChannel<Int> ): ReceiveChannel<Int> = produce { // Filtering out even numbers val filtered = filter(numbers) { it % 2 != 0 } // Squaring the remaining odd numbers val squared = map(filtered) { it * it } // Summing them up val sum = reduce(squared) { acc, x -> acc + x } send(sum) } fun CoroutineScope.filter( numbers: ReceiveChannel<Int>, predicate: (Int) -> Boolean ): ReceiveChannel<Int> = produce { numbers.consumeEach { number -> if (predicate(number)) send(number) } } fun CoroutineScope.map( numbers: ReceiveChannel<Int>, mapper: (Int) -> Int ): ReceiveChannel<Int> = produce { numbers.consumeEach { number -> send(mapper(number)) } } fun reduce( numbers: ReceiveChannel<Int>, accumulator: (Int, Int) -> Int ): Int = runBlocking { var result = 0 for (number in numbers) { result = accumulator(result, number) } result }
В этом примере функция pipeline
создает новый конвейер, объединяя три этапа: filter
, map
и reduce
. Этап filter
отфильтровывает четные числа, этап map
возводит в квадрат оставшиеся нечетные числа, а этап reduce
суммирует возведенные в квадрат нечетные числа.
Каждый этап реализован как отдельная сопрограмма, которая потребляет данные из входного канала и выдает данные в выходной канал с помощью функций filter
, map
и reduce
. Функция pipeline
возвращает новый ReceiveChannel
, представляющий выходной канал конвейера.
Пример конвейера 2 — обработка изображений:
Вот пример конвейера, который обрабатывает поток изображений, изменяя их размер, сжимая и сохраняя:
fun processImages( coroutineScope: CoroutineScope ) { coroutineScope.launch { val images = produceImages(listOf( "https://via.placeholder.com/300x300.png", "https://via.placeholder.com/500x500.png", "https://via.placeholder.com/800x800.png" )) val resized = resizeImages(images, 400) val compressed = compressImages(resized, 80) storeImages(compressed, Paths.get("output/")) } } fun CoroutineScope.produceImages(urls: List<String>): ReceiveChannel<ByteArray> = produce { for (url in urls) { val bytes = URL(url).readBytes() send(bytes) } } fun CoroutineScope.resizeImages( images: ReceiveChannel<ByteArray>, size: Int ): ReceiveChannel<ByteArray> = produce { images.consumeEach { image -> // ImageResizer can a util class to resize the image val resizedImage = ImageResizer.resize(image, size) send(resizedImage) } } fun CoroutineScope.compressImages( images: ReceiveChannel<ByteArray>, quality: Int ): ReceiveChannel<ByteArray> = produce { images.consumeEach { image -> // ImageCompressor can a util class to compress the image val compressedImage = ImageCompressor.compress(image, quality) send(compressedImage) } } suspend fun storeImages(images: ReceiveChannel<ByteArray>, directory: Path) { Files.createDirectories(directory) var index = 1 for (image in images) { val file = directory.resolve("image${index++}.jpg") FileOutputStream(file.toFile()).use { output -> output.write(image) } } }
В этом примере функция processImages
создает ReceiveChannel
, который создает поток данных изображения из списка URL-адресов с помощью функции produceImages
. Затем он передает этот канал функции resizeImages
, которая изменяет размеры изображений до заданного размера, а затем передает выходной канал функции compressImages
, которая сжимает изображения до заданного уровня качества. Наконец, выходной канал функции compressImages
передается функции storeImages
, которая сохраняет сжатые изображения на диск.
Каждый этап в конвейере реализован как отдельная сопрограмма, которая потребляет данные из входного канала и создает данные для выходного канала с помощью функций resizeImages
, compressImages
и storeImages
.
Классы ImageResizer
и ImageCompressor
, используемые в функциях resizeImages
и compressImages
, являются лишь примерами гипотетических классов, которые могут выполнять эти операции с данными изображения.
В целом, этот конвейер обеспечивает удобный и эффективный способ обработки потока изображений путем изменения их размера, сжатия и сохранения. Этот конвейер можно легко расширить, включив в него дополнительные этапы или обработав различные типы задач обработки изображений.
Что дальше?
В следующей части этой серии мы углубимся в мир Kotlin Channels и изучим различные типы каналов и их реальные приложения. К концу второй статьи у вас будет полное представление о том, как использовать Kotlin Channels для создания эффективных и масштабируемых параллельных приложений. Следите за обновлениями!
Вы можете найти следующую часть здесь