0%

python | 多线程、协程、主线程结合

关于 多线程、协程、主线程结合 的升级用法。

如果你这样使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio

import aiohttp

import time

async def ws_binance():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg)
time.sleep(5)


async def ws_mexc():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{ "method":"SUBSCRIPTION", "params":["spot@public.deals.v3.api@BTCUSDT"] }')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(ws_binance())
loop.create_task(ws_mexc())
loop.run_forever()

就会发现两个 ws 共享一个 event loop,会造成数据卡顿。两个人的数据受制于 time.sleep(5),当然如果把 time.sleep(5) 换成 await asyncio.sleep(5),两者不受影响。

真正的使用方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import asyncio
import threading
import time

import aiohttp


async def ws_binance():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:8001") as ws:
await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg)
await asyncio.sleep(0.0001)


async def ws_mexc():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:8001") as ws:
await ws.send_str('{ "method":"SUBSCRIPTION", "params":["spot@public.deals.v3.api@BTCUSDT"] }')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg)
await asyncio.sleep(0.0001)


async def ws_tt():
while 1:
print("=====================================")
await asyncio.sleep(5)


async def main():
task1 = asyncio.create_task(ws_binance())
task2 = asyncio.create_task(ws_mexc())
task3 = asyncio.create_task(ws_tt())
await asyncio.gather(task1, task2, task3)


def start_asyncio_loop():
asyncio.run(main())


if __name__ == '__main__':
t = threading.Thread(target=start_asyncio_loop)
t.start()

while 1:
print("++++++++")
time.sleep(1)

首先,协程中 ws 必须用一个独立的线程运行,否则,主线程执行不下去。

在独立线程中,每一个 ws 都要单独开一个 event loop,否则他们都会共享一个 event loop,会造成卡顿。

另外,每一个 ws 都要弄一个 sleep ,否则会出现 cpu 过高问题。

如果把 await asyncio.sleep(0.0001) 换成 time.sleep() 也会有问题,相当于该独立线程睡眠了。

请我喝杯咖啡吧~