C++ 九阴真经之线程池

    技术2022-07-10  84

    线程池的作用:

        1、 避免了在处理短时间任务时创建与销毁线程的代价;

        2、在大批量并发任务中,可以更合理的使用系统资源,进行削峰平谷,保证系统运行的稳定性;

    基于C++11标准构建线程池,具有以下优势;

        1、任务无需继承接口

        2、支持 lambada表达式

        3、支持全局函数、静态成员函数;

        4、使用bind支持成员函数;

     

    class ThreadObject : public QueueObject { public: using QueueObject::QueueObject; ~ThreadObject() { Stop(); for (std::thread& thread : m_pThreadPool) { if (thread.joinable()) thread.join(); // 等待任务结束, 前提:线程一定会执行完 } } int ThreadCount() { return m_pThreadPool.size(); } //空闲线程数量 int IdlCount() { return m_idlThrNum; } protected: //空闲线程数量 std::atomic<int> m_idlThrNum ; std::vector<std::thread> m_pThreadPool; }; //线程池 class CThreadPool : public ThreadObject { public: using ThreadObject::ThreadObject; //启动默认线程池 void Start(int nThreadNum = 1) { m_idlThrNum = nThreadNum < 1 ? 1 : nThreadNum; for (auto size = 0; size < nThreadNum; ++size) { //初始化线程数量 m_pThreadPool.emplace_back( [this] { // 工作线程函数 while (!this->m_bStop) { std::function<void(void)> task; { // 获取一个待执行的 task // unique_lock 相比 lock_guard 的好处是:可以随时 unlock() 和 lock() std::unique_lock<std::mutex> lock(m_mu); this->m_condPop.wait(lock, [this] { return this->m_bStop.load() || !this->m_taskQueue.empty(); } ); // wait 直到有 task if (this->m_bStop && this->m_taskQueue.empty()) return; task = std::move(this->m_taskQueue.front()); // 取一个 task this->m_taskQueue.pop(); } //通知写线程 m_condPush.notify_one(); m_idlThrNum--; task(); m_idlThrNum++; } } ); } } //提交任务到线程池 template<class F, class... Args> auto Commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> { using return_type = decltype(f(args...)); auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(m_mu); //不允许向已停止的线程池提交作业 if (m_bStop) throw std::runtime_error("向已停止的线程池提交作业"); while (m_taskQueue.size() == m_nCapacity) //队列已满 { m_condPush.wait(m_mu); //等待,将暂时的解锁 } m_taskQueue.emplace([task]() { (*task)(); }); } m_condPop.notify_one(); return res; } virtual size_t GetTaskNum() { return m_taskQueue.size(); } private: std::queue<std::function<void()>> m_taskQueue; //队列 };

    测试代码:

    void Func1(int i, const std::string& msg) { std::cout << i << "-->" << msg<< std::endl; } int Func2(int i, const std::string& msg) { std::cout << i << "-->" << msg << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(1000)); return i * 100; } class PrintTest { public: void Func1(int i, const std::string& msg) { std::cout << i << "-->" << msg << std::endl; } void Func2(int i) { std::cout << i << "-->" << "测试成员函数" << std::endl; } }; int main() { CThreadPool pool; pool.Start(2); //测试lambda表达式 pool.Commit([] {std::cout << "测试lambda表达式" << std::endl; }); //测试带参数的lambda表达式 pool.Commit([](int val){std::cout << "测试带参数的lambda表达式" << "-->" << val << std::endl; }, 999); //测试全局函数 pool.Commit(Func1, 100, "测试全局函数"); //测试成员函数 PrintTest p; pool.Commit(std::mem_fn(&PrintTest::Func1),&p , 200, "测试成员函数"); //测试同步获取结果 auto res = pool.Commit(Func2, 300, "测试同步获取结果"); auto val = res.get(); std::cout << "获取到结果:" << val << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(5000)); return 0; }
    Processed: 0.012, SQL: 9