Добавление событий в начало наблюдаемого Rx с помощью ReplaySubject - возможно?

Я реализую шину, используя Rx (реактивный фреймворк), и пока все выглядит хорошо. Проблема, с которой я столкнулся прямо сейчас, заключается в том, как добавлять события перед началом потока.

Чтобы дать больше контекста, это для источника события (а-ля CQRS / ES). Когда автобус запускается, ряд подписчиков получают IObservable. На этом этапе шина спрашивает абонентов, с какого номера / последовательности события им нужно начать. События загружаются с диска, начиная с наименьшего номера, и добавляются в поток, причем каждый подписчик использует оператор where, чтобы начать с нужного события. По мере запуска приложения новые события добавляются и отправляются подписчикам. Эта часть отлично работает.

Некоторые подписчики подключаются поздно, но все же им нужны все события. Похоже, ReplaySubject отвечает всем требованиям, если количество событий достаточно мало. Я думаю, что, вероятно, смогу выполнить кэширование на диске / в автономном режиме, чтобы их было больше (любые указатели приветствуются!).

Более серьезная проблема заключается в том, что когда некоторые подписчики получают IObservable, им нужно получать события, которые произошли до тех, которые были загружены изначально. Ситуация примерно такая.

Когда автобус тронулся, он загрузил событие № 50 (самое раннее, которое кто-либо хотел). Теперь новый подписчик запрашивает IObservable, за исключением того, что он нужен с №20 и далее. Итак, я хочу загрузить 20–49 и добавить их перед началом потока. Ни один из существующих подписчиков не должен видеть какое-либо событие (оно будет отфильтровано по параметру «Где»).

Кажется, это должно быть возможно, но я не могу понять это. Возможно ли это в Rx? Если да, то как?

Спасибо! Эрик


person Erick T    schedule 12.05.2012    source источник


Ответы (2)


Я бы посмотрел на это очень простым Rx-способом. Не используйте ReplaySubject, используйте один Subject для всех «живых» событий и объединяйте исторические события перед ним для каждого подписчика.

Я предполагаю, что у вас может быть какая-то структура данных, которая отслеживает исторические записи. Если вы уже загрузили / зафиксировали события из # 50, чтобы сказать # 80 в памяти, тогда, когда появится новый подписчик, который запрашивает события из # 20, я сделаю что-то вроде этого:

var fromIndex = 20;
var eventSubject = new Subject<Event>();
capturedEvents
    .GetFromIndex(fromIndex) // Gets a enumerable list of events
    .ToObservable()
    .Concat(eventSubject)
    .Subscribe( ... );

Затем вы можете использовать eventSubject для захвата новых событий, например:

eventSubject.Subscribe(capturedEvents.CaptureEvent);

Это соответствует вашим потребностям?

person Enigmativity    schedule 12.05.2012
comment
Думаю, есть - надо с утра проверить. Я должен сказать, что работа с Rx требует небольшой психологической адаптации! - person Erick T; 12.05.2012
comment
@ErickT - Я согласен, это требует некоторой психологической адаптации, но если вы похожи на меня, вы обнаружите, что не хотите возвращаться к прежнему образу жизни. Я просто ненавижу, например, сейчас использовать обычные обработчики событий. Так много кода делается коротким, лаконичным и безопасным с использованием Rx. - person Enigmativity; 12.05.2012
comment
У меня есть то же самое, что и у вас выше, но я не вижу приближения старых событий. Мне нужны подписки? - person Erick T; 19.05.2012
comment
@ErickT - все должно работать нормально - вы не получите значений, если у вас нет подписок. Возможно, отправьте мне код Джеймсу по электронной почте на адрес enigmativity dot com? - person Enigmativity; 19.05.2012

Как насчет только:

itemsFromTwentyToFourtyNine.Concat(currentItems);
person Ana Betts    schedule 12.05.2012