且行善举,莫问前程。

0%

记一次使用asyncio.run遇到的坑

如果写同步的程序,我都是使用 requests 发送 http 请求,异步程序,我更倾向于使用 aiohttp,这两个都是非常优秀的工具包,但是写异步代码,如果掌握不精很容易坑了自己。接下来我会讲述一下我在使用中遇到的一个问题。

aiohttp client 抛了异常

下面是简化过的代码。

http.py
1
2
3
4
5
6
7
8
9
10
11
12
import aiohttp

class Http:
def __init__(self):
connector = aiohttp.TCPConnector()
self.session = aiohttp.ClientSession(connector=connector)

async def get(self, url):
async with self.session.get(url) as r:
return await r.text()

http = Http()
main.py
1
2
3
4
5
6
7
8
9
from base import b

async def main():
r = await b.get("https://blog.ibeats.top/robots.txt")
print(r)

if __name__ == "__main__":
import asyncio
asyncio.run(main())

这时候执行 main.py 就抛出了异常

RuntimeError: Timeout context manager should be used inside a task

打断点查看,确实是在 with timer 抛出的错误,这情况很有可能是没在事件循环内实例化 session。代码少还是很容易看出来,运行事件循环前导入了 http,并且实例化了 session。

所以从根本上上解决问题就是导入http时不要初始化 session,然后代码可以改成这样。

http.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import aiohttp

class Http:
def __init__(self):
self.session = None

async def get_session(self):
if self.session is None:
connector = aiohttp.TCPConnector()
self.session = aiohttp.ClientSession(connector=connector)

return self.session

async def get(self, url):
session = await self.get_session()
async with session.get(url) as r:
return await r.text()

http = Http()

这样就可以就可以放心的在任何地方初始化了。

为什么不能运行前实例化session。

运行事件循环前也可以实例化 session。但是不要使用 asyncio.run 方法,可以自己创建一个loop来运行事件循环。

1
2
3
4
5
6
async def main():
...

if __name__ == "__main__":
loop = async.get_event_loop()
loop.run_until_complete(main())

为什么会这样,我们要进入 asyncio 内部看一下了,CPython 有用 Python 实现的 asyncio 代码,就不用直接看C了。在看过 aiohttp 代码后,aiohttp 初始化 session 时,使用的是 asyncio.get_event_loop() ,asyncio.run() 是自己创建的事件循环。那么我将代码简化后写出来再分析一下。

runners.py
1
2
3
4
5
6
7
8
9
10
11
from events import *
def run()
if events._get_running_loop() is not None:
raise RuntimeError("asyncio.run() cannot be called from a running event loop")

loop = events.new_event_loop()
try:
events.set_event_loop(loop)
return loop.run_until_complete(main)
finally:
...
events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class _RunningLoop(threading.local):
loop_pid = (None, None)

_event_loop_policy = None
_running_loop = _RunningLoop()

def get_running_loop():
loop = _get_running_loop()
if loop is None:
raise RuntimeError('no running event loop')
return loop

def _get_running_loop():
running_loop, pid = _running_loop.loop_pid
if running_loop is not None and pid == os.getpid():
return running_loop

def _set_running_loop(loop):
_running_loop.loop_pid = (loop, os.getpid())

def _init_event_loop_policy():
global _event_loop_policy
with _lock:
if _event_loop_policy is None: # pragma: no branch
from . import DefaultEventLoopPolicy
_event_loop_policy = DefaultEventLoopPolicy()

def get_event_loop_policy():
if _event_loop_policy is None:
_init_event_loop_policy()
return _event_loop_policy

def get_event_loop():
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
return get_event_loop_policy().get_event_loop()

aiohttp使用get_event_loop,就是说如果不调用 set_event_loop,当执行 asyncio.run 时,会重新创建一个事件循环,导致事件循环不是同一个,运行事件循环时,aiohttp 里抛出,所以启动事件循环时,也使用 get_event_loop 就能保证最后使用的是同一个事件循环,当然还是不建议这么做,稍微控制不好就会耽误很长的时间找问题所在,最后得不偿失了。

延伸阅读

------ 本文结束 ------