Это вторая часть из серии под названием Demystifying RxJS, в которой мы создаем нашу собственную миниатюрную версию RxJS, чтобы получить глубокое фундаментальное понимание того, как он работает. Если вы прочитали Часть I, вы можете продолжить с того места, на котором остановились. Или вы можете форк CodeSandbox, содержащий завершенный код для Части I. Вы также можете найти готовый код для этого раздела по адресу https://codesandbox.io/s/demystifying-rxjs-part-ii-completed-5fqxy. Наконец, вы можете просмотреть полный код миниатюрной библиотеки по адресу https://codesandbox.io/s/demystifying-rxjs-complete-implementation-5ib63.

В Части I этой серии мы реализовали Observables, ключевой компонент RxJS. Однако эти Observables довольно просты: все, что они предлагают, - это subscribe() функция, которая позволяет нам получать уведомления о событиях из наблюдаемых потоков. Как я упоминал в конце части I, реальная сила наблюдаемых заключается в способности обрабатывать потоки асинхронных данных как списки. Мы можем добиться этого с помощью операторов, которые позволяют нам это делать.

В Части I этой серии мы реализовали Observables, ключевой компонент RxJS. Однако эти Observables довольно просты: все, что они предлагают, - это subscribe() функция, которая позволяет нам получать уведомления о событиях из наблюдаемых потоков. Как я упоминал в конце части I, реальная сила наблюдаемых заключается в способности обрабатывать потоки асинхронных данных как списки. Мы можем добиться этого с помощью операторов, которые позволяют нам это делать.

В этой части серии мы расширим нашу миниатюрную библиотеку, включив в нее некоторые базовые операторы, и добавим почтенный метод pipe() в наши Observables. Мы увидим, как это позволяет нам легко использовать и комбинировать операторы для создания мощных средств управления асинхронными потоками данных, которые предоставляет RxJS. Затем мы рассмотрим операторы сопоставления более высокого порядка, создав наши собственные версии mergeMap и concatMap. Это позволит нам лучше понять, как работают эти более продвинутые операторы.

К концу этого раздела вы должны иметь очень четкое представление об основных механизмах, которые позволяют RxJS функционировать в качестве такой мощной библиотеки, и иметь большую часть знаний, которые вам понадобятся для эффективного использования библиотеки. Итак, приступим!

Я много сосредоточился на представлении асинхронного потока данных в виде списка . Списки такими мощными делают операции, которые вы можете с ними выполнять. Списки - это примитивный механизм в разработке программного обеспечения, который позволяет произвольно преобразовывать и агрегировать (теоретически) неограниченный набор элементов в чрезвычайно сжатой форме. Рассмотрим следующий фрагмент кода:

const oneThroughTen = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const squareSum = oneThroughTen
  .map(n => n * n)
  .reduce((s, n) => s + n, 0);

Подумайте, как бы выглядел этот squareSum метод, если бы у нас не было списков, не говоря уже об операциях, которые можно было бы использовать с ними; это был бы гораздо более громоздкий фрагмент кода. К счастью, мы можем использовать списки и их методы для семантического и лаконичного описания того, что мы пытаемся достичь.

Вы можете представить асинхронный поток данных точно так же, как «список», с двумя основными отличиями:

  1. У вас не все элементы в списке сразу
  2. Вы не знаете точно, когда эти товары будут доставлены

Наблюдаемые объекты решают две вышеупомянутые проблемы, создавая унифицированную абстракцию над асинхронными потоками данных. Операторы, таким образом, позволяют нам достичь того же уровня семантики и лаконичности, что и у списков при работе с потоками данных в Observables.

Например, мы можем легко вычислить сумму в квадрате над потоком чисел с помощью RxJS:

import {of} from 'rxjs';
import {map, reduce} from 'rxjs/operators';
const squareSum = of(1, 2, 3, 4, 5, 6, 7, 8, 19, 10).pipe(
  map(n => n * n),
  reduce((s, n) => s + n, 0),
);

Функция pipe() вместе с операторами, которые она использует, является вторым большим фрагментом головоломки, касающимся того, что делает RxJS таким мощным. Теперь давайте сами создадим эти map() и reduce() операторы, реализуем pipe() и протестируем нашу реализацию, воссоздав squareSum Observable.

Операторы + труба ()

Начнем с определения операторов map() и reduce(), которые нам понадобятся для создания нашей squareSum наблюдаемой. Введите следующее справа внизу, где вы остановились в Части I (или где заканчивается код, если вы только начинаете на этом этапе):

Возможно, начинает проясняться, как все перетекает обратно в эту примитивную observe() функцию, переданную конструктору Observable. Но если нет, не беспокойтесь! Скоро мы углубимся в это.

Однако прежде чем мы это сделаем, давайте напишем нашу pipe() функцию для нашего класса Observable. Добавьте следующее в реализацию Observable в свой код.

Наконец, прежде чем мы перейдем к тому, что мы только что сделали, давайте убедимся, что это работает, как задумано, написав код для вывода результата нашей суммы квадратов с использованием pipe() и операторов, которые мы только что написали. Вернитесь к концу написанного кода и введите следующее:

Вы должны увидеть Squared sum = 385 в консоли.

Также обратите внимание, что, как и в случае с операторами RxJS, легко определять операторы, которые просто используют pipe() для создания новых операторов.

Разбивка операторов

Теперь, когда мы знаем, что это работает, давайте вернемся к тому, что мы только что написали, и разберемся, что происходит.

Первое, что мы делаем, это определяем тип OperatorFunction. Это определение оператора в нашей библиотеке. RxJS имеет более продвинутые механизмы для определения того, как работают операторы и как их можно создавать. Для наших целей мы сводим оператор к тому, что я считаю его наиболее простой и понятной формой: функция, которая может преобразовывать одну наблюдаемую в другую наблюдаемую. Это называется OperatorFunction в RxJS, и если вы посмотрите руководство по операторам, вы увидите, что это предпочтительный метод того, как думать о них.

Так как же нам преобразовать одну наблюдаемую в другую? Точно так же, как мы делаем все остальное в RxJS: используя функцию конструктора observe() в Observable. Мы создаем новый наблюдаемый объект, observe() функция которого подписывается на исходный наблюдаемый , и генерирует новое значение на основе значение источника испускаемого наблюдаемого. Логика внутри функции observe() объекта Observable, созданного внутри оператора, такая же, как логика внутри observe(), созданного где-либо еще. Все, что мы делаем, - это сшиваем логику наблюдения вместе, чтобы накладывать преобразования друг на друга.

Обратите внимание, как мы вызываем функцию map() в качестве аргумента для pipe(). Это связано с тем, что map возвращает оператор, который, учитывая исходный Observable, испускает значения, эквивалентные вызову функции преобразования для значений, испускаемых источником. reduce() аналогичен: он возвращает оператор, который потребляет значения, переданные из источника, агрегирует их, а затем, когда источник завершает работу, выдает результат. map(), reduce() и подобные функции не являются операторами. Скорее они производят операторы. Когда я только начинал работать с RxJS, это было для меня большим источником путаницы, поэтому я хотел заявить об этом прямо.

Разрушение трубы ()

Мы установили, что операторы - это просто функции, которые принимают Observables и возвращают новые. Так как же связать несколько операторов вместе? Точно так же мы объединяем любую другую группу функций вместе: объединяем их в конвейер. pipe() похож на reduce, но для Observables. Он начинается с исходного Observable, а затем последовательно вызывает каждый из определенных операторов, «накапливая» результат. Поскольку каждый оператор возвращает новый Observable, у вас остается скомпонованный результат последовательного применения всех этих операторов к исходному Observable.

Подведение итогов по основам

На данный момент у вас должно быть достаточно четкое представление об основах работы с операторами. Если хотите, можете здесь сделать паузу и попробовать несколько упражнений, чтобы улучшить свое понимание:

  • Напишите еще несколько собственных операторов, например filter () , scan (), catchError () и debounceTime ().
  • Представьте, что у вас есть наблюдаемое, которое генерирует бесконечный поток входящих чисел через веб-сокет, возможно, что-то вроде API биржевых тикеров в реальном времени. Используйте свои собственные операторы для преобразования этого наблюдаемого в наблюдаемое, которое испускает значения открытия, максимума, минимума и закрытия для каждого 30-секундного окна (подсказка: взгляните на bufferTime (), чтобы помочь вам в этом).

Операторы отображения высшего порядка

ПРИМЕЧАНИЕ. Я собираюсь продемонстрировать здесь, в частности, радикальное упрощение по сравнению с тем, как это реализовано в RxJS. Однако основная интуиция, используемая здесь, такая же, как и в RxJS, только в RxJS она реализована более элегантным (и продвинутым) способом.

Возможно, вы заметили, что до этого момента я для удобства не использовал такие операторы, как mergeMap и concatMap, которые вы, вероятно, использовали, если работали с RxJS в настройках Angular. Они называются операторами сопоставления более высокого порядка. То есть они используют функции, которые при заданном порожденном значении возвращают Observable, а не новое необработанное значение. Это немного сложнее, чтобы понять, особенно если вы до сих пор не совсем поняли интуицию вокруг операторов, поэтому я сохранил их для отдельного раздела. Мы собираемся создать как mergeMap, так и concatMap максимально простым и понятным способом, чтобы лучше понять, как работают эти операторы.

Начнем с mergeMap, который принимает в качестве аргумента функцию, производящую наблюдаемое. Возвращенный наблюдаемый объект работает таким образом, что каждый раз, когда наблюдаемый источник генерирует значение, данная функция вызывается со значением для создания этого нового наблюдаемого объекта, и этот наблюдаемый объект подписывается на него и немедленно начинает излучать. Все выбросы от всех созданных наблюдаемых объектов объединяются в возвращаемые наблюдаемые объекты по мере их поступления.

Введите этот код в свой редактор (несите меня, если он выглядит слишком сложным, мы скоро вернемся к нему!):

Давайте напишем более подробный тестовый пример, чтобы доказать себе, что это работает. Напишите следующее под определением mergeMap:

Здесь у нас есть наблюдаемый outer, который излучает 1, ожидает 800 мс, излучает 3, ждет три секунды и затем завершается. Мы pipe() эту наблюдаемую в mergeMap, которая создает новую наблюдаемую, которая испускает три значения, равные 10 * x 500 мс, а затем завершается.

Если mergeMap работает по назначению, то, что мы должны увидеть в окончательной подписке, - это два выброса 10, за которыми следует одно излучение 30, чередующееся прямо перед третьим излучением 10, поскольку 800 мс пройдут после того, как два 10 были отправлены, но до третьего 10 испускается, и поэтому outer должен испускать 3, инициируя новый вызов createInner(). Наконец, должны сработать третий 10 и оставшиеся два 30.

Фактически, это то, что мы увидим, если проверим вывод консоли из нашей подписки mergeMap:

Давайте вернемся к этому коду, чтобы понять, что он делает.

Первая часть этого кода похожа на нашу функцию map(): мы ведем текущий счет текущего индекса и передаем его в project вместе с выходными данными подписанного Observable. Однако логика управления подписками немного сложнее.

Первое, что вы заметите, - это набор subscriptions. Нам это понадобится, чтобы мы могли отслеживать подписку как на Observable верхнего уровня, так и на внутренние Observables, возвращаемые функцией project() . Мы гарантируем регистрацию всех подписок в нашем наборе subscriptions, чтобы мы могли отказаться от подписки на все наблюдаемые объекты, когда возвращенный наблюдаемый объект отменяется.

Каждый раз, когда мы получаем новое значение в subscribe(), мы создаем новый Observable, вызывая project() с переданным значением и текущим индексом. Затем мы подписываемся на этот возвращенный Observable, что позволяет нам пересылать созданные значения Observable в нашу исходную цепочку наблюдателей, а также учитывать ошибки и завершения. Фактически, мы объединяем выходные данные созданных Observables с нашими выходными Observables.

Обратите внимание, что мы удаляем подписку из нашего набора, когда завершается внутренний Observable. Согласно Наблюдаемому контракту нам больше не нужно отказываться от подписки по завершении подписки. Однако мы действительно хотим продолжать генерировать значения из новых проецируемых Observable даже после завершения внутреннего Observable. Поэтому мы специально выполняем not complete для наблюдаемой, возвращаемой оператором. Однако, если source Observable завершается, мы убеждаемся, что отказались от подписки, чтобы больше не получать эмиссии от выходного Observable (обратите внимание, что отмена подписки на исходный источник Observable здесь не имеет семантического эффекта).

Ключевые выводы здесь двоякие:

  • Операторы отображения высшего порядка работают, подписываясь на возвращаемые Observables и пересылая их значения через исходную цепочку Observer. По сути, Observer из возвращенного Observable действует как прокси для всех проецируемых Observable, созданных внутри функции.
  • mergeMap и все merge* операторы генерируют наблюдаемые значения, как только они поступают. Обратите внимание, как только создается Observable, значения начинают испускаться. Даже если предыдущие внутренние Observables еще не завершены, код здесь не имеет отношения к этому. Это является ключевым поведением в семействе операторов слияния.

Надеюсь, это приоткрывает завесу над тем, как работают некоторые из этих более продвинутых функций отображения более высокого порядка. Чтобы действительно укрепить наше понимание, давайте закончим реализацией более сложной функции сопоставления более высокого порядка: concatMap.

Как обычно, мы сначала посмотрим на код, а затем вернемся к нему. Введите это в свой редактор:

В отличие от mergeMap, concatMap не испускает значения из проецируемых Observable до тех пор, пока не завершится любой из текущих генерирующих Observable. Если сравнить мраморную диаграмму с mergeMap выше, мы получим:

Обратите внимание, здесь нет чередования между 10s и 30s; 30s идут строго после 10s.

Давайте проверим, что это так, используя те же тесты для mergeMap, которые мы использовали выше. Введите следующее под определением для concatMap:

Если мы запустим это, мы действительно должны увидеть, что спроецированные наблюдаемые испускаются строго в том порядке, в котором были испущены исходные наблюдаемые:

Давайте вернемся к коду и проанализируем, как мы этого добились.

Первое, на что следует обратить внимание, - это то, как мы ввели массив buffer. В concatMap порядок имеет значение, поэтому нам нужен способ точно контролировать, когда мы подписываемся на Observables. Используя список и выходя из этого списка, мы можем создать детерминированный порядок выдачи значений из каждого Observable по принципу «первым пришел - первым обслужен». Для этого используются функции buffer и subscribeTo; они помогают обеспечить правильную подписку на Observables. Когда мы получаем новое значение из исходного Observable, мы создаем новый внутренний Observable из этого значения. Затем мы проверяем, подписаны ли мы в настоящее время на наблюдаемое, проверяя, есть ли у нас какие-либо подписки в нашем subscriptions наборе. Если это так, мы помещаем этот наблюдаемый объект в наш buffer, где он может ожидать подписки. Вы можете думать об этом буфере как о рабочей очереди. Мы можем подписаться только на один наблюдаемый объект за раз, поэтому последующие наблюдаемые объекты должны будут дождаться завершения текущего подписанного объекта.

Если мы не subscribed, мы вызываем subscribeTo(projected, obs, subscriptions), который подписывается на внутренний Observable. Логика, на которой нужно сосредоточиться в subscribeTo, - это внутренняя функция завершения Observable . Когда внутренний Observable завершает свою работу, мы проверяем: 1) У нас больше нет подписок в нашем subscriptions наборе (подробнее об этом чуть позже) и 2) У нас есть какие-либо ожидающие наблюдаемые объекты, на которые требуется подписка. В этом случае мы немедленно подписываемся на следующий Observable, ожидающий подписки.

Ключевой вывод здесь заключается в том, что используя механизм, похожий на очередь, для ограничения одновременных подписок, и подписываясь на Observables только после завершения текущего подписанного Observable, concatMap может гарантировать порядок эмиссии для своих прогнозируемых наблюдаемых.

Абстрагирование операторов отображения высшего порядка

На этом этапе, даже если для вас все имеет смысл, у вас, скорее всего, возникнут некоторые вопросы:

  • Зачем нам вообще нужен набор subscriptions для concatMap?
  • Разве мы не можем повторно использовать логику между mergeMap и concatMap?

Вы на 100% правы, задавая эти вопросы. Я решил написать эти операторы сопоставления более высокого порядка таким образом, чтобы помочь вам понять, как их можно дальше абстрагировать, и в конечном итоге прийти к тому, как RxJS обрабатывает их. В RxJS этот поток буферизации и подписки обрабатывается гораздо более сложным, мощным и элегантным способом с помощью слияния, конкатенации и других. Если вам интересно, это отличное упражнение - прочитать эти ссылки на API и исходный код, чтобы лучше понять, как они работают. Например, merge и concat в RxJS имеют понятие параллелизм. Глядя на concatMap, вы можете представить, как вместо проверки в нашем массиве подписок, что он имеет размер 0, мы могли бы проверить, был ли он размером N, где N - количество разрешенных одновременных подписок. Тогда вы, возможно, лучше поймете, что они означают в документации RxJS, когда говорят, что merge равно concat с бесконечным параллелизмом. И наоборот, если бы merge имел понятие встроенного буфера, он мог бы представить concat как merge операцию с 1 параллелизмом.

Последним необязательным упражнением будет попытка рефакторинга mergeMap и concatMap, чтобы больше соответствовать тому, как он фактически построен в RxJS. В противном случае, если вам интересно, но вы не хотите писать код самостоятельно, в Angular University есть исчерпывающий пост о картировании более высокого порядка.

Заключение

Если вы зашли так далеко, похлопайте себя по плечу и, возможно, даже станьте победным танцем. Вы сами создали библиотеку, которая может имитировать многие функции RxJS. Вы создали не только полнофункциональный Observable класс, но и набор операторов, которые можно использовать для мощного управления асинхронными потоками данных. Поступая так, вы, надеюсь, получили гораздо более глубокое понимание и интуицию о том, как одна из основных библиотек, на которую опирается Angular, - и библиотека для реактивного программирования в Интернете - делает то, что делает.

Я надеюсь, что эти два руководства помогли вам демистифицировать RxJS :) Я хотел бы услышать ваши отзывы в комментариях, поэтому, пожалуйста, дайте мне знать, как это можно улучшить!

Есть еще одна часть - Часть III - о планировщиках. Планировщики - это более тонкая тема в RxJS, которую вы можете не так часто видеть, но если вам интересно их реализовать, читайте дальше! Если нет, то большое спасибо за чтение и удачного взлома! 🔁