一. std::async函数模板
(一)std::async和std::thread的区别
1. 两者最明显的区别在于async采用默认启动策略时并不一定创建新的线程。如果系统资源紧张,那么std::thread创建线程可能失败,系统报告异常,整个程序可能崩溃。而std::async一般则不会,它会在无法创建新线程时,会将异步任务分配给后续调用future.get()函数的线程,以同步的方式执行(即不创建新线程)。
2. std::async表现为更高阶的抽象,它把用户从线程管理的细节解放出来,将这些责任转交给C++标准库的实现者。而std::thread要求自行处理线程耗尽、超订、负载均衡以及新平台适配问题。
3. std::thread未提供直接获取线程函数返回值的方法。但std::async可以通过future对象来获取。
(二)std::async函数模板及分析
1. “共享状态”对象,用于保存线程函数(一般是可调用对象)及其参数、返回值以及新线程状态等信息。该对象保存在堆中,由std::async、std::promise或std::package_task提供,并交由future或shared_future管理其生命期。被调方(通常指调用promise.set_value()的一方)将计算所得的结果写入“共享状态”,而调用方通过std::future的get()读取该结果。
【“共享状态”】相关类的源码摘要
// CLASS TEMPLATE _Associated_state template <class _Ty> class _Associated_state { // class for managing associated synchronous state public: using _State_type = _Ty; using _Mydel = _Deleter_base<_Ty>; _Associated_state(_Mydel* _Dp = nullptr) : _Refs(1), // non-atomic initialization _Exception(), _Retrieved(false), _Ready(false), _Ready_at_thread_exit(false), _Has_stored_result(false), _Running(false), _Deleter(_Dp) { // construct // TODO: _Associated_state ctor assumes _Ty is default constructible } virtual ~_Associated_state() noexcept { // destroy if (_Has_stored_result && !_Ready) { // registered for release at thread exit _Cond._Unregister(_Mtx); } } void _Retain() { // increment reference count _MT_INCR(_Refs); } void _Release() { // decrement reference count and destroy when zero if (_MT_DECR(_Refs) == 0) { _Delete_this(); } } private: _Atomic_counter_t _Refs; public: virtual void _Wait() { // wait for signal unique_lock<mutex> _Lock(_Mtx); _Maybe_run_deferred_function(_Lock); while (!_Ready) { _Cond.wait(_Lock); } } struct _Test_ready { // wraps _Associated_state _Test_ready(const _Associated_state* _St) : _State(_St) { // construct from associated state } bool operator()() const { // test state return _State->_Ready != 0; } const _Associated_state* _State; }; template <class _Rep, class _Per> future_status _Wait_for(const chrono::duration<_Rep, _Per>& _Rel_time) { // wait for duration unique_lock<mutex> _Lock(_Mtx); if (_Has_deferred_function()) { return future_status::deferred; } if (_Cond.wait_for(_Lock, _Rel_time, _Test_ready(this))) { return future_status::ready; } return future_status::timeout; } template <class _Clock, class _Dur> future_status _Wait_until(const chrono::time_point<_Clock, _Dur>& _Abs_time) { // wait until time point unique_lock<mutex> _Lock(_Mtx); if (_Has_deferred_function()) { return future_status::deferred; } if (_Cond.wait_until(_Lock, _Abs_time, _Test_ready(this))) { return future_status::ready; } return future_status::timeout; } virtual _Ty& _Get_value(bool _Get_only_once) { // return the stored result or throw stored exception unique_lock<mutex> _Lock(_Mtx); if (_Get_only_once && _Retrieved) { _Throw_future_error(make_error_code(future_errc::future_already_retrieved)); } if (_Exception) { _Rethrow_future_exception(_Exception); } _Retrieved = true; _Maybe_run_deferred_function(_Lock); while (!_Ready) { _Cond.wait(_Lock); } if (_Exception) { _Rethrow_future_exception(_Exception); } return _Result; } void _Set_value(const _Ty& _Val, bool _At_thread_exit) { // store a result unique_lock<mutex> _Lock(_Mtx); _Set_value_raw(_Val, &_Lock, _At_thread_exit); } void _Set_value_raw(const _Ty& _Val, unique_lock<mutex>* _Lock, bool _At_thread_exit) { // store a result while inside a locked block if (_Has_stored_result) { _Throw_future_error(make_error_code(future_errc::promise_already_satisfied)); } _Result = _Val; _Do_notify(_Lock, _At_thread_exit); } void _Set_value(_Ty&& _Val, bool _At_thread_exit) { // store a result unique_lock<mutex> _Lock(_Mtx); _Set_value_raw(_STD forward<_Ty>(_Val), &_Lock, _At_thread_exit); } void _Set_value_raw(_Ty&& _Val, unique_lock<mutex>* _Lock, bool _At_thread_exit) { // store a result while inside a locked block if (_Has_stored_result) { _Throw_future_error(make_error_code(future_errc::promise_already_satisfied)); } _Result = _STD forward<_Ty>(_Val); _Do_notify(_Lock, _At_thread_exit); } void _Set_value(bool _At_thread_exit) { // store a (void) result unique_lock<mutex> _Lock(_Mtx); _Set_value_raw(&_Lock, _At_thread_exit); } void _Set_value_raw( unique_lock<mutex>* _Lock, bool _At_thread_exit) { // store a (void) result while inside a locked block if (_Has_stored_result) { _Throw_future_error(make_error_code(future_errc::promise_already_satisfied)); } _Do_notify(_Lock, _At_thread_exit); } void _Set_exception(exception_ptr _Exc, bool _At_thread_exit) { // store a result unique_lock<mutex> _Lock(_Mtx); _Set_exception_raw(_Exc, &_Lock, _At_thread_exit); } void _Set_exception_raw(exception_ptr _Exc, unique_lock<mutex>* _Lock, bool _At_thread_exit) { // store a result while inside a locked block if (_Has_stored_result) { _Throw_future_error(make_error_code(future_errc::promise_already_satisfied)); } _Exception = _Exc; _Do_notify(_Lock, _At_thread_exit); } bool _Is_ready() const { // return ready status return _Ready != 0; } bool _Is_ready_at_thread_exit() const { // return ready at thread exit status return _Ready_at_thread_exit; } bool _Already_has_stored_result() const { // return presence of stored result return _Has_stored_result; } bool _Already_retrieved() const { // return retrieved status return _Retrieved; } void _Abandon() { // abandon shared state unique_lock<mutex> _Lock(_Mtx); if (!_Has_stored_result) { // queue exception future_error _Fut(make_error_code(future_errc::broken_promise)); _Set_exception_raw(_STD make_exception_ptr(_Fut), &_Lock, false); } } protected: void _Make_ready_at_thread_exit() { // set ready status at thread exit if (_Ready_at_thread_exit) { _Ready = true; } } void _Maybe_run_deferred_function(unique_lock<mutex>& _Lock) { // run a deferred function if not already done if (!_Running) { // run the function _Running = true; _Run_deferred_function(_Lock); } } public: _Ty _Result; exception_ptr _Exception; mutex _Mtx; condition_variable _Cond; bool _Retrieved; int _Ready; bool _Ready_at_thread_exit; bool _Has_stored_result; bool _Running; private: virtual bool _Has_deferred_function() const noexcept { // overridden by _Deferred_async_state return false; } virtual void _Run_deferred_function(unique_lock<mutex>&) { // do nothing } virtual void _Do_notify(unique_lock<mutex>* _Lock, bool _At_thread_exit) { // notify waiting threads _Has_stored_result = true; if (_At_thread_exit) { // notify at thread exit _Cond._Register(*_Lock, &_Ready); } else { // notify immediately _Ready = true; _Cond.notify_all(); } } void _Delete_this() { // delete this object if (_Deleter) { _Deleter->_Delete(this); } else { delete this; } } _Mydel* _Deleter; public: _Associated_state(const _Associated_state&) = delete; _Associated_state& operator=(const _Associated_state&) = delete; }; // CLASS TEMPLATE _Packaged_state template <class> class _Packaged_state; template <class _Ret, class... _ArgTypes> class _Packaged_state<_Ret(_ArgTypes...)> : public _Associated_state<_Ret> { // class for managing associated asynchronous state for packaged_task public: using _Mybase = _Associated_state<_Ret>; using _Mydel = typename _Mybase::_Mydel; template <class _Fty2> _Packaged_state(const _Fty2& _Fnarg) : _Fn(_Fnarg) { // construct from function object } #if _HAS_FUNCTION_ALLOCATOR_SUPPORT template <class _Fty2, class _Alloc> _Packaged_state(const _Fty2& _Fnarg, const _Alloc& _Al, _Mydel* _Dp) : _Mybase(_Dp), _Fn(allocator_arg, _Al, _Fnarg) { // construct from function object and allocator } #endif // _HAS_FUNCTION_ALLOCATOR_SUPPORT template <class _Fty2> _Packaged_state(_Fty2&& _Fnarg) : _Fn(_STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object } #if _HAS_FUNCTION_ALLOCATOR_SUPPORT template <class _Fty2, class _Alloc> _Packaged_state(_Fty2&& _Fnarg, const _Alloc& _Al, _Mydel* _Dp) : _Mybase(_Dp), _Fn(allocator_arg, _Al, _STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object and allocator } #endif // _HAS_FUNCTION_ALLOCATOR_SUPPORT void _Call_deferred(_ArgTypes... _Args) { // set deferred call _TRY_BEGIN // call function object and catch exceptions this->_Set_value(_Fn(_STD forward<_ArgTypes>(_Args)...), true); _CATCH_ALL // function object threw exception; record result this->_Set_exception(_STD current_exception(), true); _CATCH_END } void _Call_immediate(_ArgTypes... _Args) { // call function object _TRY_BEGIN // call function object and catch exceptions this->_Set_value(_Fn(_STD forward<_ArgTypes>(_Args)...), false); _CATCH_ALL // function object threw exception; record result this->_Set_exception(_STD current_exception(), false); _CATCH_END } const function<_Ret(_ArgTypes...)>& _Get_fn() { // return stored function object return _Fn; } private: function<_Ret(_ArgTypes...)> _Fn; }; // CLASS TEMPLATE _Deferred_async_state template <class _Rx> class _Deferred_async_state : public _Packaged_state<_Rx()> { // class for managing associated synchronous state for // deferred execution from async public: template <class _Fty2> _Deferred_async_state(const _Fty2& _Fnarg) : _Packaged_state<_Rx()>(_Fnarg) { // construct from function object } template <class _Fty2> _Deferred_async_state(_Fty2&& _Fnarg) : _Packaged_state<_Rx()>(_STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object } private: virtual bool _Has_deferred_function() const noexcept { // this function is considered to be deferred until it‘s invoked return !this->_Running; } virtual void _Run_deferred_function(unique_lock<mutex>& _Lock) { // run the deferred function _Lock.unlock(); _Packaged_state<_Rx()>::_Call_immediate(); _Lock.lock(); } }; // CLASS TEMPLATE _Task_async_state template <class _Rx> class _Task_async_state : public _Packaged_state<_Rx()> { // class for managing associated synchronous state for // asynchronous execution from async public: using _Mybase = _Packaged_state<_Rx()>; using _State_type = typename _Mybase::_State_type; template <class _Fty2> _Task_async_state(_Fty2&& _Fnarg) : _Mybase(_STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object _Task = ::Concurrency::create_task([this]() { // do it now this->_Call_immediate(); }); this->_Running = true; } virtual ~_Task_async_state() noexcept { // destroy _Wait(); } virtual void _Wait() { // wait for completion _Task.wait(); } virtual _State_type& _Get_value(bool _Get_only_once) { // return the stored result or throw stored exception _Task.wait(); return _Mybase::_Get_value(_Get_only_once); } private: ::Concurrency::task<void> _Task; };
2. 调用std::async是会创建一个“_Deferred_async_state”或_“Task_async_state”类的“共享状态”对象,该对象是_Packaged_state的子类。注意,直接创建std::promise时,生成的是“_associated_state”类的共享状态对象,而std::package_task创建的是“_Packaged_state”类的共享状态对象。
3. _Get_associated_state是个工厂函数,通过不同的策略创建不同的“共享状态”对象,并将其交由future管理,负责其生命周期。future类似于std::unique_ptr,对“共享状态”对象“独占”所有权。
4. 与std::thread一样,传入std::async中的可调用对象及其参数会被按值以副本形成保存成一个tuple对象,然后再以右值的方式传入线程函数中对应的参数。
【编程实验】创建异步任务
#include <iostream> #include <thread> #include <future> #include <mutex> #include <vector> #include <numeric> //for std::accumulate using namespace std; std::mutex mtx; class Widget { public: void foo(int x, const std::string& s) { std::lock_guard<std::mutex> lk(mtx); cout << "thread id = "<<std::this_thread::get_id()<< " void Foo::foo(int, const std::string&): x = " << x << ", s = " << s<< endl; } void bar(const std::string& s) { std::lock_guard<std::mutex> lk(mtx); cout << "thread id = " << std::this_thread::get_id() <<" void Widget::bar(const std::string&): s = " << s << endl; } void operator()(double val) { std::lock_guard<std::mutex> lk(mtx); cout << "thread id = " << std::this_thread::get_id() << " void Widget::operator(): val = " << val << endl; } }; class NonCopyable //只移动对象 { public: NonCopyable() {}; NonCopyable(const NonCopyable&) = delete; NonCopyable& operator=(const NonCopyable&) = delete; NonCopyable(NonCopyable&&) = default; NonCopyable& operator=(NonCopyable&&) = default; double operator()(double d) { std::lock_guard<std::mutex> lk(mtx); cout << "thread id = " << std::this_thread::get_id() << " void NonCopyable::operator(): d = " << d << endl; return d; } }; //并行计算 template<typename RandomIt> int parallel_sum(RandomIt beg, RandomIt end) { auto len = end - beg; if (len < 1000) { std::lock_guard<std::mutex> lk(mtx); cout << "thread id = " << std::this_thread::get_id() << " invoke parallel_sum()" << endl; return std::accumulate(beg, end, 0); //遍历[beg,end)区别的每个元素并累加。初始值为0 } RandomIt mid = beg + len / 2; auto handle = std::async(std::launch::async, //子线程将[mid,end)元素进行累加 parallel_sum<RandomIt>, mid, end); int sum = parallel_sum(beg, mid);//本线程将[begin,mid)区间元素进行累加 return sum + handle.get(); //返回两个区间结果的累加和 } int main() { Widget w; cout << "main thread id = " << std::this_thread::get_id() << endl; //1. 参数传递 auto fut1 = std::async(&Widget::foo, &w, 42, "hello"); //传入this指针:&w auto fut2 = std::async(&Widget::bar, w, "goodbye"); //传入x的副本如tmp。 tmp.bar(...) auto fut3 = std::async(Widget(), 3.14159); //传入Widget临时对象,调用operator() auto fut4 = std::async(std::ref(w), 2.718); //传入w的引用,调用operator(); NonCopyable mo; //只移动对象 auto fut5 = std::async(std::move(mo),3.14159); //mo是只移动对象,必须被转为右值 //2. 同步、异步 auto fut6 = std::async(std::launch::async, Widget(), 1.2); //在新线程上运行,operator() auto fut7 = std::async(std::launch::deferred, &Widget::bar, &w, "deferred"); //线程延迟到调用get或wait才执行 auto fut8 = std::async(std::launch::async | std::launch::deferred, //等价于默认启动策略 &Widget::bar, &w, "async | deferred"); fut7.get(); //主线程阻塞,等待fut7子线程。(子线程延迟到这时才执行)。 //3. 并行计算 std::vector<int> vec(10000, 1); //10000个1 int res = parallel_sum(vec.begin(), vec.end()); { std::lock_guard<std::mutex> lk(mtx); cout << "The sum is: " << res << endl; cout << "main thread end." << endl; } return 0; } /*输出结果 main thread id = 16756 thread id = 1928 void Foo::foo(int, const std::string&): x = 42, s = hello thread id = 16756 void Widget::bar(const std::string&): s = deferred //注意,由主线程执行 thread id = 13216 void Widget::bar(const std::string&): s = goodbye thread id = 7940 void Widget::operator(): val = 3.14159 thread id = 16080 void Widget::operator(): val = 2.718 thread id = 11492 void NonCopyable::operator(): d = 3.14159 thread id = 1928 void Widget::operator(): val = 1.2 thread id = 13216 void Widget::bar(const std::string&): s = async | deferred thread id = 16756 invoke parallel_sum() thread id = 7940 invoke parallel_sum() thread id = 16080 invoke parallel_sum() thread id = 11492 invoke parallel_sum() thread id = 1928 invoke parallel_sum() thread id = 13216 invoke parallel_sum() thread id = 1928 invoke parallel_sum() thread id = 7636 invoke parallel_sum() thread id = 5816 invoke parallel_sum() thread id = 15856 invoke parallel_sum() thread id = 15832 invoke parallel_sum() thread id = 7636 invoke parallel_sum() thread id = 15400 invoke parallel_sum() thread id = 16968 invoke parallel_sum() thread id = 15856 invoke parallel_sum() thread id = 15476 invoke parallel_sum() The sum is: 10000 main thread end. */
二. std::async的启动策略
(一)std::async的启动策略
1. 三种启动策略(std::async通过指定不同的启动策略来决定创建是“共享状态”对象)
(1)异步方式(std::launch::async):会创建一个“_Task_async_state”类的共享状态对象。使用该策略时异味着线程函数必须以异步的方式运行,即在另一个线程之上执行。
(2)同步方式(std::launch::deferred):会创建一个“_Deferred_async_state”类的共享状态对象。使用该策略意味着线程函数延迟到调用future的get/wait时才得以运行,而且两者是在同一线程上以同步的方式运行。即调用future的一方会阻塞至线程函数运行结束为止。如果get/wait没有得到调用,则线程函数不会被执行。
(3)默认启动策略(std::launch::async|std::launch::deferred):即两者或运算的结果,这意味着任务可能以异步或同步的方式被运行。也就是说是否创建新线程来运行任务,取决于系统资源是否紧张,由标准库的线程管理组件承担线程创建和销毁、避免超订以及负载均衡的责任。
(二)默认启动策略
1. 带来的问题
(1)用户无法预知是异步还是同步运行,因为线程函数可能被调度为延迟执行。
(2)无法预知线程函数是否与调用future的get/wait函数线程是否在同一线程运行。如果此时线程函数会读取线程局部存储(thread_local storage, TLS),那么也就无法预知会取到哪个线程的局部存储。
(3)有时甚至连线程函数是否会运行,这件起码的事情都是无法预知的。这是因此无法保证在程序的每条路径上future的get或wait都会得以调用。
2. 注意事项:
(1)默认启动策略能正常工作需要满足以下所有条件。
①任务不需要与调用get/wait的线程并发执行。
②读/写哪个线程的thread_local变量无关紧要。
③可以保证在std::async返回的future上调用get/wait,或者可以接受任务可能永不执行。
④用户已清楚使用wait_for或wait_unitil的代码任务可能被推迟执行,这种可能性己被纳入考量。
(2)只要其中一个条件不满足,就必须手动指定启动策略以保证任务以异步或同步的方式运行。
【编程实验】默认启动策略问题的解决
#include <iostream> #include <future> using namespace std; using namespace literals; //for duration suffixes(时长后缀,如1s) void func() { std::this_thread::sleep_for(1s); } //reallyAsync函数模板:用于保证任务被异步执行 template<typename Func, typename ...Args> inline auto reallyAsync(Func&& f, Args... args) { return std::async(std::launch::async, std::forward<Func>(func), std::forward<Args>(args)...); } int main() { //wait_for函数必须可虑任务是同步或异步运行 auto fut1 = std::async(func); //默认启动策略,无法预估任务是被同步还是异步运行 //解决方案1:wait_for(0s) if (fut1.wait_for(0s) == std::future_status::deferred){ //同步运行,wait_for(0s) fut1.get(); //等待结果 }else { //异步运行 while (fut1.wait_for(100ms) != std::future_status::ready) { //轮询子线程是否结束 //... //并发做其他任务 } //... //fut is ready } //解决方案2:确实以异步运行任务 auto fut2 = reallyAsync(func); while (fut2.wait_for(100ms) != std::future_status::ready) //异步方式,确保wait_for返回ready的结果 { //从而消除future_status::deferred的可能 } return 0; }
原文:https://www.cnblogs.com/5iedu/p/11727695.html