Оператор Scan
в Rx.Net имеет подпись:
public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator);
Аккумулятор
Func<TAccumulate, TSource, TAccumulate> accumulator
Пытаясь реализовать модель конечного автомата с асинхронным переходом состояния, я обнаружил, что оператор ScanAsync
со следующей подписью будет полезен.
public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator);
Аккумулятор имеет подпись
Func<TAccumulate, TSource, Task<TAccumulate>> accumulator
Идеальный код приложения должен быть примерно таким (аналогичный обычному оператору Scan
, с разницей в использовании асинхронного аккумулятора).
IObservable<TEvent> events;
IObservable<State> states = events.ScanAsync(
initialState,
async (previousState, evt) => {
var newState = await transitionAsync(previousState, evt);
return newState;
});
Похоже, MS разрабатывает AsyncRx.NET, однако это еще не выпущено (нет расписания).
Связанные вещи:
При моделировании асинхронного конечного автомата с помощью BehaviourSubject
для состояний и подписки на наблюдаемые события, например следующий код
IObservable<TEvent> events;
BehaviourSubject<State> states = new BehaviourSubject<State>(initialState);
events.Subscribe(async e => {
var newState = await transition(states.Value, e);
states.OnNext(newState);
})
Я предполагаю, что в некоторых случаях могут быть условия гонки.
Я пытался реализовать его с помощью
IObservable<TS> ScanAsync<TS, TE>(
IObservable<TE> source,
Func<TS, TE, Task<TS>> reducer,
TS initialState)
{
var states = from m in source.Take(1)
from nextState in reducer(initialState, m).ToObservable()
from s in ScanAsync(source.Skip(1), reducer, nextState)
select s;
return Observable.Return(initialState).Concat(states);
}
Однако иногда это работает, иногда просто блокируется, и я понятия не имею, в чем причина.