Rhino_Collect
是 Rhino
的核心模块,是采集数据的模块。
整个模块做成一个库,通过传递配置来进行采集数据,整个模块一共有 3
个核心类。
Restful 类
Websocket 类
HeartBeat 类
Restful 类 有的交易所,或者链上数据只能通过 restful
的方式获得,所以,需要一个 restful
请求,下面举一个简单的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asyncio import aiohttp async def get_binance() : session = aiohttp.ClientSession() while 1 : res = await session.get("https://fapi.binance.com/fapi/v1/depth?symbol=btcusdt" , proxy="http://127.0.0.1:1087" ) response = await res.text() print("restful binance" ) await asyncio.sleep(0.5 ) async def ws_binance() : session = aiohttp.ClientSession() async with session.ws_connect('wss :/ / fstream .binance .com / ws ', proxy ="http://127.0.0.1:1087" ) as ws: await ws.send_str('{"method" : "SUBSCRIBE" ,"params" :["btcusdt@aggTrade" ,"btcusdt@depth" ],"id" : 1}') async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: print("websocket binance" ) async def ws_mexc() : session = aiohttp.ClientSession() async with session.ws_connect('wss :/ / wbs .mexc .com / ws ', proxy ="http://127.0.0.1:1087" ) as ws: await ws.send_str('{ "method" :"SUBSCRIPTION" , "params" :["spot@public.deals.v3.api@BTCUSDT" ] }') async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: print("websocket mexc" ) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.create_task(get_binance () ) loop.create_task(ws_binance () ) loop.create_task(ws_mexc () ) loop.run_forever()
上面的 get_binance 就是 restful 的请求方式,但是,通过学习
get_binance 改成下面这种形式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import asyncio import aiohttp async def restful(loop): session = aiohttp.ClientSession() loop.create_task(get_binance (session , loop ) ) async def get_binance(session , loop ) : res = await session.get("https://fapi.binance.com/fapi/v1/depth?symbol=btcusdt" , proxy="http://127.0.0.1:1087" ) response = await res.text() print("restful binance" ) await asyncio.sleep(0.5 ) loop.create_task(get_binance (session , loop ) ) async def ws_binance() : session = aiohttp.ClientSession() async with session.ws_connect('wss :/ / fstream .binance .com / ws ', proxy ="http://127.0.0.1:1087" ) as ws: await ws.send_str('{"method" : "SUBSCRIBE" ,"params" :["btcusdt@aggTrade" ,"btcusdt@depth" ],"id" : 1}') async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: print("websocket binance" ) async def ws_mexc() : session = aiohttp.ClientSession() async with session.ws_connect('wss :/ / wbs .mexc .com / ws ', proxy ="http://127.0.0.1:1087" ) as ws: await ws.send_str('{ "method" :"SUBSCRIPTION" , "params" :["spot@public.deals.v3.api@BTCUSDT" ] }') async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: print("websocket mexc" ) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.create_task(restful (loop ) ) loop.create_task(ws_binance () ) loop.create_task(ws_mexc () ) loop.run_forever()
websocket websocket
就是上面的代码
HeartBeat 类 这个是心跳模块,主要是监控各个交易所的服务是否还活着。
目前,Rhino_Collect
并没有断开长久后,进行通知的服务「钉钉、微信等」,相关的通知,通过一个专门的监控模块,进行监控。
HeartBeat
主要是,进行 websocket
连接保活。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import asyncio import aiohttp import time websock_data = {} async def restful(loop): session = aiohttp.ClientSession() loop.create_task(get_binance (session , loop ) ) async def get_binance(session , loop ) : res = await session.get("https://fapi.binance.com/fapi/v1/depth?symbol=btcusdt" , proxy="http://127.0.0.1:1087" ) response = await res.text() print("restful binance" ) await asyncio.sleep(0.5 ) loop.create_task(get_binance (session , loop ) ) async def ws_binance() : session = aiohttp.ClientSession() async with session.ws_connect('wss :/ / fstream .binance .com / ws ', proxy ="http://127.0.0.1:1087" ) as ws: await ws.send_str('{"method" : "SUBSCRIBE" ,"params" :["btcusdt@aggTrade" ,"btcusdt@depth" ],"id" : 1}') async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: print("websocket binance" ) websock_data["binance "] = int (time.time() * 1000 ) async def ws_mexc() : session = aiohttp.ClientSession() async with session.ws_connect('wss :/ / wbs .mexc .com / ws ', proxy ="http://127.0.0.1:1087" ) as ws: await ws.send_str('{ "method" :"SUBSCRIPTION" , "params" :["spot@public.deals.v3.api@BTCUSDT" ] }') async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: print("websocket mexc" ) websock_data["mexc "] = int (time.time() * 1000 ) async def listen() : while 1 : now = int (time.time() * 1000 ) for key, store_time in websock_data.items() : if now - store_time > 10_000 : print(f"{key} 断开,将进行重连" ) await asyncio.sleep(1 ) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.create_task(restful (loop ) ) loop.create_task(ws_binance () ) loop.create_task(ws_mexc () ) loop.create_task(listen () ) loop.run_forever()
数据流走向 数据拿到后,通过向 redis
中进行数据存储,采用生产者和消费者模式,将数据推送给下一个模块。
配置化开启服务 Rhino_Collect
通过配置化的方式,进行到底采集什么数据,具体细节在这里就不展开了。