aboutsummaryrefslogtreecommitdiff
path: root/thread_pool.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'thread_pool.hpp')
-rw-r--r--thread_pool.hpp149
1 files changed, 0 insertions, 149 deletions
diff --git a/thread_pool.hpp b/thread_pool.hpp
deleted file mode 100644
index 6aea4ec..0000000
--- a/thread_pool.hpp
+++ /dev/null
@@ -1,149 +0,0 @@
-//Chris Xiong 2022
-//License: MPL-2.0
-#ifndef THREAD_POOL_H
-#define THREAD_POOL_H
-
-#include <atomic>
-#include <condition_variable>
-#include <functional>
-#include <future>
-#include <memory>
-#include <mutex>
-#include <queue>
-#include <thread>
-
-template<typename T>
-class _atomic_queue
-{
-public:
- void push(T&v)
- {
- std::unique_lock<std::mutex> lck(mtx);
- q.push(v);
- }
- bool pop(T&v)
- {
- std::unique_lock<std::mutex> lck(mtx);
- if(!q.empty())
- {
- v = std::move(q.front());
- q.pop();
- return true;
- }
- return false;
- }
- size_t size()
- {
- std::unique_lock<std::mutex> lck(mtx);
- return q.size();
- }
-private:
- std::queue<T> q;
- std::mutex mtx;
-};
-
-class thread_pool
-{
-public:
- thread_pool(size_t njobs): waiting_threads(0), stop(false), wait_interrupt(false)
- {
- thr.resize(njobs);
- thstop.resize(njobs);
- for(size_t i = 0; i < njobs; ++i)
- {
- auto cstop = thstop[i] = std::make_shared<std::atomic<bool>>(false);
- auto looper = [this, i, cstop]
- {
- std::atomic<bool>&stop = *cstop;
- std::function<void(int)> *f;
- bool popped = wq.pop(f);
- while(1)
- {
- for(; popped; popped = wq.pop(f))
- {
- std::unique_ptr<std::function<void(int)>> pf(f);
- (*f)(i);
- if(stop)return;
- }
- std::unique_lock<std::mutex> lck(mtx);
- ++waiting_threads;
- cv.wait(lck, [this, &f, &popped, &stop]
- {
- popped = wq.pop(f);
- return popped || wait_interrupt || stop;
- });
- --waiting_threads;
- if(!popped)return;
- }
- };
- thr[i].reset(new std::thread(looper));
- }
- }
- template<typename F, typename...A>
- auto create_task(F&&f, A&&...args)->std::future<decltype(f(0, args...))>
- {
- auto task = std::make_shared<std::packaged_task<decltype(f(0, args...))(int)>>(
- std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<A>(args)...)
- );
- auto worktask = new std::function<void(int)>([task](int id)
- {
- (*task)(id);
- });
- wq.push(worktask);
- std::unique_lock<std::mutex> lck(mtx);
- cv.notify_one();
- return task->get_future();
- }
- void wait()
- {
- if(!stop)
- wait_interrupt = true;
- {
- std::unique_lock<std::mutex> lck(mtx);
- cv.notify_all();
- }
- for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join();
- std::function<void(int)> *f;
- while(wq.size())
- {
- wq.pop(f);
- delete f;
- }
- thr.clear();
- thstop.clear();
- }
- void terminate()
- {
- stop = true;
- std::function<void(int)> *f;
- while(wq.size())
- {
- wq.pop(f);
- delete f;
- }
- for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true;
- {
- std::unique_lock<std::mutex> lck(mtx);
- cv.notify_all();
- }
- for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join();
- while(wq.size())
- {
- wq.pop(f);
- delete f;
- }
- thr.clear();
- thstop.clear();
- }
-private:
- std::vector<std::unique_ptr<std::thread>> thr;
- std::vector<std::shared_ptr<std::atomic<bool>>> thstop;
- _atomic_queue<std::function<void(int)>*> wq;
- std::atomic<bool> wait_interrupt;
- std::atomic<bool> stop;
- std::atomic<int> waiting_threads;
- std::mutex mtx;
- std::condition_variable cv;
-};
-
-#endif