Rx .NET - принудительно излучать буфер

Я хочу использовать функциональность буфера Rx:

var source = new Subject<Price>();
var buffer = source
    .Buffer(TimeSpan.FromSeconds(30), 5)
    .Where(p => p.Any());

это означает, что выброс (публикация подписчикам) происходит, когда буфер достигает размера 5 или 30 секунд прошло с момента последнего выброса.

Но мне нужно иметь возможность испускать по запросу - например, когда я получаю элемент последовательности с высоким приоритетом. Затем я хочу добавить его в наблюдаемый (source.OnNext()) и каким-то образом заставить его излучать (это означает возврат всех элементов в буфер и его очистку).

Я знаю, что могу добавить следующий код:

var flusher = new Subject<Price>();
var closing = flusher.Select(x => new List<Price> {x});
var query = buffer.Merge(closing).Subscribe(something);

и вызовите flusher.OnNext(highPriorityItem), и я его выпущу.

Но в этом случае у меня есть две независимые последовательности с двумя разными эмитами. Мне нужен один выброс, когда буфер заполнен или определенный элемент появляется последовательно.

Принудительно очистить Observable.Buffer типа count-type c# и Принудительно сбрасывать в Observable.Buffer c# не мне кажется подходит


person brolly87    schedule 11.10.2017    source источник
comment
[...] don't seem to be suitable for me Какая-то конкретная причина? Последний, похоже, делает именно то, о чем вы просите...   -  person decPL    schedule 11.10.2017
comment
Есть некоторые определения, связанные с модулем, и таймер вместо буфера. Я не знаю/понимаю, как они решают мою проблему   -  person brolly87    schedule 11.10.2017
comment
Вы пробовали их реализовать? Просто замените этот Unit своим классом Price. Если у вас есть конкретная проблема с ними, опишите ее, и, надеюсь, кто-то может помочь.   -  person decPL    schedule 11.10.2017
comment
При использовании Amb, как и в stackoverflow.com/questions/12944716/, я очищаю буфер цен, когда использую flusher.OnNext(somePrice). Но я получаю коллекцию без объекта somePrice и, кроме того, не делая этого, мой буфер никогда не очищается (и он должен, когда он достигает размера 5)   -  person brolly87    schedule 11.10.2017


Ответы (2)


Я думаю, что у decPL есть основная идея, но его решение нестабильно. В зависимости от планировщика наблюдаемого input вы можете получить непредсказуемые результаты, даже если он подписан в правильном порядке. Это потому, что существует несколько независимых подписок на input. Вам нужно протолкнуть все это через вызов .Publish(...), чтобы обеспечить только одну подписку.

Также нужен способ очистки при удалении подписки. Поэтому ему также необходимо выполнить вызов .Create(...).

Вот как:

var input = new Subject<Price>();

IObservable<IList<Price>> query =
    input
        .Publish(i =>
            Observable
                .Create<IList<Price>>(o =>
                {
                    var timeBuffer =
                        Observable
                            .Timer(TimeSpan.FromSeconds(10.0))
                            .Select(n => Unit.Default);
                    var flush =
                        i
                            .Where(p => p.IS_IMPORTANT)
                            .Select(n => Unit.Default);
                    var sizeBuffer =
                        i
                            .Buffer(5)
                            .Select(l => Unit.Default);
                    return
                        i
                            .Window(() => Observable.Merge(timeBuffer, sizeBuffer, flush))
                            .SelectMany(w => w.ToList())
                            .Subscribe(o);
                }));

query.Subscribe(w => DO_SOMETHING_WITH_PRICES(w));
person Enigmativity    schedule 11.10.2017
comment
Если IS_IMPORTANT является единственным случаем для специальной очистки, не могли бы вы также заменить flush Subject как Observable: i.Where(p => p.IS_IMPORTANT).Select(_ => Unit.Default ) ) ? - person supertopi; 12.10.2017
comment
@supertopi - Хороший звонок - я это исправил. - person Enigmativity; 12.10.2017

РЕДАКТИРОВАТЬ: @Enigmativity абсолютно прав, обратитесь к его ответу. Оставим это нетронутым, так как, надеюсь, здесь немного легче определить ход мысли.

Попробуйте следующее:

var input = new Subject<Price>(); //your input observable

var flush = new Subject<long>(); //used to manually flush the 'buffer' for important prices
var timeBuffer
   = Observable.Timer(TimeSpan.FromSeconds(10)); //controls the time-based part of 'buffer'
var sizeBuffer = input.Buffer(5).Select(l => 0L); //controls the size-based part of 'buffer'

var bufferedInput = input.Window(()=>Observable.Merge(timeBuffer, sizeBuffer, flush))
                         .SelectMany(w => w.ToList())
                         .Subscribe(w => DO_SOMETHING_WITH_PRICES(w));

//Flush on important price (NOTE - order of the two subscriptions matter)
input.Where(p => p.IS_IMPORTANT).Subscribe(p => flush.OnNext(0L));
person decPL    schedule 11.10.2017
comment
Это почти нормально, но когда я нажимаю важную цену через OnNext, все элементы в буфере выбрасываются, а эта важная цена — нет. Он будет выпущен в следующем раунде (когда буфер получит размер 5 или через 30 секунд). Я хотел бы, чтобы он был частью первого выброса. - person brolly87; 11.10.2017
comment
Поэтому я и написал, что порядок подписок имеет значение. Убедитесь, что вы подписаны на «буфер», прежде чем подписка перенаправит важные цены на flush. - person decPL; 11.10.2017
comment
Спасибо, сначала проигнорил. Работает сейчас! - person brolly87; 11.10.2017
comment
Это нестабильное решение - в зависимости от планировщика наблюдаемого input вы можете получить непредсказуемые результаты, даже если это будет сделано в правильном порядке. Это потому, что существует несколько независимых подписок на input. Вам нужно протолкнуть все это через вызов .Publish(), чтобы обеспечить только одну подписку. - person Enigmativity; 12.10.2017