From 4401f681d33f534a7d7ef8f4f940bd54b60710c3 Mon Sep 17 00:00:00 2001 From: Chris Xiong Date: Sun, 18 Sep 2022 01:52:26 -0400 Subject: Move stuff around to accommodate new family members. --- thread_pool.hpp | 149 -------------------------------------------------------- 1 file changed, 149 deletions(-) delete mode 100644 thread_pool.hpp (limited to 'thread_pool.hpp') diff --git a/thread_pool.hpp b/thread_pool.hpp deleted file mode 100644 index 6aea4ec..0000000 --- a/thread_pool.hpp +++ /dev/null @@ -1,149 +0,0 @@ -//Chris Xiong 2022 -//License: MPL-2.0 -#ifndef THREAD_POOL_H -#define THREAD_POOL_H - -#include -#include -#include -#include -#include -#include -#include -#include - -template -class _atomic_queue -{ -public: - void push(T&v) - { - std::unique_lock lck(mtx); - q.push(v); - } - bool pop(T&v) - { - std::unique_lock lck(mtx); - if(!q.empty()) - { - v = std::move(q.front()); - q.pop(); - return true; - } - return false; - } - size_t size() - { - std::unique_lock lck(mtx); - return q.size(); - } -private: - std::queue q; - std::mutex mtx; -}; - -class thread_pool -{ -public: - thread_pool(size_t njobs): waiting_threads(0), stop(false), wait_interrupt(false) - { - thr.resize(njobs); - thstop.resize(njobs); - for(size_t i = 0; i < njobs; ++i) - { - auto cstop = thstop[i] = std::make_shared>(false); - auto looper = [this, i, cstop] - { - std::atomic&stop = *cstop; - std::function *f; - bool popped = wq.pop(f); - while(1) - { - for(; popped; popped = wq.pop(f)) - { - std::unique_ptr> pf(f); - (*f)(i); - if(stop)return; - } - std::unique_lock lck(mtx); - ++waiting_threads; - cv.wait(lck, [this, &f, &popped, &stop] - { - popped = wq.pop(f); - return popped || wait_interrupt || stop; - }); - --waiting_threads; - if(!popped)return; - } - }; - thr[i].reset(new std::thread(looper)); - } - } - template - auto create_task(F&&f, A&&...args)->std::future - { - auto task = std::make_shared>( - std::bind(std::forward(f), std::placeholders::_1, std::forward(args)...) - ); - auto worktask = new std::function([task](int id) - { - (*task)(id); - }); - wq.push(worktask); - std::unique_lock lck(mtx); - cv.notify_one(); - return task->get_future(); - } - void wait() - { - if(!stop) - wait_interrupt = true; - { - std::unique_lock lck(mtx); - cv.notify_all(); - } - for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); - std::function *f; - while(wq.size()) - { - wq.pop(f); - delete f; - } - thr.clear(); - thstop.clear(); - } - void terminate() - { - stop = true; - std::function *f; - while(wq.size()) - { - wq.pop(f); - delete f; - } - for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true; - { - std::unique_lock lck(mtx); - cv.notify_all(); - } - for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); - while(wq.size()) - { - wq.pop(f); - delete f; - } - thr.clear(); - thstop.clear(); - } -private: - std::vector> thr; - std::vector>> thstop; - _atomic_queue*> wq; - std::atomic wait_interrupt; - std::atomic stop; - std::atomic waiting_threads; - std::mutex mtx; - std::condition_variable cv; -}; - -#endif -- cgit v1.2.3