Я хочу использовать функциональность буфера Rx:
var source = new Subject<Price>();
var buffer = source
.Buffer(TimeSpan.FromSeconds(30), 5)
.Where(p => p.Any());
это означает, что выброс (публикация подписчикам) происходит, когда буфер достигает размера 5 или 30 секунд прошло с момента последнего выброса.
Но мне нужно иметь возможность испускать по запросу - например, когда я получаю элемент последовательности с высоким приоритетом. Затем я хочу добавить его в наблюдаемый (source.OnNext()
) и каким-то образом заставить его излучать (это означает возврат всех элементов в буфер и его очистку).
Я знаю, что могу добавить следующий код:
var flusher = new Subject<Price>();
var closing = flusher.Select(x => new List<Price> {x});
var query = buffer.Merge(closing).Subscribe(something);
и вызовите flusher.OnNext(highPriorityItem), и я его выпущу.
Но в этом случае у меня есть две независимые последовательности с двумя разными эмитами. Мне нужен один выброс, когда буфер заполнен или определенный элемент появляется последовательно.
Принудительно очистить Observable.Buffer типа count-type c# и Принудительно сбрасывать в Observable.Buffer c# не мне кажется подходит
[...] don't seem to be suitable for me
Какая-то конкретная причина? Последний, похоже, делает именно то, о чем вы просите... - person decPL   schedule 11.10.2017Unit
своим классомPrice
. Если у вас есть конкретная проблема с ними, опишите ее, и, надеюсь, кто-то может помочь. - person decPL   schedule 11.10.2017