Получить подписчиков Observable и заставить их подписаться на другой Observable

Проще говоря

Учитывая существующий Observable (который еще не завершен), есть ли способ получить связанных подписчиков (функции, переданные для подписки), чтобы вместо этого заставить их подписаться на другой Observable?

Контекст

Служба в моем приложении помогает создавать подключения SeverEvent, возвращать ConnectableObservable к прокси-соединению и разрешать многоадресную рассылку с помощью оператора publish. Сервис отслеживает существующие подключения через внутреннее хранилище:

store: {[key: string]: ConnectionTracker};

// …

interface ConnectionTracker {
    url: string;
    eventSource: EventSource;
    observable: rx.ConnectableObservable<any>;
    subscription: rx.Subscription;
    observer: rx.Observer<any>;
    data?: any; // Arbitrary data
}

При создании соединения, если связанный трекер уже существует (идентификация выполняется с использованием конечной точки соединения), служба должна:

  • ok Закрыть существующее соединение ServerEvent с трекером
  • ok Откройте новое соединение SerevrEvent (следовательно, новый ConnectableObservable)
  • Замените Observable существующего средства отслеживания новым наблюдаемым, но сделайте так, чтобы существующие подписчики подписались на новый Observable.

Вот часть кода, которая создает ConnectionTracker.

/**
* Create/Update a ServerEvent connection tracker
*/
createTracker<T>(endpoint: string, queryString: string = null): ConnectionTracker
{
    let fullUri = endpoint + (queryString ? `?${queryString}` : '')
        , tracker = this.findTrackerByEndpoint(endpoint) || {
            observable: null,
            fullUri: fullUri,
            eventSource: null,
            observer: null,
            subscription: null
        }
    ;

    // Tracker exists
    if (tracker.observable !== null) {
        // If fullUri hasn't changed, use the tracker as is
        if (tracker.fullUri === fullUri) {
            return tracker;
        }

        // At this point, we know "fullUri" has changed, the tracker's
        // connection should be replaced with a fresh one

// ⇒ TODO
// ⇒ Gather old tracker.observable's subscribers/subscriptions to make
//   them subscribe to the new Observable instead (created down below)

        // Terminate previous connection and clean related resouces
        tracker.observer.complete();
        tracker.eventSource.close();
    }

    tracker.eventSource = new EventSource(<any>fullUri, {withCredentials: true});
    tracker.observable = rx.Observable.create((observer: rx.Observer<T>) => {
            // Executed once
            tracker.eventSource.onmessage = e => observer.next(JSON.parse(e.data));
            tracker.eventSource.onerror = e => observer.error(e);
            // Keep track of the observer
            tracker.observer = observer;
        })
        // Transform Observable into a ConnectableObservable for multicast
        .publish()
    ;

    // Start emitting right away and also keep a reference to 
    // proxy subscription for later disposal
    tracker.subscription = tracker.observable.connect();

    return tracker;
}

Спасибо.


person Stphane    schedule 25.07.2019    source источник
comment
Похоже, вы могли бы использовать только switchMap, который возвращает новый Observable. Подписчики останутся такими, какие они есть, но будут получать значения от этого нового Observable.   -  person martin    schedule 25.07.2019
comment
Боюсь, switchMap — это не то, что мне нужно. Я намерен создать замену Observable, но восстановить зарегистрированных наблюдателей из предыдущего Observable. Как только соединение ServerEvent закрывается, связанный Observable становится устаревшим (нет источника → больше нет вызова next()), ссылки на Observable могут быть удалены. switchMap строит только цепочку Observable: исходный Observable теоретически все еще находится в рабочем состоянии (насколько я понимаю), но поскольку связанное соединение закрыто, никакое значение больше не будет выдаваться, а новый Observable не будет иметь шанс взять верх.   -  person Stphane    schedule 25.07.2019


Ответы (2)


Если вы попытаетесь сделать что-то вроде перемещения подписчика на другой наблюдаемый объект, то вы просто не будете делать то, что задумано в RxJS. Любая такая манипуляция по сути является хакерством.

Если вы время от времени создаете новую наблюдаемую (например, делая запрос) и хотите, чтобы какой-то подписчик всегда был подписан на самую последнюю из них, то вот решение:

  private observables: Subject<Observable<Data>> = new Subject();

  getData(): Observable<Data> {
    return this.observables.pipe(switchAll());
  }

  onMakingNewRequest(newObservable: Observable<Data>) {
    this.observables.push(newObservable);
  }

Таким образом, вы можете предоставить один наблюдаемый объект (через getData()), на который подписывается клиент, но, нажав на this.observables, вы измените фактический источник данных, который видит пользователь.

Что касается закрытия соединения и подобных вещей, ваш наблюдаемый (тот, который создается с каждым запросом или чем-то еще) должен в основном заботиться о выпуске и закрытии материала, когда он отписывается, тогда вам не нужно выполнять какую-либо дополнительную обработку, предыдущий наблюдаемый автоматически отпишется, как только вы нажмете новый. Детали зависят от фактического бэкэнда, с которым вы связываетесь.

person TPReal    schedule 26.07.2019
comment
Ваше второе предложение звучит как то, что я ищу, и я чувствую, что комбинация Subject / switchAll — это именно то, что мне нужно. Я покопаюсь в этом, спасибо. - person Stphane; 31.07.2019

Вместо того, чтобы пытаться вручную переносить подписчиков с одного Observable на другой, вы должны предоставить слушателям Observable, который при необходимости будет автоматически переключаться на другой Observable.

Вы делаете это, работая с наблюдаемым объектом высокого порядка (наблюдаемый объект, который испускает наблюдаемые объекты), который всегда переключается на самый последний внутренний наблюдаемый объект.

Основная концепция

// a BehaviorSubject is used so that late subscribers also immediately get the most recent inner Observable
const higherOrderObservable = new BehaviorSubject<Observable<any>>(EMPTY);

// pass new Observable to listeners
higherOrderObservable.next(new Observable(..));

// get most recent inner Observable
const currentObservable = higherOrderObservable.pipe(switchMap(obs => obs));
currentObservable.subscribe(valueFromInnerObservable => { .. })

В твоем случае

Для каждой конечной точки создайте BehaviorSubject (поставщик средства отслеживания), который генерирует Observable (средство отслеживания), которое в данный момент должно использоваться для этой конечной точки< /эм>. Если необходимо использовать другой трекер для данной конечной точки, передайте этот новый Observable в метод BehaviorSubject. Позвольте вашим слушателям подписаться на BehaviorSubject (поставщик трекера), который автоматически предоставит им нужный трекер, т. е. переключится на Observable, который следует использовать в данный момент.

Упрощенная версия вашего кода может выглядеть так, как показано ниже. Особенности зависят от того, как вы используете функцию createTracker в своем приложении.

interface ConnectionTracker {
  fullUri: string;
  tracker$: ConnectableObservable<any>;
}

// Map an endpoint to a tracker supplier.
// This is your higher order Observable as it emits objects that wrap an Observable
store: { [key: string]: BehaviorSubject<ConnectionTracker> };
closeAllTrackers$ = new Subject();

// Creates a new tracker if necessary and returns a ConnectedObservable for that tracker. 
// The ConnectedObservable will always resemble the current tracker.
createTracker<T>(endpoint: string, queryString: string = null): Observable<any> {
  const fullUri = endpoint + (queryString ? `?${queryString}` : '');
  // if no tracker supplier for the endpoint exists, create one
  if (!store[endpoint]) {
    store[endpoint] = new BehaviorSubject<ConnectionTracker>(null);
  }
  const currentTracker = store[endpoint].getValue();

  // if no tracker exists or the current one is obsolete, create a new one
  if (!currentTracker || currentTracker.fullUri !== fullUri) {
    const tracker$ = new Observable<T>(subscriber => {
      const source = new EventSource(fullUri, { withCredentials: true });
      source.onmessage = e => subscriber.next(JSON.parse(e.data));
      source.onerror = e => subscriber.error(e);
      return () => source.close(); // on unsubscribe close the source
    }).pipe(publish()) as ConnectableObservable<any>;
    tracker$.connect();
    // pass the new tracker to the tracker supplier
    store[endpoint].next({ fullUri, tracker$ });
  }
  // return the tracker supplier for the given endpoint that always switches to the current tracker
  return store[endpoint].pipe(
    switchMap(tracker => tracker ? tracker.tracker$ : EMPTY), // switchMap will unsubscribe from the previous tracker and thus close the connection if a new tracker comes in
    takeUntil(this.closeAllTrackers$) // complete the tracker supplier on emit
  );
}

// close all trackers and remove the tracker suppliers
closeAllTrackers() {
  this.closeAllTrackers$.next();
  this.store = {};
}

Если вы хотите разом закрыть все подключения к трекеру, а существующие подписчики должны получить уведомление complete, позвоните по номеру closeAllTrackers. Если вы хотите закрыть только некоторые подключения к трекерам, но не хотите, чтобы существующие подписчики получали уведомление complete, чтобы они продолжали прослушивать новые трекеры, поставляемые в будущем, вызовите store[trackerEndpoint].next(null) для каждого трекера.

person frido    schedule 25.07.2019
comment
Что делать, если мне нужно разорвать все существующие подключения? Будет ли достаточно циклически перебирать собственные свойства store и вызывать .complete() для каждого BehaviorSubject? - person Stphane; 31.07.2019
comment
Вы, вероятно, не хотите завершать сами BehaviorSubjects, потому что впоследствии вы не сможете создавать новые трекеры, если захотите. Вместо этого просто передайте null каждому поставщику трекеров: store[endpoint].next(null). Я отредактировал последний оператор возврата так, что если null передается объекту BehaviorSubject, он переключается на наблюдаемый объект EMPTY, который только что завершился. Таким образом, вы отмените подписку на предыдущий трекер, и слушатели не получат никаких данных, пока новый Observable не будет передан BehaviorSubject. - person frido; 31.07.2019