0%

Rhino | Rhino_Collect 编写原理

Rhino_CollectRhino 的核心模块,是采集数据的模块。

整个模块做成一个库,通过传递配置来进行采集数据,整个模块一共有 3 个核心类。

  • Restful 类
  • Websocket 类
  • HeartBeat 类

Restful 类

有的交易所,或者链上数据只能通过 restful 的方式获得,所以,需要一个 restful 请求,下面举一个简单的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio

import aiohttp


async def get_binance():
session = aiohttp.ClientSession()
while 1:
res = await session.get("https://fapi.binance.com/fapi/v1/depth?symbol=btcusdt", proxy="http://127.0.0.1:1087")
response = await res.text()
print("restful binance")
await asyncio.sleep(0.5)


async def ws_binance():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket binance")


async def ws_mexc():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{ "method":"SUBSCRIPTION", "params":["spot@public.deals.v3.api@BTCUSDT"] }')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket mexc")


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(get_binance())
loop.create_task(ws_binance())
loop.create_task(ws_mexc())
loop.run_forever()

上面的 get_binance 就是 restful 的请求方式,但是,通过学习

get_binance 改成下面这种形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import asyncio

import aiohttp


async def restful(loop):
session = aiohttp.ClientSession()
loop.create_task(get_binance(session, loop))


async def get_binance(session, loop):
res = await session.get("https://fapi.binance.com/fapi/v1/depth?symbol=btcusdt", proxy="http://127.0.0.1:1087")
response = await res.text()
print("restful binance")
await asyncio.sleep(0.5)
loop.create_task(get_binance(session, loop))


async def ws_binance():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket binance")


async def ws_mexc():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{ "method":"SUBSCRIPTION", "params":["spot@public.deals.v3.api@BTCUSDT"] }')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket mexc")


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(restful(loop))
loop.create_task(ws_binance())
loop.create_task(ws_mexc())
loop.run_forever()

websocket

websocket 就是上面的代码

HeartBeat 类

这个是心跳模块,主要是监控各个交易所的服务是否还活着。

目前,Rhino_Collect 并没有断开长久后,进行通知的服务「钉钉、微信等」,相关的通知,通过一个专门的监控模块,进行监控。

HeartBeat 主要是,进行 websocket 连接保活。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import asyncio

import aiohttp
import time

websock_data = {}


async def restful(loop):
session = aiohttp.ClientSession()
loop.create_task(get_binance(session, loop))


async def get_binance(session, loop):
res = await session.get("https://fapi.binance.com/fapi/v1/depth?symbol=btcusdt", proxy="http://127.0.0.1:1087")
response = await res.text()
print("restful binance")
await asyncio.sleep(0.5)
loop.create_task(get_binance(session, loop))


async def ws_binance():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket binance")
websock_data["binance"] = int(time.time() * 1000)


async def ws_mexc():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{ "method":"SUBSCRIPTION", "params":["spot@public.deals.v3.api@BTCUSDT"] }')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket mexc")
websock_data["mexc"] = int(time.time() * 1000)


async def listen():
while 1:
now = int(time.time() * 1000)
for key, store_time in websock_data.items():
if now - store_time > 10_000:
print(f"{key} 断开,将进行重连")
await asyncio.sleep(1)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(restful(loop))
loop.create_task(ws_binance())
loop.create_task(ws_mexc())
loop.create_task(listen())
loop.run_forever()

数据流走向

数据拿到后,通过向 redis 中进行数据存储,采用生产者和消费者模式,将数据推送给下一个模块。

配置化开启服务

Rhino_Collect 通过配置化的方式,进行到底采集什么数据,具体细节在这里就不展开了。

请我喝杯咖啡吧~