From 683c66d81898e1d7d4cb814a5740169529c3313e Mon Sep 17 00:00:00 2001 From: Chris Xiong Date: Mon, 19 Sep 2022 18:26:36 -0400 Subject: Scanning can now be cancelled. Fix terminate() of thread pool blocking if wait() is already called. --- xsig/src/signature_db.cpp | 16 +++++++++++--- xsig/src/thread_pool.hpp | 56 +++++++++++++++++++++++++++++++---------------- 2 files changed, 50 insertions(+), 22 deletions(-) (limited to 'xsig/src') diff --git a/xsig/src/signature_db.cpp b/xsig/src/signature_db.cpp index ba1a372..6b328d6 100644 --- a/xsig/src/signature_db.cpp +++ b/xsig/src/signature_db.cpp @@ -28,6 +28,7 @@ struct signature_db_priv sqlite3 *db; sqlite3_mutex *mtx; sqlite3_stmt *bst[batch_status::BATCH_STATUS_MAX]; + thread_pool *tp; void init_db(); bool verify_db(); @@ -124,6 +125,8 @@ signature_db::signature_db(const fs::path &dbpath) p->mtx = sqlite3_db_mutex(p->db); for (int i = 0; i < batch_status::BATCH_STATUS_MAX; ++i) p->bst[i] = nullptr; + p->tp = nullptr; + if (!p->verify_db()) { sqlite3_close(p->db); @@ -429,12 +432,19 @@ void signature_db::populate(const std::vector &paths, const populate_c cfg.callback(count.load(), thid); }; - thread_pool tp(cfg.njobs); + p->tp = new thread_pool(cfg.njobs); for(size_t i = 0; i < paths.size(); ++i) { - tp.create_task(job_func, paths[i]); + p->tp->create_task(job_func, paths[i]); } - tp.wait(); + p->tp->wait(); + delete p->tp; + p->tp = nullptr; +} +void signature_db::populate_interrupt() +{ + if (p->tp) + p->tp->terminate(); } void signature_db::ds_init() diff --git a/xsig/src/thread_pool.hpp b/xsig/src/thread_pool.hpp index 42b245a..de78721 100644 --- a/xsig/src/thread_pool.hpp +++ b/xsig/src/thread_pool.hpp @@ -24,7 +24,7 @@ public: bool pop(T&v) { std::unique_lock lck(mtx); - if(!q.empty()) + if (!q.empty()) { v = std::move(q.front()); q.pop(); @@ -49,23 +49,25 @@ public: { thr.resize(njobs); thstop.resize(njobs); - for(size_t i = 0; i < njobs; ++i) + for (size_t i = 0; i < njobs; ++i) { auto cstop = thstop[i] = std::make_shared>(false); auto looper = [this, i, cstop] { - std::atomic&stop = *cstop; + std::atomic &stop = *cstop; std::function *f; bool popped = wq.pop(f); - while(1) + while (1) { - for(; popped; popped = wq.pop(f)) + for (; popped; popped = wq.pop(f)) { std::unique_ptr> pf(f); (*f)(i); - if(stop)return; + if (stop.load()) return; } + std::unique_lock lck(mtx); + ++waiting_threads; cv.wait(lck, [this, &f, &popped, &stop] { @@ -73,10 +75,11 @@ public: return popped || wait_interrupt || stop; }); --waiting_threads; - if(!popped)return; + if (!popped) return; } }; thr[i].reset(new std::thread(looper)); + pthread_setname_np(thr[i]->native_handle(), (std::string("thrpool #") + std::to_string(i)).c_str()); } } template @@ -96,15 +99,20 @@ public: } void wait() { - if(!stop) - wait_interrupt = true; + if (!stop) wait_interrupt = true; + { std::unique_lock lck(mtx); cv.notify_all(); } - for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); + for (size_t i = 0; i < thr.size(); ++i) + { + if (thr[i]->joinable()) + thr[i]->join(); + } + std::function *f = nullptr; - while(wq.size()) + while (wq.size()) { wq.pop(f); delete f; @@ -116,24 +124,34 @@ public: { stop = true; std::function *f = nullptr; - while(wq.size()) + while (wq.size()) { wq.pop(f); delete f; } - for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true; + for (size_t i = 0; i < thstop.size(); ++i) + *thstop[i] = true; + { std::unique_lock lck(mtx); cv.notify_all(); } - for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); - while(wq.size()) + if (!wait_interrupt) { - wq.pop(f); - delete f; + for (size_t i = 0; i < thr.size(); ++i) + { + if (thr[i]->joinable()) + thr[i]->join(); + } + + while (wq.size()) + { + wq.pop(f); + delete f; + } + thr.clear(); + thstop.clear(); } - thr.clear(); - thstop.clear(); } private: std::vector> thr; -- cgit v1.2.3