aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Chris Xiong <chirs241097@gmail.com> 2022-09-19 18:26:36 -0400
committerGravatar Chris Xiong <chirs241097@gmail.com> 2022-09-19 18:27:15 -0400
commit683c66d81898e1d7d4cb814a5740169529c3313e (patch)
tree2bd09916fce34141b3ce00822f816485fe1cc8dd
parent817075ec9bd6bd656a4a6dc23363e950474b21d5 (diff)
downloaddeduper-683c66d81898e1d7d4cb814a5740169529c3313e.tar.xz
Scanning can now be cancelled.
Fix terminate() of thread pool blocking if wait() is already called.
-rw-r--r--qdeduper/filescanner.cpp26
-rw-r--r--qdeduper/filescanner.hpp4
-rw-r--r--qdeduper/mingui.cpp23
-rw-r--r--qdeduper/mingui.hpp3
-rw-r--r--qdeduper/sigdb_qt.cpp5
-rw-r--r--qdeduper/sigdb_qt.hpp1
-rw-r--r--xsig/include/signature_db.hpp1
-rw-r--r--xsig/src/signature_db.cpp16
-rw-r--r--xsig/src/thread_pool.hpp56
9 files changed, 102 insertions, 33 deletions
diff --git a/qdeduper/filescanner.cpp b/qdeduper/filescanner.cpp
index e7e45fc..141cfbf 100644
--- a/qdeduper/filescanner.cpp
+++ b/qdeduper/filescanner.cpp
@@ -6,9 +6,8 @@
using std::size_t;
-FileScanner::FileScanner() : QObject(nullptr), maxmnlen(0)
+FileScanner::FileScanner() : QObject(nullptr), maxmnlen(0), interrpt(false)
{
-
}
void FileScanner::add_magic_number(const std::string &m)
@@ -24,9 +23,13 @@ void FileScanner::add_path(const fs::path &p, bool recurse)
}
template <class T>
-void dirit_foreach(T iter, std::function<void(const fs::directory_entry& p)> f)
+void dirit_foreach(T iter, std::function<void(const fs::directory_entry& p)> f, std::atomic<bool> *inter)
{
- std::for_each(fs::begin(iter), fs::end(iter), f);
+ for(auto &e : iter)
+ {
+ if (inter->load()) break;
+ f(e);
+ }
}
void FileScanner::scan()
@@ -57,18 +60,29 @@ void FileScanner::scan()
bool recurse;
std::tie(p, recurse) = pe;
if (recurse)
- dirit_foreach(fs::recursive_directory_iterator(p, opt), f);
+ dirit_foreach(fs::recursive_directory_iterator(p, opt), f, &interrpt);
else
- dirit_foreach(fs::directory_iterator(p, opt), f);
+ dirit_foreach(fs::directory_iterator(p, opt), f, &interrpt);
}
};
for_all_paths(count_files);
+ if (interrpt.load()) return;
Q_EMIT scan_done_prep(fcnt);
fcnt = 0;
for_all_paths(scan_file);
}
+void FileScanner::interrupt()
+{
+ interrpt.store(true);
+}
+
+bool FileScanner::interrupted()
+{
+ return interrpt.load();
+}
+
std::vector<fs::path> FileScanner::file_list()
{
return ret;
diff --git a/qdeduper/filescanner.hpp b/qdeduper/filescanner.hpp
index 5d927a4..3df620e 100644
--- a/qdeduper/filescanner.hpp
+++ b/qdeduper/filescanner.hpp
@@ -1,6 +1,7 @@
#ifndef FILESCANNER_HPP
#define FILESCANNER_HPP
+#include <atomic>
#include <filesystem>
#include <string>
#include <utility>
@@ -16,11 +17,14 @@ class FileScanner : public QObject
std::vector<std::pair<fs::path, bool>> paths;
std::vector<fs::path> ret;
std::size_t maxmnlen;
+ std::atomic<bool> interrpt;
public:
FileScanner();
void add_magic_number(const std::string &m);
void add_path(const fs::path &p, bool recurse = false);
void scan();
+ void interrupt();
+ bool interrupted();
std::vector<fs::path> file_list();
Q_SIGNALS:
void scan_done_prep(std::size_t nfiles);
diff --git a/qdeduper/mingui.cpp b/qdeduper/mingui.cpp
index 845bc70..4e4cc87 100644
--- a/qdeduper/mingui.cpp
+++ b/qdeduper/mingui.cpp
@@ -6,6 +6,7 @@
#include <cstdio>
#include <chrono>
+#include <thread>
#include <cwchar>
#include <QDebug>
@@ -175,10 +176,10 @@ DeduperMainWindow::DeduperMainWindow()
void DeduperMainWindow::setup_menu()
{
- QMenu *file = this->menuBar()->addMenu("File");
- QMenu *view = this->menuBar()->addMenu("View");
- QMenu *mark = this->menuBar()->addMenu("Marks");
- QMenu *help = this->menuBar()->addMenu("Help");
+ QMenu *file = this->menuBar()->addMenu("&File");
+ QMenu *view = this->menuBar()->addMenu("&View");
+ QMenu *mark = this->menuBar()->addMenu("&Marks");
+ QMenu *help = this->menuBar()->addMenu("&Help");
QAction *create_db = file->addAction("Create Database...");
QObject::connect(create_db, &QAction::triggered, this, &DeduperMainWindow::create_new);
@@ -396,6 +397,7 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths)
this->pd->setMaximum(0);
auto f = QtConcurrent::run([this, paths] {
FileScanner *fs = new FileScanner();
+ this->fsc = fs;
std::for_each(paths.begin(), paths.end(), [fs](auto p){fs->add_path(p.first, p.second);});
fs->add_magic_number("\x89PNG\r\n");
fs->add_magic_number("\xff\xd8\xff");
@@ -413,6 +415,12 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths)
}
}, Qt::ConnectionType::QueuedConnection);
fs->scan();
+ if (fs->interrupted())
+ {
+ delete fs;
+ this->fsc = nullptr;
+ return;
+ }
this->pd->setMaximum(fs->file_list().size() - 1);
this->pd->setLabelText("Scanning...");
this->sdb = new SignatureDB();
@@ -431,8 +439,9 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths)
this->pd->setLabelText("Finalizing...");
}
}, Qt::ConnectionType::QueuedConnection);
- this->sdb->scan_files(fs->file_list(), 8);
+ this->sdb->scan_files(fs->file_list(), std::thread::hardware_concurrency());
delete fs;
+ this->fsc = nullptr;
});
QFutureWatcher<void> *fw = new QFutureWatcher<void>(this);
fw->setFuture(f);
@@ -442,6 +451,10 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths)
this->curgroup = 0;
this->show_group(this->curgroup);
}, Qt::ConnectionType::QueuedConnection);
+ QObject::connect(pd, &QProgressDialog::canceled, [this] {
+ if (this->fsc) this->fsc->interrupt();
+ if (this->sdb) this->sdb->interrupt_scan();
+ });
}
void DeduperMainWindow::show_group(size_t gid)
diff --git a/qdeduper/mingui.hpp b/qdeduper/mingui.hpp
index 94f5bbd..e4b1c3f 100644
--- a/qdeduper/mingui.hpp
+++ b/qdeduper/mingui.hpp
@@ -10,6 +10,7 @@
#include <QMainWindow>
#include <QList>
+#include "filescanner.hpp"
#include "sigdb_qt.hpp"
class QHBoxLayout;
@@ -22,6 +23,7 @@ class QProgressDialog;
class QSplitter;
class QStandardItemModel;
class QToolBar;
+class FileScanner;
class ImageItemDelegate;
namespace fs = std::filesystem;
@@ -42,6 +44,7 @@ private:
ImageItemDelegate *id = nullptr;
QProgressDialog *pd = nullptr;
SignatureDB *sdb = nullptr;
+ FileScanner *fsc = nullptr;
std::size_t curgroup;
bool nohotkeywarn;
diff --git a/qdeduper/sigdb_qt.cpp b/qdeduper/sigdb_qt.cpp
index 692f9c7..67cc3e6 100644
--- a/qdeduper/sigdb_qt.cpp
+++ b/qdeduper/sigdb_qt.cpp
@@ -91,6 +91,11 @@ void SignatureDB::scan_files(const std::vector<fs::path> &files, int njobs)
create_priv_struct();
}
+void SignatureDB::interrupt_scan()
+{
+ if (sdb)
+ sdb->populate_interrupt();
+}
size_t SignatureDB::num_groups()
{
diff --git a/qdeduper/sigdb_qt.hpp b/qdeduper/sigdb_qt.hpp
index 070662d..112ffa9 100644
--- a/qdeduper/sigdb_qt.hpp
+++ b/qdeduper/sigdb_qt.hpp
@@ -32,6 +32,7 @@ public:
bool valid();
void scan_files(const std::vector<fs::path> &files, int njobs);
+ void interrupt_scan();
size_t num_groups();
std::vector<size_t> get_group(size_t gid);
std::map<std::pair<size_t, size_t>, double> group_distances(size_t gid);
diff --git a/xsig/include/signature_db.hpp b/xsig/include/signature_db.hpp
index 107aa90..a74e90b 100644
--- a/xsig/include/signature_db.hpp
+++ b/xsig/include/signature_db.hpp
@@ -85,6 +85,7 @@ public:
bool from_db_file(const fs::path &path);
void populate(const std::vector<fs::path> &paths, const populate_cfg_t &cfg);
+ void populate_interrupt();
//disjoint set for keeping similar images in the same group
//some of these probably shouldn't be public. TBD...
diff --git a/xsig/src/signature_db.cpp b/xsig/src/signature_db.cpp
index ba1a372..6b328d6 100644
--- a/xsig/src/signature_db.cpp
+++ b/xsig/src/signature_db.cpp
@@ -28,6 +28,7 @@ struct signature_db_priv
sqlite3 *db;
sqlite3_mutex *mtx;
sqlite3_stmt *bst[batch_status::BATCH_STATUS_MAX];
+ thread_pool *tp;
void init_db();
bool verify_db();
@@ -124,6 +125,8 @@ signature_db::signature_db(const fs::path &dbpath)
p->mtx = sqlite3_db_mutex(p->db);
for (int i = 0; i < batch_status::BATCH_STATUS_MAX; ++i)
p->bst[i] = nullptr;
+ p->tp = nullptr;
+
if (!p->verify_db())
{
sqlite3_close(p->db);
@@ -429,12 +432,19 @@ void signature_db::populate(const std::vector<fs::path> &paths, const populate_c
cfg.callback(count.load(), thid);
};
- thread_pool tp(cfg.njobs);
+ p->tp = new thread_pool(cfg.njobs);
for(size_t i = 0; i < paths.size(); ++i)
{
- tp.create_task(job_func, paths[i]);
+ p->tp->create_task(job_func, paths[i]);
}
- tp.wait();
+ p->tp->wait();
+ delete p->tp;
+ p->tp = nullptr;
+}
+void signature_db::populate_interrupt()
+{
+ if (p->tp)
+ p->tp->terminate();
}
void signature_db::ds_init()
diff --git a/xsig/src/thread_pool.hpp b/xsig/src/thread_pool.hpp
index 42b245a..de78721 100644
--- a/xsig/src/thread_pool.hpp
+++ b/xsig/src/thread_pool.hpp
@@ -24,7 +24,7 @@ public:
bool pop(T&v)
{
std::unique_lock<std::mutex> lck(mtx);
- if(!q.empty())
+ if (!q.empty())
{
v = std::move(q.front());
q.pop();
@@ -49,23 +49,25 @@ public:
{
thr.resize(njobs);
thstop.resize(njobs);
- for(size_t i = 0; i < njobs; ++i)
+ for (size_t i = 0; i < njobs; ++i)
{
auto cstop = thstop[i] = std::make_shared<std::atomic<bool>>(false);
auto looper = [this, i, cstop]
{
- std::atomic<bool>&stop = *cstop;
+ std::atomic<bool> &stop = *cstop;
std::function<void(int)> *f;
bool popped = wq.pop(f);
- while(1)
+ while (1)
{
- for(; popped; popped = wq.pop(f))
+ for (; popped; popped = wq.pop(f))
{
std::unique_ptr<std::function<void(int)>> pf(f);
(*f)(i);
- if(stop)return;
+ if (stop.load()) return;
}
+
std::unique_lock<std::mutex> lck(mtx);
+
++waiting_threads;
cv.wait(lck, [this, &f, &popped, &stop]
{
@@ -73,10 +75,11 @@ public:
return popped || wait_interrupt || stop;
});
--waiting_threads;
- if(!popped)return;
+ if (!popped) return;
}
};
thr[i].reset(new std::thread(looper));
+ pthread_setname_np(thr[i]->native_handle(), (std::string("thrpool #") + std::to_string(i)).c_str());
}
}
template<typename F, typename...A>
@@ -96,15 +99,20 @@ public:
}
void wait()
{
- if(!stop)
- wait_interrupt = true;
+ if (!stop) wait_interrupt = true;
+
{
std::unique_lock<std::mutex> lck(mtx);
cv.notify_all();
}
- for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join();
+ for (size_t i = 0; i < thr.size(); ++i)
+ {
+ if (thr[i]->joinable())
+ thr[i]->join();
+ }
+
std::function<void(int)> *f = nullptr;
- while(wq.size())
+ while (wq.size())
{
wq.pop(f);
delete f;
@@ -116,24 +124,34 @@ public:
{
stop = true;
std::function<void(int)> *f = nullptr;
- while(wq.size())
+ while (wq.size())
{
wq.pop(f);
delete f;
}
- for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true;
+ for (size_t i = 0; i < thstop.size(); ++i)
+ *thstop[i] = true;
+
{
std::unique_lock<std::mutex> lck(mtx);
cv.notify_all();
}
- for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join();
- while(wq.size())
+ if (!wait_interrupt)
{
- wq.pop(f);
- delete f;
+ for (size_t i = 0; i < thr.size(); ++i)
+ {
+ if (thr[i]->joinable())
+ thr[i]->join();
+ }
+
+ while (wq.size())
+ {
+ wq.pop(f);
+ delete f;
+ }
+ thr.clear();
+ thstop.clear();
}
- thr.clear();
- thstop.clear();
}
private:
std::vector<std::unique_ptr<std::thread>> thr;