管程可以视为一个线程安全的数据结构,其内部提供了互斥与同步操作,向外提供访问共享数据的专用接口(接口被称为管程的过程),通过管程提供的接口即可达成共享数据的保护与线程间同步。
使用管程,可以简化线程间互斥、同步的编码复杂度(否则需自己控制互斥、同步机制,并保证正确),可以集中分散的互斥、同步操作代码,更容易验证、查错,也更可读(反之,信号量的PV操作可能分散到各个地方,验证、阅读相对麻烦)。
管程中仅能有一个线程在其中执行,根据发起通知时,被唤醒线程(T1)执行,还是唤醒线程(T2)继续执行,可将管程分为三种:
根据前述管程特点,管程应该是一个对象,内部封装了资源,该对象实现了互斥及与阻塞、唤醒机制
使用锁,搭配条件变量来实现互斥及与阻塞、唤醒机制
下面简单介绍条件变量,后面的管程代码有一个小优化,涉及对条件变量实现的理解
条件变量是同步原语,其内部有一个队列,用于存放被wait阻塞的线程,当另一个线程发起通知时,如果队列不为空,队头线程将被唤醒,否则,什么也不做。条件变量也可以一次性唤醒队列中的全部阻塞线程。
条件变量里有一个存储阻塞线程的队列。由于阻塞线程和唤醒通知线程都需要访问这同一队列,所以还有一个用于保护队列的锁,锁粒度不大,并且不需要线程切换,应为自旋锁。
wait函数会将当前线程入队,并原子的进行锁释放与当前线程阻塞,阻塞直到另一线程通知才解除,解除后重新获取锁。锁释放与当前线程阻塞必须是原子的,否则,别的线程发出的唤醒通知,可能发生在当前线程阻塞之前,这会造成唤醒丢失。
signal函数出队一个阻塞线程并唤醒他,当队列为空时,不做任何事情。
broadcast函数将逐个唤醒队列中的所有阻塞线程。
下面实现了一个管程,同时符合mesa与hanson管程的定义,并且做了一些优化,取消掉了不必要的通知操作。
1 #ifndef __MONITOR_H_
2 #define __MONITOR_H_
3
4 #include <list>
5 #include <mutex>
6 #include <utility>
7 #include <condition_variable>
8
9 template<typename T>
10 class Monitor{
11 public:
12 Monitor(): m_iMaxCount(100), m_bStop(false){
13 }
14 Monitor(int iMaxCount) : m_iMaxCount(iMaxCount), m_bStop(false){
15 }
16 ~Monitor() = default;
17
18 void Enqueue(const T& data){
19 Append(data);
20 }
21
22 void Enqueue(T&& data){
23 Append(std::forward<T>(data)); //转发data的原属性,此处转发data的右值引用
24 }
25
26 void Dequeue(T& data){
27 std::unique_lock<std::mutex> lk(m_mMutex);
28 m_cvNotEmpty.wait(lk, [this](){return m_bStop || !IsEmpty(); });
29 if(m_bStop){
30 return;
31 }
32 bool bNeedNotify = IsFull();
33 data = m_listData.front();
34 m_listData.pop_front();
35 lk.unlock();
36 if(bNeedNotify){
37 m_cvNotFull.notify_one();
38 }
39 }
40
41 void Stop(){
42 {
43 std::lock_guard<std::mutex> lk(m_mMutex);
44 m_bStop = true;
45 }
46 m_cvNotEmpty.notify_all();
47 m_cvNotFull.notify_all();
48 }
49
50 private:
51 template<typename U>
52 void Append(U&& data){ //实现通用引用
53 std::unique_lock<std::mutex> lk(m_mMutex);
54 m_cvNotFull.wait(lk, [this](){return m_bStop || !IsFull(); });
55 if(m_bStop){
56 return;
57 }
58 bool bNeedNotify = IsEmpty();
59 m_listData.emplace_back(std::forward<U>(data)); //再次转发
60 lk.unlock();
61 if(bNeedNotify){
62 m_cvNotEmpty.notify_one();
63 }
64 }
65
66 bool IsFull(){
67 return static_cast<int>(m_listData.size()) == m_iMaxCount;
68 }
69
70 bool IsEmpty(){
71 return 0 == static_cast<int>(m_listData.size());
72 }
73
74 Monitor(const Monitor& rhs) = delete;
75 Monitor(Monitor&& rhs) = delete;
76 Monitor& operator=(const Monitor& rhs) = delete;
77 Monitor& operator=(Monitor&& rhs) = delete;
78
79 private:
80 int m_iMaxCount;
81 bool m_bStop;
82 std::mutex m_mMutex;
83 std::list<T> m_listData;
84 std::condition_variable m_cvNotEmpty;
85 std::condition_variable m_cvNotFull;
86 };
87
88 #endif //!__MONITOR_H_
1 template<typename _Predicate>
2 void
3 wait(unique_lock<mutex>& __lock, _Predicate __p)
4 {
5 while (!__p())
6 wait(__lock);
7 }
可调用对象返回true则什么都不做,否则,重复wait到满足条件,这可以避免虚假唤醒,同时也是Mesa管程的行为
1 template<typename _Tp>
2 constexpr _Tp&&
3 forward(typename std::remove_reference<_Tp>::type& __t) noexcept
4 { return static_cast<_Tp&&>(__t); }
5
6 template<typename _Tp>
7 constexpr _Tp&&
8 forward(typename std::remove_reference<_Tp>::type&& __t) noexcept
9 {
10 static_assert(!std::is_lvalue_reference<_Tp>::value, "template argument"
11 " substituting _Tp is an lvalue reference type");
12 return static_cast<_Tp&&>(__t);
13 }
则static_cast<_Tp&&>为static_cast<type& &&>,折叠后为static_cast<type&>, 所以转发返回值仍为为左值引用。
则static_cast<_Tp&&>为static_cast<type&& &&>,折叠后为static_cast<type&&>, 所以转发返回仍为右值引用 。
管程其实也是一种同步队列,现在
得到同步队列如下
1 #ifndef __SyncQueue_H_ 2 #define __SyncQueue_H_ 3 4 #include <list> 5 #include <mutex> 6 #include <utility> 7 #include <condition_variable> 8 9 template<typename T> 10 class SyncQueue{ 11 public: 12 SyncQueue() : m_iMaxCount(100), m_bStop(false){ 13 } 14 SyncQueue(int iMaxCount) : m_iMaxCount(iMaxCount), m_bStop(false){ 15 } 16 ~SyncQueue() = default; 17 18 void Enqueue(const T& data){ 19 Append(data); 20 } 21 22 void Enqueue(T&& data){ 23 Append(std::forward<T>(data)); //转发data的原属性,此处转发data的右值引用 24 } 25 26 bool TryEnqueue(const T& data){ 27 return TryAppend(data); 28 } 29 30 bool TryEnqueue(T&& data){ 31 return TryAppend(std::forward<T>(data)); 32 } 33 34 void Dequeue(T& data){ 35 std::unique_lock<std::mutex> lk(m_mMutex); 36 m_cvNotEmpty.wait(lk, [this](){return m_bStop || !IsEmpty(); }); 37 if(m_bStop){ 38 return; 39 } 40 bool bNeedNotify = IsFull(); 41 data = m_listData.front(); 42 m_listData.pop_front(); 43 lk.unlock(); 44 if(bNeedNotify){ 45 m_cvNotFull.notify_one(); 46 } 47 } 48 49 bool TryDequeue(T& data){ 50 std::unique_lock<std::mutex> lk(m_mMutex); 51 if(m_bStop || IsEmpty()){ 52 return false; 53 } 54 bool bNeedNotify = IsFull(); 55 data = m_listData.front(); 56 m_listData.pop_front(); 57 lk.unlock(); 58 if(bNeedNotify){ 59 m_cvNotFull.notify_one(); 60 } 61 return true; 62 } 63 64 void Stop(){ 65 { 66 std::lock_guard<std::mutex> lk(m_mMutex); 67 m_bStop = true; 68 } 69 m_cvNotEmpty.notify_all(); 70 m_cvNotFull.notify_all(); 71 } 72 73 int Size(){ 74 std::lock_guard<std::mutex> lk(m_mMutex); 75 return static_cast<int>(m_listData.size()); 76 } 77 78 private: 79 template<typename U> 80 bool TryAppend(U&& data){ //实现通用引用 81 std::unique_lock<std::mutex> lk(m_mMutex); 82 if(m_bStop || IsFull()){ 83 return false; 84 } 85 bool bNeedNotify = IsEmpty(); 86 m_listData.emplace_back(std::forward<U>(data)); //再次转发 87 lk.unlock(); 88 if(bNeedNotify){ 89 m_cvNotEmpty.notify_one(); 90 } 91 return true; 92 } 93 94 template<typename U> 95 void Append(U&& data){ //实现通用引用 96 std::unique_lock<std::mutex> lk(m_mMutex); 97 m_cvNotFull.wait(lk, [this](){return m_bStop || !IsFull(); }); 98 if(m_bStop){ 99 return; 100 } 101 bool bNeedNotify = IsEmpty(); 102 m_listData.emplace_back(std::forward<U>(data)); //再次转发 103 lk.unlock(); 104 if(bNeedNotify){ 105 m_cvNotEmpty.notify_one(); 106 } 107 } 108 109 bool IsFull(){ 110 return static_cast<int>(m_listData.size()) == m_iMaxCount; 111 } 112 113 bool IsEmpty(){ 114 return 0 == static_cast<int>(m_listData.size()); 115 } 116 117 SyncQueue(const SyncQueue& rhs) = delete; 118 SyncQueue(SyncQueue&& rhs) = delete; 119 SyncQueue& operator=(const SyncQueue& rhs) = delete; 120 SyncQueue& operator=(SyncQueue&& rhs) = delete; 121 122 private: 123 int m_iMaxCount; 124 bool m_bStop; 125 std::mutex m_mMutex; 126 std::list<T> m_listData; 127 std::condition_variable m_cvNotEmpty; 128 std::condition_variable m_cvNotFull; 129 }; 130 131 #endif //!__SyncQueue_H_
https://www.cnblogs.com/Keeping-Fit/p/15064039.html#wiz-toc-6-203609834
原文:https://www.cnblogs.com/MrLiuZF/p/15142816.html