muduo网络库源码复现笔记(七):base库的ThreadPool.h

    技术2025-02-03  44

    Muduo网络库简介

    muduo 是一个基于 Reactor 模式的现代 C++ 网络库,作者陈硕。它采用非阻塞 IO 模型,基于事件驱动和回调,原生支持多核多线程,适合编写 Linux 服务端多线程网络应用程序。 muduo网络库的核心代码只有数千行,在网络编程技术学习的进阶阶段,muduo是一个非常值得学习的开源库。目前我也是刚刚开始学习这个网络库的源码,希望将这个学习过程记录下来。这个网络库的源码已经发布在GitHub上,可以点击这里阅读。目前Github上这份源码已经被作者用c++11重写,我学习的版本是没有使用c++11版本的。不过二者大同小异,核心思想是没有变化的。点这里可以看我的源代码,如果你对我之前的博客有兴趣,可以点击下面的连接: muduo网络库源码复现笔记(一):base库的Timestamp.h muduo网络库源码复现笔记(二):base库的Atomic.h muduo网络库源码复现笔记(三):base库的Exception.h muduo网络库源码复现笔记(四):base库的Thread.h和CurrentThread.h muduo网络库源码复现笔记(五):base库的Mutex.h和Condition.h和CoutntDownLatch.h muduo网络库源码复现笔记(六):base库的BlockingQueue.h和BoundedBlockingQueue.h

    ThreaedPool.h

    ThreadPool.h故名思义,这个文件封装了一个线程池类。线程池包含两个容器,第一个是线程容器,在线程容器中预备一定数量的线程备用;第二个是任务容器,当新的任务进入时,唤醒一个线程执行任务。看一下具体的代码:

    class ThreadPool : boost::noncopyable { public: typedef boost::function<void()> Task; explicit ThreadPool(const string& name=string()); ~ThreadPool(); void start(int numThreads); void stop(); void run(const Task& f); private: void runInThread(); Task take(); MutexLock mutex_;//锁 Condition cond_;//条件量 string name_;//名称 boost::ptr_vector<muduo::Thread> threads_;//线程容器 std::deque<Task> queue_;//任务容器 bool running_;//判定线程池是否在运行状态 }; #endif

    私有成员

    私有成员中,threads_和queue_就是前面提到的线程容器和任务容器,任务容器的类型是typedef boost::function<void()> Task;

    run函数

    run函数的作用是将要执行的任务加入任务队列。看一下代码

    void ThreadPool::run(const Task& task) { if(queue_.empty()) { task(); } else { MutexLockGuard lock(mutex_); queue_.push_back(task); cond_.notify(); } }

    代码不复杂,如果任务队列是空的,直接执行任务,否则就加入任务队列,唤醒线程来执行。

    take函数

    take函数是从任务容器中取出任务的函数,首先加锁,然后判断任务队列是否为空,若为空则等待,否则就拿出来,任务队列中删除这个任务。

    ThreadPool::Task ThreadPool::take() { MutexLockGuard lock(mutex_); while(queue_.empty() && running_) cond_.wait(); Task task; if(!queue_.empty()) { task = queue_.front(); queue_.pop_front(); } return task; }

    runInThread函数

    runInThread函数是线程中要执行的逻辑。如果线程池在工作状态,则使用take函数取出任务执行。

    void ThreadPool::runInThread() { try { while(running_) { Task task(take()); if(task) { task(); } } } catch(const Exception& ex) { fprintf(stderr,"excetion caught in ThreadPool %s\n",name_.c_str()); fprintf(stderr,"readson:%s\n",ex.what()); fprintf(stderr,"stack trace:%s\n",ex.stackTrace()); abort(); } catch(const std::exception& ex) { fprintf(stderr,"excetion caught in ThreadPool %s\n",name_.c_str()); fprintf(stderr,"readson:%s\n",ex.what()); } catch(...) { fprintf(stderr,"unknow exception in ThreadPool\n",name_.c_str()); } }

    start函数

    start函数是暴露给用户的接口,使用这个接口用户可以开启numThreads个接口,

    void ThreadPool::start(int numThreads) { assert(threads_.empty()); running_ = true; threads_.reserve(numThreads); for(int i = 0; i < numThreads; ++i) { char id[32]; snprintf(id,sizeof id,"%d",i); threads_.push_back(new Thread(boost::bind(&ThreadPool::runInThread,this),name_ + id)); threads_[i].start(); } }

    在线程容器中加入线程时,绑定了runInThread函数。

    stop函数

    stop函数用于停止线程池的工作状态,将running_置为false,唤醒所有线程,执行join函数

    void ThreadPool::stop() { { MutexLockGuard lock(mutex_); running_ = false; cond_.notifyAll(); } for_each(threads_.begin(),threads_.end(),boost::bind(&Thread::join,_1)); }
    Processed: 0.011, SQL: 9