могу ли я отменить все сообщения, а затем добавить один, используя TPL Dataflow?

С библиотекой TPL Dataflow я хотел бы сделать что-то вроде этого:

myActionBlock.Post(newValue, cancelAllPreviousPosts: true);

Похоже, что токен отмены в ActionBlock отменяет все; Если бы я установил его, мне пришлось бы создать новый ActionBlock. Можно ли сделать частичную отмену с помощью ActionBlock?

Публикации, которые еще не были обработаны, не следует предпринимать. Было бы неплохо, если бы в выполняющейся в данный момент публикации был доступный токен отмены.


person Brannon    schedule 19.02.2014    source источник
comment
Я опубликовал это некоторое время назад, но с тех пор создал свою собственную библиотеку; В моей библиотеке Kts.ActorsLite есть последняя очередь действий: github.com/BrannonKing/Kts.ActorsLite < / а>   -  person Brannon    schedule 02.01.2018


Ответы (3)


Взгляните на BroadcastBlock<T>, который содержит только самый последний опубликованный элемент. Вы можете поместить широковещательный блок перед ActionBlock<T>.

Хотя отправка нового элемента в широковещательный блок не отменит элемент, который в данный момент обрабатывается блоком действий, он перезапишет любой существующий элемент, уже удерживаемый широковещательным блоком; фактически отбрасывает все старые сообщения, еще не обработанные блоком действий. Когда блок действий завершает свой текущий элемент, он берет самый последний элемент, опубликованный в блоке широковещания.

person Monroe Thomas    schedule 24.02.2014
comment
Это тот ответ, который я искал. Спасибо. - person Brannon; 28.02.2014

В дополнение к ответу Монро Томас важно понимать, что ActionBlock, следующий за BroadcastBlock, нуждается в BoundedCapacity, ограниченном 1, иначе он будет хранить и обрабатывать каждое сообщение широковещательного блока, даже когда он все еще выполняется.
Вот пример кода:

ActionBlock<int> ExecuteBlock = new ActionBlock<int>(async ThisNumber =>
{
  await Task.Delay(100);
  Console.WriteLine($">{ThisNumber}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

BroadcastBlock<int> ThrottleBlock = new BroadcastBlock<int>(null);
ThrottleBlock.LinkTo(ExecuteBlock, new DataflowLinkOptions { PropagateCompletion = true });

for(int IX = 0; IX < 128; IX++)
{
  await ThrottleBlock.SendAsync(IX);
  await Task.Delay(10);
}

Это приводит к следующему:

>0
>6
>12
>20
>27
>34
>41
>48
>55
>62
>68
>75
>82
>88
>95
>101
>108
>115
>122
>127

Наслаждайтесь!
-Simon

person Simon Mattes    schedule 02.01.2018
comment
Хорошее разъяснение! :) - person Monroe Thomas; 01.05.2019

Непосредственно в TPL Dataflow ничего подобного нет, но я вижу несколько способов, как вы могли бы реализовать это самостоятельно:

  1. Если вам не нужно иметь возможность обрабатывать измененный блок как обычный блок потока данных (например, без поддержки LinkTo()), тогда простым способом было бы написать тип, который обертывает ActionBlock, но элементы которого также содержат флаг, который говорит, они должны быть обработаны. Когда вы указываете cancelAllPreviousPosts: true, все эти флаги сбрасываются, поэтому эти элементы будут пропущены.

    Код может выглядеть примерно так:

    class CancellableActionBlock<T>
    {
        private class Item
        {
            public T Data { get; private set; }
            public bool ShouldProcess { get; set; }
    
            public Item(T data)
            {
                Data = data;
                ShouldProcess = true;
            }
        }
    
        private readonly ActionBlock<Item> actionBlock;
        private readonly ConcurrentDictionary<Item, bool> itemSet;
    
        public CancellableActionBlock(Action<T> action)
        {
            itemSet = new ConcurrentDictionary<Item, bool>();
            actionBlock = new ActionBlock<Item>(item =>
            {
                bool ignored;
                itemSet.TryRemove(item, out ignored);
    
                if (item.ShouldProcess)
                {
                    action(item.Data);
                }
            });
        }
    
        public bool Post(T data, bool cancelAllPreviousPosts = false)
        {
            if (cancelAllPreviousPosts)
            {
                foreach (var item in itemSet.Keys)
                {
                    item.ShouldProcess = false;
                }
                itemSet.Clear();
            }
    
            var newItem = new Item(data);
            itemSet.TryAdd(newItem, true);
            return actionBlock.Post(newItem);
        }
    
        // probably other members that wrap actionBlock members,
        // like Complete() and Completion
    }
    
  2. Если вы хотите создать что-то более компонуемое и многократно используемое, вы можете создать специальный блок только для этой отмены. Вы можете реализовать это, используя три соединенных вместе BufferBlocks, где третий будет иметь емкость 1, а второй - неограниченную емкость. Таким образом, почти все элементы в очереди будут во втором блоке, поэтому вы можете выполнить отмену, просто заменив этот блок новым. Вся структура будет представлена ​​Encapsulate() привязкой к первому и третьему блоку.

    Проблема с этим подходом заключается в том, что отмена имеет задержку в 1 элемент (тот, который находится в третьем блоке). Также я не придумал для этого хороший интерфейс.

person svick    schedule 26.02.2014
comment
Я награждаю здесь +50 за усилия на примере. Спасибо. - person Brannon; 28.02.2014