Это первая часть из серии под названием Demystifying RxJS, в которой мы создаем нашу собственную миниатюрную версию RxJS, чтобы получить глубокое фундаментальное понимание того, как он работает. Вы можете следить за этим, используя этот начальный шаблон на CodeSandbox, или проверить этот CodeSandbox, содержащий полную миниатюрную библиотеку. Вы также можете просмотреть заполненный код для этого раздела только по адресу https://codesandbox.io/s/demystifying-rxjs-part-i-completed-0utge

Наблюдаемые - это абстракция, лежащая в основе RxJS. Они обеспечивают основу, на которой строится остальная часть библиотеки. Давайте начнем нашу собственную библиотеку RxJS с самостоятельного создания Observables.

Ниже приведен код TypeScript - полностью - для нашего чрезвычайно простого класса Observable . Если это кажется большим, не волнуйтесь! Вскоре мы подробно рассмотрим весь этот код. На данный момент, если вы используете начальный шаблон, введите его под последней строкой кода в src/index.ts:

Чтобы доказать себе, что это действительно работает, давайте адаптируем пример из официальной документации по Observables и посмотрим, получим ли мы такой же результат. Добавьте следующее прямо под вашим наблюдаемым кодом:

Если вы используете начальный шаблон, нажмите кнопку обновления в окне браузера в редакторе, а затем откройте консоль редактора. Теперь вы должны увидеть результат, соответствующий тому, который показан в официальном руководстве:

Ура! У нас есть несколько Observables. Уделите несколько минут, чтобы изучить только что написанный код. Надеюсь, написав это и изучив приведенный ниже пример, вы начали интуитивно понимать, как это может работать. Если нет, не волнуйтесь, потому что мы потратим оставшуюся часть статьи на разбивку 😁. Observables - самая важная часть этой библиотеки, которую вы должны понять. Четкое понимание того, как Observables достигают того, что они делают, устранит большую путаницу вокруг того, как RxJS работает. Давайте теперь вернемся и вместе пройдемся по коду.

Наблюдатели

Наблюдаемые объекты работают с потоками асинхронных данных. По сути, поток - это причудливый способ сказать «последовательность данных, которые могут поступить, но вы не знаете, когда». Например, потоком могут быть данные, поступающие из WebSocket. Поток также может быть щелчком по кнопке, где «поток» - это последовательность событий щелчка, которые происходят в течение сеанса. Поток может также быть таймером, который может сработать (если он не отменен), или HTTP-запросом, который может завершиться (если он не прерван). Вы можете думать об этих двух последних примерах как о последовательности только одного фрагмента данных (либо события таймера, либо ответа). В RxJS все эти примеры обрабатываются одинаково, и это один из источников его возможностей. Но я отвлекся.

Поскольку мы имеем дело с потоками данных, отсюда следует, что нам нужно что-то, чтобы напрямую взаимодействовать с этими потоками. Поэтому мы определяем интерфейс Observer, который отвечает за прямое взаимодействие с потоком, который оборачивается нашим Observable. Наблюдатели сигнализируют Observables, что в потоке наблюдаемых данных произошло что-то интересное. Это «что-то интересное» бывает трех видов:

  • Пришло новое значение. Это то, для чего нужен next().
  • Произошла ошибка. Это то, для чего нужен error().
  • Поток значений закончился, и больше никаких значений не будет. Вот для чего нужен complete().

Одно замечание для проницательного наблюдателя (😏): вы увидите, что то, что мы называем наблюдателями, в RxJS называется подписчики. Наблюдатель в RxJS - это фактически интерфейс, который реализует подписчик. Как я уже упоминал, мы создаем радикально упрощенную версию RxJS, поэтому этот уровень абстракции нам не нужен. Если вам интересно, я бы посоветовал вам изучить API RxJS, которые могут указать, почему необходимы более высокие уровни сложности (лично я думаю, что Subjects кажутся вероятной причиной).

Подписки

Затем мы определяем интерфейс Подписки, который отвечает за управление наблюдением за потоком асинхронных значений.

Вы можете думать о подписке как об идентификаторе, который вы получаете при вызове setTimeout() / setInterval(): это « дескриптор» самого наблюдения. Когда вы начинаете наблюдать, вы получаете ручку. Если вы хотите прекратить наблюдение, вы можете позвонить unsubscribe(). Звонок unsubscribe() похож на звонок clearInterval(); он сигнализирует, что ручка больше не нужна.

Наблюдаемые

Наконец, у нас есть класс Observable. Давайте подробно рассмотрим наблюдаемое.

Конструктор принимает observe() фабричную функцию для наблюдения за асинхронным потоком значений. Фабричной функции передается Observer, и ожидается, что этот наблюдатель будет взаимодействовать с указанным потоком значений. Эта фабричная функция имеет решающее значение, поскольку она связывает Observer с его базовым потоком. Он может настраивать обработчики событий, настраивать обратные вызовы таймера, подписываться на события сокетов или что-то еще, что имеет дело с асинхронным поведением. Именно эта фабричная функция, заданная в конструкторе Observable, позволяет абстрагировать базовый поток в интерфейс Observable. На мой взгляд, это чрезвычайно важная часть RxJS для понимания. Это также тот, который легче всего упустить, потому что обычно мы создаем Observables, используя фабричные методы, такие как of(), from() и т.п. (подробнее об этом позже).

В нашем примере кода мы написали фабричную функцию, в которой Observer немедленно испускает три значения, а затем ждет 1 секунду с помощью вызова setTimeout() перед отправкой дополнительного значения и завершением. Хотя это надуманный пример, представьте себе поток данных из WebSocket: вы не знаете точно, когда данные прибудут, но когда это произойдет, вам нужно каким-то образом на них отреагировать. В этом случае Observer взаимодействия будут похожи на код внутри этого setTimeout() обратного вызова.

Если есть какая-либо логика очистки, которую необходимо выполнить, когда наблюдение должно быть завершено, фабричная функция может вернуть функцию удаления, которая отвечает за отмену любого установочного кода, необходимого для прослушивания асинхронных значений. Продолжая пример с потоком данных WebSocket, если пользователь предпринимает какое-либо действие, чтобы мы могли перестать прослушивать изменения, нам больше не нужно прослушивать эти события в сокете. Таким образом, функция teardown позволяет нам отказаться от наших наблюдений.

Несмотря на то, что все, что наша Observable может сделать прямо сейчас, это подписаться на значения, передаваемые из потока данных, это все, что необходимо для создания основы для нашей наблюдаемой библиотеки. Продемонстрируем это в следующем разделе.

Укрепление нашего понимания: наблюдаемые фабрики

Ключ к пониманию того, как работает наблюдаемое, лежит в функции subscribe(). Как видно из кода, Observables принимают обратные вызовы, которые вы предоставляете subscribe(), превращают их в Observer и вызывают функцию конструктора observe() с этим Observer. Когда вы откручиваете все слои абстракция, это все, что на самом деле делают наблюдаемые.

Если вы не уверены, что это так, давайте реализуем некоторые из фабричных функций Observable , с которыми вы, скорее всего, столкнулись при работе с RxJS, чтобы увидеть, как все это связано с этой observe() функцией.

Для начала давайте реализуем of(), который просто перебирает все значения, предоставленные в качестве аргументов, перенаправляя каждое из них наблюдателю, а затем завершает.

Затем давайте реализуем from(), упрощенно - для краткости - так, чтобы он принимал только объекты и обещания, подобные массивам. Обратите внимание, как здесь мы либо перебираем объект, подобный массиву, либо ожидаем завершения обещания (соответственно), передавая результаты обратно наблюдателю.

Обратите внимание, что в from мы явно не завершаем, если в обещании есть ошибка. Это часть наблюдаемого контракта, который определяет, как работают наблюдаемые семейства Rx. Вы не должны ничего выдавать после выдачи ошибки, включая завершение (кстати, именно поэтому вы не можете повторно использовать наблюдаемые Angular HTTP, если у них есть ошибка '' d out, что было для меня источником недоумения).

Наконец, давайте реализуем fromEvent(). Здесь мы создадим прослушиватель событий, который пересылает все события наблюдателю. Мы также вернем функцию teardown, которая удаляет наш прослушиватель событий.

Как видно из вышесказанного, каждый из них можно реализовать, просто передав предварительно определенную observe() фабричную функцию в конструктор нашего Observable .

Ключевые выводы

Основываясь на том, что мы написали выше, следует отметить несколько важных моментов:

  • Функция observe() объекта Observable не вызывается, пока не будет вызван subscribe(). Это то, что мы имеем в виду, когда говорим, что Observable холодный. Как видите, это имеет смысл, потому что мы не хотим, чтобы значения активировались до, пока мы на них не подпишемся, чтобы не получить уведомление об их прибытии.
  • Обратите внимание, как в fromEvent , мы не можем предотвратить создание событий до того, как наблюдаемое будет подписано . Пользователь мог щелкнуть эту кнопку в любое время, независимо от того, подписаны мы на него или нет. Это то, что мы имеем в виду, когда говорим, что Observable горячий.
  • Обратите внимание, как каждый раз, когда мы subscribe(), мы повторно вызываем фабричную функцию. Вот откуда берутся все эти неожиданные множественные триггеры, и это то, что share() и друзья предотвращают.

Теперь мы вооружены базовой функциональностью, которая нам нужна для библиотеки Observable. Вы можете использовать то, что мы здесь создали, для обертывания любого потока асинхронных событий.

Однако реальная сила Observables заключается в возможности обрабатывать потоки асинхронных данных как списки, а списки полезны только тогда, когда с ними можно работать. В Части II мы напишем наши собственные операторы, а также функцию .pipe() , которую мы можем использовать для достижения этой функциональности.