diff options
-rw-r--r-- | mingui/main.cpp | 1 | ||||
-rw-r--r-- | thread_pool.hpp | 222 |
2 files changed, 121 insertions, 102 deletions
diff --git a/mingui/main.cpp b/mingui/main.cpp index 57c2fb1..8199cb9 100644 --- a/mingui/main.cpp +++ b/mingui/main.cpp @@ -136,7 +136,6 @@ int main(int argc, char **argv) w->show_images(build_list(lists[curlist])); w->update_distances(build_dists(lists[curlist])); w->update_viewstatus(curlist, lists.size()); - }); QObject::connect(w, &MinGuiWidget::prev, []{ diff --git a/thread_pool.hpp b/thread_pool.hpp index ee661ce..c28b5e8 100644 --- a/thread_pool.hpp +++ b/thread_pool.hpp @@ -14,114 +14,134 @@ template<typename T> class _atomic_queue { public: - void push(T&v) - { - std::unique_lock<std::mutex> lck(mtx); - q.push(v); - } - bool pop(T&v) - { - std::unique_lock<std::mutex> lck(mtx); - if(!q.empty()) - { - v=std::move(q.front()); - q.pop(); - return true; - } - return false; - } - size_t size() - { - std::unique_lock<std::mutex> lck(mtx); - return q.size(); - } + void push(T&v) + { + std::unique_lock<std::mutex> lck(mtx); + q.push(v); + } + bool pop(T&v) + { + std::unique_lock<std::mutex> lck(mtx); + if(!q.empty()) + { + v = std::move(q.front()); + q.pop(); + return true; + } + return false; + } + size_t size() + { + std::unique_lock<std::mutex> lck(mtx); + return q.size(); + } private: - std::queue<T> q; - std::mutex mtx; + std::queue<T> q; + std::mutex mtx; }; class thread_pool { public: - thread_pool(size_t njobs):waiting_threads(0),stop(false),wait_interrupt(false) - { - thr.resize(njobs); - thstop.resize(njobs); - for(size_t i=0;i<njobs;++i) - { - auto cstop=thstop[i]=std::make_shared<std::atomic<bool>>(false); - auto looper=[this,i,cstop]{ - std::atomic<bool>&stop=*cstop; - std::function<void(int)> *f; - bool popped=wq.pop(f); - while(1) - { - for(;popped;popped=wq.pop(f)) - { - std::unique_ptr<std::function<void(int)>> pf(f); - (*f)(i); - if(stop)return; - } - std::unique_lock<std::mutex> lck(mtx); - ++waiting_threads; - cv.wait(lck,[this,&f,&popped,&stop]{ - popped=wq.pop(f); - return popped||wait_interrupt||stop; - }); - --waiting_threads; - if(!popped)return; - } - }; - thr[i].reset(new std::thread(looper)); - } - } - template<typename F,typename...A> - auto create_task(F&&f,A&&...args)->std::future<decltype(f(0,args...))> - { - auto task=std::make_shared<std::packaged_task<decltype(f(0,args...))(int)>>( - std::bind(std::forward<F>(f),std::placeholders::_1,std::forward<A>(args)...) - ); - auto worktask=new std::function<void(int)>([task](int id){(*task)(id);}); - wq.push(worktask); - std::unique_lock<std::mutex> lck(mtx); - cv.notify_one(); - return task->get_future(); - } - void wait() - { - if(!stop)wait_interrupt=true; - { - std::unique_lock<std::mutex> lck(mtx); - cv.notify_all(); - } - for(size_t i=0;i<thr.size();++i)if(thr[i]->joinable())thr[i]->join(); - std::function<void(int)> *f; - while(wq.size()){wq.pop(f);delete f;} - thr.clear();thstop.clear(); - } - void terminate() - { - stop=true; - std::function<void(int)> *f; - while(wq.size()){wq.pop(f);delete f;} - for(size_t i=0;i<thstop.size();++i)*thstop[i]=true; - { - std::unique_lock<std::mutex> lck(mtx); - cv.notify_all(); - } - for(size_t i=0;i<thr.size();++i)if(thr[i]->joinable())thr[i]->join(); - while(wq.size()){wq.pop(f);delete f;} - thr.clear();thstop.clear(); - } + thread_pool(size_t njobs): waiting_threads(0), stop(false), wait_interrupt(false) + { + thr.resize(njobs); + thstop.resize(njobs); + for(size_t i = 0; i < njobs; ++i) + { + auto cstop = thstop[i] = std::make_shared<std::atomic<bool>>(false); + auto looper = [this, i, cstop] + { + std::atomic<bool>&stop = *cstop; + std::function<void(int)> *f; + bool popped = wq.pop(f); + while(1) + { + for(; popped; popped = wq.pop(f)) + { + std::unique_ptr<std::function<void(int)>> pf(f); + (*f)(i); + if(stop)return; + } + std::unique_lock<std::mutex> lck(mtx); + ++waiting_threads; + cv.wait(lck, [this, &f, &popped, &stop] + { + popped = wq.pop(f); + return popped || wait_interrupt || stop; + }); + --waiting_threads; + if(!popped)return; + } + }; + thr[i].reset(new std::thread(looper)); + } + } + template<typename F, typename...A> + auto create_task(F&&f, A&&...args)->std::future<decltype(f(0, args...))> + { + auto task = std::make_shared<std::packaged_task<decltype(f(0, args...))(int)>>( + std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<A>(args)...) + ); + auto worktask = new std::function<void(int)>([task](int id) + { + (*task)(id); + }); + wq.push(worktask); + std::unique_lock<std::mutex> lck(mtx); + cv.notify_one(); + return task->get_future(); + } + void wait() + { + if(!stop) + wait_interrupt = true; + { + std::unique_lock<std::mutex> lck(mtx); + cv.notify_all(); + } + for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); + std::function<void(int)> *f; + while(wq.size()) + { + wq.pop(f); + delete f; + } + thr.clear(); + thstop.clear(); + } + void terminate() + { + stop = true; + std::function<void(int)> *f; + while(wq.size()) + { + wq.pop(f); + delete f; + } + for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true; + { + std::unique_lock<std::mutex> lck(mtx); + cv.notify_all(); + } + for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); + while(wq.size()) + { + wq.pop(f); + delete f; + } + thr.clear(); + thstop.clear(); + } private: - std::vector<std::unique_ptr<std::thread>> thr; - std::vector<std::shared_ptr<std::atomic<bool>>> thstop; - _atomic_queue<std::function<void(int)>*> wq; - std::atomic<bool> wait_interrupt; - std::atomic<bool> stop; - std::atomic<int> waiting_threads; - std::mutex mtx; - std::condition_variable cv; + std::vector<std::unique_ptr<std::thread>> thr; + std::vector<std::shared_ptr<std::atomic<bool>>> thstop; + _atomic_queue<std::function<void(int)>*> wq; + std::atomic<bool> wait_interrupt; + std::atomic<bool> stop; + std::atomic<int> waiting_threads; + std::mutex mtx; + std::condition_variable cv; }; #endif |