头文件,threadpoll类定义;
#ifndef THREADDPOOL #define THREADDPOOL #include <vector> #include <utility> #include <queue> #include <thread> #include <functional> #include <mutex> #include<condition_variable> using namespace std; class threadpool { public: static const int kInitThreadsSize ; enum taskPriorityE { level0, level1, level2, }; typedef function<void()>Task; typedef pair<taskPriorityE, Task>TaskPair; threadpool(); ~threadpool(); void start(); void stop(); void addTask(const Task&); void addTaskPair(const TaskPair&); private: threadpool(const thread&); const threadpool& operator== (const thread&); struct cmp { bool operator()(const threadpool::TaskPair p1, const threadpool::TaskPair p2) { return p1.first > p2.first; } }; void threadfunc(); Task take(); typedef vector<thread*>threads_; typedef priority_queue<TaskPair, vector<TaskPair>, cmp>Tasks_; threads_ threads; Tasks_ tasks; mutex m; condition_variable cv; bool isStarted; }; #endif源文件,类成员函数的实现;
#include"threadpool.h" #include<assert.h> #include <cassert> #include<iostream> threadpool::threadpool():m(),cv(),isStarted(false) { } threadpool::~threadpool() { if (isStarted) stop(); } void threadpool::start() { assert(threads.empty()); isStarted = true; threads.reserve(kInitThreadsSize); for (int i = 1; i <= kInitThreadsSize; ++i) { threads.push_back(new thread(bind(&threadpool::threadfunc, this))); } } void threadpool::stop() { { unique_lock<mutex>lock(m); isStarted = false; cv.notify_all(); } { for (auto c : threads) { c->join(); delete(c); } threads.clear(); } } void threadpool::addTask(const Task& task) { TaskPair t(level2,task); addTaskPair(t); } void threadpool::addTaskPair(const TaskPair& taskpair) { unique_lock<mutex>lock(m); tasks.push(taskpair); cv.notify_one(); } void threadpool::threadfunc() { while (isStarted) { Task task = take(); if (task) { task(); } } } threadpool:: Task threadpool::take() { unique_lock<mutex>lock(m); while (isStarted&&tasks.empty()) cv.wait(lock); Task task; auto n = tasks.size(); if (isStarted && !tasks.empty()) { task = tasks.top().second; tasks.pop(); assert(n - 1 == tasks.size()); } return task; } const int threadpool::kInitThreadsSize = 5;简单的测试:
#include"threadpool.h" #include"iostream" #include<string> std::mutex g_mutex; void priorityFunc() { std::this_thread::sleep_for(1s); std::lock_guard<std::mutex> lock(g_mutex); std::thread::id this_id = std::this_thread::get_id(); cout << this_id << " high priority" << endl; } void testFunc() { // loop to print character after a random period of time std::lock_guard<std::mutex> lock(g_mutex); std::thread::id this_id = std::this_thread::get_id(); std::this_thread::sleep_for(1s); cout << this_id<<" low priority"<<endl; } int main() { threadpool threadPool; threadPool.start(); for (int i = 1; i < 8; ++i) { threadPool.addTask(testFunc); } threadPool.addTaskPair(threadpool::TaskPair(threadpool::level0, priorityFunc)); getchar(); threadPool.stop(); return 0; }参考 线程池实现