Когда в RxJS версии 5.5 были введены конвейерные операторы, написание операторов пользовательской области стало намного проще.

Конвейерный оператор - это функция высшего порядка: функция, возвращающая другую функцию. И возвращаемая функция принимает наблюдаемое и возвращает наблюдаемое. Итак, чтобы создать оператор, вам не нужно создавать подклассы Operator и Subscriber. Вы просто пишете функцию.

Простой.

Однако бывают ситуации, когда нужно проявлять особую осторожность. В частности, вам нужно быть осторожным, когда ваш оператор хранит внутреннее состояние.

Пример

Давайте посмотрим на пример: оператор debug, который записывает полученные значения и их индексы в консоль.

Нашему оператору необходимо будет поддерживать некоторое внутреннее состояние: индекс, который будет увеличиваться каждый раз при получении next уведомления. Наивный подход - сохранить это состояние в операторной функции. Нравится:

Однако у этого подхода есть пара проблем, которые могут привести к неожиданному поведению и появлению труднообнаруживаемых ошибок.

Проблемы

Первая проблема заключается в том, что наш оператор не является ссылочно прозрачным. Функция является ссылочно прозрачной, если можно заменить вызов функции значением, которое будет возвращено без изменения поведения программы.

Давайте посмотрим, что происходит, когда значение, возвращаемое нашим оператором, используется для составления нескольких наблюдаемых:

Результат программы:

first use:
[0] 1
[1] 2
second use:
[2] 1
[3] 2

Что ж, это удивительно. Индекс не начинался с нуля для второй наблюдаемой.

Вторая проблема заключается в том, что наш оператор будет вести себя разумно только в том случае, если наблюдаемое, которое он возвращает, подписано один раз.

Давайте посмотрим, что происходит, когда делается несколько подписок на наблюдаемое, составленное из нашего оператора debug:

Результат программы:

first use:
[0] 1
[1] 2
second use:
[2] 1
[3] 2

Опять же, это то же самое удивительное поведение: индекс не начинался с нуля для второй подписки.

Итак, как можно исправить эти проблемы?

Решение (я)

Обе проблемы можно решить, сохраняя состояние для каждой подписки. И есть несколько способов добиться этого.

Первый - использовать конструктор Observable для создания наблюдаемого, которое вернет наш оператор. Если переменная index перемещается в функцию, переданную конструктору, индекс будет сохраняться для каждой подписки. Нравится:

Второй способ - который я предпочитаю - реализовать состояние каждой подписки - это использовать defer наблюдаемый создатель. Если переменная index перемещена в фабричную функцию, переданную в defer, она будет сохраняться для каждой подписки. Нравится:

Другой, более сложный способ реализации состояния подписки - использование оператора scan. scan поддерживает собственное состояние для каждой подписки, которое инициализируется seed и становится доступным через accumulator. Индекс можно сохранить в scan следующим образом:

Если программы, используемые для демонстрации проблем с наивной реализацией, запускаются с использованием любой из реализаций, в которых поддерживается состояние каждой подписки, будет получен следующий результат:

first use:
[0] 1
[1] 2
second use:
[0] 1
[1] 2

Чего и следовало ожидать: никаких сюрпризов.

Этот пост также опубликован в моем личном блоге: ncjamieson.com.