0%

python | run_until_complete 和 run_forever

首先 run_forever3.7 之后就废弃了,但是,它还是非常非常有用的。

另外,暂时,没发现 3.7 可以代替 run_forever 的方法。

本文,除了想探讨上面两个方法怎么用之外,还想探讨一个 event loop 的生命周期。

run_forever

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import traceback

import aiohttp


class T:
def __init__(self):
self._session = None

async def get(self):
try:
if self._session is None:
self._session = aiohttp.ClientSession()
async with self._session.get("http://www.baidu.com") as r:
t = await r.text()
print(1)
except Exception as e:
print(traceback.format_exc())


def main():
test = T()
loop = asyncio.get_event_loop()
for i in range(10):
loop.create_task(test.get())
loop.run_forever()


if __name__ == '__main__':
main()

在程序中我们创建了一个 event loop ,然后向这个 loop 中不断的添加 task,由于 loop.run_forever() 可以让 loop 一直存在。

执行了 run_forever 之后,后面的代码就不能运行了。

run_until_complete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
import traceback
import time


class T:
def __init__(self):
self._session = None

async def get(self):
try:
if self._session is None:
self._session = aiohttp.ClientSession()
for i in range(100):
async with self._session.get("http://www.baidu.com") as r:
t = await r.text()
print(1)
except Exception as e:
print(traceback.format_exc())
finally:
await self._session.close()


def main():
test = T()
loop = asyncio.get_event_loop()
loop.run_until_complete(test.get())


if __name__ == '__main__':
main()

run_until_complete() 需要传如一个 future 对象,我们传入一个协程对象之所以可以,是因为,run_until_complete 会智能的将其转换,想当于

loop.run_until_complete(asyncio.ensure_future(test.get()))

另外,run_until_complete 是阻塞的,只有运行完里面的 future 对象后,才能继续向下运行,另外,run_until_complete 执行完毕后,event loop 依然不会关闭,还将继续运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
import traceback
import time


class T:
def __init__(self):
self._session = None

async def get(self):
try:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
for i in range(10):
async with self._session.get("http://www.baidu.com") as r:
t = await r.text()
print(1)
except Exception as e:
print(traceback.format_exc())
finally:
await self._session.close()


def main():
test = T()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(test.get()))
print(2)
loop.run_until_complete(asyncio.ensure_future(test.get()))


if __name__ == '__main__':
main()

3.7 之后,run_until_complete 可以用 asyncio.run 代替。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
import traceback
import time


class T:
def __init__(self):
self._session = None

async def get(self):
try:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
for i in range(10):
async with self._session.get("http://www.baidu.com") as r:
t = await r.text()
print(1)
except Exception as e:
print(traceback.format_exc())
finally:
await self._session.close()


def main():
test = T()
asyncio.run(test.get())


if __name__ == '__main__':
main()

while 和 event loop

我们印象中,while 在执行的过程中是不能跳出循环的,但是,借助于 event loop 是可以跳出的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio


async def first_worker():
while True:
await asyncio.sleep(1)
print("第一个任务完成")


async def second_worker():
while True:
await asyncio.sleep(1)
print("第二个任务完成")


loop = asyncio.get_event_loop()
try:
loop.create_task(first_worker())
loop.create_task(second_worker())
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("关闭 Loop")
loop.close()
请我喝杯咖啡吧~