Вот отполированная версия идеи Шломо о сохранении последних значений для каждого ключа с использованием оператора Scan
и ImmutableDictionary
в качестве состояния. Приведенный ниже пользовательский оператор (SampleByKey
) выбирает последовательность ключевых элементов с определенным интервалом. При каждом такте выборки выдается IDictionary<TKey, TSource>
, содержащее последние значения, которые были выданы на данный момент каждым ключом.
public static IObservable<IDictionary<TKey, TSource>> SampleByKey<TSource, TKey>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
TimeSpan interval,
IEqualityComparer<TKey> keyComparer = default)
{
return source
.Scan(ImmutableDictionary.Create<TKey, TSource>(keyComparer),
(dict, x) => dict.SetItem(keySelector(x), x))
.Publish(published => Observable
.Interval(interval)
.WithLatestFrom(published, (_, dict) => dict)
.TakeUntil(published.LastOrDefaultAsync()));
}
Пример использования:
IObservable<IDictionary<string, Price>> sampled = priceFeed
.SampleByKey(p => p.Key, TimeSpan.FromSeconds(1.0));
В случае, если source
выдало ноль элементов во время двух выборок, один и тот же словарь будет выдан последовательно.
Эта реализация очень похожа на ту, которую я опубликовал ранее в вопрос о том, как сэмплировать последовательность с динамически изменяемым временным интервалом.
Примечание. Я удалил свою предыдущую реализацию (Редакция 1) как слишком сложную и потенциально негерметичную. Подход Шломо легче понять и модифицировать по мере необходимости.
person
Theodor Zoulias
schedule
22.12.2020