Как использовать TaskCompletionSource

У меня ситуация с несколькими производителями и одним потребителем. Я выбрал общий thread-safe ресурс, в котором все производители Enqueue элементов. Однако я не знаю, как эффективно сделать производителя await для новых элементов при Dequeue-инге из этого ресурса.

ПОКО

struct Sample
    {
        public int Id { get; set; }
        public  double Value { get; set; }
    }

Производители

class ProducerGroup
    {
        StorageQueue sink;
        int producerGroupSize;

        public ProducerGroup(StorageQueue _sink,int producers)
        {
            this.sink = _sink;
            this.producerGroupSize = producers;
        }
        public void StartProducers()
        {
            Task[] producers = new Task[producerGroupSize];
            int currentProducer;
            for (int i = 0; i < producerGroupSize; i++)
            {
                currentProducer = i;
                producers[i] = Task.Run(async () =>
                  {
                      int cycle = 0;
                      while (true)
                      {
                          if (cycle > 5)
                          {
                              cycle = 0;
                          }
                          Sample localSample = new Sample { Id = currentProducer, Value = cycle++ };
                          await Task.Delay(1000);
                          this.sink.Enqueue(localSample);
                      }
                  });
            }

        }
    }

Хранилище

class StorageQueue
    {
        private TaskCompletionSource<Sample> tcs;
        private object @lock = new object();

        private Queue<Sample> queue;

        public static StorageQueue CreateQueue(int?size=null)
        {
            return new StorageQueue(size);
        }

        public StorageQueue(int ?size)
        {
            if (size.HasValue)
            {
                this.queue = new Queue<Sample>(size.Value);
            }
            else
                this.queue = new Queue<Sample>();
        }

        public void Enqueue(Sample value)
        {
            lock (this.@lock)
            {
                this.queue.Enqueue(value);
                tcs = new TaskCompletionSource<Sample>();
                tcs.SetResult(this.queue.Peek());
            }
        }
        public async Task<Sample> DequeueAsync()
        {
            var result=await this.tcs.Task;
            this.queue.Dequeue();
            return result;
        }
    }

Потребитель

class Consumer
    {
        private StorageQueue source;
        public Consumer(StorageQueue _source)
        {
            this.source = _source;
        }
        public async Task ConsumeAsync()
        {
            while (true)
            {
                Sample arrivedSample = await this.source.DequeueAsync();  //how should i implement this ?
                Console.WriteLine("Just Arrived");
            }
        }
    }

Как вы можете видеть в классе Consumer, я хотел бы обернуть свой Storage's methodDequeuein aTaskso that i canawaitit in myConsumer. The only reason i usedTaskCompletionSourceis to be able to communicate between theDequeueandEnqueuemethods in theStorage`.

Я не знаю, нужно ли мне повторно инициализировать tcs, но полагаю, что нужно, так как я хочу новый Task после каждой операции Enqueue.

Я также повторно инициализировал tcs внутри lock, так как я хочу, чтобы этот конкретный экземпляр устанавливал результат.

Как мне поступить с этим? Реализация в порядке? Может ли System.Reactive предложить лучший вариант?


person Bercovici Adrian    schedule 29.12.2018    source источник
comment
Существует множество вариантов шаблона производитель/потребитель, попробуйте Блокировка сбора, TPL-поток данных, RX или Каналы   -  person JSteward    schedule 31.12.2018


Ответы (1)


Я думаю, что вижу несколько проблем с вашей реализацией:

  1. Если вы вызовете Enqueue() несколько раз, не вызывая DequeueAsync(), вы потеряете TaskCompletionSource и получите только последний. Тогда вызов DequeueAsync() не даст правильных результатов после первого вызова.

Чтобы исправить это, вам понадобится очередь из TaskCompletionSource для этого. Посмотрите здесь и далее здесь.

Лучше использовать SemaphoreSlim .WaitAsync() в DequeueAsync для правильного ожидания, если очередь пуста.

  1. Вы также не защищаете свою очередь в DequeueAsync().
person Nick    schedule 30.12.2018