由于 web3.py 在速度上无法满足我的需求,所以决定自己封装一下。
使用 curl 通过 rpc 进行访问
curl https://bsc-dataseed.binance.org/ -X POST --data '{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":56}' -x 127.0.0.1:1087
注意看上面的引号。
使用 python 进行协程封装
由于 web3.py 的部分底层并不是协程,所以,想要发挥全部的方法,只能通过多线程。但是,多线程一个是不优雅,另一个就是耗费的资源过多,这里用协程自己封装一套 SDK。
这里仅仅举两个例子。后面看情况,要不要开源相关的轮子。
get_block_info 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import asyncio import aiohttp from Config.config import proxies, is_proxyclass EvmRequest: evm_url = "https://bsc-dataseed.binance.org/" def __init__(self): pass async def get_block_info(self, block_type): print (1) if is_proxy: proxy = proxies else : proxy = "" data = { "jsonrpc" : "2.0" , "method" : "eth_getBlockByNumber" , "params" : [block_type, False ], "id" : 1 } async with aiohttp.request(method ="POST" , url =self.evm_url, json =data, proxy =proxy) as r: reponse = await r.text(encoding ="utf-8" ) return reponse evm_request = EvmRequest() def init(): asyncio.run (run ()) async def run (): functions = [] functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) functions.append(evm_request.get_block_info("latest" )) results = await asyncio.gather(*f unctions) print (results) if __name__ == '__main__' : init()
注意看 post
的传递数值的方式是 json
。
RPC 和自己的合约做交互 不耗费 gas 的 eth_call 生成 data
的其他方式请参考
使用 ganache
部署一个简单合约。
1 2 3 4 5 6 7 8 9 pragma solidity ^0.8 .0 ;contract test{ function add(uint a,uint b) public pure returns(uint ){ return a + b; } }
这里假设合约地址是: 0x5CF5F2dAFDF7998D9085ffafF5beB1B2b6e4c13c
交互脚本为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import requests import sha3 import web3 import json w3 = web3.Web3() URL = 'http://localhost:8545' session = requests.Session() myAddress = '0xF464A67CA59606f0fFE159092FF2F474d69FD675' # address funded in genesis file abi = "" "[{" inputs":[{" internalType":" uint256"," name":" a"," type":" uint256"},{" internalType":" uint256"," name":" b"," type":" uint256"}]," name":" add"," outputs":[{" internalType":" uint256"," name":" "," type":" uint256"}]," stateMutability":" pure"," type":" function"}]" "" value1, value2 = 10, 32 result = sha3.keccak_256() result.update(b"add(uint256,uint256)" ) methodId = a = result.hexdigest()[0:8] param1 = (value1).to_bytes(32, byteorder ='big' ).hex() param2 = (value2).to_bytes(32, byteorder ='big' ).hex() data = "0x" + methodId + param1 + param2 json_data = {"jsonrpc" : "2.0" , "method" : "eth_call" , "params" : [{'from' : myAddress, 'to' : "0x5CF5F2dAFDF7998D9085ffafF5beB1B2b6e4c13c" , 'data' : data}, "latest" ], # must be an array [value1, value2, .. ., valueN] "id" : 1} s = session.post(URL, json =json_data, headers={'Content-type' : 'application/json' }) print (int(json.loads(s.text).get ("result" ), 16))
ps: 2022-09-16
这里要提示一下,通过 json-rpc 调用返回的数据,需要自己进行解析,举一个简单的例子。
下面是我调用某一个合约返回的数据。
1 0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000cfcc03efdd344811f5a90000000000000000000000000000000000000000000000bff77b8447ab536e2400000000000000000000000000000000000000000000016ff8d994f69a7293c900000000000000000000000000000000000000000000000156abcf0f93ab3e4f
进行拆开
1 2 3 4 5 6 7 0 x0000000000000000000000000000000000000000000000000000000000000020 0000000000000000000000000000000000000000000000000000000000000002 00000000000000000000000000000000000000000000 cfcc03efdd344811f5a90000000000000000000000000000000000000000000000 bff77b8447ab536e2400000000000000000000000000000000000000000000016f f8d994f69a7293c900000000000000000000000000000000000000000000000156 abcf0f93ab3e4f
然后根据你的合约情况进行相应的解析。
耗费 gas 的合约交互 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import jsonimport requestsimport sha3import web3from eth_abi import encodefrom eth_account import Accountfrom web3 import Web3w3 = web3.Web3() URL = 'http://localhost:8545' session = requests.Session() myAddress = '' abi = """[]""" def get_submit_order_data () : encode_data_info = [...] encode_data = encode(['address[]' , 'address[]' , 'address' , 'uint256' ], encode_data_info).hex() encryption = sha3.keccak_256() encryption.update(b"swapCoin(address[],address[],address,uint256)" ) method_hex = encryption.hexdigest()[0 :8 ] method_encryption = method_hex transaction = { "from" : "..." , 'to' : "..." , "gas" : "0x300000" , "gasPrice" : Web3.toWei(3 , 'gwei' ), 'nonce' : 2000 , "data" : "0x" + method_encryption + encode_data } return Account.sign_transaction(transaction, "rhino_order.sign_secret" ) def submit_order () : contract_data = get_submit_order_data() data = { "jsonrpc" : "2.0" , "method" : "eth_sendRawTransaction" , "params" : [ contract_data.rawTransaction.hex() ], "id" : 1 } s = session.post(URL, json=data, headers={'Content-type' : 'application/json' }) print(int(json.loads(s.text).get("result" ), 16 )) if __name__ == '__main__' : submit_order()
上述代码只是用来展示用,其主要是与 tomoon | cex2dex 合约再次优化 进行交互。
struct 进行交互 solidity
的代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 pragma solidity ^0.8.4; .. .contract MultiSwap { .. . struct coinAmount{ uint amountIn; uint amountOut; address receiveAddress; address[] routers; address[] tokenContracts; } .. . function swapExactSupportingFeeCoin(coinAmount memory coin) external onlySign(msg.sender) { .. . } }
交互如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import jsonimport requestsimport sha3import web3from eth_abi import encodefrom eth_account import Accountfrom web3 import Web3w3 = web3.Web3() URL = 'http://localhost:8545' session = requests.Session() myAddress = '' abi = """[]""" def get_submit_order_data () : encode_data_info = [...] encode_data = encode(['uint256' , 'uint256' , 'address' , 'address[]' , 'address[]' ], encode_data_info).hex() encryption = sha3.keccak_256() encryption.update(b"swapExactSupportingFeeCoin((uint256,uint256,address,address[],address[]))" ) method_hex = encryption.hexdigest()[0 :8 ] method_encryption = method_hex transaction = { "from" : "..." , 'to' : "..." , "gas" : "0x300000" , "gasPrice" : Web3.toWei(3 , 'gwei' ), 'nonce' : 2000 , "data" : "0x" + method_encryption + encode_data } return Account.sign_transaction(transaction, "rhino_order.sign_secret" ) def submit_order () : contract_data = get_submit_order_data() data = { "jsonrpc" : "2.0" , "method" : "eth_sendRawTransaction" , "params" : [ contract_data.rawTransaction.hex() ], "id" : 1 } s = session.post(URL, json=data, headers={'Content-type' : 'application/json' }) print(int(json.loads(s.text).get("result" ), 16 )) if __name__ == '__main__' : submit_order()
不使用 sha3 有的时候本地运行不了 sha3
,所以,用了另外的方案替代。
这里面是一个名为 getUserStakeInfo
的方法,只需要传递一个 address
参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from eth_hash.auto import keccakdef get_function_selector (signature) : return keccak(signature.encode('utf-8' )).hex()[:8 ] def encode_address (address) : return address[2 :].rjust(64 , '0' ) def generate_data (function_signature, address) : function_selector = get_function_selector(function_signature) encoded_address = encode_address(address) return '0x' + function_selector + encoded_address function_signature = "getUserStakeInfo(address)" user_address = "0x2c16F387d1dc3A4C88232899AbdAa48d849ba8d3" data = generate_data(function_signature, user_address) print(f"Encoded data: {data} " )