Давайте попробуем выяснить, что на самом деле представляет собой Observable, и попробуем написать его простую реализацию. Я постараюсь объяснить так, чтобы вы почувствовали себя уверенно в использовании наблюдаемых rxjs. Мы слышим много модных словечек, таких как «ленивый», «горячо наблюдаемый», «холодный наблюдаемый» и т.п. В этой первой статье я постараюсь объяснить вам, что на самом деле такое Observable.

Давайте попробуем построить Observable, упоминая его свойства и создавая свойство одно за другим. Это лучший способ понять внутреннее устройство.

  1. Observable - это функция (период): - это просто функция, которая принимает один параметр или три обратных вызова (наблюдатель) и возвращает функцию удаления. Да, это так, и это правда. Давайте создадим наблюдаемое, которое выталкивает первые три нечетных числа, начиная с 0.
function firstThreeOddNumbers$ (observer) {
    observer.next(1);
    observer.next(3);
    observer.next(5);
    observer.complete();
    return () => console.log('clean up function');
}

Итак, вы только что написали свой первый наблюдаемый объект, на который при подписке вы получаете первые три нечетных числа.

Позвольте мне повторить и запомнить это: Observable - это всего лишь функция.

2. Observable ленив: - Мы часто слышим это, но никогда особо не пытаемся понять, что это на самом деле означает. На самом деле это действительно просто, функции в javascript или большинстве других языков ленивы. Это просто означает, что если мы определим функцию, она ничего не сделает, пока мы не вызовем функцию. Я думаю, что это все знают, и теперь, как мы знаем, наблюдаемое также является функцией. Следовательно, его ленивый характер связан с тем, что это всего лишь функция, и он ничего не будет делать, пока не будет вызван. Удивлен? Да, это просто модное слово, и теперь вы знаете, почему наблюдаемый ленив. Давайте перейдем к нашему примеру: мы определили наблюдаемый объект как функцию firstThreeOddNumbers (Observer) {}, которая ничего не делает, пока мы ее не вызовем. Назовем нашу наблюдаемую.

// call the function
console.log('start');
const unSub = firstThreeOddNumbers$({
   next:  (x) => console.log(x),
   error: (error) => console.log(error),
   complete: ()=> console.log('complete')
});

unSub();
console.log('end');

//output
start
1
3
5
complete
clean up function
end

поэтому как только мы вызываем функцию, мы начинаем получать данные из наблюдаемого (1, 3, 5) или просто говорим функция выполняется, а также возвращаем функцию очистки, которую мы вызываем так что утечки памяти не происходит.

Я знаю, вы можете подумать, неужели все так просто? да поверьте мне, это так. Это просто функция (по умолчанию ленивая по своей природе), и внутри функции мы просто подключаемся к некоторому производителю данных к наблюдателю, переданному в качестве параметра.

Итак, чтобы завершить Observable - это функция, мы вызываем функцию, выполняется функция, которая начинает передавать данные от производителя данных к наблюдателю, вызывая Observver.next ().

3. Observable является синхронизируемым или асинхронным в зависимости от того, что мы делаем внутри функции.

Вам может быть интересно, что наблюдаемое должно быть асинхронным, о чем вы говорите? Позвольте мне повторить еще раз, Observable - это просто функция, а функция по умолчанию является синхронной и ленивой по своей природе. Если вы видите вывод нашего вышеупомянутого фрагмента кода, он также просто синхронен с выводом ниже.

start
1
3
5
complete
clean up function
end

Сначала печатается start, и производитель данных начинает передавать данные, затем вызывается complete, вызывается unSub, а затем печатается end. Это абсолютно синхронно. И, очевидно, как функция, он должен быть синхронным.

Если мы хотим, чтобы это было асинхронно, нам нужно выполнить асинхронную операцию или добавить какой-нибудь планировщик внутри функции, который сделает ее асинхронной.

Давайте сделаем наш наблюдаемый асинхронный

function firstThreeOddNumbers$ (observer) {
    const id = setTimeout(() => {
    observer.next(1);
    observer.next(3);
    observer.next(5);
    observer.complete();
    return () => {
         console.log('clean up function');
         clearTimeout(id);
      }
    }, 5000);
}
console.log('start');
const unSub = firstThreeOddNumbers$({
   next:  (x) => console.log(x),
   error: (error) => console.log(error),
   complete: ()=> console.log('complete')
});
console.log('end');
setTimeout(() => unsub() , 10000 );
//output
start
end
1
3
5
complete
clean up function

теперь мы видим, что сначала печатаются начало и конец, а обработчик setTimout вызывается асинхронно (в следующем тике), который выталкивает данные и завершает их.

Опять удивлен? да это оно. Если внутри функции мы проталкиваем данные асинхронно, тогда наблюдаемый будет асинхронным, а в противном случае - синхронным.

4. Observable функция обернута в класс: - В основном мы используем библиотеку Rxjs в наших проектах и ​​видим типы subscribe и Observable.

Это что-то другое? Под капотом все еще остается функция

Rxjs на самом деле просто оборачивает эту наблюдаемую функцию в класс.

class Observable {
   constructor(fn) {
       this.subscribe = fn;
   }
}
source$ = new Observable(firstThreeOddNumbers);
source$
   .subscribe(observer);

Опять удивлен?

subscribe - это просто свойство класса Observable, которое указывает на нашу основную наблюдаемую функцию. Итак, если мы вызываем подписку, мы просто вызываем эту функцию. Подумайте еще раз, вызов подписки означает, что мы просто вызываем нашу основную наблюдаемую функцию, которая передает данные наблюдателю синхронно или асинхронно.

5. Observable является одноадресным по своей природе: - Мы могли слышать или испытывать это, если обернем http-вызовы в Observable и если мы подписались, он сделает http-вызов. Если мы снова подпишемся на тот же Observable, он снова сделает тот же вызов. Это связано с тем, что Observable является одноадресным по своей природе. Означает, что он подключает один экземпляр производителя данных только к одному наблюдателю.

Чтобы еще раз упростить, чтобы не возвращаться к основам. Когда мы вызываем subscribe, мы просто вызываем функцию. Так что подумайте еще раз, если мы снова вызовем подписку на тот же наблюдаемый объект, что может произойти. ? он снова вызывает ту же функцию и выполняет тот же код внутри функции. Вот и все. Это действительно так просто.

source$ = new Observable(firstThreeOddNumbers);
source$.subscribe(observer1);
source$.subscribe(observer2);
output
1
3
5
complete
tear down function
1
3
5
complete
tear down function

Даже в нашем примере мы видим, что двойная подписка на один и тот же наблюдаемый объект вызывает функцию дважды.

По сути, наш Observable не сильно отличается от Observable для HTTP-вызовов. В случае http основная наблюдаемая функция может выполнять вызов ajax, и как только результат с сервера вернется, будет вызван наблюдатель.next (результат).

Таким образом, мы подписываемся на него дважды, тогда на самом деле мы дважды вызываем сервер.

Я уверен или, по крайней мере, надеюсь, что теперь, когда мы понимаем основы, может быть действительно просто понять вышеупомянутый случай.

В заключение Позвольте мне вкратце дать вам определение наблюдаемого.

  1. Наблюдаемое - это просто функция. Функция по своей природе ленива и синхронна и, следовательно, является наблюдаемой.
  2. Внутри функции нам нужно просто подключить производителя данных к потребителю данных (наблюдателю). Производитель данных отправляет данные потребителю данных через Observer.next ().
  3. Производитель данных может передавать данные синхронно или асинхронно.

.