//Chris Xiong 2022 //License: MPL-2.0 #ifndef THREAD_POOL_H #define THREAD_POOL_H #include #include #include #include #include #include #include #include template class _atomic_queue { public: void push(T&v) { std::unique_lock lck(mtx); q.push(v); } bool pop(T&v) { std::unique_lock lck(mtx); if(!q.empty()) { v = std::move(q.front()); q.pop(); return true; } return false; } size_t size() { std::unique_lock lck(mtx); return q.size(); } private: std::queue q; std::mutex mtx; }; class thread_pool { public: thread_pool(size_t njobs): waiting_threads(0), stop(false), wait_interrupt(false) { thr.resize(njobs); thstop.resize(njobs); for(size_t i = 0; i < njobs; ++i) { auto cstop = thstop[i] = std::make_shared>(false); auto looper = [this, i, cstop] { std::atomic&stop = *cstop; std::function *f; bool popped = wq.pop(f); while(1) { for(; popped; popped = wq.pop(f)) { std::unique_ptr> pf(f); (*f)(i); if(stop)return; } std::unique_lock lck(mtx); ++waiting_threads; cv.wait(lck, [this, &f, &popped, &stop] { popped = wq.pop(f); return popped || wait_interrupt || stop; }); --waiting_threads; if(!popped)return; } }; thr[i].reset(new std::thread(looper)); } } template auto create_task(F&&f, A&&...args)->std::future { auto task = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(args)...) ); auto worktask = new std::function([task](int id) { (*task)(id); }); wq.push(worktask); std::unique_lock lck(mtx); cv.notify_one(); return task->get_future(); } void wait() { if(!stop) wait_interrupt = true; { std::unique_lock lck(mtx); cv.notify_all(); } for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); std::function *f; while(wq.size()) { wq.pop(f); delete f; } thr.clear(); thstop.clear(); } void terminate() { stop = true; std::function *f; while(wq.size()) { wq.pop(f); delete f; } for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true; { std::unique_lock lck(mtx); cv.notify_all(); } for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); while(wq.size()) { wq.pop(f); delete f; } thr.clear(); thstop.clear(); } private: std::vector> thr; std::vector>> thstop; _atomic_queue*> wq; std::atomic wait_interrupt; std::atomic stop; std::atomic waiting_threads; std::mutex mtx; std::condition_variable cv; }; #endif