From 683c66d81898e1d7d4cb814a5740169529c3313e Mon Sep 17 00:00:00 2001 From: Chris Xiong Date: Mon, 19 Sep 2022 18:26:36 -0400 Subject: Scanning can now be cancelled. Fix terminate() of thread pool blocking if wait() is already called. --- xsig/src/thread_pool.hpp | 56 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 37 insertions(+), 19 deletions(-) (limited to 'xsig/src/thread_pool.hpp') diff --git a/xsig/src/thread_pool.hpp b/xsig/src/thread_pool.hpp index 42b245a..de78721 100644 --- a/xsig/src/thread_pool.hpp +++ b/xsig/src/thread_pool.hpp @@ -24,7 +24,7 @@ public: bool pop(T&v) { std::unique_lock lck(mtx); - if(!q.empty()) + if (!q.empty()) { v = std::move(q.front()); q.pop(); @@ -49,23 +49,25 @@ public: { thr.resize(njobs); thstop.resize(njobs); - for(size_t i = 0; i < njobs; ++i) + for (size_t i = 0; i < njobs; ++i) { auto cstop = thstop[i] = std::make_shared>(false); auto looper = [this, i, cstop] { - std::atomic&stop = *cstop; + std::atomic &stop = *cstop; std::function *f; bool popped = wq.pop(f); - while(1) + while (1) { - for(; popped; popped = wq.pop(f)) + for (; popped; popped = wq.pop(f)) { std::unique_ptr> pf(f); (*f)(i); - if(stop)return; + if (stop.load()) return; } + std::unique_lock lck(mtx); + ++waiting_threads; cv.wait(lck, [this, &f, &popped, &stop] { @@ -73,10 +75,11 @@ public: return popped || wait_interrupt || stop; }); --waiting_threads; - if(!popped)return; + if (!popped) return; } }; thr[i].reset(new std::thread(looper)); + pthread_setname_np(thr[i]->native_handle(), (std::string("thrpool #") + std::to_string(i)).c_str()); } } template @@ -96,15 +99,20 @@ public: } void wait() { - if(!stop) - wait_interrupt = true; + if (!stop) wait_interrupt = true; + { std::unique_lock lck(mtx); cv.notify_all(); } - for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); + for (size_t i = 0; i < thr.size(); ++i) + { + if (thr[i]->joinable()) + thr[i]->join(); + } + std::function *f = nullptr; - while(wq.size()) + while (wq.size()) { wq.pop(f); delete f; @@ -116,24 +124,34 @@ public: { stop = true; std::function *f = nullptr; - while(wq.size()) + while (wq.size()) { wq.pop(f); delete f; } - for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true; + for (size_t i = 0; i < thstop.size(); ++i) + *thstop[i] = true; + { std::unique_lock lck(mtx); cv.notify_all(); } - for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); - while(wq.size()) + if (!wait_interrupt) { - wq.pop(f); - delete f; + for (size_t i = 0; i < thr.size(); ++i) + { + if (thr[i]->joinable()) + thr[i]->join(); + } + + while (wq.size()) + { + wq.pop(f); + delete f; + } + thr.clear(); + thstop.clear(); } - thr.clear(); - thstop.clear(); } private: std::vector> thr; -- cgit v1.2.3