aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Chris Xiong <chirs241097@gmail.com> 2022-09-04 19:48:08 -0400
committerGravatar Chris Xiong <chirs241097@gmail.com> 2022-09-04 19:48:08 -0400
commitad93efafe98029ff72dec710949ea168d648dbd7 (patch)
treefa4b7f4fc995085abb475fba3f04e009b213b73f
parent6e389e62d381f65602a890f03b32bcd41ad983d9 (diff)
downloaddeduper-ad93efafe98029ff72dec710949ea168d648dbd7.tar.xz
format thread_pool.hpp
-rw-r--r--mingui/main.cpp1
-rw-r--r--thread_pool.hpp222
2 files changed, 121 insertions, 102 deletions
diff --git a/mingui/main.cpp b/mingui/main.cpp
index 57c2fb1..8199cb9 100644
--- a/mingui/main.cpp
+++ b/mingui/main.cpp
@@ -136,7 +136,6 @@ int main(int argc, char **argv)
w->show_images(build_list(lists[curlist]));
w->update_distances(build_dists(lists[curlist]));
w->update_viewstatus(curlist, lists.size());
-
});
QObject::connect(w, &MinGuiWidget::prev,
[]{
diff --git a/thread_pool.hpp b/thread_pool.hpp
index ee661ce..c28b5e8 100644
--- a/thread_pool.hpp
+++ b/thread_pool.hpp
@@ -14,114 +14,134 @@ template<typename T>
class _atomic_queue
{
public:
- void push(T&v)
- {
- std::unique_lock<std::mutex> lck(mtx);
- q.push(v);
- }
- bool pop(T&v)
- {
- std::unique_lock<std::mutex> lck(mtx);
- if(!q.empty())
- {
- v=std::move(q.front());
- q.pop();
- return true;
- }
- return false;
- }
- size_t size()
- {
- std::unique_lock<std::mutex> lck(mtx);
- return q.size();
- }
+ void push(T&v)
+ {
+ std::unique_lock<std::mutex> lck(mtx);
+ q.push(v);
+ }
+ bool pop(T&v)
+ {
+ std::unique_lock<std::mutex> lck(mtx);
+ if(!q.empty())
+ {
+ v = std::move(q.front());
+ q.pop();
+ return true;
+ }
+ return false;
+ }
+ size_t size()
+ {
+ std::unique_lock<std::mutex> lck(mtx);
+ return q.size();
+ }
private:
- std::queue<T> q;
- std::mutex mtx;
+ std::queue<T> q;
+ std::mutex mtx;
};
class thread_pool
{
public:
- thread_pool(size_t njobs):waiting_threads(0),stop(false),wait_interrupt(false)
- {
- thr.resize(njobs);
- thstop.resize(njobs);
- for(size_t i=0;i<njobs;++i)
- {
- auto cstop=thstop[i]=std::make_shared<std::atomic<bool>>(false);
- auto looper=[this,i,cstop]{
- std::atomic<bool>&stop=*cstop;
- std::function<void(int)> *f;
- bool popped=wq.pop(f);
- while(1)
- {
- for(;popped;popped=wq.pop(f))
- {
- std::unique_ptr<std::function<void(int)>> pf(f);
- (*f)(i);
- if(stop)return;
- }
- std::unique_lock<std::mutex> lck(mtx);
- ++waiting_threads;
- cv.wait(lck,[this,&f,&popped,&stop]{
- popped=wq.pop(f);
- return popped||wait_interrupt||stop;
- });
- --waiting_threads;
- if(!popped)return;
- }
- };
- thr[i].reset(new std::thread(looper));
- }
- }
- template<typename F,typename...A>
- auto create_task(F&&f,A&&...args)->std::future<decltype(f(0,args...))>
- {
- auto task=std::make_shared<std::packaged_task<decltype(f(0,args...))(int)>>(
- std::bind(std::forward<F>(f),std::placeholders::_1,std::forward<A>(args)...)
- );
- auto worktask=new std::function<void(int)>([task](int id){(*task)(id);});
- wq.push(worktask);
- std::unique_lock<std::mutex> lck(mtx);
- cv.notify_one();
- return task->get_future();
- }
- void wait()
- {
- if(!stop)wait_interrupt=true;
- {
- std::unique_lock<std::mutex> lck(mtx);
- cv.notify_all();
- }
- for(size_t i=0;i<thr.size();++i)if(thr[i]->joinable())thr[i]->join();
- std::function<void(int)> *f;
- while(wq.size()){wq.pop(f);delete f;}
- thr.clear();thstop.clear();
- }
- void terminate()
- {
- stop=true;
- std::function<void(int)> *f;
- while(wq.size()){wq.pop(f);delete f;}
- for(size_t i=0;i<thstop.size();++i)*thstop[i]=true;
- {
- std::unique_lock<std::mutex> lck(mtx);
- cv.notify_all();
- }
- for(size_t i=0;i<thr.size();++i)if(thr[i]->joinable())thr[i]->join();
- while(wq.size()){wq.pop(f);delete f;}
- thr.clear();thstop.clear();
- }
+ thread_pool(size_t njobs): waiting_threads(0), stop(false), wait_interrupt(false)
+ {
+ thr.resize(njobs);
+ thstop.resize(njobs);
+ for(size_t i = 0; i < njobs; ++i)
+ {
+ auto cstop = thstop[i] = std::make_shared<std::atomic<bool>>(false);
+ auto looper = [this, i, cstop]
+ {
+ std::atomic<bool>&stop = *cstop;
+ std::function<void(int)> *f;
+ bool popped = wq.pop(f);
+ while(1)
+ {
+ for(; popped; popped = wq.pop(f))
+ {
+ std::unique_ptr<std::function<void(int)>> pf(f);
+ (*f)(i);
+ if(stop)return;
+ }
+ std::unique_lock<std::mutex> lck(mtx);
+ ++waiting_threads;
+ cv.wait(lck, [this, &f, &popped, &stop]
+ {
+ popped = wq.pop(f);
+ return popped || wait_interrupt || stop;
+ });
+ --waiting_threads;
+ if(!popped)return;
+ }
+ };
+ thr[i].reset(new std::thread(looper));
+ }
+ }
+ template<typename F, typename...A>
+ auto create_task(F&&f, A&&...args)->std::future<decltype(f(0, args...))>
+ {
+ auto task = std::make_shared<std::packaged_task<decltype(f(0, args...))(int)>>(
+ std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<A>(args)...)
+ );
+ auto worktask = new std::function<void(int)>([task](int id)
+ {
+ (*task)(id);
+ });
+ wq.push(worktask);
+ std::unique_lock<std::mutex> lck(mtx);
+ cv.notify_one();
+ return task->get_future();
+ }
+ void wait()
+ {
+ if(!stop)
+ wait_interrupt = true;
+ {
+ std::unique_lock<std::mutex> lck(mtx);
+ cv.notify_all();
+ }
+ for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join();
+ std::function<void(int)> *f;
+ while(wq.size())
+ {
+ wq.pop(f);
+ delete f;
+ }
+ thr.clear();
+ thstop.clear();
+ }
+ void terminate()
+ {
+ stop = true;
+ std::function<void(int)> *f;
+ while(wq.size())
+ {
+ wq.pop(f);
+ delete f;
+ }
+ for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true;
+ {
+ std::unique_lock<std::mutex> lck(mtx);
+ cv.notify_all();
+ }
+ for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join();
+ while(wq.size())
+ {
+ wq.pop(f);
+ delete f;
+ }
+ thr.clear();
+ thstop.clear();
+ }
private:
- std::vector<std::unique_ptr<std::thread>> thr;
- std::vector<std::shared_ptr<std::atomic<bool>>> thstop;
- _atomic_queue<std::function<void(int)>*> wq;
- std::atomic<bool> wait_interrupt;
- std::atomic<bool> stop;
- std::atomic<int> waiting_threads;
- std::mutex mtx;
- std::condition_variable cv;
+ std::vector<std::unique_ptr<std::thread>> thr;
+ std::vector<std::shared_ptr<std::atomic<bool>>> thstop;
+ _atomic_queue<std::function<void(int)>*> wq;
+ std::atomic<bool> wait_interrupt;
+ std::atomic<bool> stop;
+ std::atomic<int> waiting_threads;
+ std::mutex mtx;
+ std::condition_variable cv;
};
#endif