0%

Rhino | 如何统一化执行交易所方法

有的交易所有获取 ticker 方法,有的交易所将 tickertrade 放在一起,那么,怎么兼容这些不同交易所之间的调用。

我的解决方法是,每个交易所都是 3 层结构,其中一个是根点,另外两个是叶点,根点管理叶点,叶点可以通过根点进行相互交流。

binanceuswap 来举例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class BinanceUSwapGateway(BaseGateway):
def __init__(self, logger: RhinoLogger, rhino_collect_config: RhinoConfig, loop):
super().__init__(logger, rhino_collect_config, loop)
self.exchange = Exchange.BINANCE.value
self.exchange_sub = ExchangeSub.BINANCEUSWAP.value

self.rest = BinanceUSwapRestGateway(self)
self.websocket = BinanceUSwapWebsocketGateway(self)
...

class BinanceUSwapRestGateway(RestClient):

def __init__(self, gateway: BinanceUSwapGateway):
super().__init__(gateway)
...

class BinanceUSwapWebsocketGateway(WebsocketClient):

def __init__(self, gateway: BinanceUSwapGateway):
super().__init__(gateway)
...

BinanceUSwapGateway 是根节点,BinanceUSwapRestGateway 是叶节点它专门用来获取 restful 数据,BinanceUSwapWebsocketGateway 是叶节点,它专门用来获取 websocket 数据。

而,BinanceUSwapRestGatewayBinanceUSwapWebsocketGateway 都不能直接调用,都是通过 BinanceUSwapGateway 来进行调用的,比如获取资金费率,应该调用 BinanceUSwapGatewayget_funding_rates 方法。

1
2
3
async def get_funding_rates(self, rhino_funding_rates: List[RhinoFundingRate],
callable_methods: CallableMethods = None):
await self.rest.get_funding_rates(rhino_funding_rates, callable_methods)

BinanceUSwapGateway 是继承 BaseGateway 的,BaseGateway 包含了所有交易所应该实现的方法,比如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from abc import ABC, abstractmethod
from typing import NoReturn, Union, Any, List

from RhinoLogger.RhinoLogger.RhinoLogger import RhinoLogger
from RhinoObject.Rhino.RhinoEnum import MethodEnum
from RhinoObject.Rhino.RhinoObject import SymbolInfo, RhinoDepth, RhinoTrade, RhinoConfig, SymbolInfos, RhinoOrder, \
RhinoLeverage, CallableMethods, RhinoAccount, RhinoFundingRate, RhinoKline


class BaseGateway(ABC):

def __init__(self, logger: RhinoLogger, rhino_config: RhinoConfig, loop):
self.collect = None
self.logger = logger
self.rhino_config = rhino_config
self.loop = loop
self.methods = {}

self.websocket = None

def init_methods(self) -> NoReturn:
"""
用于其他模块的回掉
:return:
"""
self.methods[MethodEnum.GETDEPTHS.name] = self.get_depths
self.methods[MethodEnum.GETTRADES.name] = self.get_trades

def get_method(self, method: Union[MethodEnum]) -> Any:
return self.methods.get(method, None)

async def set_data(self, rhino_data: Union[RhinoDepth, RhinoTrade]) -> NoReturn:
"""
使用 aredis 向 redis 中添加 depth 数据
:param rhino_depth:
:return:
"""
self.logger.debug(f"{rhino_data}")
# rhino_data.store_time = int(time.time() * 1000) # 更新网络存储时间
# await self.collect.set_data(rhino_data)

@abstractmethod
async def get_exchange_infos(self, symbol_info: SymbolInfo, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def get_depths(self, symbol_info: SymbolInfo, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def get_trades(self, rhino_trade: RhinoTrade, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def submit_order(self, rhino_order: RhinoOrder, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def batch_submit_orders(self, rhino_orders: List[RhinoOrder],
callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def cancel_order(self, rhino_order: RhinoOrder, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def cancel_orders(self, rhino_order: RhinoOrder, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def close_position(self, rhino_order: RhinoOrder, callable_methods: CallableMethods = None) -> NoReturn:
"""
平仓
"""
pass

@abstractmethod
async def close_positions(self, rhino_order: RhinoOrder, callable_methods: CallableMethods = None) -> NoReturn:
"""
平仓
"""
pass

@abstractmethod
async def get_account(self, rhino_account: RhinoAccount, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def withdraw(self, symbol_info: SymbolInfo, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def update_leverage(self, rhino_leverage: RhinoLeverage,
callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def get_position(self, rhino_order: RhinoOrder, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def get_positions(self, rhino_order: RhinoOrder, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def get_order(self, rhino_order: RhinoOrder, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def get_orders(self, rhino_order: RhinoOrder, callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def get_funding_rate(self, rhino_funding_rate: RhinoFundingRate,
callable_methods: CallableMethods = None) -> NoReturn:
pass

@abstractmethod
async def get_funding_rates(self, rhino_funding_rates: List[RhinoFundingRate],
callable_methods: CallableMethods = None):
pass

@abstractmethod
async def get_klines(self, rhino_kline: RhinoKline,
callable_methods: CallableMethods = None):
pass

@abstractmethod
async def subscribe(self, symbol_infos: SymbolInfos, callable_methods: CallableMethods = None) -> NoReturn:
self.websocket.on_heart = symbol_infos.on_heart

subscribe_state = symbol_infos.subscribes.get(self.exchange_sub, None)
if subscribe_state is None:
self.logger.error(f"{self.exchange_sub} subscribe 状态遗漏")
if subscribe_state:
self.logger.info(f"{self.exchange_sub} 准备订阅")
await self.websocket.subscribe(symbol_infos, callable_methods)

async def unsubscribe(self):
await self.websocket.unsubscribe()

async def resubscribe(self):
await self.websocket.reconnect()

通过 BaseGateway 来明确,交易所应该实现那些方法,通过每个交易所的具体实现来进行调用。

请我喝杯咖啡吧~