Можно ли попробовать последний элемент в горячем наблюдаемом, сгруппированном по ключу?

Можно ли в Rx.NET отобрать последний элемент в горячем наблюдаемом объекте, сгруппированном по ключу?

Например, если у меня есть IObservable<Price>, где Price:

Price 
- Key
- Bid
- Offer

Предположим, что IObservable связан с внешним потоком цен.

Могу ли я получить все последние Prices, сгруппированные по Key, сэмплированные каждую секунду, используя Rx?


person ahallan    schedule 22.08.2017    source источник
comment
Отчасти связано: регулирование группы Rx   -  person Theodor Zoulias    schedule 21.12.2020


Ответы (4)


Предполагая некоторое наблюдаемое source, это возвращает все цены, сгруппированные и выбранные по ключу, которые пришли за последнюю секунду.

var sampled = source
    .GroupBy(p => p.Key)
    .SelectMany(o => o.Sample(TimeSpan.FromSeconds(1)));

Если есть какая-то цена, которая не получила сообщения в последнюю секунду, она не будет включена.

Если вы хотите включить старые цены, это будет работать:

var sampled2 = source
    .Scan(ImmutableDictionary<int, Price>.Empty, (state, p) => state.SetItem(p.Key, p))
    .Replay(1)
    .RefCount();
var dummySubscription = sampled2.Subscribe();
var result = Observable.Interval(TimeSpan.FromSeconds(1))
    .SelectMany(_ => sampled2.Take(1).SelectMany(state => state.Values));

Просто не забудьте избавиться от DummySubscription, когда закончите с наблюдаемым result.

person Shlomo    schedule 22.08.2017
comment
Спасибо. Но как я могу также включить цены, которые были получены более секунды назад? - person ahallan; 23.08.2017
comment
Обновленный ответ. - person Shlomo; 23.08.2017
comment
Очень хорошо! Два возможных улучшения: AutoConnect(0) вместо RefCount+dummySubscription и добавление .TakeUntil(sampled2.LastOrDefaultAsync()) в конце, чтобы результирующая последовательность завершалась, когда завершается source. - person Theodor Zoulias; 22.12.2020
comment
AutoConnect не существовало, когда это было написано. :) - person Shlomo; 23.12.2020
comment
Возможно, этот ответ послужил мотивацией для введения этого оператора. :-) - person Theodor Zoulias; 26.12.2020

Это делает то, что вы хотите?

IObservable<ImmutableDictionary<string, Price>> sampled =
    Observable
        .Create<ImmutableDictionary<string, Price>>(o =>
        {
            var output = ImmutableDictionary<string, Price>.Empty;
            return
                source
                    .Do(x => output = output.SetItem(x.Key, x))
                    .Select(x => Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(y => output).StartWith(output))
                    .Switch()
                    .Subscribe(o);
        });
person Enigmativity    schedule 23.08.2017

Горячий наблюдаемый не будет хранить в памяти старые значения, но вы можете сами поймать последнюю цену каждого известного ключа, например, в словаре.

Пожалуйста, обратитесь к следующему примеру кода.

Dictionary<string, double> _prices = new Dictionary<string, double>();

GetPrices()
    .Buffer(TimeSpan.FromSeconds(1))
    .Subscribe(prices =>
    {
        if (prices != null && prices.Count > 0)
        {
            var grouped = prices.GroupBy(x => x.Key);
            foreach (var group in grouped)
                _prices[group.Key] = group.Last().Bid;
        }

        //print out the last quote of each known price key
        foreach (var price in _prices)
        {
            Console.WriteLine("Key: " + price.Key + ", last price: " + price.Value);
        }
    });

Каждую секунду он должен печатать последнюю цитату каждого известного ключа.

person mm8    schedule 23.08.2017
comment
Я думаю, что бизнес-логика должна быть внутри Observable, а не в подписке, как показано - person supertopi; 23.08.2017
comment
Бизнес-логика...? Прочитайте вопрос и ответ еще раз. - person mm8; 23.08.2017
comment
Вопрос заключается в том, чтобы начать с IObservable<Price>, и, похоже, он хочет создавать значение каждую секунду. Это не так. - person Enigmativity; 23.08.2017
comment
Вы пропустили? Давайте предположим, что IObservable связан с внешним ценовым потоком? Вы, конечно, не контролируете, когда горячая лента цен генерирует ценовые тики... - person mm8; 23.08.2017
comment
@ mm8 весь смысл решения (или Rx в целом) заключается в управлении горячим Observable :) - person supertopi; 23.08.2017
comment
@supertopi: В этом случае я предполагаю, что пользователь хочет что-то сделать, что бы это ни было, с последними известными ценами каждую секунду. Сам IObservable просто выталкивает цены из фида. - person mm8; 23.08.2017

Вот отполированная версия идеи Шломо о сохранении последних значений для каждого ключа с использованием оператора Scan и ImmutableDictionary в качестве состояния. Приведенный ниже пользовательский оператор (SampleByKey) выбирает последовательность ключевых элементов с определенным интервалом. При каждом такте выборки выдается IDictionary<TKey, TSource>, содержащее последние значения, которые были выданы на данный момент каждым ключом.

public static IObservable<IDictionary<TKey, TSource>> SampleByKey<TSource, TKey>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector,
    TimeSpan interval,
    IEqualityComparer<TKey> keyComparer = default)
{
    return source
        .Scan(ImmutableDictionary.Create<TKey, TSource>(keyComparer),
            (dict, x) => dict.SetItem(keySelector(x), x))
        .Publish(published => Observable
            .Interval(interval)
            .WithLatestFrom(published, (_, dict) => dict)
            .TakeUntil(published.LastOrDefaultAsync()));
}

Пример использования:

IObservable<IDictionary<string, Price>> sampled = priceFeed
    .SampleByKey(p => p.Key, TimeSpan.FromSeconds(1.0));

В случае, если source выдало ноль элементов во время двух выборок, один и тот же словарь будет выдан последовательно.

Эта реализация очень похожа на ту, которую я опубликовал ранее в вопрос о том, как сэмплировать последовательность с динамически изменяемым временным интервалом.

Примечание. Я удалил свою предыдущую реализацию (Редакция 1) как слишком сложную и потенциально негерметичную. Подход Шломо легче понять и модифицировать по мере необходимости.

person Theodor Zoulias    schedule 22.12.2020