Python协程的核心是事件循环(Event loop), 简单的介绍一下事件循环: 通过注册事件与事件处理函数。当发生了特定事件后,通过循环来知晓事件的发生,并调用注册的处理函数。总结起来就是三个步骤:注册、监听、处理。
协程的任务状态有阻塞、就绪、运行,当一协程在运行状态,遇到了I/O操作,便会到事件循环中监听I/O操作完成的事件,并注册自身的上下文以及自身的唤醒函数,之后该协程就变为阻塞状态。当监听的事件发生后,阻塞状态的协程就会被唤醒,进入就绪状态,将在下一次事件循环继续运行。
接下来就通过一个简单的例子以及源码来分析其工作原理。
示例代码:
import asyncio async def start_io(ident): print(f"{ident} blocking") # 模拟I/O操作 await asyncio.sleep(2) print(f"{ident} weak up") async def say_hello(ident): print(f"{ident}:hello,") await start_io(ident) print(f"i‘m {ident}") loop = asyncio.get_event_loop() loop.create_task(say_hello(‘XiaoMing‘)) loop.create_task(say_hello(‘XiaoHong‘)) loop.run_forever()
在上述代码里,先是获取了一个事件循环对象,然后往里面注册了两个协程对象,一个是小明的say_hello, 另一个是小红的say_hello,开启事件循环。其运行结果如下:
XiaoMing:hello, XiaoMing blocking XiaoHong:hello, XiaoHong blocking XiaoMing weak up i‘m XiaoMing XiaoHong weak up i‘m XiaoHong
可以看到,先是执行小明的函数,遇到了I/O操作时就会把控制权让出,执行小红的函数,知道I/O操作完成,唤醒小明的协程,才会继续执行。
协程是根据await关键字来分段,每次执行到await时,都会停止,并等下一次event loop来调度。
这里面涉及了两个类,BaseEventLoop类以及Task类。
先来看一下Task的初始化函数(仅关键部分代码逻辑,其它部分可自行阅读源码):
def __init__(self, coro, *, loop=None, name=None): # 协程名称 if name is None: self._name = f‘Task-{_task_name_counter()}‘ else: self._name = str(name) # 协程上下文(同协程共用,相当于多线程中的threadlocal) self._context = contextvars.copy_context() # 向事件循环对象注册 self._loop.call_soon(self.__step, context=self._context) _register_task(self)
执行协程里的每一段是通过Task的__step方法来进行的:
def __step(self, exc=None): try: # 执行至下一await处 result = coro.send(None) except StopIteration as exc: # 执行完成 super().set_result(exc.value) else: blocking = getattr(result, ‘_asyncio_future_blocking‘, None) # 是否被阻塞 if blocking is not None: # 注册该协程的唤醒函数 result.add_done_callback(self.__wakeup, context=self._context) # 如果仅为yield却没有值,那么该协程就会直接进入就绪队列 elif result is None: self._loop.call_soon(self.__step, context=self._context) finally: _leave_task(self._loop, self)
这个__step方法会执行到await的地方,并把下一段给添加到事件循环里面去,__wakeup也会调用这个方法。__step是task的核心,唤醒和执行都需要通过调用它来实现。
下面来看事件循环类, BaseEventLoop,我们主要看它一次事件循环里所处理的事情。
def _run_once(self): # 事件监听和处理 详细看Selector event loop event_list = self._selector.select(timeout) self._process_events(event_list) # 执行就绪队列里的handle ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() # 如果任务被取消了则跳过 if handle._cancelled: continue else: handle._run()
最后来一下总结,python里协程是通过async/await来将一个函数分成多个段,而这些段都是协程里面调度的单位,粒度比进程、线程都要小。
原文:https://www.cnblogs.com/CygnusKingdom/p/14698570.html