421 lines
8.1 KiB
C++
421 lines
8.1 KiB
C++
/*
|
|
* A Thread Pool Implementation
|
|
* Copyright(C) 2003-2018 Jinhao(cnjinhao@hotmail.com)
|
|
*
|
|
* Distributed under the Boost Software License, Version 1.0.
|
|
* (See accompanying file LICENSE_1_0.txt or copy at
|
|
* http://www.boost.org/LICENSE_1_0.txt)
|
|
*
|
|
*
|
|
* @file: nana/threads/pool.cpp
|
|
*/
|
|
|
|
#include <nana/threads/pool.hpp>
|
|
#include <nana/system/platform.hpp>
|
|
#include <time.h>
|
|
#include <deque>
|
|
#include <vector>
|
|
#include <atomic>
|
|
|
|
#if defined(STD_THREAD_NOT_SUPPORTED)
|
|
#include <nana/std_mutex.hpp>
|
|
#include <nana/std_condition_variable.hpp>
|
|
|
|
#else
|
|
#include <condition_variable>
|
|
#include <mutex>
|
|
#endif
|
|
|
|
#if defined(NANA_WINDOWS)
|
|
#include <windows.h>
|
|
#include <process.h>
|
|
#elif defined(NANA_POSIX)
|
|
#include <pthread.h>
|
|
#endif
|
|
|
|
namespace nana
|
|
{
|
|
namespace threads
|
|
{
|
|
//class pool
|
|
//struct task
|
|
pool::task::task(t k) : kind(k){}
|
|
pool::task::~task(){}
|
|
//end struct task
|
|
|
|
//struct task_signal
|
|
struct pool::task_signal
|
|
: task
|
|
{
|
|
task_signal()
|
|
: task(task::signal)
|
|
{}
|
|
|
|
void run()
|
|
{}
|
|
};//end struct task_signal
|
|
|
|
class pool::impl
|
|
{
|
|
enum class state{init, idle, run, finished};
|
|
|
|
struct pool_throbj
|
|
{
|
|
#if defined(NANA_WINDOWS)
|
|
typedef HANDLE thread_t;
|
|
#elif defined(NANA_POSIX)
|
|
typedef pthread_t thread_t;
|
|
#endif
|
|
impl * pool_ptr;
|
|
task * task_ptr;
|
|
thread_t handle;
|
|
std::atomic<state> thr_state;
|
|
time_t timestamp;
|
|
#if defined(NANA_POSIX)
|
|
std::mutex wait_mutex;
|
|
std::condition_variable wait_cond;
|
|
std::atomic<bool> suspended;
|
|
#endif
|
|
};
|
|
public:
|
|
impl(std::size_t thr_number)
|
|
{
|
|
if(0 == thr_number) thr_number = 4;
|
|
|
|
for(; thr_number; --thr_number)
|
|
{
|
|
pool_throbj * pto = new pool_throbj;
|
|
pto->pool_ptr = this;
|
|
pto->thr_state = state::init;
|
|
pto->task_ptr = nullptr;
|
|
#if defined(NANA_WINDOWS)
|
|
pto->handle = (HANDLE)::_beginthreadex(0, 0, reinterpret_cast<unsigned(__stdcall*)(void*)>(&impl::_m_thr_starter), pto, 0, 0);
|
|
#elif defined(NANA_POSIX)
|
|
pto->suspended = false;
|
|
::pthread_create(&(pto->handle), 0, reinterpret_cast<void*(*)(void*)>(&impl::_m_thr_starter), pto);
|
|
#endif
|
|
container_.threads.emplace_back(pto);
|
|
}
|
|
}
|
|
|
|
~impl()
|
|
{
|
|
runflag_ = false;
|
|
|
|
while(true)
|
|
{
|
|
bool all_finished = true;
|
|
{
|
|
for(auto thr: container_.threads)
|
|
{
|
|
if(state::finished != thr->thr_state)
|
|
{
|
|
all_finished = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if(all_finished)
|
|
break;
|
|
|
|
while(true)
|
|
{
|
|
auto idle_thr = _m_pick_up_an_idle();
|
|
if(idle_thr)
|
|
_m_resume(idle_thr);
|
|
else
|
|
break;
|
|
}
|
|
nana::system::sleep(100);
|
|
}
|
|
|
|
std::vector<pool_throbj*> dup(std::move(container_.threads));
|
|
|
|
for(auto thr: dup)
|
|
{
|
|
#if defined(NANA_WINDOWS)
|
|
::WaitForSingleObject(thr->handle, INFINITE);
|
|
::CloseHandle(thr->handle);
|
|
#elif defined(NANA_POSIX)
|
|
::pthread_join(thr->handle, 0);
|
|
::pthread_detach(thr->handle);
|
|
#endif
|
|
delete thr;
|
|
}
|
|
|
|
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
|
for(auto task_ptr : container_.tasks)
|
|
{
|
|
delete task_ptr;
|
|
}
|
|
}
|
|
|
|
void push(task * taskptr)
|
|
{
|
|
if(false == runflag_)
|
|
{
|
|
delete taskptr;
|
|
throw std::runtime_error("Nana.Pool: Do not accept task now");
|
|
}
|
|
|
|
pool_throbj * pto = _m_pick_up_an_idle();
|
|
|
|
if(pto)
|
|
{
|
|
pto->task_ptr = taskptr;
|
|
_m_resume(pto);
|
|
}
|
|
else
|
|
{
|
|
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
|
container_.tasks.emplace_back(taskptr);
|
|
}
|
|
}
|
|
|
|
void wait_for_signal()
|
|
{
|
|
std::unique_lock<std::mutex> lock(signal_.mutex);
|
|
signal_.cond.wait(lock);
|
|
}
|
|
|
|
void wait_for_finished()
|
|
{
|
|
while(true)
|
|
{
|
|
{
|
|
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
|
if(container_.tasks.empty())
|
|
{
|
|
bool finished = true;
|
|
for(auto thr : container_.threads)
|
|
{
|
|
if(state::run == thr->thr_state)
|
|
{
|
|
finished = false;
|
|
break;
|
|
}
|
|
}
|
|
if(finished)
|
|
return;
|
|
}
|
|
}
|
|
nana::system::sleep(100);
|
|
}
|
|
}
|
|
private:
|
|
pool_throbj* _m_pick_up_an_idle()
|
|
{
|
|
for(auto thr : container_.threads)
|
|
if(state::idle == thr->thr_state)
|
|
{
|
|
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
|
if(state::idle == thr->thr_state)
|
|
{
|
|
thr->thr_state = state::run;
|
|
return thr;
|
|
}
|
|
}
|
|
return nullptr;
|
|
}
|
|
|
|
void _m_suspend(pool_throbj* pto)
|
|
{
|
|
pto->thr_state = state::idle;
|
|
#if defined(NANA_WINDOWS)
|
|
::SuspendThread(pto->handle);
|
|
#elif defined(NANA_POSIX)
|
|
std::unique_lock<std::mutex> lock(pto->wait_mutex);
|
|
pto->suspended = true;
|
|
pto->wait_cond.wait(lock);
|
|
pto->suspended = false;
|
|
#endif
|
|
}
|
|
|
|
void _m_resume(pool_throbj* pto)
|
|
{
|
|
#if defined(NANA_WINDOWS)
|
|
while(true)
|
|
{
|
|
DWORD n = ::ResumeThread(pto->handle);
|
|
if(n == 1 || n == static_cast<DWORD>(-1))
|
|
break;
|
|
}
|
|
#elif defined(NANA_POSIX)
|
|
while(false == pto->suspended)
|
|
;
|
|
std::unique_lock<std::mutex> lock(pto->wait_mutex);
|
|
pto->wait_cond.notify_one();
|
|
#endif
|
|
}
|
|
|
|
bool _m_read(pool_throbj* pto)
|
|
{
|
|
pto->task_ptr = nullptr;
|
|
if(runflag_)
|
|
{
|
|
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
|
if(container_.tasks.size())
|
|
{
|
|
pto->task_ptr = container_.tasks.front();
|
|
container_.tasks.pop_front();
|
|
}
|
|
}
|
|
else
|
|
return false;
|
|
|
|
if(nullptr == pto->task_ptr)
|
|
{
|
|
//The task queue is empty, so that
|
|
//suspend and wait for a task.
|
|
_m_suspend(pto);
|
|
}
|
|
|
|
return (nullptr != pto->task_ptr);
|
|
}
|
|
|
|
void _m_thr_runner(pool_throbj* pto)
|
|
{
|
|
while(_m_read(pto))
|
|
{
|
|
pto->timestamp = time(nullptr);
|
|
switch(pto->task_ptr->kind)
|
|
{
|
|
case task::general:
|
|
try
|
|
{
|
|
pto->task_ptr->run();
|
|
}catch(...){}
|
|
break;
|
|
case task::signal:
|
|
while(true)
|
|
{
|
|
bool finished = true;
|
|
{
|
|
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
|
for(auto thr : container_.threads)
|
|
{
|
|
if((thr != pto) && (state::run == thr->thr_state) && (thr->timestamp <= pto->timestamp))
|
|
{
|
|
finished = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if(finished)
|
|
break;
|
|
nana::system::sleep(100);
|
|
}
|
|
|
|
//wait till the cond is waiting.
|
|
signal_.cond.notify_one();
|
|
break;
|
|
}
|
|
delete pto->task_ptr;
|
|
pto->task_ptr = nullptr;
|
|
}
|
|
|
|
pto->thr_state = state::finished;
|
|
}
|
|
|
|
//Here defines the a function used for creating a thread.
|
|
//This is platform-specified.
|
|
#if defined(NANA_WINDOWS)
|
|
static unsigned __stdcall _m_thr_starter(pool_throbj * pto)
|
|
{
|
|
pto->pool_ptr->_m_thr_runner(pto);
|
|
::_endthreadex(0);
|
|
return 0;
|
|
}
|
|
#elif defined(NANA_POSIX)
|
|
static void * _m_thr_starter(pool_throbj * pto)
|
|
{
|
|
pto->pool_ptr->_m_thr_runner(pto);
|
|
return nullptr;
|
|
}
|
|
#endif
|
|
private:
|
|
std::atomic<bool> runflag_{ true };
|
|
std::recursive_mutex mutex_;
|
|
|
|
struct signal
|
|
{
|
|
std::mutex mutex;
|
|
std::condition_variable cond;
|
|
}signal_;
|
|
|
|
struct container
|
|
{
|
|
std::deque<task*> tasks;
|
|
std::vector<pool_throbj*> threads;
|
|
}container_;
|
|
};//end class impl
|
|
|
|
#ifndef STD_THREAD_NOT_SUPPORTED
|
|
pool::pool(unsigned thread_number)
|
|
: impl_(new impl(thread_number ? thread_number : std::thread::hardware_concurrency()))
|
|
{
|
|
}
|
|
#else
|
|
pool::pool(unsigned thread_number)
|
|
: impl_(new impl(0))
|
|
{
|
|
}
|
|
#endif
|
|
|
|
pool::pool(pool&& other)
|
|
: pool()
|
|
{
|
|
std::swap(impl_, other.impl_);
|
|
}
|
|
|
|
pool& pool::operator=(pool&& other)
|
|
{
|
|
if(this != &other)
|
|
{
|
|
delete impl_;
|
|
impl_ = other.impl_;
|
|
other.impl_ = new impl(4);
|
|
}
|
|
return *this;
|
|
}
|
|
|
|
pool::~pool()
|
|
{
|
|
delete impl_;
|
|
}
|
|
|
|
void pool::signal()
|
|
{
|
|
task * task_ptr = nullptr;
|
|
try
|
|
{
|
|
task_ptr = new task_signal;
|
|
_m_push(task_ptr);
|
|
}
|
|
catch(std::bad_alloc&)
|
|
{
|
|
delete task_ptr;
|
|
}
|
|
}
|
|
|
|
void pool::wait_for_signal()
|
|
{
|
|
impl_->wait_for_signal();
|
|
}
|
|
|
|
void pool::wait_for_finished()
|
|
{
|
|
impl_->wait_for_finished();
|
|
}
|
|
|
|
void pool::_m_push(task* task_ptr)
|
|
{
|
|
impl_->push(task_ptr);
|
|
}
|
|
//end class pool
|
|
|
|
}//end namespace threads
|
|
}//end namespace nana
|