aboutsummaryrefslogblamecommitdiff
path: root/xsig/src/thread_pool.hpp
blob: de78721e3991c83aa16864e2d6a8003e67f960ef (plain) (tree)
1
2

                  















                             







                                              
                       











                                              
        

                    




                 



                                                                                     
                                          



                                                                                
                                                 

                                            
                         
                 
                                                      


                                                                        
                                                
                     
 
                                                          
 






                                                                
                                        


                                                  
                                                                                                                


















                                                                                                      

                                         



                                                  





                                               
                                              
                         









                       
                                              
                         



                      


                                                  



                                                  
                            
         












                                                   
         
     
        


                                                           
                                     

                                     

                               


      
//Chris Xiong 2022
//License: MPL-2.0
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>

template<typename T>
class _atomic_queue
{
public:
    void push(T&v)
    {
        std::unique_lock<std::mutex> lck(mtx);
        q.push(v);
    }
    bool pop(T&v)
    {
        std::unique_lock<std::mutex> lck(mtx);
        if (!q.empty())
        {
            v = std::move(q.front());
            q.pop();
            return true;
        }
        return false;
    }
    size_t size()
    {
        std::unique_lock<std::mutex> lck(mtx);
        return q.size();
    }
private:
    std::queue<T> q;
    std::mutex mtx;
};

class thread_pool
{
public:
    thread_pool(size_t njobs): waiting_threads(0), stop(false), wait_interrupt(false)
    {
        thr.resize(njobs);
        thstop.resize(njobs);
        for (size_t i = 0; i < njobs; ++i)
        {
            auto cstop = thstop[i] = std::make_shared<std::atomic<bool>>(false);
            auto looper = [this, i, cstop]
            {
                std::atomic<bool> &stop = *cstop;
                std::function<void(int)> *f;
                bool popped = wq.pop(f);
                while (1)
                {
                    for (; popped; popped = wq.pop(f))
                    {
                        std::unique_ptr<std::function<void(int)>> pf(f);
                        (*f)(i);
                        if (stop.load()) return;
                    }

                    std::unique_lock<std::mutex> lck(mtx);

                    ++waiting_threads;
                    cv.wait(lck, [this, &f, &popped, &stop]
                    {
                        popped = wq.pop(f);
                        return popped || wait_interrupt || stop;
                    });
                    --waiting_threads;
                    if (!popped) return;
                }
            };
            thr[i].reset(new std::thread(looper));
            pthread_setname_np(thr[i]->native_handle(), (std::string("thrpool #") + std::to_string(i)).c_str());
        }
    }
    template<typename F, typename...A>
    auto create_task(F&&f, A&&...args)->std::future<decltype(f(0, args...))>
    {
        auto task = std::make_shared<std::packaged_task<decltype(f(0, args...))(int)>>(
                        std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<A>(args)...)
                    );
        auto worktask = new std::function<void(int)>([task](int id)
        {
            (*task)(id);
        });
        wq.push(worktask);
        std::unique_lock<std::mutex> lck(mtx);
        cv.notify_one();
        return task->get_future();
    }
    void wait()
    {
        if (!stop) wait_interrupt = true;

        {
            std::unique_lock<std::mutex> lck(mtx);
            cv.notify_all();
        }
        for (size_t i = 0; i < thr.size(); ++i)
        {
            if (thr[i]->joinable())
                thr[i]->join();
        }

        std::function<void(int)> *f = nullptr;
        while (wq.size())
        {
            wq.pop(f);
            delete f;
        }
        thr.clear();
        thstop.clear();
    }
    void terminate()
    {
        stop = true;
        std::function<void(int)> *f = nullptr;
        while (wq.size())
        {
            wq.pop(f);
            delete f;
        }
        for (size_t i = 0; i < thstop.size(); ++i)
            *thstop[i] = true;

        {
            std::unique_lock<std::mutex> lck(mtx);
            cv.notify_all();
        }
        if (!wait_interrupt)
        {
            for (size_t i = 0; i < thr.size(); ++i)
            {
                if (thr[i]->joinable())
                    thr[i]->join();
            }

            while (wq.size())
            {
                wq.pop(f);
                delete f;
            }
            thr.clear();
            thstop.clear();
        }
    }
private:
    std::vector<std::unique_ptr<std::thread>> thr;
    std::vector<std::shared_ptr<std::atomic<bool>>> thstop;
    _atomic_queue<std::function<void(int)>*> wq;
    std::atomic<int> waiting_threads;
    std::atomic<bool> stop;
    std::atomic<bool> wait_interrupt;
    std::mutex mtx;
    std::condition_variable cv;
};

#endif