Обзор RxJS

При разработке промышленных систем крайне важно, чтобы мы строили с учетом производительности. Асинхронная неблокирующая обработка стала неотъемлемой частью экосистемы JavaScript как на стороне сервера, так и на стороне клиента. Таким образом, существует множество инструментов для помощи в разработке приложений, настроенных для обработки асинхронных задач или событий, таких как обратные вызовы, обещания, async / await и RxJS.

В этом посте мы рассмотрим RxJS на высоком уровне и рассмотрим основные компоненты, необходимые для обработки с ним асинхронных событий.

Что такое RxJS?

RxJS - это аббревиатура от «Reactive Extensions for Javascript». Проще говоря, это сборка библиотеки для реактивного программирования с компонентами, которые позволяют создавать асинхронный код. Он обеспечивает реализацию типа Observable.

Наблюдаемые

Наблюдаемое - это экземпляр типа Observable. Он обеспечивает поддержку публикации сообщений (событий) и подписки на эти сообщения в приложении. Другими словами, наблюдаемый моделирует поток событий. Объект, имеющий подписку на наблюдаемое, называется наблюдателем.

Наблюдатели

Это объекты, которые подписываются на наблюдаемые. Подписчики прослушивают сообщения, отправленные наблюдаемыми объектами, и «реагируют» на эти сообщения, выполняя объявленные преобразования в полученном сообщении.

Этот шаблон обеспечивает эффективную обработку параллельных операций, поскольку наблюдатели не блокируют поток во время ожидания сообщений от наблюдаемых объектов.

Операторы

Это функции, которые позволяют нам выполнять определенные действия с событиями, генерируемыми наблюдаемыми объектами. Существует множество наблюдаемых операторов. Обычно они классифицируются по их влиянию на наблюдаемые - например, операторы, которые создают наблюдаемые (from(), create(), start()), операторы, которые преобразуют наблюдаемые _6 _, _ 7_, flatMap()), и операторы, которые фильтруют наблюдаемые (debounce(), filter(), sample(), take()).

Простой наблюдаемый пример использования

Допустим, у вас есть users коллекция в базе данных NoSQL, из которой вы хотите загрузить данные и проверить, соответствуют ли данные заданным критериям. Если критерии соблюдены, вы хотите выполнить преобразования данных и сохранить эти данные в целевой коллекции. Эта проблема может быть решена как с помощью синхронных, так и асинхронных стратегий.

Синхронный подход

Ниже приведен псевдокод, показывающий, как указанная выше задача может быть выполнена с помощью синхронной стратегии:

read users from DB collection
for each user in users
	check criteria
	if user meets critetia
		perform transformation
		store transformed data in target collection.
	end
end

Последовательность обрабатываемых событий изображена на этой диаграмме:

Невзаимозависимые задачи выполняются одна за другой до момента завершения. У этого подхода есть положительные и отрицательные стороны. Некоторые заметные преимущества:

  • Реализация алгоритмов в синхронных процессах часто менее сложна, чем соответствующее асинхронное решение.
  • Синхронные процессы часто требуют меньше времени для реализации, чем асинхронные аналоги.

С другой стороны есть минусы:

  • Синхронный код по определению blocks выполнение программы до завершения выполняющейся задачи. На диаграмме выше задачи, которые должны быть выполнены для второго пользователя, отличаются от задач первого пользователя и не зависят от них. Тем не менее, задачи пользователя 2 не могут выполняться, пока не будут выполнены задачи пользователя 1. Это явно не самый эффективный подход. Что еще хуже, если во время выполнения задач для пользователя 1 возникает ошибка, программа будет завершена без выполнения операций пользователей 2 и 3.
  • Во многих случаях программы этой формы плохо используют ресурсы, и производительность программы может сильно пострадать.

Асинхронный подход

Асинхронный неблокирующий подход предполагает одновременное выполнение необходимых задач, как показано на этой диаграмме:

На диаграмме выше изображена обработка событий в наблюдаемых потоках:

  • Верхняя горизонтальная линия - это временная шкала наблюдаемого, где время течет слева направо.
  • Фигуры, пересекаемые наблюдаемой шкалой времени, являются элементами, испускаемыми наблюдаемым.
  • Вертикальная линия, касательная к наблюдаемой временной шкале, указывает, что наблюдаемое завершилось успешно.
  • Прямоугольное поле отображает преобразования, выполненные с испускаемыми объектами. Эти объекты принимаются и передаются по конвейеру для преобразования подписчиком.
  • Нижняя горизонтальная линия - наблюдаемый результат преобразования.

В этом асинхронном подходе действия и преобразования пользовательских объектов происходят одновременно в последовательностях.

Теперь мы познакомились с контекстом того, как работают Observables, давайте посмотрим, как использовать их с RxJS. Рассмотрим эту программу:

const data = [];

for (let i = 0; i < 10; i++) {
    data.push(i)
}

const values: Array<number> = data
    .map(value => value * 3)
    .filter(value => value % 4 === 0);

Приведенный выше код создает массив и вставляет в него целые числа от 0 до 9, после чего умножает все элементы массива на коэффициент 3. Наконец, он фильтрует только значения, равномерно делящиеся на 4, в новый массив с именем «values». Когда мы записываем массив значений в стандартный вывод системы, мы видим, что наш результат - [0, 12, 24].

Мы можем добиться того же набора действий с RxJS и наблюдаемыми объектами. Наше первое действие - создать производителя, который генерирует ценности, которые наша программа будет соблюдать и на которые в конечном итоге «реагировать». Это легко сделать:

import { Observable } from "rxjs";
import { filter, map } from 'rxjs/operators';

const observable: Observable<number> = new Observable((subscriber) => {
    for (let i = 0; i < 10; i++) {
        subscriber.next(i);
    }
    subscriber.complete();
});

В четвертой строке мы создаем новый наблюдаемый экземпляр. Мы передаем подписчику (или наблюдателю) единственный аргумент конструктору. У наблюдателя есть три основные функции: next(), error() и complete(). next() используется для отправки сообщений наблюдающим объектам. error() используется для предупреждения наблюдателей об ошибках, возникших во время процесса. Наконец, complete() уведомляет наблюдателей об успешном завершении.

Имея в виду эти методы, легко понять, чего достигает эта программа. Мы создали наблюдаемое, которое передает числа от 1 до 9 в потоке подписавшимся на него наблюдателям. Конечно, создав наблюдаемый объект, мы должны создать соответствующего подписчика:

observable
    .pipe(
        map(value => value * 3),
        filter(value => value % 4 === 0)
    ).subscribe(result => console.log(result));

Перед подпиской на наблюдаемое используйте операторы map и filter. map преобразует элементы, испускаемые наблюдаемым объектом, применяя к ним функцию. filter удаляет нежелательные элементы в наблюдаемом потоке и испускает только те элементы, которые прошли проверку предиката.

Вдобавок мы использовали pipe метод, доступный нашему наблюдаемому. pipe используется для составления реактивных операторов. Это очень удобно при выполнении сложных преобразований потоков данных!

Функция pipe() возвращает наблюдаемый объект, на который мы в конечном итоге подписываемся, чтобы получить наши результаты. Записывая полученные данные в консоль, мы получаем 0, 10, 24. Точно такой же результат мы получили при использовании первого подхода!

Заключение

В этом посте мы изучили ключевые основы реактивного программирования в Node.js с помощью RxJS. Тем не менее, в мире ReactiveX всегда есть что открыть. Если вы хотите глубже погрузиться в ReactiveX и RxJS, вот несколько отличных статей, которые помогут вам в вашем путешествии: