PublishSubject вызывает дорогостоящую функцию для всех наблюдателей

У меня есть конкретный сценарий, в котором я реализовал PublishSubject для создания элементов на основе настраиваемого события. Для каждого элемента, который будет отправлен, мне также нужно сохранить это значение (дорогостоящая операция). Я пытаюсь создать функцию (например, map), которая будет вызываться один раз для всех наблюдателей, а затем элемент, который будет получен каждым наблюдателем с помощью метода onNext().

Тема:

static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()

Триггер (испускающие предметы):

commonSubject.onNext(new SomeResult())

Выставляем тему (будет использоваться контроллером):

public static Observable<SomeResult> observeResults() {
    return commonSubject.share();
}

Контроллер:

public Observable<SomeResult> observeResults() {
    return CustomConsumer.observeResults()
            .observeOn(Schedulers.single());
}   

Подписчики:

CustomControllerResult.observeResults().subscribe(result -> doSomething());
CustomControllerResult.observeResults().subscribe(result -> doSomethingElse());

Каждый Observer получает элементы, как ожидалось, но если я добавлю дорогостоящую операцию к контроллеру, она будет вызываться для каждого наблюдателя (чего я не хочу):

public Observable<SomeResult> observeResults() {
    return CustomConsumer.observeResults()
            .observeOn(Schedulers.single())
            .compose(persistResult())
            .compose(logResult())
            .share();
}

Есть идеи, как добиться желаемого результата?


person Ionut Negru    schedule 11.09.2019    source источник
comment
Вы пробовали переместить share() ПОСЛЕ дорогостоящей операции в цепочке?   -  person LordRaydenMK    schedule 11.09.2019
comment
Да, похоже, на это не влияет. (На самом деле я забыл добавить это в код выше - я отредактировал его сейчас)   -  person Ionut Negru    schedule 17.09.2019


Ответы (1)


Проблема в том, что каждый раз, когда вызывается observeResults(), он создает новый Observable с оператором share. Но созданный Observable не передается подписчикам.

Вы можете изменить свой код на:

Observable<SomeResult> observable = CustomControllerResult.observeResults()
observable.subscribe(result -> doSomething());
observable.subscribe(result -> doSomethingElse());

Или вы можете изменить метод observeResults, чтобы он возвращал общий Observable:

static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()

static final Observable<SomeResult> observable = commonSubject
    .observeOn(Schedulers.single())
    .compose(persistResult())
    .compose(logResult())
    .share();

public static Observable<SomeResult> observeResults() {
    return observable;
}
person Gustavo    schedule 19.09.2019
comment
Я тоже подозревал, что это происходит. Отличный совет для того, чтобы на самом деле делиться наблюдаемым. Спасибо. - person Ionut Negru; 25.09.2019