使用其他并发模型的大多数程序都采用线性方式编写,而且依赖于语言运行时系统或操作系统的底层线程或进程管理来适当地改变上下文。基于asyncio的应用要求应用代码显式地处理上下文切换,要正确地使用相关技术,这取决于是否能正确理解一些相关联的概念。asyncio提供的框架以一个事件循环(event loop)为中心,这是一个首类对象,负责高效地处理I/O事件、系统事件和应用上下文切换。目前已经提供了多个循环实现来高效地利用操作系统的功能。尽管通常会自动地选择一个合理的默认实现,但也完全可以在应用中选择某个特定的事件循环实现。在很多情况下这会很有用,例如,在Windows下,一些循环类增加了对外部进程的支持,这可能会以牺牲一些网络I/O效率为代价。与事件循环交互的应用要显式地注册将运行的代码,让事件循环在资源可用时向应用代码发出必要的调用。例如,一个网络服务器打开套接字,然后注册为当这些套接字上出现输入事件时服务器要得到通知。事件循环在建立一个新的进入连接或者在数据可读取时都会提醒服务器代码。当前上下文中没有更多工作可做时,应用代码要再次短时间地交出控制。例如,如果一个套接字再没有更多的数据可以读取,那么服务器会把控制交回给事件循环。
asyncio事件循环可以采用多种不同的方法启动一个协程。最简单的方法是使用run_until complete(),并把协程直接传人这个方法。
import asyncio async def coroutine(): print(‘in coroutine‘) event_loop = asyncio.get_event_loop() try: print(‘starting coroutine‘) coro = coroutine() print(‘entering event loop‘) event_loop.run_until_complete(coro) finally: print(‘closing event loop‘) event_loop.close()
import asyncio async def coroutine(): print(‘in coroutine‘) return ‘result‘ event_loop = asyncio.get_event_loop() try: return_value = event_loop.run_until_complete( coroutine() ) print(‘it returned: {!r}‘.format(return_value)) finally: event_loop.close()
import asyncio async def outer(): print(‘in outer‘) print(‘waiting for result1‘) result1 = await phase1() print(‘waiting for result2‘) result2 = await phase2(result1) return (result1, result2) async def phase1(): print(‘in phase1‘) return ‘result1‘ async def phase2(arg): print(‘in phase2‘) return ‘result2 derived from {}‘.format(arg) event_loop = asyncio.get_event_loop() try: return_value = event_loop.run_until_complete(outer()) print(‘return value: {!r}‘.format(return_value)) finally: event_loop.close()
Python3.5引入了一些新的语言特性,可以使用async def以原生方式定义这些协程,以及使用await交出控制,asyncio的例子利用了这些新特性。Python3的早期版本可以使用由 asyncio.coroutine()修饰符包装的生成器函数和yield from来达到同样的效果。
import asyncio @asyncio.coroutine def outer(): print(‘in outer‘) print(‘waiting for result1‘) result1 = yield from phase1() print(‘waiting for result2‘) result2 = yield from phase2(result1) return (result1, result2) @asyncio.coroutine def phase1(): print(‘in phase1‘) return ‘result1‘ @asyncio.coroutine def phase2(arg): print(‘in phase2‘) return ‘result2 derived from {}‘.format(arg) event_loop = asyncio.get_event_loop() try: return_value = event_loop.run_until_complete(outer()) print(‘return value: {!r}‘.format(return_value)) finally: event_loop.close()
import asyncio import functools def callback(arg, *, kwarg=‘default‘): print(‘callback invoked with {} and {}‘.format(arg, kwarg)) async def main(loop): print(‘registering callbacks‘) loop.call_soon(callback, 1) wrapped = functools.partial(callback, kwarg=‘not default‘) loop.call_soon(wrapped, 2) await asyncio.sleep(0.1) event_loop = asyncio.get_event_loop() try: print(‘entering event loop‘) event_loop.run_until_complete(main(event_loop)) finally: print(‘closing event loop‘) event_loop.close()
import asyncio def callback(n): print(‘callback {} invoked‘.format(n)) async def main(loop): print(‘registering callbacks‘) loop.call_later(0.2, callback, 1) loop.call_later(0.1, callback, 2) loop.call_soon(callback, 3) await asyncio.sleep(0.4) event_loop = asyncio.get_event_loop() try: print(‘entering event loop‘) event_loop.run_until_complete(main(event_loop)) finally: print(‘closing event loop‘) event_loop.close()
import asyncio import time def callback(n, loop): print(‘callback {} invoked at {}‘.format(n, loop.time())) async def main(loop): now = loop.time() print(‘clock time: {}‘.format(time.time())) print(‘loop time: {}‘.format(now)) print(‘registering callbacks‘) loop.call_at(now + 0.2, callback, 1, loop) loop.call_at(now + 0.1, callback, 2, loop) loop.call_soon(callback, 3, loop) await asyncio.sleep(1) event_loop = asyncio.get_event_loop() try: print(‘entering event loop‘) event_loop.run_until_complete(main(event_loop)) finally: print(‘closing event loop‘) event_loop.close()
import asyncio def mark_done(future, result): print(‘setting future result to {!r}‘.format(result)) future.set_result(result) event_loop = asyncio.get_event_loop() try: all_done = asyncio.Future() print(‘scheduling mark_done‘) event_loop.call_soon(mark_done, all_done, ‘the result‘) print(‘entering event loop‘) result = event_loop.run_until_complete(all_done) print(‘returned result: {!r}‘.format(result)) finally: print(‘closing event loop‘) event_loop.close() print(‘future result: {!r}‘.format(all_done.result()))
import asyncio def mark_done(future, result): print(‘setting future result to {!r}‘.format(result)) future.set_result(result) async def main(loop): all_done = asyncio.Future() print(‘scheduling mark_done‘) loop.call_soon(mark_done, all_done, ‘the result‘) result = await all_done print(‘returned result: {!r}‘.format(result)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
import asyncio import functools def callback(future, n): print(‘{}: future done: {}‘.format(n, future.result())) async def register_callbacks(all_done): print(‘registering callbacks on future‘) all_done.add_done_callback(functools.partial(callback, n=1)) all_done.add_done_callback(functools.partial(callback, n=2)) async def main(all_done): await register_callbacks(all_done) print(‘setting result of future‘) all_done.set_result(‘the result‘) event_loop = asyncio.get_event_loop() try: all_done = asyncio.Future() event_loop.run_until_complete(main(all_done)) finally: event_loop.close()
import asyncio async def task_func(): print(‘in task_func‘) return ‘the result‘ async def main(loop): print(‘creating task‘) task = loop.create_task(task_func()) print(‘waiting for {!r}‘.format(task)) return_value = await task print(‘task completed {!r}‘.format(task)) print(‘return value: {!r}‘.format(return_value)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
import asyncio async def task_func(): print(‘in task_func‘) return ‘the result‘ async def main(loop): print(‘creating task‘) task = loop.create_task(task_func()) print(‘canceling task‘) task.cancel() print(‘canceled task {!r}‘.format(task)) try: await task except asyncio.CancelledError: print(‘caught error from canceled task‘) else: print(‘task result: {!r}‘.format(task.result())) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
import asyncio async def task_func(): print(‘in task_func, sleeping‘) try: await asyncio.sleep(1) except asyncio.CancelledError: print(‘task_func was canceled‘) raise return ‘the result‘ def task_canceller(t): print(‘in task_canceller‘) t.cancel() print(‘canceled the task‘) async def main(loop): print(‘creating task‘) task = loop.create_task(task_func()) loop.call_soon(task_canceller, task) try: await task except asyncio.CancelledError: print(‘main() also sees task as canceled‘) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
import asyncio async def wrapped(): print(‘wrapped‘) return ‘result‘ async def inner(task): print(‘inner: starting‘) print(‘inner: waiting for {!r}‘.format(task)) result = await task print(‘inner: task returned {!r}‘.format(result)) async def starter(): print(‘starter: creating task‘) task = asyncio.ensure_future(wrapped()) print(‘starter: waiting for inner‘) await inner(task) print(‘starter: inner returned‘) event_loop = asyncio.get_event_loop() try: print(‘entering event loop‘) result = event_loop.run_until_complete(starter()) finally: event_loop.close()
import asyncio async def phase(i): print(‘in phase {}‘.format(i)) await asyncio.sleep(0.1 * i) print(‘done with phase {}‘.format(i)) return ‘phase {} result‘.format(i) async def main(num_phases): print(‘starting main‘) phases = [ phase(i) for i in range(num_phases) ] print(‘waiting for phases to complete‘) completed, pending = await asyncio.wait(phases) results = [t.result() for t in completed] print(‘results: {!r}‘.format(results)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(3)) finally: event_loop.close()
import asyncio async def phase(i): print(‘in phase {}‘.format(i)) try: await asyncio.sleep(0.1 * i) except asyncio.CancelledError: print(‘phase {} canceled‘.format(i)) raise else: print(‘done with phase {}‘.format(i)) return ‘phase {} result‘.format(i) async def main(num_phases): print(‘starting main‘) phases = [ phase(i) for i in range(num_phases) ] print(‘waiting 0.1 for phases to complete‘) completed, pending = await asyncio.wait(phases, timeout=0.1) print(‘{} completed and {} pending‘.format( len(completed), len(pending), )) # Cancel remaining tasks so they do not generate errors # as we exit without finishing them. if pending: print(‘canceling tasks‘) for t in pending: t.cancel() print(‘exiting main‘) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(3)) finally: event_loop.close()
import asyncio async def phase1(): print(‘in phase1‘) await asyncio.sleep(2) print(‘done with phase1‘) return ‘phase1 result‘ async def phase2(): print(‘in phase2‘) await asyncio.sleep(1) print(‘done with phase2‘) return ‘phase2 result‘ async def main(): print(‘starting main‘) print(‘waiting for phases to complete‘) results = await asyncio.gather( phase1(), phase2(), ) print(‘results: {!r}‘.format(results)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main()) finally: event_loop.close()
import asyncio async def phase(i): print(‘in phase {}‘.format(i)) await asyncio.sleep(0.5 - (0.1 * i)) print(‘done with phase {}‘.format(i)) return ‘phase {} result‘.format(i) async def main(num_phases): print(‘starting main‘) phases = [ phase(i) for i in range(num_phases) ] print(‘waiting for phases to complete‘) results = [] for next_to_complete in asyncio.as_completed(phases): answer = await next_to_complete print(‘received answer {!r}‘.format(answer)) results.append(answer) print(‘results: {!r}‘.format(results)) return results event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(3)) finally: event_loop.close()
import asyncio import functools def unlock(lock): print(‘callback releasing lock‘) lock.release() async def coro1(lock): print(‘coro1 waiting for the lock‘) async with lock: print(‘coro1 acquired lock‘) print(‘coro1 released lock‘) async def coro2(lock): print(‘coro2 waiting for the lock‘) await lock.acquire() try: print(‘coro2 acquired lock‘) finally: print(‘coro2 released lock‘) lock.release() async def main(loop): # Create and acquire a shared lock. lock = asyncio.Lock() print(‘acquiring the lock before starting coroutines‘) await lock.acquire() print(‘lock acquired: {}‘.format(lock.locked())) # Schedule a callback to unlock the lock. loop.call_later(0.1, functools.partial(unlock, lock)) # Run the coroutines that want to use the lock. print(‘waiting for coroutines‘) await asyncio.wait([coro1(lock), coro2(lock)]), event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
锁可以直接调用,使用await来得到,并且使用结束时可以调用release()方法释放锁,如这个例子中的coro2()所示。还可以结合with await关键字使用锁作为异步上下文管理器,如coro1()中所示。
import asyncio import functools def set_event(event): print(‘setting event in callback‘) event.set() async def coro1(event): print(‘coro1 waiting for event‘) await event.wait() print(‘coro1 triggered‘) async def coro2(event): print(‘coro2 waiting for event‘) await event.wait() print(‘coro2 triggered‘) async def main(loop): # Create a shared event event = asyncio.Event() print(‘event start state: {}‘.format(event.is_set())) loop.call_later( 0.1, functools.partial(set_event, event) ) await asyncio.wait([coro1(event), coro2(event)]) print(‘event end state: {}‘.format(event.is_set())) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
import asyncio async def consumer(condition, n): async with condition: print(‘consumer {} is waiting‘.format(n)) await condition.wait() print(‘consumer {} triggered‘.format(n)) print(‘ending consumer {}‘.format(n)) async def manipulate_condition(condition): print(‘starting manipulate_condition‘) # pause to let consumers start await asyncio.sleep(0.1) for i in range(1, 3): async with condition: print(‘notifying {} consumers‘.format(i)) condition.notify(n=i) await asyncio.sleep(0.1) async with condition: print(‘notifying remaining consumers‘) condition.notify_all() print(‘ending manipulate_condition‘) async def main(loop): # Create a condition condition = asyncio.Condition() # Set up tasks watching the condition consumers = [ consumer(condition, i) for i in range(5) ] # Schedule a task to manipulate the condition variable loop.create_task(manipulate_condition(condition)) # Wait for the consumers to be done await asyncio.wait(consumers) event_loop = asyncio.get_event_loop() try: result = event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
import asyncio async def consumer(n, q): print(‘consumer {}: starting‘.format(n)) while True: print(‘consumer {}: waiting for item‘.format(n)) item = await q.get() print(‘consumer {}: has item {}‘.format(n, item)) if item is None: # None is the signal to stop. q.task_done() break else: await asyncio.sleep(0.01 * item) q.task_done() print(‘consumer {}: ending‘.format(n)) async def producer(q, num_workers): print(‘producer: starting‘) # Add some numbers to the queue to simulate jobs for i in range(num_workers * 3): await q.put(i) print(‘producer: added task {} to the queue‘.format(i)) # Add None entries in the queue # to signal the consumers to exit print(‘producer: adding stop signals to the queue‘) for i in range(num_workers): await q.put(None) print(‘producer: waiting for queue to empty‘) await q.join() print(‘producer: ending‘) async def main(loop, num_consumers): # Create the queue with a fixed size so the producer # will block until the consumers pull some items out. q = asyncio.Queue(maxsize=num_consumers) # Scheduled the consumer tasks. consumers = [ loop.create_task(consumer(i, q)) for i in range(num_consumers) ] # Schedule the producer task. prod = loop.create_task(producer(q, num_consumers)) # Wait for all of the coroutines to finish. await asyncio.wait(consumers + [prod]) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop, 2)) finally: event_loop.close()