diff options
author | Chris Xiong <chirs241097@gmail.com> | 2022-09-19 18:26:36 -0400 |
---|---|---|
committer | Chris Xiong <chirs241097@gmail.com> | 2022-09-19 18:27:15 -0400 |
commit | 683c66d81898e1d7d4cb814a5740169529c3313e (patch) | |
tree | 2bd09916fce34141b3ce00822f816485fe1cc8dd | |
parent | 817075ec9bd6bd656a4a6dc23363e950474b21d5 (diff) | |
download | deduper-683c66d81898e1d7d4cb814a5740169529c3313e.tar.xz |
Scanning can now be cancelled.
Fix terminate() of thread pool blocking if wait() is already called.
-rw-r--r-- | qdeduper/filescanner.cpp | 26 | ||||
-rw-r--r-- | qdeduper/filescanner.hpp | 4 | ||||
-rw-r--r-- | qdeduper/mingui.cpp | 23 | ||||
-rw-r--r-- | qdeduper/mingui.hpp | 3 | ||||
-rw-r--r-- | qdeduper/sigdb_qt.cpp | 5 | ||||
-rw-r--r-- | qdeduper/sigdb_qt.hpp | 1 | ||||
-rw-r--r-- | xsig/include/signature_db.hpp | 1 | ||||
-rw-r--r-- | xsig/src/signature_db.cpp | 16 | ||||
-rw-r--r-- | xsig/src/thread_pool.hpp | 56 |
9 files changed, 102 insertions, 33 deletions
diff --git a/qdeduper/filescanner.cpp b/qdeduper/filescanner.cpp index e7e45fc..141cfbf 100644 --- a/qdeduper/filescanner.cpp +++ b/qdeduper/filescanner.cpp @@ -6,9 +6,8 @@ using std::size_t; -FileScanner::FileScanner() : QObject(nullptr), maxmnlen(0) +FileScanner::FileScanner() : QObject(nullptr), maxmnlen(0), interrpt(false) { - } void FileScanner::add_magic_number(const std::string &m) @@ -24,9 +23,13 @@ void FileScanner::add_path(const fs::path &p, bool recurse) } template <class T> -void dirit_foreach(T iter, std::function<void(const fs::directory_entry& p)> f) +void dirit_foreach(T iter, std::function<void(const fs::directory_entry& p)> f, std::atomic<bool> *inter) { - std::for_each(fs::begin(iter), fs::end(iter), f); + for(auto &e : iter) + { + if (inter->load()) break; + f(e); + } } void FileScanner::scan() @@ -57,18 +60,29 @@ void FileScanner::scan() bool recurse; std::tie(p, recurse) = pe; if (recurse) - dirit_foreach(fs::recursive_directory_iterator(p, opt), f); + dirit_foreach(fs::recursive_directory_iterator(p, opt), f, &interrpt); else - dirit_foreach(fs::directory_iterator(p, opt), f); + dirit_foreach(fs::directory_iterator(p, opt), f, &interrpt); } }; for_all_paths(count_files); + if (interrpt.load()) return; Q_EMIT scan_done_prep(fcnt); fcnt = 0; for_all_paths(scan_file); } +void FileScanner::interrupt() +{ + interrpt.store(true); +} + +bool FileScanner::interrupted() +{ + return interrpt.load(); +} + std::vector<fs::path> FileScanner::file_list() { return ret; diff --git a/qdeduper/filescanner.hpp b/qdeduper/filescanner.hpp index 5d927a4..3df620e 100644 --- a/qdeduper/filescanner.hpp +++ b/qdeduper/filescanner.hpp @@ -1,6 +1,7 @@ #ifndef FILESCANNER_HPP #define FILESCANNER_HPP +#include <atomic> #include <filesystem> #include <string> #include <utility> @@ -16,11 +17,14 @@ class FileScanner : public QObject std::vector<std::pair<fs::path, bool>> paths; std::vector<fs::path> ret; std::size_t maxmnlen; + std::atomic<bool> interrpt; public: FileScanner(); void add_magic_number(const std::string &m); void add_path(const fs::path &p, bool recurse = false); void scan(); + void interrupt(); + bool interrupted(); std::vector<fs::path> file_list(); Q_SIGNALS: void scan_done_prep(std::size_t nfiles); diff --git a/qdeduper/mingui.cpp b/qdeduper/mingui.cpp index 845bc70..4e4cc87 100644 --- a/qdeduper/mingui.cpp +++ b/qdeduper/mingui.cpp @@ -6,6 +6,7 @@ #include <cstdio> #include <chrono> +#include <thread> #include <cwchar> #include <QDebug> @@ -175,10 +176,10 @@ DeduperMainWindow::DeduperMainWindow() void DeduperMainWindow::setup_menu() { - QMenu *file = this->menuBar()->addMenu("File"); - QMenu *view = this->menuBar()->addMenu("View"); - QMenu *mark = this->menuBar()->addMenu("Marks"); - QMenu *help = this->menuBar()->addMenu("Help"); + QMenu *file = this->menuBar()->addMenu("&File"); + QMenu *view = this->menuBar()->addMenu("&View"); + QMenu *mark = this->menuBar()->addMenu("&Marks"); + QMenu *help = this->menuBar()->addMenu("&Help"); QAction *create_db = file->addAction("Create Database..."); QObject::connect(create_db, &QAction::triggered, this, &DeduperMainWindow::create_new); @@ -396,6 +397,7 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths) this->pd->setMaximum(0); auto f = QtConcurrent::run([this, paths] { FileScanner *fs = new FileScanner(); + this->fsc = fs; std::for_each(paths.begin(), paths.end(), [fs](auto p){fs->add_path(p.first, p.second);}); fs->add_magic_number("\x89PNG\r\n"); fs->add_magic_number("\xff\xd8\xff"); @@ -413,6 +415,12 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths) } }, Qt::ConnectionType::QueuedConnection); fs->scan(); + if (fs->interrupted()) + { + delete fs; + this->fsc = nullptr; + return; + } this->pd->setMaximum(fs->file_list().size() - 1); this->pd->setLabelText("Scanning..."); this->sdb = new SignatureDB(); @@ -431,8 +439,9 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths) this->pd->setLabelText("Finalizing..."); } }, Qt::ConnectionType::QueuedConnection); - this->sdb->scan_files(fs->file_list(), 8); + this->sdb->scan_files(fs->file_list(), std::thread::hardware_concurrency()); delete fs; + this->fsc = nullptr; }); QFutureWatcher<void> *fw = new QFutureWatcher<void>(this); fw->setFuture(f); @@ -442,6 +451,10 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths) this->curgroup = 0; this->show_group(this->curgroup); }, Qt::ConnectionType::QueuedConnection); + QObject::connect(pd, &QProgressDialog::canceled, [this] { + if (this->fsc) this->fsc->interrupt(); + if (this->sdb) this->sdb->interrupt_scan(); + }); } void DeduperMainWindow::show_group(size_t gid) diff --git a/qdeduper/mingui.hpp b/qdeduper/mingui.hpp index 94f5bbd..e4b1c3f 100644 --- a/qdeduper/mingui.hpp +++ b/qdeduper/mingui.hpp @@ -10,6 +10,7 @@ #include <QMainWindow> #include <QList> +#include "filescanner.hpp" #include "sigdb_qt.hpp" class QHBoxLayout; @@ -22,6 +23,7 @@ class QProgressDialog; class QSplitter; class QStandardItemModel; class QToolBar; +class FileScanner; class ImageItemDelegate; namespace fs = std::filesystem; @@ -42,6 +44,7 @@ private: ImageItemDelegate *id = nullptr; QProgressDialog *pd = nullptr; SignatureDB *sdb = nullptr; + FileScanner *fsc = nullptr; std::size_t curgroup; bool nohotkeywarn; diff --git a/qdeduper/sigdb_qt.cpp b/qdeduper/sigdb_qt.cpp index 692f9c7..67cc3e6 100644 --- a/qdeduper/sigdb_qt.cpp +++ b/qdeduper/sigdb_qt.cpp @@ -91,6 +91,11 @@ void SignatureDB::scan_files(const std::vector<fs::path> &files, int njobs) create_priv_struct(); } +void SignatureDB::interrupt_scan() +{ + if (sdb) + sdb->populate_interrupt(); +} size_t SignatureDB::num_groups() { diff --git a/qdeduper/sigdb_qt.hpp b/qdeduper/sigdb_qt.hpp index 070662d..112ffa9 100644 --- a/qdeduper/sigdb_qt.hpp +++ b/qdeduper/sigdb_qt.hpp @@ -32,6 +32,7 @@ public: bool valid(); void scan_files(const std::vector<fs::path> &files, int njobs); + void interrupt_scan(); size_t num_groups(); std::vector<size_t> get_group(size_t gid); std::map<std::pair<size_t, size_t>, double> group_distances(size_t gid); diff --git a/xsig/include/signature_db.hpp b/xsig/include/signature_db.hpp index 107aa90..a74e90b 100644 --- a/xsig/include/signature_db.hpp +++ b/xsig/include/signature_db.hpp @@ -85,6 +85,7 @@ public: bool from_db_file(const fs::path &path); void populate(const std::vector<fs::path> &paths, const populate_cfg_t &cfg); + void populate_interrupt(); //disjoint set for keeping similar images in the same group //some of these probably shouldn't be public. TBD... diff --git a/xsig/src/signature_db.cpp b/xsig/src/signature_db.cpp index ba1a372..6b328d6 100644 --- a/xsig/src/signature_db.cpp +++ b/xsig/src/signature_db.cpp @@ -28,6 +28,7 @@ struct signature_db_priv sqlite3 *db; sqlite3_mutex *mtx; sqlite3_stmt *bst[batch_status::BATCH_STATUS_MAX]; + thread_pool *tp; void init_db(); bool verify_db(); @@ -124,6 +125,8 @@ signature_db::signature_db(const fs::path &dbpath) p->mtx = sqlite3_db_mutex(p->db); for (int i = 0; i < batch_status::BATCH_STATUS_MAX; ++i) p->bst[i] = nullptr; + p->tp = nullptr; + if (!p->verify_db()) { sqlite3_close(p->db); @@ -429,12 +432,19 @@ void signature_db::populate(const std::vector<fs::path> &paths, const populate_c cfg.callback(count.load(), thid); }; - thread_pool tp(cfg.njobs); + p->tp = new thread_pool(cfg.njobs); for(size_t i = 0; i < paths.size(); ++i) { - tp.create_task(job_func, paths[i]); + p->tp->create_task(job_func, paths[i]); } - tp.wait(); + p->tp->wait(); + delete p->tp; + p->tp = nullptr; +} +void signature_db::populate_interrupt() +{ + if (p->tp) + p->tp->terminate(); } void signature_db::ds_init() diff --git a/xsig/src/thread_pool.hpp b/xsig/src/thread_pool.hpp index 42b245a..de78721 100644 --- a/xsig/src/thread_pool.hpp +++ b/xsig/src/thread_pool.hpp @@ -24,7 +24,7 @@ public: bool pop(T&v) { std::unique_lock<std::mutex> lck(mtx); - if(!q.empty()) + if (!q.empty()) { v = std::move(q.front()); q.pop(); @@ -49,23 +49,25 @@ public: { thr.resize(njobs); thstop.resize(njobs); - for(size_t i = 0; i < njobs; ++i) + for (size_t i = 0; i < njobs; ++i) { auto cstop = thstop[i] = std::make_shared<std::atomic<bool>>(false); auto looper = [this, i, cstop] { - std::atomic<bool>&stop = *cstop; + std::atomic<bool> &stop = *cstop; std::function<void(int)> *f; bool popped = wq.pop(f); - while(1) + while (1) { - for(; popped; popped = wq.pop(f)) + for (; popped; popped = wq.pop(f)) { std::unique_ptr<std::function<void(int)>> pf(f); (*f)(i); - if(stop)return; + if (stop.load()) return; } + std::unique_lock<std::mutex> lck(mtx); + ++waiting_threads; cv.wait(lck, [this, &f, &popped, &stop] { @@ -73,10 +75,11 @@ public: return popped || wait_interrupt || stop; }); --waiting_threads; - if(!popped)return; + if (!popped) return; } }; thr[i].reset(new std::thread(looper)); + pthread_setname_np(thr[i]->native_handle(), (std::string("thrpool #") + std::to_string(i)).c_str()); } } template<typename F, typename...A> @@ -96,15 +99,20 @@ public: } void wait() { - if(!stop) - wait_interrupt = true; + if (!stop) wait_interrupt = true; + { std::unique_lock<std::mutex> lck(mtx); cv.notify_all(); } - for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); + for (size_t i = 0; i < thr.size(); ++i) + { + if (thr[i]->joinable()) + thr[i]->join(); + } + std::function<void(int)> *f = nullptr; - while(wq.size()) + while (wq.size()) { wq.pop(f); delete f; @@ -116,24 +124,34 @@ public: { stop = true; std::function<void(int)> *f = nullptr; - while(wq.size()) + while (wq.size()) { wq.pop(f); delete f; } - for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true; + for (size_t i = 0; i < thstop.size(); ++i) + *thstop[i] = true; + { std::unique_lock<std::mutex> lck(mtx); cv.notify_all(); } - for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); - while(wq.size()) + if (!wait_interrupt) { - wq.pop(f); - delete f; + for (size_t i = 0; i < thr.size(); ++i) + { + if (thr[i]->joinable()) + thr[i]->join(); + } + + while (wq.size()) + { + wq.pop(f); + delete f; + } + thr.clear(); + thstop.clear(); } - thr.clear(); - thstop.clear(); } private: std::vector<std::unique_ptr<std::thread>> thr; |