Преобразуйте каждое событие потока Dart в новый поток
Первоначально опубликовано на http://ivoberger.com.
Если вам нужны живые обновления для чего-либо в вашем приложении Dart, Streams - это то, что вам нужно. Они используются для BloC-архитектуры, Cloud Firestore, чтения больших файлов и так далее.
Встроенные методы уже позволяют выполнять множество манипуляций, таких как устранение дублирования, карта, объединение и многое другое. Для случаев использования, выходящих за рамки этих предопределенных методов, в игру вступает класс StreamTransformer.
В этом посте будет описано, как взять исходный поток и создать новый поток для каждого события, испускаемого источником. Одно из приложений для этого - объединение или, скорее, заполнение данных из базы данных реального времени, такой как Cloud Firestore.
Представьте себе интернет-магазин с обзором статей, в котором можно применить какие-то фильтры. Изменения в настройке фильтра распространяются как поток и применяются к запросу к базе данных, который возвращает поток (для получения обновлений, если данные, охваченные этим запросом, изменяются). Для каждого изменения фильтра вам нужно делать две вещи:
- Отменить подписку на старый запрос
- Создать подписку на новый запрос
Но что произойдет, если пользователь перейдет на другую страницу? Теперь вам также нужно убедиться, что отмена подписки на поток результатов отменяет все подписки в цепочке.
Все вышеперечисленное может быть достигнуто путем реализации пользовательского StreamTransformer
, который обновляет поток данных и следит за тем, чтобы не осталось никаких зависших подписок, и их можно повторно использовать везде, где это необходимо.
Теперь, когда все готово, давайте взглянем на код!
Сначала нам нужен класс для хранения функциональности. Этот класс должен расширять StreamTransformerBase
, чтобы его можно было использовать с Stream.transform
. Ему также нужна функция для обработки преобразования.
Теперь нам нужно реализовать bind
из StreamTransformerBase
. Сначала нам понадобится StreamController
для создания нашего целевого потока, а затем две переменные для отслеживания исходной и целевой подписки:
Затем мы реализуем StreamController
. В функции onListen
мы начинаем прослушивать исходный поток. На каждом исходном событии старую целевую подписку необходимо отменить, а затем воссоздать с помощью функции handleTransform
, которую мы получили через конструктор ранее. События и ошибки из целевого потока просто передаются в соответствующие функции контроллера. Я решил передавать все ошибки из исходного потока в контроллер, но вместо этого вы также можете удалить или зарегистрировать их.
Наконец, мы закрываем контроллер, когда исходный поток завершен, и передаем все действия в потоке контроллера как исходному, так и целевому потоку. Это гарантирует, что все внутренние подписки будут отменены, когда конечный потребитель отменяет подписку.
Окончательный StreamController
выглядит так:
В качестве последнего шага нам просто нужно вернуть поток из bind
, в результате чего получится такой класс:
Теперь этот трансформатор можно просто использовать. Следующий пример основан на сценарии, описанном в начале сообщения: каждый параметр потока параметров фильтра из пользовательского интерфейса необходимо превратить в запрос с применением этих параметров.
И это все. Теперь у нас есть преобразование потока, которое превратит каждое событие потока в новый поток с простой функцией преобразования и без зависших подписок.
Спасибо за чтение!