From ed47c1557915bb2472f6959e723cd76155312a98 Mon Sep 17 00:00:00 2001 From: Chris Xiong Date: Mon, 6 Apr 2020 00:50:58 +0800 Subject: Add deduper (unfinished tool for finding image duplicates). --- deduper/thread_pool.h | 127 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 deduper/thread_pool.h (limited to 'deduper/thread_pool.h') diff --git a/deduper/thread_pool.h b/deduper/thread_pool.h new file mode 100644 index 0000000..ee661ce --- /dev/null +++ b/deduper/thread_pool.h @@ -0,0 +1,127 @@ +#ifndef THREAD_POOL_H +#define THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include +#include + +template +class _atomic_queue +{ +public: + void push(T&v) + { + std::unique_lock lck(mtx); + q.push(v); + } + bool pop(T&v) + { + std::unique_lock lck(mtx); + if(!q.empty()) + { + v=std::move(q.front()); + q.pop(); + return true; + } + return false; + } + size_t size() + { + std::unique_lock lck(mtx); + return q.size(); + } +private: + std::queue q; + std::mutex mtx; +}; + +class thread_pool +{ +public: + thread_pool(size_t njobs):waiting_threads(0),stop(false),wait_interrupt(false) + { + thr.resize(njobs); + thstop.resize(njobs); + for(size_t i=0;i>(false); + auto looper=[this,i,cstop]{ + std::atomic&stop=*cstop; + std::function *f; + bool popped=wq.pop(f); + while(1) + { + for(;popped;popped=wq.pop(f)) + { + std::unique_ptr> pf(f); + (*f)(i); + if(stop)return; + } + std::unique_lock lck(mtx); + ++waiting_threads; + cv.wait(lck,[this,&f,&popped,&stop]{ + popped=wq.pop(f); + return popped||wait_interrupt||stop; + }); + --waiting_threads; + if(!popped)return; + } + }; + thr[i].reset(new std::thread(looper)); + } + } + template + auto create_task(F&&f,A&&...args)->std::future + { + auto task=std::make_shared>( + std::bind(std::forward(f),std::placeholders::_1,std::forward(args)...) + ); + auto worktask=new std::function([task](int id){(*task)(id);}); + wq.push(worktask); + std::unique_lock lck(mtx); + cv.notify_one(); + return task->get_future(); + } + void wait() + { + if(!stop)wait_interrupt=true; + { + std::unique_lock lck(mtx); + cv.notify_all(); + } + for(size_t i=0;ijoinable())thr[i]->join(); + std::function *f; + while(wq.size()){wq.pop(f);delete f;} + thr.clear();thstop.clear(); + } + void terminate() + { + stop=true; + std::function *f; + while(wq.size()){wq.pop(f);delete f;} + for(size_t i=0;i lck(mtx); + cv.notify_all(); + } + for(size_t i=0;ijoinable())thr[i]->join(); + while(wq.size()){wq.pop(f);delete f;} + thr.clear();thstop.clear(); + } +private: + std::vector> thr; + std::vector>> thstop; + _atomic_queue*> wq; + std::atomic wait_interrupt; + std::atomic stop; + std::atomic waiting_threads; + std::mutex mtx; + std::condition_variable cv; +}; + +#endif -- cgit v1.2.3