Введение с примерами кода

Обзор

В этом уроке я познакомлю вас с наблюдаемыми объектами RxJS. Во-первых, вы получите краткое объяснение того, что это такое и как вы можете их использовать. Затем мы рассмотрим пример кода и сами создадим некоторые наблюдаемые объекты.

Наблюдаемые представляют собой поток данных. Они доставляют (проталкивают) ценности своим наблюдателям (потребителям). Они часто используются для асинхронных данных, но важно знать, что наблюдаемые могут быть как синхронными, так и асинхронными. Когда мы работаем с наблюдаемыми, нам необходимо знать о двух других связанных понятиях: наблюдатели и подписки. .

Давайте углубимся. Что это такое, для чего они используются и как они работают вместе?

Что такое наблюдаемые?

Наблюдаемые объекты могут предоставлять значения либо синхронно, либо асинхронно. Они могут просто предоставить одно значение, а также несколько значений или бесконечное количество значений. Мы можем, например, использовать их для доставки простого одиночного значения, но мы также можем прослушивать нажатия кнопок, прокручивать события, обрабатывать HTTP-запросы, в общем, все, что вы хотите! И какие бы данные вам ни понадобились, интерфейс всегда один и тот же.

Наблюдаемые ленивы. Это означает, что они ничего не делают, если мы подписываемся на них. Когда мы подписываемся, мы будем получать значения до тех пор, пока наблюдаемое не завершится или пока мы не откажемся от подписки. Просто подписка заставит наблюдаемые работать, но, конечно, мы также хотим что-то сделать со значениями, которые предоставляет наблюдаемые. Мы хотим выполнить какой-то собственный код, когда получим новое значение. Это та часть, где наблюдатель вступает в игру.

Наблюдаемые часто асинхронны, но могут быть и синхронными!

Наблюдатель

Наблюдатель слушает значения, предоставляемые наблюдаемым. Когда мы подписываемся, мы предоставляем наблюдателя, который объяснит наблюдаемому, что мы хотим делать, когда поступают новые значения. Наблюдатель — это просто объект, который реализует до трех методов, по одному для каждого типа уведомлений, которые может доставить наблюдаемый объект:

  • следующий()
  • ошибка()
  • полный()

Вы не можете реализовать ни один из них, и наблюдаемое все равно будет выполнять свою работу. В большинстве случаев вы захотите реализовать некоторые из них. Если вы не предоставите метод для типа уведомления, уведомление будет просто проигнорировано. Наблюдаемый доставляет уведомления наблюдателю, вызывая next(), error() или complete() у наблюдателя.

Наблюдаемый контракт

Помните, что наблюдаемые могут просто предоставить одно значение, но также и большее их количество. Для каждого значения, которое наблюдаемый объект хочет предоставить, он вызывает next() для нашего наблюдателя с данными, которые необходимо доставить. Если происходит ошибка, наблюдаемая вызывает error(), а если наблюдаемая завершается, она вызывает complete(). Важно отметить, что next() можно вызывать несколько раз, а error() и complete() можно вызывать только один раз и только любой из них. Если какой-либо из них вызывается, то после этого ничего больше не может быть доставлено.

Как создать наблюдаемую

Давайте посмотрим на все это поближе, создав некоторый реальный код:

const observable = new Observable((observer: Observer<number>) => {
  console.log('delivering next notification');
  observer.next(1);
});

Когда мы выполняем это и смотрим на вывод нашей консоли, мы видим… правильно: ничего! Мы только что создали наблюдаемую, но не подписывались на нее:

observable.subscribe();

Когда мы снова смотрим на нашу консоль, мы видим следующий вывод:

delivering next notification

Но мы видим только команду журнала, которая выполняется самим наблюдаемым. Мы ничего не делали с значением, которое было выдано наблюдаемым объектом. Давайте также предоставим наблюдателя, чтобы сообщить наблюдаемому, что мы хотим сделать, когда вызывается next():

observable.subscribe({
  next: (value) => {
    console.log(`received next notification with value: ${value}`);
  }
});

Теперь наш наблюдатель запишет сообщение и полученное значение в консоль. Это приведет к следующему результату:

delivering next notification
received next notification with value: 1

Поздравляем! Вы только что создали свой первый наблюдаемый объект и подписались на него! Это пример наблюдаемого объекта, который доставляет значение синхронно. Теперь давайте создадим еще один наблюдаемый объект, который будет предоставлять значения асинхронно.

const observable = new Observable((observer: Observer<number>) => {
  observer.next(1);
  observer.next(2);
  
  setTimeout(()=> {
    // deliver after two seconds
    observer.next(3);
  }, 2000);
  observer.next(4);
});
observable.subscribe({
  next: (value) => {
    console.log(`received next notification with value: ${value}`);
  }
});

Теперь вы увидите этот вывод:

received next notification with value: 1
received next notification with value: 2
received next notification with value: 4
received next notification with value: 3

Обратите внимание, что мы не вносили никаких изменений в наш наблюдатель. Это означает, что когда мы подписываемся на наблюдаемое, мы также можем получать значения асинхронно!

Использование полного и ошибочного

Теперь давайте расширим наш наблюдатель, реализуя функции обратного вызова для завершения и ошибки:

const observable = new Observable((observer: Observer<number>) => {
  observer.next(1);
  observer.next(2);
  
  setTimeout(() => {
    observer.next(3);
  }, 2000);
  observer.next(4);
  observer.complete();
});
observable.subscribe({
  next: value => console.log(`received value ${value}`),
  complete: () => console.log('completed')
});

Это приведет к следующему результату:

received value 1
received value 2
received value 4
completed

Теперь мы видим наш журнал в консоли, когда наблюдаемое завершается. Обратите внимание, что мы не видели вывод дляObserver.next(3) . Это уведомление не доставляется, поскольку это происходит после завершения наблюдаемого объекта. То же самое относится и к ошибкам. При возникновении ошибки вызов next в наблюдаемом не будет иметь никакого эффекта:

const observable = new Observable((observer: Observer<number>) => {
  observer.next(1);
  observer.next(2);
throw new Error('nothing else happens after me');
observer.next(3);
  observer.complete();
});
observable.subscribe({
  next: value => console.log(`received value ${value}`),
  complete: () => console.log('completed'),
  error: error => console.log(error.message)
});

Когда мы смотрим на результат, мы не видим никакого вывода для значения 3 и завершаем:

received value 1
received value 2
nothing else happens after me

Подписки и отписки

Рассмотрим следующие наблюдаемые:

const observable = new Observable((observer: Observer<number>) => {
  let number = 0;
  setInterval(() => {
    observer.next(number++);
  }, 1000);
});
// when we only want to use 'next', instead of passing an observer object, we just pass the next callback
observable.subscribe(value => console.log(value));

Когда мы посмотрим на вывод нашей консоли, мы увидим следующее:

0
1
2
3
4
...

Это никогда не закончится! Наш наблюдаемый объект никогда не завершится и будет вечно передавать эти значения своим подписчикам. Но что, если мы хотим перестать их получать? Что, если мы захотим перестать получать значения, если мы больше не хотим их получать, вместо того, чтобы позволить наблюдаемому решать это за нас? Здесь мы отписываемся.

Когда мы подписываемся на наблюдаемый объект, метод subscribe возвращает объект subscription. Мы можем сохранить объект подписки в переменной и просто вызвать для него метод отмены подписки:

const subscription = observable.subscribe(value => console.log(value));
subscription.unsubscribe();

Это не приведет ни к какому выводу, потому что мы сразу отписываемся после подписки, чтобы немного изменить это, чтобы увидеть некоторые результаты:

const observable = new Observable((observer: Observer<number>) => {
  let number = 0;
  
  setInterval(() => {
    observer.next(number++);
  }, 1000);
});
const subscription = observable.subscribe(value => console.log(value));
// unsubscribe after two seconds
setTimeout(() => {
  subscription.unsubscribe();
}, 2000);

Теперь мы увидим следующее:

0
1

Через две секунды мы отписались и не получили никаких новых значений. Когда у вас есть бесконечный наблюдаемый объект и вам больше не нужна подписка, вы должны всегда отписываться, чтобы предотвратить утечку памяти.

Заключение

В этой статье мы рассмотрели основы наблюдаемых, наблюдателей и подписок RxJS. Исходный код всех примеров вы можете найти на GitHub.