Как реализовать оператор ScanAsync с асинхронным аккумулятором в Rx.Net?

Оператор Scan в Rx.Net имеет подпись:

public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator);

Аккумулятор

Func<TAccumulate, TSource, TAccumulate> accumulator

Пытаясь реализовать модель конечного автомата с асинхронным переходом состояния, я обнаружил, что оператор ScanAsync со следующей подписью будет полезен.

public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator);

Аккумулятор имеет подпись

Func<TAccumulate, TSource, Task<TAccumulate>> accumulator

Идеальный код приложения должен быть примерно таким (аналогичный обычному оператору Scan, с разницей в использовании асинхронного аккумулятора).

IObservable<TEvent> events;
IObservable<State> states = events.ScanAsync(
    initialState, 
    async (previousState, evt) => {
        var newState = await transitionAsync(previousState, evt);
        return newState;
    });

Похоже, MS разрабатывает AsyncRx.NET, однако это еще не выпущено (нет расписания).


Связанные вещи:

При моделировании асинхронного конечного автомата с помощью BehaviourSubject для состояний и подписки на наблюдаемые события, например следующий код

IObservable<TEvent> events;
BehaviourSubject<State> states = new BehaviourSubject<State>(initialState);
events.Subscribe(async e => {
    var newState = await transition(states.Value, e);
    states.OnNext(newState);
})

Я предполагаю, что в некоторых случаях могут быть условия гонки.

Я пытался реализовать его с помощью

IObservable<TS> ScanAsync<TS, TE>(
IObservable<TE> source,
Func<TS, TE, Task<TS>> reducer,
TS initialState)
{
    var states = from m in source.Take(1)
                    from nextState in reducer(initialState, m).ToObservable()
                    from s in ScanAsync(source.Skip(1), reducer, nextState)
                    select s;
    return Observable.Return(initialState).Concat(states);
}

Однако иногда это работает, иногда просто блокируется, и я понятия не имею, в чем причина.


person xiang0x48    schedule 16.09.2020    source источник


Ответы (1)


Вы можете использовать оператор Scan для создания промежуточного IObservable<Task<TAccumulate>>, который затем можно сгладить с помощью оператора Concat:

public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(
    this IObservable<TSource> source,
    TAccumulate seed,
    Func<TAccumulate, TSource, Task<TAccumulate>> accumulator)
{
    return source.Scan(Task.FromResult(seed), async (previousTask, item) =>
    {
        return await accumulator(await previousTask, item);
    }).Concat();
}

В приведенной выше реализации используется перегрузка Concat, которая принимает наблюдаемое из задач вместо вложенного наблюдаемого:

// Concatenates all task results, as long as the previous task terminated successfully.
public static IObservable<TSource> Concat<TSource>(
    this IObservable<Task<TSource>> sources);
person Theodor Zoulias    schedule 17.09.2020