aboutsummaryrefslogtreecommitdiff
path: root/xsig
diff options
context:
space:
mode:
authorGravatar Chris Xiong <chirs241097@gmail.com> 2022-09-19 18:26:36 -0400
committerGravatar Chris Xiong <chirs241097@gmail.com> 2022-09-19 18:27:15 -0400
commit683c66d81898e1d7d4cb814a5740169529c3313e (patch)
tree2bd09916fce34141b3ce00822f816485fe1cc8dd /xsig
parent817075ec9bd6bd656a4a6dc23363e950474b21d5 (diff)
downloaddeduper-683c66d81898e1d7d4cb814a5740169529c3313e.tar.xz
Scanning can now be cancelled.
Fix terminate() of thread pool blocking if wait() is already called.
Diffstat (limited to 'xsig')
-rw-r--r--xsig/include/signature_db.hpp1
-rw-r--r--xsig/src/signature_db.cpp16
-rw-r--r--xsig/src/thread_pool.hpp56
3 files changed, 51 insertions, 22 deletions
diff --git a/xsig/include/signature_db.hpp b/xsig/include/signature_db.hpp
index 107aa90..a74e90b 100644
--- a/xsig/include/signature_db.hpp
+++ b/xsig/include/signature_db.hpp
@@ -85,6 +85,7 @@ public:
bool from_db_file(const fs::path &path);
void populate(const std::vector<fs::path> &paths, const populate_cfg_t &cfg);
+ void populate_interrupt();
//disjoint set for keeping similar images in the same group
//some of these probably shouldn't be public. TBD...
diff --git a/xsig/src/signature_db.cpp b/xsig/src/signature_db.cpp
index ba1a372..6b328d6 100644
--- a/xsig/src/signature_db.cpp
+++ b/xsig/src/signature_db.cpp
@@ -28,6 +28,7 @@ struct signature_db_priv
sqlite3 *db;
sqlite3_mutex *mtx;
sqlite3_stmt *bst[batch_status::BATCH_STATUS_MAX];
+ thread_pool *tp;
void init_db();
bool verify_db();
@@ -124,6 +125,8 @@ signature_db::signature_db(const fs::path &dbpath)
p->mtx = sqlite3_db_mutex(p->db);
for (int i = 0; i < batch_status::BATCH_STATUS_MAX; ++i)
p->bst[i] = nullptr;
+ p->tp = nullptr;
+
if (!p->verify_db())
{
sqlite3_close(p->db);
@@ -429,12 +432,19 @@ void signature_db::populate(const std::vector<fs::path> &paths, const populate_c
cfg.callback(count.load(), thid);
};
- thread_pool tp(cfg.njobs);
+ p->tp = new thread_pool(cfg.njobs);
for(size_t i = 0; i < paths.size(); ++i)
{
- tp.create_task(job_func, paths[i]);
+ p->tp->create_task(job_func, paths[i]);
}
- tp.wait();
+ p->tp->wait();
+ delete p->tp;
+ p->tp = nullptr;
+}
+void signature_db::populate_interrupt()
+{
+ if (p->tp)
+ p->tp->terminate();
}
void signature_db::ds_init()
diff --git a/xsig/src/thread_pool.hpp b/xsig/src/thread_pool.hpp
index 42b245a..de78721 100644
--- a/xsig/src/thread_pool.hpp
+++ b/xsig/src/thread_pool.hpp
@@ -24,7 +24,7 @@ public:
bool pop(T&v)
{
std::unique_lock<std::mutex> lck(mtx);
- if(!q.empty())
+ if (!q.empty())
{
v = std::move(q.front());
q.pop();
@@ -49,23 +49,25 @@ public:
{
thr.resize(njobs);
thstop.resize(njobs);
- for(size_t i = 0; i < njobs; ++i)
+ for (size_t i = 0; i < njobs; ++i)
{
auto cstop = thstop[i] = std::make_shared<std::atomic<bool>>(false);
auto looper = [this, i, cstop]
{
- std::atomic<bool>&stop = *cstop;
+ std::atomic<bool> &stop = *cstop;
std::function<void(int)> *f;
bool popped = wq.pop(f);
- while(1)
+ while (1)
{
- for(; popped; popped = wq.pop(f))
+ for (; popped; popped = wq.pop(f))
{
std::unique_ptr<std::function<void(int)>> pf(f);
(*f)(i);
- if(stop)return;
+ if (stop.load()) return;
}
+
std::unique_lock<std::mutex> lck(mtx);
+
++waiting_threads;
cv.wait(lck, [this, &f, &popped, &stop]
{
@@ -73,10 +75,11 @@ public:
return popped || wait_interrupt || stop;
});
--waiting_threads;
- if(!popped)return;
+ if (!popped) return;
}
};
thr[i].reset(new std::thread(looper));
+ pthread_setname_np(thr[i]->native_handle(), (std::string("thrpool #") + std::to_string(i)).c_str());
}
}
template<typename F, typename...A>
@@ -96,15 +99,20 @@ public:
}
void wait()
{
- if(!stop)
- wait_interrupt = true;
+ if (!stop) wait_interrupt = true;
+
{
std::unique_lock<std::mutex> lck(mtx);
cv.notify_all();
}
- for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join();
+ for (size_t i = 0; i < thr.size(); ++i)
+ {
+ if (thr[i]->joinable())
+ thr[i]->join();
+ }
+
std::function<void(int)> *f = nullptr;
- while(wq.size())
+ while (wq.size())
{
wq.pop(f);
delete f;
@@ -116,24 +124,34 @@ public:
{
stop = true;
std::function<void(int)> *f = nullptr;
- while(wq.size())
+ while (wq.size())
{
wq.pop(f);
delete f;
}
- for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true;
+ for (size_t i = 0; i < thstop.size(); ++i)
+ *thstop[i] = true;
+
{
std::unique_lock<std::mutex> lck(mtx);
cv.notify_all();
}
- for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join();
- while(wq.size())
+ if (!wait_interrupt)
{
- wq.pop(f);
- delete f;
+ for (size_t i = 0; i < thr.size(); ++i)
+ {
+ if (thr[i]->joinable())
+ thr[i]->join();
+ }
+
+ while (wq.size())
+ {
+ wq.pop(f);
+ delete f;
+ }
+ thr.clear();
+ thstop.clear();
}
- thr.clear();
- thstop.clear();
}
private:
std::vector<std::unique_ptr<std::thread>> thr;