Проще говоря
Учитывая существующий Observable (который еще не завершен), есть ли способ получить связанных подписчиков (функции, переданные для подписки), чтобы вместо этого заставить их подписаться на другой Observable?
Контекст
Служба в моем приложении помогает создавать подключения SeverEvent, возвращать ConnectableObservable к прокси-соединению и разрешать многоадресную рассылку с помощью оператора publish. Сервис отслеживает существующие подключения через внутреннее хранилище:
store: {[key: string]: ConnectionTracker};
// …
interface ConnectionTracker {
url: string;
eventSource: EventSource;
observable: rx.ConnectableObservable<any>;
subscription: rx.Subscription;
observer: rx.Observer<any>;
data?: any; // Arbitrary data
}
При создании соединения, если связанный трекер уже существует (идентификация выполняется с использованием конечной точки соединения), служба должна:
- ok
Закрыть существующее соединение ServerEvent с трекером - ok
Откройте новое соединение SerevrEvent (следовательно, новый ConnectableObservable) - Замените Observable существующего средства отслеживания новым наблюдаемым, но сделайте так, чтобы существующие подписчики подписались на новый Observable.
Вот часть кода, которая создает ConnectionTracker.
/**
* Create/Update a ServerEvent connection tracker
*/
createTracker<T>(endpoint: string, queryString: string = null): ConnectionTracker
{
let fullUri = endpoint + (queryString ? `?${queryString}` : '')
, tracker = this.findTrackerByEndpoint(endpoint) || {
observable: null,
fullUri: fullUri,
eventSource: null,
observer: null,
subscription: null
}
;
// Tracker exists
if (tracker.observable !== null) {
// If fullUri hasn't changed, use the tracker as is
if (tracker.fullUri === fullUri) {
return tracker;
}
// At this point, we know "fullUri" has changed, the tracker's
// connection should be replaced with a fresh one
// ⇒ TODO
// ⇒ Gather old tracker.observable's subscribers/subscriptions to make
// them subscribe to the new Observable instead (created down below)
// Terminate previous connection and clean related resouces
tracker.observer.complete();
tracker.eventSource.close();
}
tracker.eventSource = new EventSource(<any>fullUri, {withCredentials: true});
tracker.observable = rx.Observable.create((observer: rx.Observer<T>) => {
// Executed once
tracker.eventSource.onmessage = e => observer.next(JSON.parse(e.data));
tracker.eventSource.onerror = e => observer.error(e);
// Keep track of the observer
tracker.observer = observer;
})
// Transform Observable into a ConnectableObservable for multicast
.publish()
;
// Start emitting right away and also keep a reference to
// proxy subscription for later disposal
tracker.subscription = tracker.observable.connect();
return tracker;
}
Спасибо.
switchMap
, который возвращает новый Observable. Подписчики останутся такими, какие они есть, но будут получать значения от этого нового Observable. - person martin   schedule 25.07.2019