diff options
author | Chris Xiong <chirs241097@gmail.com> | 2022-09-18 01:52:26 -0400 |
---|---|---|
committer | Chris Xiong <chirs241097@gmail.com> | 2022-09-18 01:52:26 -0400 |
commit | 4401f681d33f534a7d7ef8f4f940bd54b60710c3 (patch) | |
tree | d393f5fa9b5c7e96eae94e3986c40f9d80777818 /xsig/src/thread_pool.hpp | |
parent | f02cb7bf4978ec0fa1eea4ed0b21460b7637d741 (diff) | |
download | deduper-4401f681d33f534a7d7ef8f4f940bd54b60710c3.tar.xz |
Move stuff around to accommodate new family members.
Diffstat (limited to 'xsig/src/thread_pool.hpp')
-rw-r--r-- | xsig/src/thread_pool.hpp | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/xsig/src/thread_pool.hpp b/xsig/src/thread_pool.hpp new file mode 100644 index 0000000..6aea4ec --- /dev/null +++ b/xsig/src/thread_pool.hpp @@ -0,0 +1,149 @@ +//Chris Xiong 2022 +//License: MPL-2.0 +#ifndef THREAD_POOL_H +#define THREAD_POOL_H + +#include <atomic> +#include <condition_variable> +#include <functional> +#include <future> +#include <memory> +#include <mutex> +#include <queue> +#include <thread> + +template<typename T> +class _atomic_queue +{ +public: + void push(T&v) + { + std::unique_lock<std::mutex> lck(mtx); + q.push(v); + } + bool pop(T&v) + { + std::unique_lock<std::mutex> lck(mtx); + if(!q.empty()) + { + v = std::move(q.front()); + q.pop(); + return true; + } + return false; + } + size_t size() + { + std::unique_lock<std::mutex> lck(mtx); + return q.size(); + } +private: + std::queue<T> q; + std::mutex mtx; +}; + +class thread_pool +{ +public: + thread_pool(size_t njobs): waiting_threads(0), stop(false), wait_interrupt(false) + { + thr.resize(njobs); + thstop.resize(njobs); + for(size_t i = 0; i < njobs; ++i) + { + auto cstop = thstop[i] = std::make_shared<std::atomic<bool>>(false); + auto looper = [this, i, cstop] + { + std::atomic<bool>&stop = *cstop; + std::function<void(int)> *f; + bool popped = wq.pop(f); + while(1) + { + for(; popped; popped = wq.pop(f)) + { + std::unique_ptr<std::function<void(int)>> pf(f); + (*f)(i); + if(stop)return; + } + std::unique_lock<std::mutex> lck(mtx); + ++waiting_threads; + cv.wait(lck, [this, &f, &popped, &stop] + { + popped = wq.pop(f); + return popped || wait_interrupt || stop; + }); + --waiting_threads; + if(!popped)return; + } + }; + thr[i].reset(new std::thread(looper)); + } + } + template<typename F, typename...A> + auto create_task(F&&f, A&&...args)->std::future<decltype(f(0, args...))> + { + auto task = std::make_shared<std::packaged_task<decltype(f(0, args...))(int)>>( + std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<A>(args)...) + ); + auto worktask = new std::function<void(int)>([task](int id) + { + (*task)(id); + }); + wq.push(worktask); + std::unique_lock<std::mutex> lck(mtx); + cv.notify_one(); + return task->get_future(); + } + void wait() + { + if(!stop) + wait_interrupt = true; + { + std::unique_lock<std::mutex> lck(mtx); + cv.notify_all(); + } + for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); + std::function<void(int)> *f; + while(wq.size()) + { + wq.pop(f); + delete f; + } + thr.clear(); + thstop.clear(); + } + void terminate() + { + stop = true; + std::function<void(int)> *f; + while(wq.size()) + { + wq.pop(f); + delete f; + } + for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true; + { + std::unique_lock<std::mutex> lck(mtx); + cv.notify_all(); + } + for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); + while(wq.size()) + { + wq.pop(f); + delete f; + } + thr.clear(); + thstop.clear(); + } +private: + std::vector<std::unique_ptr<std::thread>> thr; + std::vector<std::shared_ptr<std::atomic<bool>>> thstop; + _atomic_queue<std::function<void(int)>*> wq; + std::atomic<bool> wait_interrupt; + std::atomic<bool> stop; + std::atomic<int> waiting_threads; + std::mutex mtx; + std::condition_variable cv; +}; + +#endif |