关于 多线程、协程、主线程结合 的升级用法。
如果你这样使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import asyncio
import aiohttp
import time
async def ws_binance(): session = aiohttp.ClientSession() async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:1087") as ws: await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}') async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: print(msg) time.sleep(5)
async def ws_mexc(): session = aiohttp.ClientSession() async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:1087") as ws: await ws.send_str('{ "method":"SUBSCRIPTION", "params":["spot@public.deals.v3.api@BTCUSDT"] }') async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: print(msg)
if __name__ == '__main__': loop = asyncio.get_event_loop() loop.create_task(ws_binance()) loop.create_task(ws_mexc()) loop.run_forever()
|
就会发现两个 ws
共享一个 event loop
,会造成数据卡顿。两个人的数据受制于 time.sleep(5)
,当然如果把 time.sleep(5)
换成 await asyncio.sleep(5)
,两者不受影响。
真正的使用方法如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import asyncio import threading import time
import aiohttp
async def ws_binance(): session = aiohttp.ClientSession() async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:8001") as ws: await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}') async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: print(msg) await asyncio.sleep(0.0001)
async def ws_mexc(): session = aiohttp.ClientSession() async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:8001") as ws: await ws.send_str('{ "method":"SUBSCRIPTION", "params":["spot@public.deals.v3.api@BTCUSDT"] }') async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: print(msg) await asyncio.sleep(0.0001)
async def ws_tt(): while 1: print("=====================================") await asyncio.sleep(5)
async def main(): task1 = asyncio.create_task(ws_binance()) task2 = asyncio.create_task(ws_mexc()) task3 = asyncio.create_task(ws_tt()) await asyncio.gather(task1, task2, task3)
def start_asyncio_loop(): asyncio.run(main())
if __name__ == '__main__': t = threading.Thread(target=start_asyncio_loop) t.start()
while 1: print("++++++++") time.sleep(1)
|
首先,协程中 ws
必须用一个独立的线程运行,否则,主线程执行不下去。
在独立线程中,每一个 ws 都要单独开一个 event loop
,否则他们都会共享一个 event loop
,会造成卡顿。
另外,每一个 ws
都要弄一个 sleep
,否则会出现 cpu
过高问题。
如果把 await asyncio.sleep(0.0001)
换成 time.sleep()
也会有问题,相当于该独立线程睡眠了。