Как запустить сигнал RX со скользящим окном в зависимости от определенных условий

У меня есть горячий наблюдаемый поток данных датчика. Мне нужен наблюдаемый сигнал, который срабатывает только тогда, когда значение датчика ниже 15 в течение заданного периода времени. Если в любое время значение превысит 15, оно должно сбросить скользящее окно. Я заставил его частично работать с приведенным ниже кодом, однако он не срабатывает, если значение все время остается ниже 15.

var notification = _sensor.Where(v => v >= 15)
                          .Throttle(new TimeSpan(0, 1, 0))
                          .SelectMany(_ => Observable.Return(Unit.Default));

Какие-либо предложения?


person batkuip    schedule 09.09.2015    source источник
comment
Могу ли я уточнить, что вы хотите, чтобы это производило значения только тогда, когда значение ниже 15, но только по истечении установленного периода времени с момента последнего значения, равного 15 или больше?   -  person Enigmativity    schedule 09.09.2015
comment
Правильный. И в тот момент, когда он снова превысит 15, он должен перестать производить значения.   -  person batkuip    schedule 09.09.2015
comment
Почему вы делаете .SelectMany(_ => Observable.Return(Unit.Default)), а не только .Select(_ => Unit.Default)?   -  person Enigmativity    schedule 09.09.2015
comment
Не могли бы вы объяснить, что вы пытаетесь сделать снова? Основываясь на моем вопросе выше, мне кажется, что выбранный вами ответ вообще этого не делает. Я запутался.   -  person Enigmativity    schedule 09.09.2015
comment
Я считаю, что ему требуется одно уведомление каждый раз, когда датчик предоставляет значения только ниже 15 для указанного TimeSpan (после самого последнего значения, равного или превышающего 15, следовательно, дроссельной заслонки)   -  person supertopi    schedule 09.09.2015
comment
@supertopi - я думал, что он хотел, чтобы все значения вызывали уведомление, если они не были равны или больше 15 или находились в пределах установленного периода после последнего значения, равного или превышающего 15.   -  person Enigmativity    schedule 10.09.2015


Ответы (2)


Если _sensor никогда не выдает значение, равное или превышающее 15, Throttle никогда не вызывается.

Простое решение — добавить уведомление о пробуждении в _sensor или notification.

var wakeup = Observable.Return(15);

var notification = _sensor.Merge(wakeup)
                          .Where(v => v >= 15)
                          .Throttle(new TimeSpan(0, 1, 0))
                          .SelectMany(_ => Observable.Return(Unit.Default));
person supertopi    schedule 09.09.2015
comment
Обманчиво простой и работает как шарм. Единственное изменение, которое я сделал, это заменить слияние на wakeup.Merge(_sensor), чтобы пробуждение всегда было первым. - person batkuip; 09.09.2015

Если я правильно понимаю вашу потребность, то это работает:

var notification = _sensor.Publish(ps => ps
    .Select(x => x >= 15.0)
    .DistinctUntilChanged()
    .Select(p => p
        ? Observable.Empty<double>()
        : Observable
            .Timer(TimeSpan.FromMinutes(1.0))
            .Select(x => -1.0)
            .IgnoreElements()
            .Concat(ps))
    .Switch());

Я предположил, что _sensor является IObservable<double>.

Таким образом, этот наблюдаемый объект будет публиковать все значения из потока _sensor, если значения остаются ниже 15,0 в течение как минимум одной минуты. Я протестировал эту функцию.

Мой тестовый код:

var random = new Random();
var _sensor = Observable.Generate(
    0,
    x => true,
    x => x,
    x => random.NextDouble() * 16.0,
    x => TimeSpan.FromSeconds(random.NextDouble()));

var published_sensor = _sensor.Publish();

var notification = published_sensor.Publish(ps => ps
    .Select(x => x >= 15.0)
    .DistinctUntilChanged()
    .Select(p => p
        ? Observable.Empty<double>()
        : Observable
            .Timer(TimeSpan.FromSeconds(5.0))
            .Select(x => -1.0)
            .IgnoreElements()
            .Concat(ps))
    .Switch());

published_sensor.Merge(notification).Timestamp().Dump();

published_sensor.Connect();

Результаты, которые я получил:

результаты

Обратите внимание, что перед публикацией повторяющихся значений проходит 5 секунд, и дублирование прекращается, когда источник создает значение больше 15.

person Enigmativity    schedule 09.09.2015
comment
Я не совсем уверен, что здесь происходит. Создает ли он таймер для каждого значения датчика? И какова цель дубликатов? - person batkuip; 09.09.2015
comment
Таймер предназначен просто для создания серии случайных значений через случайные промежутки времени - он предназначен для имитации значений датчика. Дубликаты возникают из-за того, что я объединил исходную серию с уведомлениями - published_sensor.Merge(notification) - это показывает, когда поток notification создает значения с исходным потоком _sensor. Когда есть дубликат, вы знаете, что исходное значение теперь выводится в уведомлениях. - person Enigmativity; 09.09.2015