第一次接触异步编程这个概念是在 Python
里面,去年的时候就因为不清楚 Python
中异步编程的实现原理找了很多资料研究,但最后也没有搞得很清楚。
后来又因为重心逐渐转移到了 Java
上,就暂时放弃了对 Python
中异步原理的探究,直到前段时间,被 JavaScript
的异步坑了一次后, 让我想起了 Python
里面的异步……
然后我就在网上看到了一篇文章,看了半天后在文章的评论中发现了自己两年前的评论……
当时我那个心情啊,太曲折了,于是乎决定找个时间在研究一下 Python
中的异步,这便是这篇博客的由来。
Python
异步编程实现原理的过程,但是,并不会包含太多 Python
中异步编程的实现原理,因为我还没有搞明白 QAQ我先后接触过了 Python、JavaScript1 和 Java 中的异步编程,其中,Python 的是最为特殊的一个,因为它是 单线程 和 异步阻塞 的。
通常来说,异步编程模型下最小的执行单位是一个个任务(往往就是一个函数),而为了能够在一个任务执行完成后执行另外的操作,又会引入回调任务(函数)。 不同的任务和回调任务的执行,往往又是通过事件循环和任务队列来完成。
假如将这些东西拆分开来,像 JavaScript 那样,放在不同的线程下面处理,是很容易理解的,因为整体结构足够清晰,但是 Python 不行,因为 GIL
的原因,如果 Python 还是使用多线程的方式来实现异步编程的话, 并不能带来多少性能上的提升,因此,Python 异步中的事件循环、任务队列和任务的执行都在一个线程里面。
而且,Python 中异步的实现是基于协程的,这不仅使得 Python 中的异步是阻塞的,而且,最小执行单位不再是单个任务,而是单个任务的一部分。
这就让 Python 中异步的实现变得复杂起来,本来 Python 的源码就不好读,好家伙,现在更不好读了,同时又因为异步编程往往都和异步 I/O 挂钩,刷的一下,源码中一堆和异步 I/O 相关的代码……
这让我明白了,想快速搞明白 Python 异步是咋回事是不可能的,毕竟,我要做的是通过阅读源码倒推作者的思路,这很难!!!
因此,我换了一个思路,我先自己用协程实现一个简单的事件循环,在慢慢去读 Python 的源码,总可以了吧!
这就是为啥这篇博客说是在探究 Python
异步编程的实现原理,但是标题连 Python 这个单词都没有的原因。
Python 中的协程是通过生成器来实现的,但是,基本上所有博客将协程的时候都会说的一句话,协程不等于生成器,它们只是长得像:
def grep(pattern):
"""
>>> g = grep("python")
>>> g.send(None)
Looking for python
>>> g.send("Yeah, but no, but yeah, but no")
>>> g.send("A series of tubes")
>>> g.send("python generators rock!")
python generators rock!
"""
print("Looking for %s" % pattern)
while True:
line = yield
if pattern in line:
print(line)
上面这个协程不断接收来自 send
方法的输入,在经过判断后进行输出,假如把它当做生成器使用的话,那么你只能得到无数的 None
值。
本质上,在低版本的 Python
中生成器和协程没有区别,就看你怎么用,关键就在于协程 消费 值,而生成器 生成 值,当然了,高版本的 Python
对协程提供了更多的支持, 使得它们不再一样,但是,这篇博客里面,所有的协程都通过生成器实现。
因此,我们需要关注后面会用到的几个特性:
yield
的左值会接收来自 send
方法的输入,但是协程在第一次运行时还没到达 yield
语句处,因此无法传递参数,只能通过 None
值来调用协程:
def coroutine():
while True:
val = yield
print(val)
执行输出:
In [9]: coro = coroutine()
In [10]: coro.send(None)
In [11]: coro.send(1)
1
In [12]: coro = coroutine()
In [13]: coro.send(1)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-13-e272bd1527da> in <module>()
----> 1 coro.send(1)
TypeError: can't send non-None value to a just-started generator
可以通过 yield from
语句递归调用协程,效果如下:
def coroutine():
for i in range(3):
val = yield
print('coroutine %s' % val)
def invoker():
yield from coroutine()
执行输出(就是会报异常):
In [29]: coro = invoker()
In [30]: coro.send(None)
In [31]: coro.send(1)
coroutine 1
In [32]: coro.send(2)
coroutine 2
In [33]: coro.send(3)
coroutine 3
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-33-8e657389bc11> in <module>()
----> 1 coro.send(3)
StopIteration:
协程可以有返回值,保存在 StopIteration
异常中,作为 yield from
的左值时可以直接接收:
def coroutine():
val = yield
return 'coroutine %s' % val
执行输出:
In [42]: coro = coroutine()
In [43]: coro.send(None)
In [44]: coro.send(1)
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-44-e272bd1527da> in <module>()
----> 1 coro.send(1)
StopIteration: coroutine 1
本来想将 Future & Task
和 EventLoop
分成两节的,结果 Task
和 EventLoop
耦合在了一起,只好合在一起了,下面是代码:
class EventLoop:
def __init__(self):
self._ready = []
def call_soon(self, task):
self._ready.append(task)
def run_forever(self, coro):
root = Task(coro, self)
while self._ready:
task = self._ready.pop(0)
task.step(Future())
return root.result
class Future:
def __init__(self):
# 通过 result 来保存协程的返回值
self.result = None
# 通过 _callbacks 来保存回调函数
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
# try suppression bug
self.result = self.result or result
# 执行完成后将自身作为参数传递给回调函数
for callback in self._callbacks:
callback(self)
class Task(Future):
# 协程类型
coroutine = type((i for i in range(0)))
def __init__(self, coro, loop):
super().__init__()
self.coro = coro
self.loop = loop
# 将自身加入任务队列
self.loop.call_soon(self)
def step(self, future):
try:
result = self.coro.send(future.result)
except StopIteration as exc:
# 触发 StopIteration 异常时说明协程已经执行结束
self.set_result(exc.value)
else:
# 协程返回协程,将其转换为 Task 后将 self.step 注册为期回调函数等待唤醒
if type(result) == self.coroutine:
result = Task(result, self.loop)
result.add_done_callback(self.step)
# 协程返回任务,将 self.step 注册为回调函数等待唤醒
elif isinstance(result, Task):
# there is a bug
result.add_done_callback(self.step)
self.loop.call_soon(result)
# 协程返回其他东西,不受理,直接将 self 再次放入任务队列
else:
self.loop.call_soon(self)
一开始实现的时候是想用一个外部的事件循环来操作,不需要 Task
持有事件循环,但是实现过程中发现那样存在一点问题,便学着 Python
中的方式将事件循环传递给 Task
操作, 但这里的实现是依然存在问题。
在只存在协程和同序返回 Task
的情况下测试没有问题,但是当存在异序返回 Task
的情况下问题就出现了,下面的测试代码便是异序返回,我通过在 set_result
中判断 result
的方式暂时抑制了该异常, 但是,这是治标不治本的方式。如果有大佬知道方案,请务必告诉我 QAQ
测试代码:
_loop = EventLoop()
def main():
ta = Task(say_hello(), _loop)
tb = Task(say_world(), _loop)
b = yield tb
a = yield ta
return a + b
def say_world():
print('world')
yield
return 'world'
def say_hello():
print('hello')
yield from say_other()
return 'hello '
def say_other():
print('other')
yield
print(_loop.run_forever(main()))
输出:
hello
other
world
hello world
折腾了一圈后结果还是只能得到一份存在问题的代码,和去年的时候差不多,但比去年好的是,多少多了一点思路。
但是,还是差得好远啊……
1 ES6 中 async/await
的原理还没有怎么了解过,因此这里的异步只包括 ajax
这类异步操作
原文:https://www.cnblogs.com/rgbit/p/12210530.html