Основы коммуникации для сопрограмм

В прошлой статье мы рассмотрели, что такое канал и четыре типа каналов. Давайте разберемся с практикой и посмотрим, когда, как и какими способами можно использовать каналы.

Не Rx

Прежде всего, нам нужно установить отличие от Rx, чтобы мы могли сосредоточиться на каналах, а не на замене Rx. Важно помнить, что канал по своей природе не транслирует свои ценности. Как только приемник получает некоторую ценность из канала, он удаляется из канала. Посмотрите на примеры втягивания и разветвления, где это становится яснее.

Веером

Начнем со сценария. У вас есть список адресов, и вам нужно выполнить поиск для каждого из них, чтобы получить соответствующие GPS-координаты от некоторого веб-сервиса. В этом случае не рекомендуется делать это последовательно. Поэтому мы хотим разложить список на несколько сопрограмм, чтобы иметь возможность работать с несколькими адресами одновременно. Если мы начнем с производящего конца, нам понадобится список и отправка всех значений в канал.

val channel = Channel<String>()
val addresses = listOf(
    "Fasanv 34, 11111",
    "Publikv 10, 22222",
    ...
)
launch(Dispatchers.Default) { 
    addresses.forEach {
        channel.send(it)
    }
    channel.close()
}

Теперь у нас есть канал, который получает все адреса один за другим. Как мы уже знаем, это рандеву типа, то есть он будет приостановлен до тех пор, пока кто-нибудь не получит каждое значение. Затем мы потребляем адреса из нашего канала.

launch(Dispatchers.Default) {
    for (address in channel) {
        val gps = fetchCoordinates(address)
        saveCoordinates(address, gps)
    }
}

А теперь у нас есть весь поток последовательно. Я также использовал цикл for для итерации по каналу. Он заботится обо всех приостановках, а когда канал закрывается, цикл for завершается. Это одна из тонких вещей, которые мне нравятся в Kotlin: каждая конструкция, которая в абстракции является итерируемой, также следует соглашениям и может использоваться как любая другая итерация в языке.

Еще одно замечание: именно здесь я начал действительно понимать последовательность приостановки функций и сопрограмм. Сложная задача после как минимум 15 лет асинхронности с какими-то обратными вызовами.

Я отвлекся, вернемся к сценарию и к тому, как сделать его параллельным. Что ж, это так же просто, как создать несколько сопрограмм, которые будут делать одно и то же, вот так.

repeat(5) { // iterates 5 times -> creating 5 coroutines
    launch(Dispatchers.Default) {
        for (address in channel) {
            val gps = fetchCoordinates(address)
            saveCoordinates(address, gps)
        }
    }
}

Мне нравится использовать функцию повторения вместо цикла for, когда я знаю количество итераций, которые я хочу, здесь мы повторяем пять раз, создавая таким образом пять сопрограмм. Теперь пять разных сопрограмм проходят по одному и тому же каналу, каждая из которых приостанавливается, пока не наступит их очередь получать значение. И это тоже одно из преимуществ Channel. Вам не нужно вечно беспокоиться о синхронизации, потоковой передаче или приостановке. Даже если вы просто отправите один элемент по каналу, когда вы его закроете, все пять сопрограмм и соответствующие им пять циклов for выйдут.

Мы также можем обновить код, чтобы он стал еще более идиоматичным для каналов, используя функции producte- и consumerEach. Вот обновленная версия.

val addresses = listOf(
    "Fasanv 34, 11111",
    "Publikv 10, 22222",
    ...
)
val channel = produce { 
    addresses.forEach{ send(it) }
}
repeat(5) { // iterates 5 times -> creating 5 coroutines
    launch(Dispatchers.Default) { 
        channel.consumeEach { address ->
            val gps = fetchCoordinates(address)
            saveCoordinates(address, gps)
        }
    }
}

Функция расширения consumerEach почти идентична циклу for, поэтому изучать ее не так интересно. Он может появиться в будущем, поскольку это все еще экспериментальный API.

Но функция продления продукции - это прекрасно. Что он делает, так это создает для нас новый канал и возвращает его как ReceiverChannel, который инкапсулирует отправляющую функциональность внутри производящей функции. Вне закрытия мы можем получать только с канала. Функция производства автоматически закрывает канал после завершения закрытия. Таким образом, вы случайно не получите сопрограмму и канал, который никогда не завершается.

Фан-ин

Неудивительно, что мы также можем объединить данные из множества разных сопрограмм в одну с помощью каналов, так называемого объединения. Давайте придерживаться предыдущего примера и продолжим его развивать. Вместо того, чтобы сохранять координаты, мы хотим иметь возможность представить их, когда они будут готовы.

В большинстве сред с графическим рендерингом вам необходимо представить данные из определенного потока, часто называемого UI-потоком или основным потоком. Итак, чтобы собрать все данные из всех сопрограмм, работающих в произвольном количестве потоков, которые мы не контролируем, по крайней мере, не в этом примере, нам нужно снова объединить их в один поток. Для этого нам нужен новый канал.

val receiveChannel = Channel<Pair<String, GPS>>()

Теперь у нас есть способ отправки из нескольких сопрограмм (и, возможно, потоков) и использования их всего в одной сопрограмме. Мы повторяем нашу предыдущую функцию, которая сохраняла координаты, чтобы вместо этого отправлять их по новому каналу.

...
channel.consumeEach { address ->
    val gps = fetchCoordinates(address)
    receiveChannel.send(address to gps)
}
receiveChannel.close() // don't forget to close
...

Все адреса с соответствующими координатами GPS теперь отправляются из нескольких сопрограмм в receiveChannel. В общем, вся тяжелая работа для фаната сделана, ну, да? Теперь нам нужно использовать этот канал из сопрограммы, которая будет отображать его пользователю, и вы уже должны знать, как это сделать.

// this runs on the main thread - inherited from runBlocking
launch { 
    receiveChannel.consumeEach { (address, gps) ->
        println("$address -> $gps")
    }
}

И теперь мы закончили с вентилятором. Теперь у нас есть законченное рабочее приложение, использующее типичный шаблон для параллелизма. Его также можно использовать для параллелизма, но это требует более конкретной реализации, более тесно связанной с оборудованием, и я не буду вдаваться в подробности сейчас.

Это довольно небольшой пример, но он показывает сильные стороны каналов в Kotlin. Мы разделили поток значений, над которыми нужно работать одновременно, и собрали их обратно в одну сопрограмму без каких-либо мыслей о потоковой передаче или синхронизации. В этом, на мой взгляд, настоящая сила каналов. Они упрощают синхронизацию ваших сопрограмм в систему. Вот небольшая диаграмма нашего кода и полный пример.

Остерегайтесь изменений

Некоторые из показанных функций являются экспериментальными и могут быть изменены в будущем. И если вы углубитесь в Channel API, вы можете увидеть множество устаревших и устаревших функций, таких как filter и взять, чтобы назвать несколько. Они устарели в пользу Flow, чтобы канал оставался кратким и понятным, и вместо этого оставил битву против Rx на Flow.

API-интерфейсы также развиваются, еще одна устаревшая функция - это программа-конструктор, которая работает аналогично функции создания-конструктора, но предназначена для представления актора, в котором канал становится почтовым ящиком актеров. Это прекрасное использование каналов, и оно устарело только потому, что они создают лучший, более сложный инструмент для создания действующих лиц. И только будущее покажет, станет ли он новым конструктором (как сейчас) или собственной реализацией, такой как Flow.

Последние мысли

Я надеюсь, что эти две статьи помогут вам разобраться в каналах, а также помогут освоить сопрограммы и функции приостановки. Это определенно помогло мне последовательно рассуждать об асинхронности.

Первая часть статьи: https://medium.com/@dahlberg.bob/channels-in-kotlin-part-one-594ba12dcb5a