.NET ReactiveExtensions: используйте Sample() с переменным промежутком времени

Учитывая высокочастотный наблюдаемый поток данных, я хочу испускать элемент только каждые XX секунд.

Обычно это делается в RX с помощью .Sample(TimeSpan.FromSeconds(XX))

Однако... Я хочу, чтобы интервал времени менялся в зависимости от некоторого свойства данных.

Допустим, мои данные:

class Position { ... public int Speed; }

Если скорость меньше 100, я хочу передавать данные каждые 5 секунд. Если скорость выше 100, это должно быть каждые 2 секунды.

Возможно ли это с готовым Sample() или мне нужно что-то построить самому?


person Carsten Gehling    schedule 14.06.2019    source источник
comment
Да, это возможно.   -  person Enigmativity    schedule 14.06.2019


Ответы (2)


Вот реализация низкого уровня, использующая метод расширения System.Reactive.Concurrency.Scheduler.SchedulePeriodic в качестве таймера.

public static IObservable<TSource> Sample<TSource>(this IObservable<TSource> source,
    Func<TSource, TimeSpan> intervalSelector, IScheduler scheduler = null)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (intervalSelector == null)
        throw new ArgumentNullException(nameof(intervalSelector));
    scheduler = scheduler ?? Scheduler.Default;

    return Observable.Create<TSource>(observer =>
    {
        TimeSpan currentInterval = Timeout.InfiniteTimeSpan;
        IDisposable timer = null;
        TSource latestItem = default;
        bool latestEmitted = true;
        object locker = new object();

        Action periodicAction = () =>
        {
            TSource itemToEmit;
            lock (locker)
            {
                if (latestEmitted) return;
                itemToEmit = latestItem;
                latestItem = default;
                latestEmitted = true;
            }
            observer.OnNext(itemToEmit);
        };

        return source.Subscribe(onNext: item =>
        {
            lock (locker)
            {
                latestItem = item;
                latestEmitted = false;
            }
            var newInterval = intervalSelector(item);
            if (newInterval != currentInterval)
            {
                timer?.Dispose();
                timer = scheduler.SchedulePeriodic(newInterval, periodicAction);
                currentInterval = newInterval;
            }
        }, onError: ex =>
        {
            timer?.Dispose();
            observer.OnError(ex);
        }, onCompleted: () =>
        {
            timer?.Dispose();
            observer.OnCompleted();
        });
    });
}

Пример использования:

observable.Sample(x => TimeSpan.FromSeconds(x.Speed < 100 ? 5.0 : 2.0));

Таймер перезапускается каждый раз, когда обратный вызов intervalSelector возвращает другой интервал. В крайнем случае, когда интервал изменяется с каждым новым элементом, этот пользовательский оператор будет вести себя больше как встроенный Throttle, чем встроенный Sample.

В отличие от Sample период Throttle представляет собой скользящее окно. Каждый раз, когда Throttle получает значение, окно сбрасывается. (цитирование)

person Theodor Zoulias    schedule 20.11.2019
comment
@CarstenGehling с удовольствием! - person Theodor Zoulias; 04.12.2019

Дайте мне знать, если это работает:

var query =
    source
        .Publish(ss =>
            ss
                .Select(s => s.Speed < 100 ? 5.0 : 2.0)
                .Distinct()
                .Select(x => ss.Sample(TimeSpan.FromSeconds(x))));
person Enigmativity    schedule 14.06.2019