diff options
| -rw-r--r-- | qdeduper/filescanner.cpp | 26 | ||||
| -rw-r--r-- | qdeduper/filescanner.hpp | 4 | ||||
| -rw-r--r-- | qdeduper/mingui.cpp | 23 | ||||
| -rw-r--r-- | qdeduper/mingui.hpp | 3 | ||||
| -rw-r--r-- | qdeduper/sigdb_qt.cpp | 5 | ||||
| -rw-r--r-- | qdeduper/sigdb_qt.hpp | 1 | ||||
| -rw-r--r-- | xsig/include/signature_db.hpp | 1 | ||||
| -rw-r--r-- | xsig/src/signature_db.cpp | 16 | ||||
| -rw-r--r-- | xsig/src/thread_pool.hpp | 56 | 
9 files changed, 102 insertions, 33 deletions
| diff --git a/qdeduper/filescanner.cpp b/qdeduper/filescanner.cpp index e7e45fc..141cfbf 100644 --- a/qdeduper/filescanner.cpp +++ b/qdeduper/filescanner.cpp @@ -6,9 +6,8 @@  using std::size_t; -FileScanner::FileScanner() : QObject(nullptr), maxmnlen(0) +FileScanner::FileScanner() : QObject(nullptr), maxmnlen(0), interrpt(false)  { -  }  void FileScanner::add_magic_number(const std::string &m) @@ -24,9 +23,13 @@ void FileScanner::add_path(const fs::path &p, bool recurse)  }  template <class T> -void dirit_foreach(T iter, std::function<void(const fs::directory_entry& p)> f) +void dirit_foreach(T iter, std::function<void(const fs::directory_entry& p)> f, std::atomic<bool> *inter)  { -    std::for_each(fs::begin(iter), fs::end(iter), f); +    for(auto &e : iter) +    { +        if (inter->load()) break; +        f(e); +    }  }  void FileScanner::scan() @@ -57,18 +60,29 @@ void FileScanner::scan()              bool recurse;              std::tie(p, recurse) = pe;              if (recurse) -                dirit_foreach(fs::recursive_directory_iterator(p, opt), f); +                dirit_foreach(fs::recursive_directory_iterator(p, opt), f, &interrpt);              else -                dirit_foreach(fs::directory_iterator(p, opt), f); +                dirit_foreach(fs::directory_iterator(p, opt), f, &interrpt);          }      };      for_all_paths(count_files); +    if (interrpt.load()) return;      Q_EMIT scan_done_prep(fcnt);      fcnt = 0;      for_all_paths(scan_file);  } +void FileScanner::interrupt() +{ +    interrpt.store(true); +} + +bool FileScanner::interrupted() +{ +    return interrpt.load(); +} +  std::vector<fs::path> FileScanner::file_list()  {      return ret; diff --git a/qdeduper/filescanner.hpp b/qdeduper/filescanner.hpp index 5d927a4..3df620e 100644 --- a/qdeduper/filescanner.hpp +++ b/qdeduper/filescanner.hpp @@ -1,6 +1,7 @@  #ifndef FILESCANNER_HPP  #define FILESCANNER_HPP +#include <atomic>  #include <filesystem>  #include <string>  #include <utility> @@ -16,11 +17,14 @@ class FileScanner : public QObject      std::vector<std::pair<fs::path, bool>> paths;      std::vector<fs::path> ret;      std::size_t maxmnlen; +    std::atomic<bool> interrpt;  public:      FileScanner();      void add_magic_number(const std::string &m);      void add_path(const fs::path &p, bool recurse = false);      void scan(); +    void interrupt(); +    bool interrupted();      std::vector<fs::path> file_list();  Q_SIGNALS:      void scan_done_prep(std::size_t nfiles); diff --git a/qdeduper/mingui.cpp b/qdeduper/mingui.cpp index 845bc70..4e4cc87 100644 --- a/qdeduper/mingui.cpp +++ b/qdeduper/mingui.cpp @@ -6,6 +6,7 @@  #include <cstdio>  #include <chrono> +#include <thread>  #include <cwchar>  #include <QDebug> @@ -175,10 +176,10 @@ DeduperMainWindow::DeduperMainWindow()  void DeduperMainWindow::setup_menu()  { -    QMenu *file = this->menuBar()->addMenu("File"); -    QMenu *view = this->menuBar()->addMenu("View"); -    QMenu *mark = this->menuBar()->addMenu("Marks"); -    QMenu *help = this->menuBar()->addMenu("Help"); +    QMenu *file = this->menuBar()->addMenu("&File"); +    QMenu *view = this->menuBar()->addMenu("&View"); +    QMenu *mark = this->menuBar()->addMenu("&Marks"); +    QMenu *help = this->menuBar()->addMenu("&Help");      QAction *create_db = file->addAction("Create Database...");      QObject::connect(create_db, &QAction::triggered, this, &DeduperMainWindow::create_new); @@ -396,6 +397,7 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths)      this->pd->setMaximum(0);      auto f = QtConcurrent::run([this, paths] {          FileScanner *fs = new FileScanner(); +        this->fsc = fs;          std::for_each(paths.begin(), paths.end(), [fs](auto p){fs->add_path(p.first, p.second);});          fs->add_magic_number("\x89PNG\r\n");          fs->add_magic_number("\xff\xd8\xff"); @@ -413,6 +415,12 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths)              }          }, Qt::ConnectionType::QueuedConnection);          fs->scan(); +        if (fs->interrupted()) +        { +            delete fs; +            this->fsc = nullptr; +            return; +        }          this->pd->setMaximum(fs->file_list().size() - 1);          this->pd->setLabelText("Scanning...");          this->sdb = new SignatureDB(); @@ -431,8 +439,9 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths)                  this->pd->setLabelText("Finalizing...");              }          }, Qt::ConnectionType::QueuedConnection); -        this->sdb->scan_files(fs->file_list(), 8); +        this->sdb->scan_files(fs->file_list(), std::thread::hardware_concurrency());          delete fs; +        this->fsc = nullptr;      });      QFutureWatcher<void> *fw = new QFutureWatcher<void>(this);      fw->setFuture(f); @@ -442,6 +451,10 @@ void DeduperMainWindow::scan_dirs(std::vector<std::pair<fs::path, bool>> paths)          this->curgroup = 0;          this->show_group(this->curgroup);      }, Qt::ConnectionType::QueuedConnection); +    QObject::connect(pd, &QProgressDialog::canceled, [this] { +        if (this->fsc) this->fsc->interrupt(); +        if (this->sdb) this->sdb->interrupt_scan(); +    });  }  void DeduperMainWindow::show_group(size_t gid) diff --git a/qdeduper/mingui.hpp b/qdeduper/mingui.hpp index 94f5bbd..e4b1c3f 100644 --- a/qdeduper/mingui.hpp +++ b/qdeduper/mingui.hpp @@ -10,6 +10,7 @@  #include <QMainWindow>  #include <QList> +#include "filescanner.hpp"  #include "sigdb_qt.hpp"  class QHBoxLayout; @@ -22,6 +23,7 @@ class QProgressDialog;  class QSplitter;  class QStandardItemModel;  class QToolBar; +class FileScanner;  class ImageItemDelegate;  namespace fs = std::filesystem; @@ -42,6 +44,7 @@ private:      ImageItemDelegate *id = nullptr;      QProgressDialog *pd = nullptr;      SignatureDB *sdb = nullptr; +    FileScanner *fsc = nullptr;      std::size_t curgroup;      bool nohotkeywarn; diff --git a/qdeduper/sigdb_qt.cpp b/qdeduper/sigdb_qt.cpp index 692f9c7..67cc3e6 100644 --- a/qdeduper/sigdb_qt.cpp +++ b/qdeduper/sigdb_qt.cpp @@ -91,6 +91,11 @@ void SignatureDB::scan_files(const std::vector<fs::path> &files, int njobs)      create_priv_struct();  } +void SignatureDB::interrupt_scan() +{ +    if (sdb) +        sdb->populate_interrupt(); +}  size_t SignatureDB::num_groups()  { diff --git a/qdeduper/sigdb_qt.hpp b/qdeduper/sigdb_qt.hpp index 070662d..112ffa9 100644 --- a/qdeduper/sigdb_qt.hpp +++ b/qdeduper/sigdb_qt.hpp @@ -32,6 +32,7 @@ public:      bool valid();      void scan_files(const std::vector<fs::path> &files, int njobs); +    void interrupt_scan();      size_t num_groups();      std::vector<size_t> get_group(size_t gid);      std::map<std::pair<size_t, size_t>, double> group_distances(size_t gid); diff --git a/xsig/include/signature_db.hpp b/xsig/include/signature_db.hpp index 107aa90..a74e90b 100644 --- a/xsig/include/signature_db.hpp +++ b/xsig/include/signature_db.hpp @@ -85,6 +85,7 @@ public:      bool from_db_file(const fs::path &path);      void populate(const std::vector<fs::path> &paths, const populate_cfg_t &cfg); +    void populate_interrupt();      //disjoint set for keeping similar images in the same group      //some of these probably shouldn't be public. TBD... diff --git a/xsig/src/signature_db.cpp b/xsig/src/signature_db.cpp index ba1a372..6b328d6 100644 --- a/xsig/src/signature_db.cpp +++ b/xsig/src/signature_db.cpp @@ -28,6 +28,7 @@ struct signature_db_priv      sqlite3 *db;      sqlite3_mutex *mtx;      sqlite3_stmt *bst[batch_status::BATCH_STATUS_MAX]; +    thread_pool *tp;      void init_db();      bool verify_db(); @@ -124,6 +125,8 @@ signature_db::signature_db(const fs::path &dbpath)      p->mtx = sqlite3_db_mutex(p->db);      for (int i = 0; i < batch_status::BATCH_STATUS_MAX; ++i)          p->bst[i] = nullptr; +    p->tp = nullptr; +      if (!p->verify_db())      {          sqlite3_close(p->db); @@ -429,12 +432,19 @@ void signature_db::populate(const std::vector<fs::path> &paths, const populate_c          cfg.callback(count.load(), thid);      }; -    thread_pool tp(cfg.njobs); +    p->tp = new thread_pool(cfg.njobs);      for(size_t i = 0; i < paths.size(); ++i)      { -        tp.create_task(job_func, paths[i]); +        p->tp->create_task(job_func, paths[i]);      } -    tp.wait(); +    p->tp->wait(); +    delete p->tp; +    p->tp = nullptr; +} +void signature_db::populate_interrupt() +{ +    if (p->tp) +        p->tp->terminate();  }  void signature_db::ds_init() diff --git a/xsig/src/thread_pool.hpp b/xsig/src/thread_pool.hpp index 42b245a..de78721 100644 --- a/xsig/src/thread_pool.hpp +++ b/xsig/src/thread_pool.hpp @@ -24,7 +24,7 @@ public:      bool pop(T&v)      {          std::unique_lock<std::mutex> lck(mtx); -        if(!q.empty()) +        if (!q.empty())          {              v = std::move(q.front());              q.pop(); @@ -49,23 +49,25 @@ public:      {          thr.resize(njobs);          thstop.resize(njobs); -        for(size_t i = 0; i < njobs; ++i) +        for (size_t i = 0; i < njobs; ++i)          {              auto cstop = thstop[i] = std::make_shared<std::atomic<bool>>(false);              auto looper = [this, i, cstop]              { -                std::atomic<bool>&stop = *cstop; +                std::atomic<bool> &stop = *cstop;                  std::function<void(int)> *f;                  bool popped = wq.pop(f); -                while(1) +                while (1)                  { -                    for(; popped; popped = wq.pop(f)) +                    for (; popped; popped = wq.pop(f))                      {                          std::unique_ptr<std::function<void(int)>> pf(f);                          (*f)(i); -                        if(stop)return; +                        if (stop.load()) return;                      } +                      std::unique_lock<std::mutex> lck(mtx); +                      ++waiting_threads;                      cv.wait(lck, [this, &f, &popped, &stop]                      { @@ -73,10 +75,11 @@ public:                          return popped || wait_interrupt || stop;                      });                      --waiting_threads; -                    if(!popped)return; +                    if (!popped) return;                  }              };              thr[i].reset(new std::thread(looper)); +            pthread_setname_np(thr[i]->native_handle(), (std::string("thrpool #") + std::to_string(i)).c_str());          }      }      template<typename F, typename...A> @@ -96,15 +99,20 @@ public:      }      void wait()      { -        if(!stop) -            wait_interrupt = true; +        if (!stop) wait_interrupt = true; +          {              std::unique_lock<std::mutex> lck(mtx);              cv.notify_all();          } -        for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); +        for (size_t i = 0; i < thr.size(); ++i) +        { +            if (thr[i]->joinable()) +                thr[i]->join(); +        } +          std::function<void(int)> *f = nullptr; -        while(wq.size()) +        while (wq.size())          {              wq.pop(f);              delete f; @@ -116,24 +124,34 @@ public:      {          stop = true;          std::function<void(int)> *f = nullptr; -        while(wq.size()) +        while (wq.size())          {              wq.pop(f);              delete f;          } -        for(size_t i = 0; i < thstop.size(); ++i)*thstop[i] = true; +        for (size_t i = 0; i < thstop.size(); ++i) +            *thstop[i] = true; +          {              std::unique_lock<std::mutex> lck(mtx);              cv.notify_all();          } -        for(size_t i = 0; i < thr.size(); ++i)if(thr[i]->joinable())thr[i]->join(); -        while(wq.size()) +        if (!wait_interrupt)          { -            wq.pop(f); -            delete f; +            for (size_t i = 0; i < thr.size(); ++i) +            { +                if (thr[i]->joinable()) +                    thr[i]->join(); +            } + +            while (wq.size()) +            { +                wq.pop(f); +                delete f; +            } +            thr.clear(); +            thstop.clear();          } -        thr.clear(); -        thstop.clear();      }  private:      std::vector<std::unique_ptr<std::thread>> thr; |