Рекомбинация элементов из одного и того же реактивного потока

То, чего я хочу достичь, можно описать следующим образом:

  • У меня есть поток выборок, которые представляют собой значения измерений с отметками времени. Это сырой поток.
  • Я применяю фильтр к необработанному потоку, в результате чего получаю производный поток (это будет фильтр гистерезиса из этот вопрос, но для простоты я использую здесь оператор Where)
  • Чтобы устранить большие пробелы, возникающие из-за медленного изменения значений, я применяю оператор Sample к необработанному потоку.
  • Я объединяю оба потока в результирующий поток

Концепция выглядит так:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

var s = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().AutoConnect();

var s1 = s.Where(x => x % 5 == 0);
var s2 = s.Sample(TimeSpan.FromMilliseconds(1000));

new[] {s1, s2}.Merge()./*Distinct().*/Subscribe(Console.WriteLine, cts.Token);

await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);

Первоисточник горячий. Без Distinct я, очевидно, получаю повторяющиеся значения, с ним, похоже, получается то, что я ожидаю увидеть.

Есть ли лучший подход, учитывая тот факт, что первый производный поток не является периодическим?


person ZorgoZ    schedule 18.11.2019    source источник


Ответы (1)


Вы можете добавить индекс в исходный наблюдаемый, а затем применить DistinctUntilChanged в окончательном объединенном наблюдаемом.

var withIndex = s.Select((x, i) => (Item : x, Index : i));
var s1 = withIndex.Where(p => p.Item % 5 == 0);
var s2 = withIndex.Sample(TimeSpan.FromMilliseconds(1000));

new[] { s1, s2 }
    .Merge()
    .DistinctUntilChanged(p => p.Index) // discard duplicates
    .Select(p => p.Item) // discard the index
    .Subscribe(Console.WriteLine, cts.Token);

Я предполагаю, что оператор DistinctUntilChanged легче, чем Distinct, потому что он кэширует только последний элемент.

person Theodor Zoulias    schedule 18.11.2019
comment
Хм. Хороший улов! Поскольку принудительно применяемый интервал выборки может быть длительным (минуты), кэширование может привести к ужасным накладным расходам. Можно ли использовать для этой цели временную метку исходного потока? Я проверю, но мне видится простым. - person ZorgoZ; 19.11.2019
comment
Ах, да. Я забыл, что значения имеют временную метку! Временные метки выглядят достаточно для меня с точки зрения уникальности. - person Theodor Zoulias; 19.11.2019