Пул потоков С++ с изменяемыми потоками: странный тупик при назначении задач потокам

надеюсь, у вас у всех были хорошие каникулы.

Этот вопрос связан с моим предыдущим вопросом: std::condition_variable - Подождите, пока несколько потоков уведомят наблюдателя

Я пытаюсь реализовать пул потоков на основе моей собственной реализации изменяемого потока ниже:

class MutableThread
{
private:
    std::thread m_Thread;
    std::function<void()> m_Function;
    bool m_bRun;
    std::mutex m_LockMutex;
    std::mutex m_WaitMutex;
    std::condition_variable m_CV;
    IAsyncTemplateObserver<MutableThread>* m_Observer = nullptr;

private:
    void Execute()
    {
        while (m_bRun)
        {
            {
                std::unique_lock<std::mutex> wait(m_WaitMutex);
                m_CV.wait(wait);
            }               

            std::lock_guard<std::mutex> lock(m_LockMutex);
            if (m_bRun && m_Function)
            {
                m_Function();
                m_Function = std::function<void()>();

                if (m_Observer != nullptr)
                {
                    m_Observer->Signal(this);
                }
            }
        }
    }

public:
    HDEBUGNAME(TEXT("MutableThread"));

    MutableThread(const MutableThread& thread) = delete;

    MutableThread(IAsyncTemplateObserver<MutableThread>* _Observer)
    {
        m_Observer = _Observer;
        m_bRun = true;
        m_Thread = std::thread(&MutableThread::Execute, this);
    }

    MutableThread()
    {
        m_Observer = nullptr;
        m_bRun = true;
        m_Thread = std::thread(&MutableThread::Execute, this);
    }       

    ~MutableThread()
    {
        m_bRun = false;

        m_CV.notify_one();

        try
        {
            if (m_Thread.joinable())
                m_Thread.join();
        }
        catch (std::system_error& ex)
        {
            HWARNINGD(TEXT("%s"), ex.what());
        }                           
    }

    inline bool Start(const std::function<void()>& f)
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        if (m_Function != nullptr)
            return false;

        m_Function = f;

        m_CV.notify_one();

        return true;
    }

IAsyncTemplateObserver просто наследуется от моего класса IAsyncObserver, опубликованного в предыдущем вопросе, и добавляет виртуальную функцию:

template <typename T>
class IAsyncTemplateObserver : public IAsyncObserver
{
public:
    virtual void Signal(T* _Obj) = 0;
};

Что я хочу сделать, так это сообщить ThreadPool, что функция завершила выполнение и новая задача назначена изменяемому потоку:

class MutableThread;

struct Task
{
    std::function<void()> m_Function;
    uint32_t m_uPriority;

    Task(const std::function<void()>& _Function, uint32_t _uPriority)
    {
        m_Function = _Function;
        m_uPriority = _uPriority;
    }
};

inline bool operator<(const Task& lhs, const Task& rhs)
{
    return lhs.m_uPriority < rhs.m_uPriority;
}

class ThreadPool : public IAsyncTemplateObserver<MutableThread>
{
private:
    std::list<MutableThread* > m_FreeThreads;
    std::list<MutableThread* > m_UsedThreads;

    std::set<Task> m_Tasks;

    std::mutex m_LockMutex;     
public:

    ThreadPool()
    {
        //Grow(std::thread::hardware_concurrency() - 1);
    }

    ThreadPool(size_t n)
    {
        Grow(n);
    }

    ~ThreadPool()
    {
        //std::lock_guard<std::mutex> lock(m_Mutex);
        for (MutableThread* pUsed : m_UsedThreads)
        {
            HSAFE_DELETE(pUsed);
        }

        for (MutableThread* pFree : m_FreeThreads)
        {
            HSAFE_DELETE(pFree);
        }
    }

    inline void Grow(size_t n)
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        for (size_t i = 0; i < n; i++)
        {
            m_FreeThreads.push_back(new MutableThread(this));
        }
    }

    inline void AddTask(const Task& _Task)
    {
        {
            std::lock_guard<std::mutex> lock(m_LockMutex);
            m_Tasks.insert(_Task);
        }

        AssignThreads();
    }

    virtual void Signal(MutableThread* _pThread)
    {
        {
            std::lock_guard<std::mutex> lock(m_LockMutex);
            m_UsedThreads.remove(_pThread);
            m_FreeThreads.push_back(_pThread);
        }

        AssignThreads();

        NotifyOne();
    }

    inline void WaitForAllThreads()
    {
        bool bWait = true;
        do
        {
            {
                //check if we have to wait
                std::lock_guard<std::mutex> lock(m_LockMutex);
                bWait = !m_UsedThreads.empty() || !m_Tasks.empty();
            }

            if (bWait)
            {                   
                std::unique_lock<std::mutex> wait(m_ObserverMutex);
                m_ObserverCV.wait(wait);
            }

        } while (bWait);
    }

private:

    inline void AssignThreads()
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        if (m_FreeThreads.empty() || m_Tasks.empty())
            return;

        //Get free thread
        MutableThread* pThread = m_FreeThreads.back();
        m_FreeThreads.pop_back();

        //park thread in used list
        m_UsedThreads.push_back(pThread);

        //get task with highest priority
        std::set<Task>::iterator it = m_Tasks.end();
        --it; //last entry has highest priority

        //start the task
        pThread->Start(it->m_Function);

        //remove the task from the list
        m_Tasks.erase(it);          
    }

Функция AddTask вызывается несколько раз одним и тем же потоком, но когда изменяемый поток сигнализирует пулу потоков (через m_Observer->Signal(this)) приложение зависает в lock_guard функции AssignThreads(). Теперь странная вещь, в отличие от обычного тупика, все представления стека вызовов в Visual Studio пусты, как только я пытаюсь переступить черту с помощью lock_guard.

Кто-нибудь может объяснить такое поведение? Есть ли какой-то серьезный недостаток в дизайне или просто путаница?

Спасибо за вашу помощь!

Привет, Фабиан

Изменить: я добавил минимальное визуальное студийное решение, которое воспроизводит проблему: ThreadPoolTest.zip


person Fabian    schedule 28.12.2014    source источник
comment
У меня нет ответа на ваш вопрос, но если вы работаете в системе Windows, вам следует попробовать профилировать вызовы std::async без пула потоков. По моему опыту, это намного быстрее, так как окна действительно хорошо оптимизируют потоки.   -  person Nicolas Holthaus    schedule 29.12.2014
comment
Это кажется хорошей идеей, я проверю это.   -  person Fabian    schedule 29.12.2014


Ответы (1)


Благодаря другу я смог решить проблему, переместив вызов m_Observer->Signal(this) за пределы области действия lock_guard в функции MutableThread::Execute(). Во-вторых, я удалил lock_guard в функции AssignThreads() и переместил его вызов в область действия lock_guard в функции Signal()/AddTask. На самом деле это не связано, но все же является недостатком: все вызовы condition_variables.wait() теперь находятся в цикле while(m_bNotified == false).

person Fabian    schedule 29.12.2014