C++ 并发编程:设计线程安全的队列、链表、查询表

    技术2022-07-17  67

    一、一般想法

    一般想法都会是用std::mutex锁住整个数据结构,如下面代码所示。当然了,这种是一种实现发送,但是不是最好的方式,不能实现最大并发。 github示例代码

    #include <iostream> #include <mutex> #include <condition_variable> #include <queue> #include <memory> /* use c++ 11 */ template<typename T> class ThreadsafeQueue { private: std::mutex mut; std::queue<T> data_queue; std::condition_variable data_cond; public: ThreadsafeQueue() {} ThreadsafeQueue(ThreadsafeQueue const& other) { std::lock_guard<std::mutex> lk(other.mut); data_queue = other.data_queue; } void push(T new_value) { std::lock_guard<std::mutex> lk(mut); data_queue.push(new_value); data_cond.notify_one(); } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this] {return !data_queue.empty(); }); value = data_queue.front(); data_queue.pop(); } std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this] {return !data_queue.empty(); }); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool try_pop(T& value) { std::lock_guard<std::mutex> lk(mut); if (data_queue.empty) return false; value = data_queue.front(); data_queue.pop(); } std::shared_ptr<T> try_pop() { std::lock_guard<std::mutex> lk(mut); if (data_queue.empty()) return std::shared_ptr<T>(); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool empty() const { std::lock_guard<std::mutex> lk(mut); return data_queue.empty(); } }; //test void PrepareData(ThreadsafeQueue<int>& queue) { while (true) { static int i = 1; std::cout << "threadid=" << std::this_thread::get_id() << " push:i=" << i << std::endl; queue.push(i++); std::this_thread::sleep_for(std::chrono::seconds(1)); } } void ProcessData(ThreadsafeQueue<int>& queue) { while (true) { int i = 0; queue.wait_and_pop(i); std::cout << "threadid=" << std::this_thread::get_id << " wait_and_pop:i=" << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); } } int main() { ThreadsafeQueue<int> queue; std::thread t1(PrepareData,std::ref(queue)); std::thread t2(ProcessData, std::ref(queue)); t1.join(); t2.join(); }

    二、最大并发设计

    在一个数据结构中,各个部分都有一个std::mutex实例。,当某个线程需要操作其中的部分时,利用std::mutex将其锁住,这样,如果其他线程访问另外一个部分时,不需要等待,这样就能最大程度实现并发,但实现起来相对麻烦一些,容易发生死锁等。

    1. 线程安全的队列

    github示例代码

    #include <iostream> #include <mutex> #include <condition_variable> #include <queue> #include <memory> /* use c++ 11 */ template<typename T> class ThreadsafeQueue { private: struct node { std::shared_ptr<T> data; std::unique_ptr<node> next; }; std::mutex head_mutex; std::unique_ptr<node> head; std::mutex tail_mutex; node* tail; std::condition_variable data_cond; private: node* get_tail() { std::lock_guard<std::mutex> tail_lock(tail_mutex); return tail; } std::unique_ptr<node> pop_head() { std::unique_ptr<node> old_head = std::move(head); head = std::move(old_head->next); return old_head; } std::unique_lock<std::mutex> wait_for_data() { std::unique_lock<std::mutex> head_lock(head_mutex); data_cond.wait(head_lock, [&] {return head.get() != get_tail(); }); return head_lock; } std::unique_ptr<node> wait_pop_head() { std::unique_lock<std::mutex> head_lock(wait_for_data()); return pop_head(); } std::unique_ptr<node> wait_pop_head(T& value) { std::unique_lock<std::mutex> head_lock(wait_for_data()); value = std::move(*head->data); return pop_head(); } std::unique_ptr<node> try_pop_head() { std::lock_guard<std::mutex> head_lock(head_mutex); if (head.get() == get_tail()) { return std::unique_ptr<node>(); } return pop_head(); } std::unique_ptr<node> try_pop_head(T& value) { std::lock_guard<std::mutex> head_lock(head_mutex); if (head.get() == get_tail()) { return std::unique_ptr<node>(); } value = std::move(*head->data); return pop_head(); } public: ThreadsafeQueue() :head(new node), tail(head.get()){} ThreadsafeQueue(const ThreadsafeQueue& other) = delete; ThreadsafeQueue& operator=(const ThreadsafeQueue& other) = delete; std::shared_ptr<T> try_pop() { const std::unique_ptr<node> old_head = try_pop_head(); return old_head ? old_head->data : std::shared_ptr<T>(); } bool try_pop(T& value) { const std::unique_ptr<node> old_head = try_pop_head(value); return old_head; } std::shared_ptr<T> wait_and_pop() { const std::unique_ptr<node> old_head = wait_pop_head(); return old_head->data; } void wait_and_pop(T& value) { const std::unique_ptr<node> old_head = wait_pop_head(value); } void push(T new_value) { std::shared_ptr<T> new_data( std::make_shared<T>(std::move(new_value))); std::unique_ptr<node> p(new node); { std::lock_guard<std::mutex> tail_lock(tail_mutex); tail->data = new_data; node* const new_tail = p.get(); tail->next = std::move(p); tail = new_tail; } data_cond.notify_one(); } void empty() { std::lock_guard<std::mutex> head_lock(head_mutex); return (head == get_tail()); } }; //test void PrepareData(ThreadsafeQueue<int>& queue) { while (true) { static int i = 1; std::cout << "threadid=" << std::this_thread::get_id() << " push:i=" << i << std::endl; queue.push(i++); std::this_thread::sleep_for(std::chrono::seconds(1)); } } void ProcessData(ThreadsafeQueue<int>& queue) { while (true) { int i = 0; queue.wait_and_pop(i); std::cout << "threadid=" << std::this_thread::get_id << " wait_and_pop:i=" << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); } } int main() { ThreadsafeQueue<int> queue; std::thread t1(PrepareData,std::ref(queue)); std::thread t2(ProcessData, std::ref(queue)); t1.join(); t2.join(); }

    2. 线程安全的链表

    github示例代码

    #include <iostream> #include <memory> #include <mutex> template<typename T> class ThreadsafeList { struct node { std::mutex m; std::shared_ptr<T> data; std::unique_ptr<node> next; node() : next(){} node(T const& value) : data(std::make_shared<T>(value)) {} }; node head; public: ThreadsafeList() {} ~ThreadsafeList() { remove_if([](T const&) {return true; }); } ThreadsafeList(ThreadsafeList const& other) = delete; ThreadsafeList& operator=(ThreadsafeList const& other) = delete; void push_front(T const& value) { std::unique_ptr<node> new_node(new node(value)); std::lock_guard<std::mutex> lk(head.m); new_node->next = std::move(head.next); head.next = std::move(new_node); } template<typename Function> void for_each(Function f) { node* current = &head; std::unique_lock<std::mutex> lk(head.m); while (node* const next = current->next.get()) { std::unique_lock<std::mutex> next_lk(next->m); lk.unlock(); f(*next->data); current = next; lk = std::move(next_lk); } } template<typename Predicate> std::shared_ptr<T> find_first_if(Predicate p) { node* current = &head; std::unique_lock<std::mutex> lk(head.m); while (node* const next = current->next.get()) { std::unique_lock<std::mutex> next_lk(next->m); lk.unlock(); if (p(*next->data)) { return next->data; } current = next; lk = std::move(next_lk); } return std::shared_ptr<T>(); } template<typename Predicate> void remove_if(Predicate p) { node* current = &head; std::unique_lock<std::mutex> lk(head.m); while (node* const next = current->next.get()) { std::unique_lock<std::mutex> next_lk(next->m); if (p(*next->data)) { std::unique_ptr<node> old_next = std::move(current->next); current->next = std::move(next->next); next_lk.unlock(); } else { lk.unlock(); current = next; lk = std::move(next_lk); } } } }; int main() { //简单函数功能测试 ThreadsafeList<int> list; list.push_front(1); list.for_each([](const int& item) { std::cout << item << " "; }); std::cout << std::endl; std::shared_ptr<int> ptr=list.find_first_if([](const int& item) {return item == 1; }); if (ptr.get()!=nullptr) { std::cout << *ptr << std::endl; } }

    3. 线程安全的查询表

    github示例代码

    // ThreadSafeLookupTable.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 // #include <iostream> #include <vector> #include<map> #include <memory> #include <mutex> #include <functional> #include <list> #include <utility> #include <shared_mutex> //#include <boost/thread/shared_mutex.hpp> template<typename Key, typename Value, typename Hash = std::hash<Key> > class ThreadsafeLookupTable { private: class bucket_type { private: typedef std::pair<Key, Value> bucket_value; typedef std::list<bucket_value> bucket_data; typedef typename bucket_data::iterator bucket_iterator; typedef typename bucket_data::const_iterator bucket_const_iterator; bucket_data data; //mutable boost::shared_mutex mutex; mutable std::shared_mutex mutex; bucket_const_iterator find_entry_for(Key const& key) const { return std::find_if(data.begin(), data.end(), [&](const bucket_value& item) {return item.first == key; }); } public: Value value_for(Key const& key, Value const& default_value) const { // boost::shared_lock<boost::shared_mutex> lock(mutex); std::shared_lock<std::shared_mutex> lock(mutex); bucket_const_iterator found_entry = find_entry_for(key); return (found_entry == data.end()) ? default_value : found_entry->second; } void add_or_update_mapping(Key const& key, Value const& value) { //std::unique_lock<boost::shared_mutex> lock(mutex); std::unique_lock<std::shared_mutex> lock(mutex); bucket_const_iterator found_entry = find_entry_for(key); if (found_entry == data.end()) { data.push_back(bucket_value(key, value)); } else { auto data = const_cast<bucket_value&>(*found_entry); data.second = value; } } void remove_mapping(Key const& key) { //std::unique_lock<boost::shared_mutex> lock(mutex); std::unique_lock<std::shared_mutex> lock(mutex); bucket_const_iterator found_entry = find_entry_for(key); if (found_entry != data.end()) { data.erase(found_entry); } } }; std::vector<std::unique_ptr<bucket_type> > buckets; Hash hasher; bucket_type& get_bucket(Key const& key) const { std::size_t const bucket_index = hasher(key) % buckets.size(); return *buckets[bucket_index]; } public: typedef Key key_type; typedef Value mapped_type; typedef Hash hash_type; ThreadsafeLookupTable( unsigned num_buckets = 19, Hash const& hasher_ = Hash()) : buckets(num_buckets), hasher(hasher_) { for (unsigned i = 0; i < num_buckets; ++i) { buckets[i].reset(new bucket_type); } } ThreadsafeLookupTable(ThreadsafeLookupTable const& other) = delete; ThreadsafeLookupTable& operator=( ThreadsafeLookupTable const& other) = delete; Value value_for(Key const& key,Value const& default_value = Value()) const { return get_bucket(key).value_for(key, default_value); } void add_or_update_mapping(Key const& key, Value const& value) { get_bucket(key).add_or_update_mapping(key, value); } void remove_mapping(Key const& key) { get_bucket(key).remove_mapping(key); } std::map<Key, Value> get_map() const { //std::vector<std::unique_lock<boost::shared_mutex> > locks; std::vector<std::unique_lock<std::shared_mutex> > locks; for (unsigned i = 0; i < buckets.size(); ++i) { locks.push_back( //std::unique_lock<boost::shared_mutex>(buckets[i].mutex)); std::unique_lock<std::shared_mutex>(buckets[i].mutex)); } std::map<Key, Value> res; for (unsigned i = 0; i < buckets.size(); ++i) { for (auto& it = buckets[i].data.begin(); it != buckets[i].data.end(); ++it) { res.insert(*it); } } return res; } }; int main() { ThreadsafeLookupTable<int, int> hashTable; hashTable.add_or_update_mapping(1, 2); auto value = hashTable.value_for(1); std::cout << value << std::endl; hashTable.remove_mapping(1); value = hashTable.value_for(1); if (value == 0) { std::cout << "remove" << std::endl; } }
    Processed: 0.013, SQL: 9