这一章教授怎么解析 pending
数据。
首先,在解析之前,我们需要明确几个目的。
pending
数据是放在内存区的数据
- 解析
pending
的难点在速度和节点分布上
eth
的 pengding
还好说,单线程就能解析,但是,必须要协程解析速度才能跟得上
bsc
的 pending
数据巨大,而且,由于 pending
的返回数据不合理「下面会讲到」,导致,速度越来越慢,甚至,程序跑个几十秒之后,你所解析的数据已经不是 pending
数据了,而是已经出快的数据
- ps: 2022-5-29 之前之所以解析更不上速度,是因为
web3.py
的底层是 http
,并不是协程,所以,如果是协程那一套访问的话,会变成顺序访问
- 连接一个节点获取不了所有的
pending
数据
- 交易会提交给节点,然后,区块进行同步,所以,你只能获得连接节点的
pending
数据,而获取不到所有的 pending
数据,这一点尤为记住
- 所以,由于上一点的原因,一定要明确自己的目的场景,比如,你的目的是发出去交易后,判断是否有人会截取你的
pending
数据,一旦检测到有人截取,你就取消交易。但是,通常很难做到,原因就是,你可能获取不到攻击你的 pending
另外,web3.js
的方式请参考
bsc
协程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| from web3 import Web3 import asyncio
w3 = Web3(Web3.HTTPProvider("https://bsc.getblock.io/mainnet/?api_key=b325***")) event_filter = w3.eth.filter('pending')
def init(): asyncio.run(run())
async def run(): while 1: pending_txs = w3.eth.get_filter_changes(event_filter.filter_id) print(len(pending_txs)) functions = [] for pending_tx in pending_txs: functions.append(asyncio.ensure_future(parse(pending_tx.hex())))
results = await asyncio.gather(*functions) print(len(results)) id = 1 for result in results: print(id) print(result) id += 1
async def parse(tx): try: print(tx) data = w3.eth.get_transaction(tx) return data except Exception as e: return 0
if __name__ == '__main__': init()
|
上面的代码其实运行的并不行,主要来源于
1 2 3 4 5 6 7
| async def parse(tx): try: print(tx) data = w3.eth.get_transaction(tx) return data except Exception as e: return 0
|
其中
data = w3.eth.get_transaction(tx)
的底层并不是协程写的,而是,一般的 http
请求。
所以,想要真的发挥出作用,需要使用多线程。
另外,有的时候明明有很多 pending tx
,但是,连接节点却什么也返回不了,主要有以下原因。
- 所连接的节点并不支持那种方法的调用
- 访问的节点连接用户不多,所以没有很多
pending
多线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import time from concurrent.futures import ThreadPoolExecutor
from web3 import Web3
w3 = Web3(Web3.HTTPProvider("https://bsc.getblock.io/mainnet/?api_key=b***")) event_filter = w3.eth.filter('pending')
def init(): run()
def run(): while 1: pending_txs = w3.eth.get_filter_changes(event_filter.filter_id) print(len(pending_txs)) pools = ThreadPoolExecutor(max_workers=len(pending_txs)) for pending_tx in pending_txs: pools.submit(parse, pending_tx.hex()) time.sleep(2)
def parse(tx): try: print(tx) data = w3.eth.get_transaction(tx) print(data) except Exception as e: return 0
if __name__ == '__main__': init()
|
上面的多线程的速度是可以的,但是有一个尴尬的点。
- time.sleep(2)
- 没有这句话,可能因为访问太多,节点会拒绝访问
- 有这话,就有可能导致数据更新受限,后面全是出块的数据
最好的办法是使用 wss
的 subscribe
,但是,目前,web3.py
并不支持。