Как вызвать TriggerBatch автоматически после тайм-аута, если количество элементов в очереди меньше, чем BatchSize?

Использование Dataflow CTP (в TPL)

Есть ли способ вызвать BatchBlock.TriggerBatch автоматически, если количество текущих или отложенных элементов меньше, чем BatchSize, после тайм-аута?

И лучше: этот таймаут должен сбрасываться на 0 каждый раз, когда блок получает новый элемент.


person Softlion    schedule 23.02.2012    source источник


Ответы (4)


Да, вы можете сделать это довольно элегантно, соединив блоки в цепочку. В этом случае вы хотите настроить TransformBlock, который вы связываете «до» BatchBlock. Это будет выглядеть примерно так:

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());

TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

timeoutTransformBlock.LinkTo(yourBatchBlock);

yourBufferBlock.LinkTo(timeoutTransformBlock);
person Drew Marsh    schedule 24.02.2012

Вот защищенная версия превосходного решение. Здесь используется DataflowBlock.Encapsulate метод для создания блока потока данных, который инкапсулирует функции таймера и пакетной обработки. Помимо нового аргумента timeout, метод CreateBatchBlock также поддерживает все параметры, доступные для обычного конструктора BatchBlock.

public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
    int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
    dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
    var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
    var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
    var transformBlock = new TransformBlock<T, T>((T value) =>
    {
        timer.Change(timeout, Timeout.Infinite);
        return value;
    }, new ExecutionDataflowBlockOptions()
    {
        BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
        MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
        NameFormat = dataflowBlockOptions.NameFormat,
        TaskScheduler = dataflowBlockOptions.TaskScheduler
    });
    transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
    {
        PropagateCompletion = true
    });
    return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}
person Theodor Zoulias    schedule 04.12.2019

Спасибо Дрю Маршу за идею использования TransformBlock, который очень помог мне с недавним решением. Тем не менее, я считаю, что таймер необходимо сбросить ПОСЛЕ пакетного блока (т.е. после того, как он либо был запущен из-за достижения размера пакета ИЛИ явного вызова метода TriggerBatch в обратном вызове таймера). Если вы сбрасываете таймер каждый раз, когда получаете один элемент, он потенциально может продолжать сбрасывать несколько раз, фактически не запуская пакет (постоянно отодвигая «dueTime» на таймере дальше).

В этом случае фрагмент кода будет выглядеть следующим образом:

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch(), null, 5000, Timeout.Infinite);

TransformBlock<T[], T[]> timeoutTransformBlock = new TransformBlock<T[], T[]>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

yourBufferBlock.LinkTo(yourBatchBlock);
yourBatchBlock.LinkTo(timeoutTransformBlock)
timeoutTransformBlock.LinkTo(yourActionBlock);

// Start the producer which is populating the BufferBlock etc.
person Andrew Horth    schedule 06.12.2019
comment
Оба подхода к выбору времени имеют смысл для разных сценариев. - person Theodor Zoulias; 06.12.2019

Вы можете использовать параметры ссылки

_transformManyBlock.LinkTo(_batchBlock, new DataflowLinkOptions {PropagateCompletion = true});
person Andrey Akt    schedule 04.12.2019