Очереди и дескрипторы ожидания в C#

У меня был следующий код в моем приложении в течение нескольких лет, и я никогда не видел проблем с ним.

while ((PendingOrders.Count > 0) || (WaitHandle.WaitAny(CommandEventArr) != 1))
{
    lock (PendingOrders)
    {
       if (PendingOrders.Count > 0)
       {
           fbo = PendingOrders.Dequeue();
       }
       else
       {
           fbo = null;
       }
    }

    // Do Some Work if fbo is != null
}

Где CommandEventArr состоит из NewOrderEvent (событие с автоматическим сбросом) и ExitEvent (событие с ручным сбросом).

Но я не уверен, является ли это потокобезопасным (при условии, что N потоков производителей, которые все блокируют очередь перед постановкой в ​​очередь, и один поток-потребитель, который запускает приведенный выше код). Кроме того, мы можем предположить, что свойство Queue.Count просто возвращает одно значение экземпляра Int32 из класса Queue (без volatile, блокировки, блокировки и т. д.).

Каков обычный шаблон, используемый с Queue и AutoResetEvent, чтобы исправить это и сделать то, что я пытаюсь сделать с кодом выше?

(Отредактировано, чтобы немного изменить вопрос после того, как было правильно указано, что Queue.Count может делать что угодно и зависит от его реализации).


person Michael Covelli    schedule 29.04.2010    source источник
comment
Конечно, этот цикл встроен в какой-то внешний цикл типа while (running) - разве не поэтому он отлично работает?   -  person Ian Mercer    schedule 30.04.2010
comment
Нет, другого цикла нет. Есть только один поток, который блокирует очередь и ставит заказы в очередь, а затем устанавливает событие NewOrderEvent.   -  person Michael Covelli    schedule 30.04.2010


Ответы (4)


Для меня это выглядит довольно потокобезопасным, WaitAny() просто завершится немедленно, потому что событие уже установлено. Это не проблема.

Не нарушайте синхронизацию потоков, которая работает. Но если вам нужна лучшая мышеловка, вы можете рассмотреть BlockingQueue Джо Даффи в этом журнальная статья. Более общая версия доступна в .NET 4.0, System.Collections.Concurrent .BlockingCollection с ConcurrentQueue в качестве практической реализации.

person Hans Passant    schedule 30.04.2010
comment
Но поскольку это AutoResetEvent, если другой поток устанавливает событие до того, как у меня будет возможность его дождаться, тогда у меня не возникнет проблемы. Я согласен, что если бы это было ManualResetEvent, то это была бы другая история. - person Michael Covelli; 30.04.2010
comment
Вааа... это в 4.0? Почему я не получил записку? Я узнаю что-то новое каждый день. - person Brian Gideon; 30.04.2010
comment
@Michael: ARE работает не так, вызов Wait сбрасывает его при выходе. Он не сбрасывается при входе, а затем ждет. - person Hans Passant; 30.04.2010
comment
Ах, я думаю, что вы правы, и я ошибаюсь относительно природы ARE. Вы говорите, что ARE не будет сброшен до тех пор, пока хотя бы один поток где-то не будет ждать его? - person Michael Covelli; 30.04.2010
comment
Вы уверены, что это потокобезопасно? Существует вероятность того, что один поток вызовет Queue<T>.Dequeue(), а другой вызовет Queue<T>.Count в одной и той же очереди в одно и то же время... что может быть потокобезопасным в зависимости от реализации этих методов, но я знаю, что параллельный чтение-запись определенно вызывает проблемы с другими коллекциями, такими как Dictionary<TKey, TValue>. - person Aaronaught; 30.04.2010
comment
@Майкл: Это точно. Но, если быть очень падантичным, код по-прежнему небезопасен. Смотрите мой ответ для объяснения. - person Brian Gideon; 30.04.2010
comment
Согласен, Count может быть прочитан как 0, если он уже установлен в 1, перед оператором if. Однако вызов Wait исправляет это. Даже если бы это было не так, while() позаботится об этом. - person Hans Passant; 30.04.2010
comment
@Aaronaught Я должен был указать, что у меня может быть несколько потоков, ставящих в очередь заказы (производители), но только один поток, исключающий их из очереди (потребитель). Таким образом, только один поток удаляется из очереди и смотрит на Count. Другие потоки никогда не смотрят на количество или удаление из очереди, они только блокируют очередь и помещают элементы в очередь. - person Michael Covelli; 30.04.2010
comment
@Hans Согласен, даже если мой единственный потребительский поток увидит 0 для счетчика, когда на самом деле он равен 1 (потому что 0 кэшируется и он еще не получил 1 в этом потоке или что-то в этом роде), то событие это исправит. Count › 0 присутствует только тогда, когда кто-то быстро ставит в очередь M элементов и устанавливает событие M раз. Я думаю, что мне все еще нужен этот чек, верно? ARE не будет отслеживать, сколько раз он был установлен, только независимо от того, установлен он или нет, верно? - person Michael Covelli; 30.04.2010
comment
@Michael, да, вызов Set () для ARE более одного раза был бы проблемой, если бы вы их сосчитали. Вы не знаете. - person Hans Passant; 30.04.2010
comment
Я не беспокоюсь о том, что Count возвращает неправильное значение, если оно выполняется одновременно. Именно в этом смысл блокировки с двойной проверкой. Меня беспокоит то, что он может буквально рухнуть. Это происходит с некоторыми другими коллекциями (за исключением параллельных коллекций .NET 4.0). Если этот конкретный DCL стабилен, то это только из-за деталей реализации Queue<T> (другими словами, повезло). - person Aaronaught; 30.04.2010
comment
@Aaronaught Я понимаю, что если реализация Queue‹T› Count была изменена, чтобы перебирать элементы или делать что-то еще, тогда может возникнуть проблема. Но думаете ли вы, что он все еще существует, если вопрос изменен, так что вы можете предположить, что Queue‹T› имеет один элемент экземпляра Count, который не заблокирован, не изменчив, не заблокирован или что-то в этом роде? - person Michael Covelli; 30.04.2010
comment
@Michael: Как правило, я просто ничего не предполагаю о внутренней работе какого-либо компонента и не основываю какие-либо решения на характеристиках, статика которых не гарантируется. Если в документации сказано, что это не потокобезопасно, я синхронизирую весь доступ (а не только доступ для записи). Вы не обязательно ожидаете, что Dictionary вызовет исключение, если один поток добавляет значение, а другой поток ищет другое значение, и все же это происходит (иногда). Если вам удастся обойти здесь замок с двойной проверкой... тогда вам повезло. - person Aaronaught; 30.04.2010
comment
Ну, ничего не гарантируется, кроме смерти и налогов, когда вы используете свойство или метод небезопасного для потоков объекта небезопасным для потоков способом. Но Count предсказуемо ведет себя неправильно: свойство, которое не изменяет состояние объекта, не может его испортить. Он может только вернуть неправильное значение. Однако Даффи не любит рисковать, с ним невозможно поспорить. Я бы поспорил об изменении того, что хорошо работало несколько лет. - person Hans Passant; 30.04.2010
comment
@Hans Спасибо за вашу помощь в размышлениях об этом. Похоже, консенсус заключается в том, что полагаться на детали реализации, которые могут измениться, — плохая идея, даже если этот код работает. Думаю, мне следует полагаться на ссылку, которую вы мне прислали, чтобы переделать это. Спасибо! - person Michael Covelli; 01.05.2010

Ты прав. Код не является потокобезопасным. Но не по той причине, о которой вы думаете.

AutoResetEvent в порядке. Но только потому, что вы получаете блокировку и повторно тестируете PendingOrders.Count. Настоящая суть вопроса в том, что вы вызываете PendingOrders.Count вне блокировки. Поскольку класс Queue не является потокобезопасным, ваш код не является потокобезопасным... и точка.

Теперь на самом деле у вас, вероятно, никогда не будет проблем с этим по двум причинам. Во-первых, свойство Queue.Count почти наверняка предназначено для того, чтобы никогда не оставлять объект в полусыром состоянии. В конце концов, он, вероятно, просто вернет переменную экземпляра. Во-вторых, отсутствие барьера памяти при чтении не окажет существенного влияния на более широкий контекст вашего кода. Худшее, что может случиться, это то, что вы получите устаревшее чтение на одной итерации цикла, а затем полученная блокировка неявно создаст барьер памяти, и на следующей итерации произойдет новое чтение. Я предполагаю, что в очереди есть только один поток. Все значительно меняется, если их 2 или более.

Однако позвольте мне сделать это совершенно ясно. У вас нет гарантии, что PendingOrders.Count не изменит состояние объекта во время его выполнения. И поскольку он не заблокирован, другой поток может инициировать операцию над ним, пока он все еще находится в этом полузащищенном состоянии.

person Brian Gideon    schedule 30.04.2010
comment
Действительно, Queue.Count просто возвращает поле int в текущей реализации и не помечено как volatile. - person Ian Mercer; 30.04.2010
comment
Привет Брайан, что касается вашего первого пункта, я согласен. Произвольная реализация Queue‹T› может делать что угодно со свойством Count, включая перебор всех элементов. Я должен изменить вопрос, чтобы сказать, что мы можем предположить, что Queue‹T›.Count просто возвращает переменную экземпляра (энергонезависимое чтение без блокировки). - person Michael Covelli; 30.04.2010
comment
Я согласен, что я мог получить устаревшее чтение. То есть реальное значение Count равно 1, но мое чтение Count в условии цикла while возвращает 0. Но я думаю, что это все равно будет нормально. Проверка Count › 0 нужна только для того, чтобы отловить случай, когда производители ставят в очередь M заказов одновременно, чтобы я не обрабатывал только один из заказов. Я думаю, ты прав - барьер памяти замка спасет меня. Просто пытаюсь продумать, есть ли на практике случай, когда может возникнуть проблема. (Включая наличие N потоков производителей, при условии, что каждый из них блокирует очередь перед постановкой в ​​очередь). - person Michael Covelli; 30.04.2010
comment
@Michael: Да, ты вполне можешь быть прав. На самом деле у вас может и не быть проблемы. Об этом очень трудно думать. Даже эксперты по многопоточности признают, что это невероятно сложная проблема. Ваш лучший вариант — получить правильную реализацию BlockingQueue и использовать ее. .NET 4.0 поставляется с ним, если вы используете новую версию фреймворка. - person Brian Gideon; 30.04.2010

Использование ручных событий...

ManualResetEvent[] CommandEventArr = new ManualResetEvent[] { NewOrderEvent, ExitEvent };

while ((WaitHandle.WaitAny(CommandEventArr) != 1))
{
    lock (PendingOrders)
    {
        if (PendingOrders.Count > 0)
        {
            fbo = PendingOrders.Dequeue();
        }
        else
        {
            fbo = null;
            NewOrderEvent.Reset();
        }
    }
}

Затем вам также необходимо обеспечить блокировку на стороне очереди:

    lock (PendingOrders)
    {
        PendingOrders.Enqueue(obj);
        NewOrderEvent.Set();
    }
person csharptest.net    schedule 29.04.2010
comment
Похоже, это сработает. Позвольте мне подумать об этом еще немного, но мне кажется, что это правильно. Спасибо! - person Michael Covelli; 30.04.2010
comment
+1: я только бегло взглянул на это, но я считаю, что это решение действительно безопасно. Причина, по которой этот код легче понять, заключается в том, что он не использует ни одну из этих причудливых стратегий без блокировки (которые являются огромными красными флажками). - person Brian Gideon; 30.04.2010
comment
Я должен уточнить, что это безопасно, только если есть один поток, выполняющий постановку в очередь. Наличие двух или более потоков в очереди может привести к ситуации, когда в очереди есть элемент, но ARE не установлен. - person Brian Gideon; 30.04.2010
comment
Почему постановка в очередь N потоков вызывает какие-либо проблемы (при условии, что все они блокируют очередь перед постановкой в ​​очередь)? - person Michael Covelli; 30.04.2010
comment
@Michael: Потому что два потока постановки в очередь могут вызвать Set в ARE до того, как будет выпущен один поток исключения из очереди. Это оставляет очередь с двумя элементами, но освобождается только один поток удаления из очереди до сброса ARE. Это означает, что только 1 элемент удален из очереди. - person Brian Gideon; 30.04.2010
comment
Верно. Но тогда Count › 0 заставит цикл while продолжать извлекать второй элемент из очереди, верно? (Я должен был сказать раньше, что я предполагаю только один потребительский поток). - person Michael Covelli; 30.04.2010
comment
@Michael: В вашем коде ... да (вероятно). Но в коде этого ответа... нет. - person Brian Gideon; 30.04.2010
comment
@ Брайан А, я понимаю, что ты говоришь. Без проверки типа Count › 0 этот код действительно имеет проблему. Если два потока ставятся в очередь одновременно, мы получим только первый элемент. - person Michael Covelli; 01.05.2010
comment
Событие сбрасывается только тогда, когда счетчик достигает нуля. Вот почему я использую mre вместо are. Вы можете безопасно поставить в очередь и удалить из очереди любое количество объявлений, при условии, конечно, что среда выполнения фиксирует изменения в структуре памяти очереди перед разблокировкой. Так было, афаик, всегда. - person csharptest.net; 01.05.2010
comment
@csharptest.net Вы правы, я невнимательно читал ваш код. Я думаю, что это выглядит потокобезопасным для любого числа читателей. - person Michael Covelli; 01.05.2010

Вы должны использовать WaitAny только для этого и убедиться, что он получает сигнал о каждом новом ордере, добавленном в коллекцию PendingOrders:

while (WaitHandle.WaitAny(CommandEventArr) != 1))
{
   lock (PendingOrders)
   {
      if (PendingOrders.Count > 0)
      {
          fbo = PendingOrders.Dequeue();
      }
      else
      {
          fbo = null;

          //Only if you want to exit when there are no more PendingOrders
          return;
      }
   }

   // Do Some Work if fbo is != null
}
person Iñaki Elcoro    schedule 29.04.2010
comment
Если два элемента находятся в очереди во время выполнения одного, это приведет только к исключению из очереди одного из них, а затем к ожиданию постановки в очередь другого. - person Ian Mercer; 30.04.2010
comment
Это именно та проблема, которая привела меня к использованию проверки Count › 0 в первую очередь. Если какой-то другой поток поставит элемент в очередь и установит событие, пока я нахожусь в цикле while, я его пропущу. - person Michael Covelli; 30.04.2010