#ifndef THREAD_POOL_H #define THREAD_POOL_H #include #include #include #include #include #include #include #include template class _atomic_queue { public: void push(T&v) { std::unique_lock lck(mtx); q.push(v); } bool pop(T&v) { std::unique_lock lck(mtx); if(!q.empty()) { v=std::move(q.front()); q.pop(); return true; } return false; } size_t size() { std::unique_lock lck(mtx); return q.size(); } private: std::queue q; std::mutex mtx; }; class thread_pool { public: thread_pool(size_t njobs):waiting_threads(0),stop(false),wait_interrupt(false) { thr.resize(njobs); thstop.resize(njobs); for(size_t i=0;i>(false); auto looper=[this,i,cstop]{ std::atomic&stop=*cstop; std::function *f; bool popped=wq.pop(f); while(1) { for(;popped;popped=wq.pop(f)) { std::unique_ptr> pf(f); (*f)(i); if(stop)return; } std::unique_lock lck(mtx); ++waiting_threads; cv.wait(lck,[this,&f,&popped,&stop]{ popped=wq.pop(f); return popped||wait_interrupt||stop; }); --waiting_threads; if(!popped)return; } }; thr[i].reset(new std::thread(looper)); } } template auto create_task(F&&f,A&&...args)->std::future { auto task=std::make_shared>( std::bind(std::forward(f),std::placeholders::_1,std::forward(args)...) ); auto worktask=new std::function([task](int id){(*task)(id);}); wq.push(worktask); std::unique_lock lck(mtx); cv.notify_one(); return task->get_future(); } void wait() { if(!stop)wait_interrupt=true; { std::unique_lock lck(mtx); cv.notify_all(); } for(size_t i=0;ijoinable())thr[i]->join(); std::function *f; while(wq.size()){wq.pop(f);delete f;} thr.clear();thstop.clear(); } void terminate() { stop=true; std::function *f; while(wq.size()){wq.pop(f);delete f;} for(size_t i=0;i lck(mtx); cv.notify_all(); } for(size_t i=0;ijoinable())thr[i]->join(); while(wq.size()){wq.pop(f);delete f;} thr.clear();thstop.clear(); } private: std::vector> thr; std::vector>> thstop; _atomic_queue*> wq; std::atomic wait_interrupt; std::atomic stop; std::atomic waiting_threads; std::mutex mtx; std::condition_variable cv; }; #endif