Отфильтровать наблюдаемое, используя значения из другого наблюдаемого

У меня есть две наблюдаемые:

  1. Наблюдаемый, представляющий список входов флажков.
  2. Наблюдаемый, представляющий поток событий, поступающих с сервера.

Я хотел бы отфильтровать вторую наблюдаемую, используя значения из первой.

Значения, полученные от сервера, включают свойство tag, которое соответствует значениям в списке флажков. Наблюдаемое, полученное в результате комбинации двух вышеупомянутых, будет давать значения только с сервера, свойство tag которого включено в набор отмеченных флажков.


person Ionuț G. Stan    schedule 19.07.2013    source источник


Ответы (4)


Вы можете использовать withLatestFrom. введите описание изображения здесь.

source.withLatestFrom(checkboxes, (data, checkbox) => ({data, checkbox}))
  .filter(({data, checkbox}) => ...)

Здесь checkboxes - это наблюдаемое, представляющее список входов флажков. source - это наблюдаемое, представляющее поток событий, поступающих с сервера. В функции фильтра вы можете проверить, действительны ли данные по сравнению с настройками флажка, и пропустить их.

Обратите внимание: важно, чтобы checkboxes испускал по крайней мере 1 значение, прежде чем поток сможет что-либо испустить.

Пс. Что касается других ответов, это решение работает, даже если источник холодный.

person Dorus    schedule 24.08.2016
comment
@ IonuțG.Stan В самом деле. И это удивительно, как много людей до сих пор обращаются к этим старым вопросам, так что, надеюсь, некоторым это поможет :) - person Dorus; 25.08.2016
comment
Обратите внимание, что это аннотировано с помощью @Experimental и может значительно измениться в любое время, поэтому его не следует использовать или полагаться в производственном коде. - person acrespo; 26.04.2017
comment
@acrespo Это используется в производственном коде во многих местах. Где именно вы нашли этот тег @ Experimental? - person Dorus; 26.04.2017
comment
Ой, извините, я подошел к вопросу в поисках некоторой информации о RxJava (тот же вопрос, но для java), и я не заметил тега rxjs. Поэтому я добавил этот комментарий, потому что оператор withLatestFrom аннотирован @Experimental в реализация java (при условии, что RxJava версии 1.X). Виноват! - person acrespo; 26.04.2017
comment
@acrespo Насколько я понимаю, _ 1_ не имеет экспериментального тега в RxJava 2. - person Dorus; 26.04.2017
comment
Да, я знаю, Я тоже не вижу этого в исходном коде. Сразу хотел отметить, что для RxJava 1, который до сих пор активно поддерживается и широко используется. - person acrespo; 26.04.2017
comment
Пытался проголосовать за это, но похоже, что 9 месяцев назад я проголосовал против. Возможно ошибка - извините! - person Simon_Weaver; 18.01.2018
comment
@Simon_Weaver Ага! Я уже 9 месяцев думал об этом отрицательном голосовании :-) - person Dorus; 18.01.2018

Чтобы фильтровать поток A с использованием значений потока B, вам необходимо наблюдать за потоком B и использовать самые последние значения для фильтрации потока A.

Используйте switch() для преобразования B наблюдаемый к наблюдаемому, производящий значения от наблюдаемого.

checkedInputValuesSource
    .map(function (options) {
        return dataSource
            .filter(function (value) {
                return options.indexOf(value) !== -1;
            });
    })
    .switch()
    .subscribe(function (x) {
        console.log('out: ' + x);
    });

Использование switch() предполагает, что dataSource является горячая наблюдаемая.

Пример использования interval() для создания фиктивных данных:

var input,
    checkedInputValuesSource,
    dataSource;

input = document.querySelectorAll('input');

// Generate source describing the current filter.
checkedInputValuesSource = Rx.Observable
    .fromEvent(input, 'change')
    .map(function () {
        var inputs = document.querySelectorAll('input'),
            checkedInputValues = [];
        
        [].forEach.call(inputs, function (e) {
            if (e.checked) {
                checkedInputValues.push(e.value);
            }
        });
        
        return checkedInputValues;
    })
    .startWith([]);

// Generate random data source (hot).
dataSource = Rx.Observable
    .interval(500)
    .map(function () {
        var options = ['a', 'b', 'c'];
    
        return options[Math.floor(Math.floor(Math.random() * options.length))];
    })
    .do(function (x) {
        console.log('in: ' + x);
    })
    .share();

checkedInputValuesSource
    .map(function (options) {
        return dataSource
            .filter(function (value) {
                return options.indexOf(value) !== -1;
            });
    })
    .switch()
    .subscribe(function (x) {
        console.log('out: ' + x);
    });
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>

<input type='checkbox' value='a'>
<input type='checkbox' value='b'>
<input type='checkbox' value='c'>

В этом примере будет получен результат, подобный следующему:

in: c
in: a
out: a
in: b
in: c
out: a
in: b
in: a

Где in отражает весь сгенерированный ввод, а b данные, прошедшие фильтр. Фильтр настраивается путем установки флажков напротив входов, отражающих значения «a», «b» и «c».

person Gajus    schedule 03.08.2015
comment
Разве это не то, что я написал в своем решении? - person Ionuț G. Stan; 03.08.2015
comment
Возможно. Я просматриваю каждый вопрос / ответ rxjs и делаю свой вклад, когда могу. Ваш ответ ссылается на внешний ресурс и требует для запуска дополнительных зависимостей. Я написал решение в изолированной среде. - person Gajus; 03.08.2015
comment
Обратите внимание, что вы можете сделать это, даже если источник холодный. С помощью publish(...). Как это: dataSource .publish(dsrc => checkedInputValuesSource .switchMap((options) => dsrc .filter((value) => options.indexOf(value) !== -1) ) ) - person Dorus; 25.08.2016

Видимо, мне нужна была комбинация select, filter и switchLatest. Я написал небольшой тестовый пример, демонстрирующий это: https://gist.github.com/igstan/d5b8db7b43f49dd87382#file-observable-filter-observable-js-L36-L45

person Ionuț G. Stan    schedule 19.07.2013
comment
Чтобы было ясно, ваша суть использует map и filter, которые, как я полагаю, являются всего лишь псевдонимами, которые вы создали для select и where? - person Brandon; 19.07.2013
comment
@ Брэндон, да, это правда. - person Ionuț G. Stan; 19.07.2013
comment
Привет, я бы хотел поиграть с вашим кодом на github, какие еще библиотеки js мне нужны? - person Guenni; 13.10.2013
comment
@ GüntherSchmidt Я собрал все, что вам нужно для запуска тестов, в отдельном репозитории: github.com / igstan / rx-testing-demo. Развлекайся! - person Ionuț G. Stan; 14.10.2013
comment
Большое спасибо! Тем временем я учился использовать некоторые комбинаторы, и метод combLatest спас положение в особенно серьезной задаче. - person Guenni; 15.10.2013

Расширяя ответ от @Dorus ... В Котлине это можно сделать так:

val observable: Observable<Data> = ...
val filter: Observable<Checkbox> = ...
val filtered: Observable<Data> =
        observable.filterWithLatestFrom(filter) { checkbox -> checkbox.isSelected }

Используя функцию расширения:

/**
 * Return an [Observable] with type [T1] that is filtered using the last event emitted by the [other] observable.
 */
fun <T1 : Any, T2 : Any> Observable<T1>.filterWithLatestFrom(other: Observable<T2>, filterFunction: (T2) -> Boolean)
: Observable<T1> {
    return this.withLatestFrom(other) { obs1, obs2 -> Pair(obs1, obs2) }
        .filter { (_, obs2) -> filterFunction.invoke(obs2) }
        .map { (obs1, _) -> obs1}
}
person Jeff McKnight    schedule 05.02.2021