#include <iostream>
#include <thread>
#include <mutex>
class wait_test {
bool flag;
std::mutex m;
public:
wait_test(bool _flag):flag(_flag){}
void setFlag(bool _flag)
{
std::unique_lock<std::mutex> lk(m);
flag = _flag;
}
bool getFlag()
{
std::unique_lock<std::mutex> lk(m);
return flag;
}
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while (!flag)
{
lk.unlock(); // 1 解锁互斥量
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
lk.lock(); // 3 再锁互斥量
}
}
};
void funA(wait_test &wt, int &i)
{
while (!wt.getFlag())
{
++i;
}
}
void funB(wait_test &wt, int &i)
{
std::cout << "begin\t" << i << std::endl;
wt.wait_for_flag();//等待主线程set
std::cout << "end\t" << i << std::endl;
}
int main()
{
wait_test wt{ false };
int i{ 0 };
std::thread t1{ funA,std::ref(wt),std::ref(i) }, t2{ funB,std::ref(wt),std::ref(i) };
t1.detach();
t2.detach();
wt.setFlag(true);
system("pause");
return 0;
}
第三个选择(也是优先的选择)是,使用C++标准库提供的工具去等待事件的发生。通过另一线程触发等待事件的机制是最基本的唤醒方式(例如:流水线上存在额外的任务时),这种机制就称为“条件变量”(condition variable)。从概念上来说,一个条件变量会与多个事件或其他条件相关,并且一个或多个线程会等待条件的达成。当某些线程被终止时,为了唤醒等待线程(允许等待线程继续执行)终止的线程将会向等待着的线程广播“条件达成”的信息。#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
struct data_chunk {
int m;
};
struct A {
std::mutex mut;
std::queue<data_chunk> data_queue; // 1
std::condition_variable data_cond;
bool more_data_to_prepare()
{
return data_queue.size() < 10;
}
bool is_last_chunk()
{
return data_queue.size() == 3;
}
};
int i = 0;
data_chunk prepare_data()
{
data_chunk r;
r.m = ++i;
return r;
}
void data_preparation_thread(A &a)
{
std::cout << "preparation begin"<< std::endl;
while (a.more_data_to_prepare())
{
const data_chunk data = prepare_data();
std::lock_guard<std::mutex> lk(a.mut);
a.data_queue.push(data); // 2
std::cout << "preparation notify" << std::endl;
a.data_cond.notify_one(); // 3
}
std::cout << "preparation end" << std::endl;
}
void process(const data_chunk &d)
{
std::cout << d.m << std::endl;
}
void data_processing_thread(A &a)
{
while (true)
{
std::unique_lock<std::mutex> lk(a.mut); // 4
a.data_cond.wait(
lk, [&a] {return !a.data_queue.empty();}); // 5
std::cout << "process wait end" << std::endl;
data_chunk data = a.data_queue.front();
a.data_queue.pop();
lk.unlock(); // 6
process(data);
if (a.is_last_chunk())
break;
}
}
int main()
{
A a;
std::thread t1{ data_preparation_thread,std::ref(a) },
t2{ data_processing_thread,std::ref(a) };
t1.join();
t2.join();
system("pause");
return 0;
}
当等待线程重新获取互斥量并检查条件时,如果它并非直接响应另一个线程的通知,这就是所谓的“伪唤醒”(spurious wakeup)。因为任何伪唤醒的数量和频率都是不确定的,这里不建议使用一个有副作用的函数做条件检查。当你这样做了,就必须做好多次产生副作用的心理准备。#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>//头文件
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut; // 1 互斥量必须是可变的
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
threadsafe_queue(threadsafe_queue const& other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty();});
value = data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
threadsafe_queue<data_chunk> data_queue; // 1
void data_preparation_thread()
{
while (more_data_to_prepare())
{
data_chunk const data = prepare_data();
data_queue.push(data); // 2
}
}
void data_processing_thread()
{
while (true)
{
data_chunk data;
data_queue.wait_and_pop(data); // 3
process(data);
if (is_last_chunk(data))
break;
}
}#include <future>
#include <iostream>
int find_the_answer_to_ltuae()
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return 10;
}
void do_other_stuff()
{
std::this_thread::sleep_for(std::chrono::milliseconds(120));
}
int main()
{
std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout << "The answer is " << the_answer.get() << std::endl;
system("pause");
return 0;
}std::async 允许你通过添加额外的调用参数,向函数传递额外的参数。当第一个参数是一个指向成员函数的指针,第二个参数提供有这个函数成员类的具体对象(不是直接的,就是通过指针,还可以包装在 std::ref 中),剩余的参数可作为成员函数的参数传入。否则,第二个和随后的参数将作为函数的参数,或作为指定可调用对象的第一个参数。#include <string>
#include <future>
#include <iostream>
struct X
{
int m;
void foo(int i, std::string const& s)
{
std::cout << s << "\t" << i << std::endl;
}
std::string bar(std::string const &s)
{
return "bar("+s+")";
}
};
struct Y
{
double operator()(double d)
{
return d + 1.1;
}
};
X baz(X& _x)
{
++_x.m;
return _x;
}
class move_only
{
public:
move_only() = default;
move_only(move_only&&) = default;
move_only(move_only const&) = delete;
move_only& operator=(move_only&&) = default;
move_only& operator=(move_only const&) = delete;
void operator()()
{
std::cout << "move_only()" << std::endl;
}
};
void fun()
{
X x;
auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用p->foo(42, "hello"),p是指向x的指针,指针
auto f2 = std::async(&X::bar, x, "goodbye"); // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本,具体对象
std::cout << f2.get() << std::endl;
Y y;
auto f3 = std::async(Y(), 3.141); // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到
std::cout << f3.get() << std::endl;
auto f4 = std::async(std::ref(y), 2.718); // 调用y(2.718)
std::cout << f4.get() << std::endl;
x.m = 1;
auto f5 = std::async(baz, std::ref(x)); // 调用baz(x)
std::cout << f5.get().m << std::endl;
auto f6 = std::async(move_only()); // 调用tmp(),tmp是通过std::move(move_only())构造得到
}
int main()
{
fun();
system("pause");
return 0;
}在默认情况下,这取决于 std::async 是否启动一个线程,或是否在期望等待时同步任务。在大多数情况下(估计这就是你想要的结果),但是你也可以在函数调用之前,向 std::async 传递一个额外参数。这个参数的类型是 std::launch ,还可以是std::launch::defered ,用来表明函数调用被延迟到wait()或get()函数调用时才执行, std::launch::async 表明函数必须在其所在的独立线程上执行, std::launch::deferred
| std::launch::async 表明实现可以选择这两种方式的一种。最后一个选项是默认的。当函数调用被延迟,它可能不会在运行了。X baz(X& _x,int i)
{
_x.m=i;
std::cout << _x.m<<"调用" << std::endl;
return _x;
}调用:auto f7 = std::async(std::launch::async, Y(), 1.2); // 在新线程上执行 std::cout <<"f7\t"<< f7.get() << std::endl; auto f8 = std::async(std::launch::deferred, baz, std::ref(x),2); // 在wait()或get()调用时执行 auto f9 = std::async( std::launch::deferred | std::launch::async, baz, std::ref(x),3); // 实现选择执行方式 std::cout << "f9\t"<<f9.get().m << std::endl; auto f10 = std::async(baz, std::ref(x),4); f8.wait(); // 调用延迟函数,后台运行,此时如果有结果就前行,否则阻塞 std::cout << "f8\t" << f8.get().m << std::endl; std::cout <<"f10\t"<< f10.get().m << std::endl;
Cpp Concurrency In Action(读书笔记3)——同步并发操作
原文:http://blog.csdn.net/bestzem/article/details/52980004