надеюсь, у вас у всех были хорошие каникулы.
Этот вопрос связан с моим предыдущим вопросом: std::condition_variable - Подождите, пока несколько потоков уведомят наблюдателя
Я пытаюсь реализовать пул потоков на основе моей собственной реализации изменяемого потока ниже:
class MutableThread
{
private:
std::thread m_Thread;
std::function<void()> m_Function;
bool m_bRun;
std::mutex m_LockMutex;
std::mutex m_WaitMutex;
std::condition_variable m_CV;
IAsyncTemplateObserver<MutableThread>* m_Observer = nullptr;
private:
void Execute()
{
while (m_bRun)
{
{
std::unique_lock<std::mutex> wait(m_WaitMutex);
m_CV.wait(wait);
}
std::lock_guard<std::mutex> lock(m_LockMutex);
if (m_bRun && m_Function)
{
m_Function();
m_Function = std::function<void()>();
if (m_Observer != nullptr)
{
m_Observer->Signal(this);
}
}
}
}
public:
HDEBUGNAME(TEXT("MutableThread"));
MutableThread(const MutableThread& thread) = delete;
MutableThread(IAsyncTemplateObserver<MutableThread>* _Observer)
{
m_Observer = _Observer;
m_bRun = true;
m_Thread = std::thread(&MutableThread::Execute, this);
}
MutableThread()
{
m_Observer = nullptr;
m_bRun = true;
m_Thread = std::thread(&MutableThread::Execute, this);
}
~MutableThread()
{
m_bRun = false;
m_CV.notify_one();
try
{
if (m_Thread.joinable())
m_Thread.join();
}
catch (std::system_error& ex)
{
HWARNINGD(TEXT("%s"), ex.what());
}
}
inline bool Start(const std::function<void()>& f)
{
std::lock_guard<std::mutex> lock(m_LockMutex);
if (m_Function != nullptr)
return false;
m_Function = f;
m_CV.notify_one();
return true;
}
IAsyncTemplateObserver просто наследуется от моего класса IAsyncObserver, опубликованного в предыдущем вопросе, и добавляет виртуальную функцию:
template <typename T>
class IAsyncTemplateObserver : public IAsyncObserver
{
public:
virtual void Signal(T* _Obj) = 0;
};
Что я хочу сделать, так это сообщить ThreadPool, что функция завершила выполнение и новая задача назначена изменяемому потоку:
class MutableThread;
struct Task
{
std::function<void()> m_Function;
uint32_t m_uPriority;
Task(const std::function<void()>& _Function, uint32_t _uPriority)
{
m_Function = _Function;
m_uPriority = _uPriority;
}
};
inline bool operator<(const Task& lhs, const Task& rhs)
{
return lhs.m_uPriority < rhs.m_uPriority;
}
class ThreadPool : public IAsyncTemplateObserver<MutableThread>
{
private:
std::list<MutableThread* > m_FreeThreads;
std::list<MutableThread* > m_UsedThreads;
std::set<Task> m_Tasks;
std::mutex m_LockMutex;
public:
ThreadPool()
{
//Grow(std::thread::hardware_concurrency() - 1);
}
ThreadPool(size_t n)
{
Grow(n);
}
~ThreadPool()
{
//std::lock_guard<std::mutex> lock(m_Mutex);
for (MutableThread* pUsed : m_UsedThreads)
{
HSAFE_DELETE(pUsed);
}
for (MutableThread* pFree : m_FreeThreads)
{
HSAFE_DELETE(pFree);
}
}
inline void Grow(size_t n)
{
std::lock_guard<std::mutex> lock(m_LockMutex);
for (size_t i = 0; i < n; i++)
{
m_FreeThreads.push_back(new MutableThread(this));
}
}
inline void AddTask(const Task& _Task)
{
{
std::lock_guard<std::mutex> lock(m_LockMutex);
m_Tasks.insert(_Task);
}
AssignThreads();
}
virtual void Signal(MutableThread* _pThread)
{
{
std::lock_guard<std::mutex> lock(m_LockMutex);
m_UsedThreads.remove(_pThread);
m_FreeThreads.push_back(_pThread);
}
AssignThreads();
NotifyOne();
}
inline void WaitForAllThreads()
{
bool bWait = true;
do
{
{
//check if we have to wait
std::lock_guard<std::mutex> lock(m_LockMutex);
bWait = !m_UsedThreads.empty() || !m_Tasks.empty();
}
if (bWait)
{
std::unique_lock<std::mutex> wait(m_ObserverMutex);
m_ObserverCV.wait(wait);
}
} while (bWait);
}
private:
inline void AssignThreads()
{
std::lock_guard<std::mutex> lock(m_LockMutex);
if (m_FreeThreads.empty() || m_Tasks.empty())
return;
//Get free thread
MutableThread* pThread = m_FreeThreads.back();
m_FreeThreads.pop_back();
//park thread in used list
m_UsedThreads.push_back(pThread);
//get task with highest priority
std::set<Task>::iterator it = m_Tasks.end();
--it; //last entry has highest priority
//start the task
pThread->Start(it->m_Function);
//remove the task from the list
m_Tasks.erase(it);
}
Функция AddTask вызывается несколько раз одним и тем же потоком, но когда изменяемый поток сигнализирует пулу потоков (через m_Observer->Signal(this)) приложение зависает в lock_guard функции AssignThreads(). Теперь странная вещь, в отличие от обычного тупика, все представления стека вызовов в Visual Studio пусты, как только я пытаюсь переступить черту с помощью lock_guard.
Кто-нибудь может объяснить такое поведение? Есть ли какой-то серьезный недостаток в дизайне или просто путаница?
Спасибо за вашу помощь!
Привет, Фабиан
Изменить: я добавил минимальное визуальное студийное решение, которое воспроизводит проблему: ThreadPoolTest.zip