Основы коммуникации для сопрограмм
В прошлой статье мы рассмотрели, что такое канал и четыре типа каналов. Давайте разберемся с практикой и посмотрим, когда, как и какими способами можно использовать каналы.
Не Rx
Прежде всего, нам нужно установить отличие от Rx, чтобы мы могли сосредоточиться на каналах, а не на замене Rx. Важно помнить, что канал по своей природе не транслирует свои ценности. Как только приемник получает некоторую ценность из канала, он удаляется из канала. Посмотрите на примеры втягивания и разветвления, где это становится яснее.
Веером
Начнем со сценария. У вас есть список адресов, и вам нужно выполнить поиск для каждого из них, чтобы получить соответствующие GPS-координаты от некоторого веб-сервиса. В этом случае не рекомендуется делать это последовательно. Поэтому мы хотим разложить список на несколько сопрограмм, чтобы иметь возможность работать с несколькими адресами одновременно. Если мы начнем с производящего конца, нам понадобится список и отправка всех значений в канал.
val channel = Channel<String>() val addresses = listOf( "Fasanv 34, 11111", "Publikv 10, 22222", ... ) launch(Dispatchers.Default) { addresses.forEach { channel.send(it) } channel.close() }
Теперь у нас есть канал, который получает все адреса один за другим. Как мы уже знаем, это рандеву типа, то есть он будет приостановлен до тех пор, пока кто-нибудь не получит каждое значение. Затем мы потребляем адреса из нашего канала.
launch(Dispatchers.Default) { for (address in channel) { val gps = fetchCoordinates(address) saveCoordinates(address, gps) } }
А теперь у нас есть весь поток последовательно. Я также использовал цикл for для итерации по каналу. Он заботится обо всех приостановках, а когда канал закрывается, цикл for завершается. Это одна из тонких вещей, которые мне нравятся в Kotlin: каждая конструкция, которая в абстракции является итерируемой, также следует соглашениям и может использоваться как любая другая итерация в языке.
Еще одно замечание: именно здесь я начал действительно понимать последовательность приостановки функций и сопрограмм. Сложная задача после как минимум 15 лет асинхронности с какими-то обратными вызовами.
Я отвлекся, вернемся к сценарию и к тому, как сделать его параллельным. Что ж, это так же просто, как создать несколько сопрограмм, которые будут делать одно и то же, вот так.
repeat(5) { // iterates 5 times -> creating 5 coroutines launch(Dispatchers.Default) { for (address in channel) { val gps = fetchCoordinates(address) saveCoordinates(address, gps) } } }
Мне нравится использовать функцию повторения вместо цикла for, когда я знаю количество итераций, которые я хочу, здесь мы повторяем пять раз, создавая таким образом пять сопрограмм. Теперь пять разных сопрограмм проходят по одному и тому же каналу, каждая из которых приостанавливается, пока не наступит их очередь получать значение. И это тоже одно из преимуществ Channel. Вам не нужно вечно беспокоиться о синхронизации, потоковой передаче или приостановке. Даже если вы просто отправите один элемент по каналу, когда вы его закроете, все пять сопрограмм и соответствующие им пять циклов for выйдут.
Мы также можем обновить код, чтобы он стал еще более идиоматичным для каналов, используя функции producte- и consumerEach. Вот обновленная версия.
val addresses = listOf( "Fasanv 34, 11111", "Publikv 10, 22222", ... ) val channel = produce { addresses.forEach{ send(it) } } repeat(5) { // iterates 5 times -> creating 5 coroutines launch(Dispatchers.Default) { channel.consumeEach { address -> val gps = fetchCoordinates(address) saveCoordinates(address, gps) } } }
Функция расширения consumerEach почти идентична циклу for, поэтому изучать ее не так интересно. Он может появиться в будущем, поскольку это все еще экспериментальный API.
Но функция продления продукции - это прекрасно. Что он делает, так это создает для нас новый канал и возвращает его как ReceiverChannel, который инкапсулирует отправляющую функциональность внутри производящей функции. Вне закрытия мы можем получать только с канала. Функция производства автоматически закрывает канал после завершения закрытия. Таким образом, вы случайно не получите сопрограмму и канал, который никогда не завершается.
Фан-ин
Неудивительно, что мы также можем объединить данные из множества разных сопрограмм в одну с помощью каналов, так называемого объединения. Давайте придерживаться предыдущего примера и продолжим его развивать. Вместо того, чтобы сохранять координаты, мы хотим иметь возможность представить их, когда они будут готовы.
В большинстве сред с графическим рендерингом вам необходимо представить данные из определенного потока, часто называемого UI-потоком или основным потоком. Итак, чтобы собрать все данные из всех сопрограмм, работающих в произвольном количестве потоков, которые мы не контролируем, по крайней мере, не в этом примере, нам нужно снова объединить их в один поток. Для этого нам нужен новый канал.
val receiveChannel = Channel<Pair<String, GPS>>()
Теперь у нас есть способ отправки из нескольких сопрограмм (и, возможно, потоков) и использования их всего в одной сопрограмме. Мы повторяем нашу предыдущую функцию, которая сохраняла координаты, чтобы вместо этого отправлять их по новому каналу.
... channel.consumeEach { address -> val gps = fetchCoordinates(address) receiveChannel.send(address to gps) } receiveChannel.close() // don't forget to close ...
Все адреса с соответствующими координатами GPS теперь отправляются из нескольких сопрограмм в receiveChannel. В общем, вся тяжелая работа для фаната сделана, ну, да? Теперь нам нужно использовать этот канал из сопрограммы, которая будет отображать его пользователю, и вы уже должны знать, как это сделать.
// this runs on the main thread - inherited from runBlocking launch { receiveChannel.consumeEach { (address, gps) -> println("$address -> $gps") } }
И теперь мы закончили с вентилятором. Теперь у нас есть законченное рабочее приложение, использующее типичный шаблон для параллелизма. Его также можно использовать для параллелизма, но это требует более конкретной реализации, более тесно связанной с оборудованием, и я не буду вдаваться в подробности сейчас.
Это довольно небольшой пример, но он показывает сильные стороны каналов в Kotlin. Мы разделили поток значений, над которыми нужно работать одновременно, и собрали их обратно в одну сопрограмму без каких-либо мыслей о потоковой передаче или синхронизации. В этом, на мой взгляд, настоящая сила каналов. Они упрощают синхронизацию ваших сопрограмм в систему. Вот небольшая диаграмма нашего кода и полный пример.
Остерегайтесь изменений
Некоторые из показанных функций являются экспериментальными и могут быть изменены в будущем. И если вы углубитесь в Channel API, вы можете увидеть множество устаревших и устаревших функций, таких как filter и взять, чтобы назвать несколько. Они устарели в пользу Flow, чтобы канал оставался кратким и понятным, и вместо этого оставил битву против Rx на Flow.
API-интерфейсы также развиваются, еще одна устаревшая функция - это программа-конструктор, которая работает аналогично функции создания-конструктора, но предназначена для представления актора, в котором канал становится почтовым ящиком актеров. Это прекрасное использование каналов, и оно устарело только потому, что они создают лучший, более сложный инструмент для создания действующих лиц. И только будущее покажет, станет ли он новым конструктором (как сейчас) или собственной реализацией, такой как Flow.
Последние мысли
Я надеюсь, что эти две статьи помогут вам разобраться в каналах, а также помогут освоить сопрограммы и функции приостановки. Это определенно помогло мне последовательно рассуждать об асинхронности.
Первая часть статьи: https://medium.com/@dahlberg.bob/channels-in-kotlin-part-one-594ba12dcb5a