Преобразуйте каждое событие потока Dart в новый поток

Первоначально опубликовано на http://ivoberger.com.

Если вам нужны живые обновления для чего-либо в вашем приложении Dart, Streams - это то, что вам нужно. Они используются для BloC-архитектуры, Cloud Firestore, чтения больших файлов и так далее.

Встроенные методы уже позволяют выполнять множество манипуляций, таких как устранение дублирования, карта, объединение и многое другое. Для случаев использования, выходящих за рамки этих предопределенных методов, в игру вступает класс StreamTransformer.

В этом посте будет описано, как взять исходный поток и создать новый поток для каждого события, испускаемого источником. Одно из приложений для этого - объединение или, скорее, заполнение данных из базы данных реального времени, такой как Cloud Firestore.

Представьте себе интернет-магазин с обзором статей, в котором можно применить какие-то фильтры. Изменения в настройке фильтра распространяются как поток и применяются к запросу к базе данных, который возвращает поток (для получения обновлений, если данные, охваченные этим запросом, изменяются). Для каждого изменения фильтра вам нужно делать две вещи:

  1. Отменить подписку на старый запрос
  2. Создать подписку на новый запрос

Но что произойдет, если пользователь перейдет на другую страницу? Теперь вам также нужно убедиться, что отмена подписки на поток результатов отменяет все подписки в цепочке.

Все вышеперечисленное может быть достигнуто путем реализации пользовательского StreamTransformer, который обновляет поток данных и следит за тем, чтобы не осталось никаких зависших подписок, и их можно повторно использовать везде, где это необходимо.

Теперь, когда все готово, давайте взглянем на код!

Сначала нам нужен класс для хранения функциональности. Этот класс должен расширять StreamTransformerBase, чтобы его можно было использовать с Stream.transform. Ему также нужна функция для обработки преобразования.

Теперь нам нужно реализовать bind из StreamTransformerBase. Сначала нам понадобится StreamController для создания нашего целевого потока, а затем две переменные для отслеживания исходной и целевой подписки:

Затем мы реализуем StreamController. В функции onListen мы начинаем прослушивать исходный поток. На каждом исходном событии старую целевую подписку необходимо отменить, а затем воссоздать с помощью функции handleTransform, которую мы получили через конструктор ранее. События и ошибки из целевого потока просто передаются в соответствующие функции контроллера. Я решил передавать все ошибки из исходного потока в контроллер, но вместо этого вы также можете удалить или зарегистрировать их.

Наконец, мы закрываем контроллер, когда исходный поток завершен, и передаем все действия в потоке контроллера как исходному, так и целевому потоку. Это гарантирует, что все внутренние подписки будут отменены, когда конечный потребитель отменяет подписку.

Окончательный StreamController выглядит так:

В качестве последнего шага нам просто нужно вернуть поток из bind, в результате чего получится такой класс:

Теперь этот трансформатор можно просто использовать. Следующий пример основан на сценарии, описанном в начале сообщения: каждый параметр потока параметров фильтра из пользовательского интерфейса необходимо превратить в запрос с применением этих параметров.

И это все. Теперь у нас есть преобразование потока, которое превратит каждое событие потока в новый поток с простой функцией преобразования и без зависших подписок.

Спасибо за чтение!