Я реализую шину, используя Rx (реактивный фреймворк), и пока все выглядит хорошо. Проблема, с которой я столкнулся прямо сейчас, заключается в том, как добавлять события перед началом потока.
Чтобы дать больше контекста, это для источника события (а-ля CQRS / ES). Когда автобус запускается, ряд подписчиков получают IObservable. На этом этапе шина спрашивает абонентов, с какого номера / последовательности события им нужно начать. События загружаются с диска, начиная с наименьшего номера, и добавляются в поток, причем каждый подписчик использует оператор where, чтобы начать с нужного события. По мере запуска приложения новые события добавляются и отправляются подписчикам. Эта часть отлично работает.
Некоторые подписчики подключаются поздно, но все же им нужны все события. Похоже, ReplaySubject отвечает всем требованиям, если количество событий достаточно мало. Я думаю, что, вероятно, смогу выполнить кэширование на диске / в автономном режиме, чтобы их было больше (любые указатели приветствуются!).
Более серьезная проблема заключается в том, что когда некоторые подписчики получают IObservable, им нужно получать события, которые произошли до тех, которые были загружены изначально. Ситуация примерно такая.
Когда автобус тронулся, он загрузил событие № 50 (самое раннее, которое кто-либо хотел). Теперь новый подписчик запрашивает IObservable, за исключением того, что он нужен с №20 и далее. Итак, я хочу загрузить 20–49 и добавить их перед началом потока. Ни один из существующих подписчиков не должен видеть какое-либо событие (оно будет отфильтровано по параметру «Где»).
Кажется, это должно быть возможно, но я не могу понять это. Возможно ли это в Rx? Если да, то как?
Спасибо! Эрик