что не так с моей схемой очереди производитель-потребитель?

Я начну с примера кода C# здесь. Я пытаюсь адаптировать его по нескольким причинам: 1) в моем сценарии все задачи будут помещены в очередь заранее, прежде чем запустятся потребители, и 2) я хотел абстрагировать работника в отдельный класс вместо того, чтобы иметь необработанные Thread элементы в классе WorkerQueue.

Моя очередь, похоже, не избавляется от себя, она просто зависает, и когда я прерываю работу в Visual Studio, она застревает в строке _th.Join() для WorkerThread #1. Кроме того, есть ли лучший способ организовать это? Что-то в раскрытии методов WaitOne() и Join() кажется неправильным, но я не мог придумать подходящего способа, позволяющего WorkerThread взаимодействовать с очередью.

Кроме того, в стороне - если я вызову q.Start(#) в верхней части блока using, только некоторые из потоков будут запускаться каждый раз (например, потоки 1, 2 и 8 обрабатывают каждую задачу). Почему это? Это какая-то гонка или я что-то не так делаю?

using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.Threading;
using System.Linq;

namespace QueueTest
{
    class Program
    {
        static void Main(string[] args)
        {
            using (WorkQueue q = new WorkQueue())
            {
                q.Finished += new Action(delegate { Console.WriteLine("All jobs finished"); });

                Random r = new Random();
                foreach (int i in Enumerable.Range(1, 10))
                    q.Enqueue(r.Next(100, 500));

                Console.WriteLine("All jobs queued");
                q.Start(8);
            }
        }
    }

    class WorkQueue : IDisposable
    {
        private Queue<int> _jobs = new Queue<int>();
        private int _job_count;
        private EventWaitHandle _wh = new AutoResetEvent(false);
        private object _lock = new object();
        private List<WorkerThread> _th;
        public event Action Finished;

        public WorkQueue()
        {
        }

        public void Start(int num_threads)
        {
            _job_count = _jobs.Count;
            _th = new List<WorkerThread>(num_threads);
            foreach (int i in Enumerable.Range(1, num_threads))
            {
                _th.Add(new WorkerThread(i, this));
                _th[_th.Count - 1].JobFinished += new Action<int>(WorkQueue_JobFinished);
            }
        }

        void WorkQueue_JobFinished(int obj)
        {
            lock (_lock)
            {
                _job_count--;
                if (_job_count == 0 && Finished != null)
                    Finished();
            }
        }

        public void Enqueue(int job)
        {
            lock (_lock)
                _jobs.Enqueue(job);

            _wh.Set();
        }

        public void Dispose()
        {
            Enqueue(Int32.MinValue);
            _th.ForEach(th => th.Join());
            _wh.Close();
        }

        public int GetNextJob()
        {
            lock (_lock)
            {
                if (_jobs.Count > 0)
                    return _jobs.Dequeue();
                else
                    return Int32.MinValue;
            }
        }

        public void WaitOne()
        {
            _wh.WaitOne();
        }
    }

    class WorkerThread
    {
        private Thread _th;
        private WorkQueue _q;
        private int _i;

        public event Action<int> JobFinished;

        public WorkerThread(int i, WorkQueue q)
        {
            _i = i;
            _q = q;
            _th = new Thread(DoWork);
            _th.Start();
        }

        public void Join()
        {
            _th.Join();
        }

        private void DoWork()
        {
            while (true)
            {
                int job = _q.GetNextJob();
                if (job != Int32.MinValue)
                {
                    Console.WriteLine("Thread {0} Got job {1}", _i, job);
                    Thread.Sleep(job * 10); // in reality would to actual work here
                    if (JobFinished != null)
                        JobFinished(job);
                }
                else
                {
                    Console.WriteLine("Thread {0} no job available", _i);
                    _q.WaitOne();
                }
            }
        }
    }
}

person toasteroven    schedule 02.06.2010    source источник


Ответы (4)


Все рабочие потоки блокируются при вызове _q.WaitOne() в DoWork(). Вызов метода Join() потока приведет к тупику, потоки никогда не завершатся. Вам нужно будет добавить механизм, сигнализирующий рабочему потоку о выходе. ManualResetEvent, протестированный с помощью WaitAny в рабочем потоке, выполнит свою работу.

Один совет по отладке: ознакомьтесь с окном Debug + Windows + Threads. Он позволяет переключаться между потоками и просматривать их стеки вызовов. Вы бы сами быстро нашли эту проблему.

person Hans Passant    schedule 02.06.2010

Вы делаете WaitOne() в конце DoWork, но никогда не устанавливаете его после запуска потоков.
Обратите внимание, что AutoResetEvent вернется в состояние не установлено после "успешного" WaitOne

person Itay Karo    schedule 02.06.2010

Ваш цикл в вашем методе DoWork никогда не заканчивается. Это приведет к тому, что поток всегда будет занят, и этот thread.Join() будет навсегда заблокирован, ожидая его завершения.

У вас есть WaitOne, но я не думаю, что это необходимо, если нет причины, по которой вы хотите, чтобы ваш пул потоков оставался после завершения вашей работы:

    private void DoWork()
    {
        bool done = false;
        while (!done)
        {
            int job = _q.GetNextJob();
            if (job != Int32.MinValue)
            {
                Console.WriteLine("Thread {0} Got job {1}", _i, job);
                Thread.Sleep(job * 10); // in reality would to actual work here
                if (JobFinished != null)
                    JobFinished(job);
            }
            else
            {
                Console.WriteLine("Thread {0} no job available", _i);
                done = true;
            }
        }
    }

Если вы хотите, чтобы потоки сохранялись, чтобы вам не приходилось перераспределять больше потоков при вызове WorkQueue.Start, вам придется сделать что-то более сложное с AutoResetEvent.

person Anderson Imes    schedule 02.06.2010

Ваша главная проблема - детерминированный тупик, описанный в других ответах.

Однако правильный способ справиться с этим состоит не в устранении взаимоблокировки, а в полном устранении События.

Вся идея модели «производитель-потребитель» заключается в том, что клиенты одновременно помещают в очередь и удаляют из очереди элементы, и поэтому необходимы механизмы синхронизации. Если вы заранее ставите в очередь все элементы, а затем удаляете их из очереди одновременно, вам нужна только блокировка исключения из очереди, поскольку «Событие» используется, чтобы позволить «Потребителям» ждать, пока новые элементы будут поставлены в очередь; этого не произойдет в вашем случае (исходя из вашего описания).

Кроме того, принцип проектирования «единой ответственности» предполагает, что код многопоточности должен быть отделен от кода «Очередь блокировки». Сделайте «Очередь блокировки» отдельным классом, а затем используйте его в своем классе управления потоками.

person Hershi    schedule 02.06.2010
comment
так что - по крайней мере, для моего сценария - мне даже не нужен EventWaitHandle или вызовы потока Join(), верно? - person toasteroven; 03.06.2010