基础的生产者消费者模型,生产者向公共缓存区写入数据,消费者从公共缓存区读取数据进行处理,两个线程访问公共资源,加锁实现数据的一致性。
通过加锁来实现
1 class Produce_1 {
2 public:
3 Produce_1(std::queue<int> * que_, std::mutex * mt_) {
4 m_mt = mt_;
5 m_que = que_;
6 m_stop = false;
7 }
8 void runProduce() {
9 while (!m_stop) {
10 std::this_thread::sleep_for(std::chrono::seconds(1));
11 std::lock_guard<std::mutex> lgd(*m_mt);
12 m_que->push(1);
13 std::cout << "Produce_1 produce 1" << std::endl;
14 }
15 }
16 void join() {
17 m_trd->join();
18 m_trd.reset();
19 }
20 void start() {
21 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce_1::runProduce), this)));
22 }
23 void stop() {
24 m_stop = true;
25 }
26 private:
27 std::mutex * m_mt;
28 std::queue<int> * m_que;
29 volatile bool m_stop;
30 std::shared_ptr<std::thread> m_trd;
31 };
32
33
34 /*
35 *单缓冲一个同步队列 效率较低
36 */
37 class Consume_1 {
38 public:
39 Consume_1(std::queue<int> * que_, std::mutex * mt_) {
40 m_mt = mt_;
41 m_que = que_;
42 m_stop = false;
43 }
44
45 void runConsume() {
46 while (!m_stop) {
47 std::this_thread::sleep_for(std::chrono::seconds(1));
48 std::lock_guard<std::mutex> lgd(*m_mt);
49 if (!m_que->empty()) {
50 m_que->pop();
51 }
52 std::cout << "Consume_1 consume" << std::endl;
53 }
54 }
55 void join() {
56 m_trd->join();
57 m_trd.reset();
58 }
59 void start() {
60 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume_1::runConsume), this)));
61 }
62 void stop() {
63 m_stop = true;
64 }
65 private:
66 std::mutex * m_mt;
67 std::queue<int> * m_que;
68 volatile bool m_stop;
69 std::shared_ptr<std::thread> m_trd;
70 };
通过条件变量来实现
1 typedef struct Mutex_Condition{
2 std::mutex mt;
3 std::condition_variable cv;
4 }Mutex_Condition;
5
6 class Produce {
7 public:
8 Produce(std::queue<int> * que_, Mutex_Condition * mc_) {
9 m_que = que_;
10 m_mc = mc_;
11 m_stop = false;
12 }
13 void join() {
14 m_trd->join();
15 m_trd.reset();
16 }
17 void produce(int enter) {
18 std::lock_guard<std::mutex> lgd(m_mc->mt);
19 m_que->push(enter);
20 m_mc->cv.notify_one();
21 }
22
23 void runProduce() {
24 while (!m_stop) {
25 std::this_thread::sleep_for(std::chrono::seconds(1));
26 produce(1);
27 std::cout << "Produce Thread produce 1 " << std::endl;
28 }
29 }
30
31 void start() {
32 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce::runProduce), this)));
33 }
34 void stop() {
35 m_stop = true;
36 }
37
38 private:
39 std::queue<int> * m_que;
40 Mutex_Condition * m_mc;
41 std::shared_ptr<std::thread> m_trd;
42 volatile bool m_stop;
43 };
44
45
46 class Consume {
47 public:
48 Consume(std::queue<int> * que_, Mutex_Condition * mc_) {
49 m_que = que_;
50 m_mc = mc_;
51 m_stop = false;
52 }
53 void join() {
54 m_trd->join();
55 m_trd.reset();
56 }
57 void consume() {
58 std::unique_lock<std::mutex> lgd(m_mc->mt);
59 while (m_que->empty()) {
60 int i = 0;
61 m_mc->cv.wait(lgd);
62 }
63 m_que->pop();
64 std::cout << "Consume Thread consume " << std::endl;
65 }
66 void runConsume() {
67 while (!m_stop) {
68 std::this_thread::sleep_for(std::chrono::seconds(1));
69 consume();
70 }
71 }
72 void start() {
73 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume::runConsume), this)));
74 }
75 void stop() {
76 m_stop = true;
77 }
78
79 private:
80 std::queue<int> * m_que;
81 Mutex_Condition * m_mc;
82 std::shared_ptr<std::thread> m_trd;
83 volatile bool m_stop;
84
85 };
二、生产者消费者-双缓冲
一个公共缓存区,由于多线程访问的锁冲突较大,可以采取双缓冲手段来解决锁的冲突
双缓冲的关键:双缓冲队列的数据交换
1)生产者线程不断的向生产者队列A写入数据,当队列中有数据时,进行数据的交换,交换开始启动时通过条件变量通知交换线程来处理最先的数据交换。
2)数据交换完成后,通过条件变量通知消费者处理数据,此时交换线程阻塞到消费者数据处理完成时通知的条件变量上。
3)消费者收到数据交换后的通知后,进行数据的处理,数据处理完成后,通知交换线程进行下一轮的双缓冲区的数据交换。
要点:
生产者除了在数据交换时,其余时刻都在不停的生产数据。
数据交换队列需要等待消费者处理数据完成的通知,以进行下一轮交换。
消费者处理数据时,不进行数据交换,生产者同时会不断的生产数据,消费者需要等待数据交换完成的通知,并且发送消费完成的通知给交换线程
1 typedef struct Mutex_Condition{
2 std::mutex mt;
3 std::condition_variable cv;
4 }Mutex_Condition;
5
6 class Produce_1 {
7 public:
8 Produce_1(std::queue<int> * que_1, std::queue<int> * que_2, Mutex_Condition * mc_1 , Mutex_Condition * mc_2) {
9 m_read_que = que_1;
10 m_writer_que = que_2;
11 m_read_mc = mc_1;
12 m_writer_mc = mc_2;
13 m_stop = false;
14
15 }
16 void runProduce() {
17 while (!m_stop) {
18 std::this_thread::sleep_for(std::chrono::microseconds(20 * 1000));
19 std::lock_guard<std::mutex> lgd(m_writer_mc->mt);
20 m_writer_que->push(1);
21 m_writer_mc->cv.notify_one();
22 std::cout << "m_writer push" << std::endl;
23 }
24
25 }
26 void join() {
27 m_trd->join();
28 m_trd.reset();
29 }
30 void start() {
31 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce_1::runProduce), this)));
32 }
33 void stop() {
34 m_stop = true;
35 }
36 private:
37 Mutex_Condition * m_read_mc;
38 Mutex_Condition * m_writer_mc;
39 std::queue<int> * m_read_que;
40 std::queue<int> * m_writer_que;
41 volatile bool m_stop;
42 std::shared_ptr<std::thread> m_trd;
43 };
44
45
46 class Consume_1 {
47 public:
48 Consume_1(std::queue<int> * que_1, std::queue<int> * que_2, Mutex_Condition * mc_1,Mutex_Condition * mc_2,Mutex_Condition * switch_mc) {
49 m_read_que = que_1;
50 m_writer_que = que_2;
51 m_read_mc = mc_1;
52 m_writer_mc = mc_2;
53 m_stop = false;
54 m_switch_mc = switch_mc;
55 }
56
57 void runConsume() {
58 while (!m_stop) {
59 while (true) {
60 std::this_thread::sleep_for(std::chrono::microseconds(20 * 1000));
61 std::unique_lock<std::mutex> ulg(m_read_mc->mt);
62 while (m_read_que->empty()) {
63 m_read_mc->cv.wait(ulg);
64 }
65 //deal data
66 //std::lock_guard<std::mutex> ulg(m_read_mc->mt);
67 while (!m_read_que->empty()) {
68 m_read_que->pop();
69 std::cout << "m_read_queue pop" << std::endl;
70 }
71 m_switch_mc->cv.notify_one();
72 }
73 }
74 }
75 void join() {
76 m_trd->join();
77 m_trd.reset();
78 }
79 void start() {
80 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume_1::runConsume), this)));
81 }
82 void stop() {
83 m_stop = true;
84 }
85 private:
86 Mutex_Condition * m_read_mc;
87 Mutex_Condition * m_writer_mc;
88 Mutex_Condition * m_switch_mc;
89 std::queue<int> * m_read_que;
90 std::queue<int> * m_writer_que;
91 volatile bool m_stop;
92 std::shared_ptr<std::thread> m_trd;
93 };
94 void que_switch_trd(std::queue<int> * read_que, std::queue<int> * writer_que, Mutex_Condition * read_mc, Mutex_Condition * writer_mc,Mutex_Condition * switch_mc) {
95 while (true) {
96 std::this_thread::sleep_for(std::chrono::microseconds(20*1000));
97 {
98 std::unique_lock<std::mutex> ulg(writer_mc->mt);
99 while (writer_que->empty()) {
100 writer_mc->cv.wait(ulg);
101 }
102 std::lock_guard<std::mutex> ulg_2(read_mc->mt);
103 std::swap(*read_que, *writer_que);
104 std::cout << "switch queue" << std::endl;
105 if (!read_que->empty()) {
106 read_mc->cv.notify_one();
107 }
108 }
109 std::unique_lock<std::mutex> ulg_2(switch_mc->mt);
110 while (!read_que->empty()) {
111 switch_mc->cv.wait(ulg_2);
112 }
113 }
114 }
115 int main(){
116
117 Mutex_Condition mc_1;
118 Mutex_Condition mc_2;
119 Mutex_Condition mc_3;
120 std::queue<int> que_1;
121 std::queue<int> que_2;
122
123 Produce_1 produce_1(&que_1, &que_2, &mc_1, &mc_2);
124 Consume_1 consume_1(&que_1, &que_2, &mc_1, &mc_2,&mc_3);
125
126 std::thread trd(std::bind(&que_switch_trd, &que_1, &que_2, &mc_1, &mc_2,&mc_3));
127 produce_1.start();
128 consume_1.start();
129
130 produce_1.join();
131 consume_1.join();
132 trd.join();
133
134 return 0;
135 }
原文:http://www.cnblogs.com/Forever-Kenlen-Ja/p/7811943.html