0%

python | 使用进程池管理进程

和选用线程池来关系多线程类似,当程序中设置到多进程编程时,Python 提供了更好的管理多个进程的方式,就是使用进程池。

进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行。

Python multiprocessing 模块提供了 Pool() 函数,专门用来创建一个进程池,该函数的语法格式如下:

  • multiprocessing.Pool(processes)

其中,processes 参数用于指定该进程池中包含的进程数。如果进程是 None,则默认使用 os.cpu_count() 返回的数字(根据本地的 cpu 个数决定,processes 小于等于本地的 cpu 个数)。

注意,Pool() 函数只是用来创建进程池,而 multiprocessing 模块中表示进程池的类是 multiprocessing.pool.Pool 类。

multiprocessing 模块 Pool 类常用方法


方法名 功能
apply( func[, args[, kwds]] ) 将 func 函数提交给进程池处理。其中 args 代表传给 func 的位置参数,kwds 代表传给 func 的关键字参数。该方法会被阻塞直到 func 函数执行完成。
apply_async( func[, args[, kwds[, callback[, error_callback]]]] ) 这是 apply() 方法的异步版本,该方法不会被阻塞。其中 callback 指定 func 函数完成后的回调函数,error_callback 指定 func 函数出错后的回调函数。
map( func, iterable[, chunksize] ) 类似于 Python 的 map() 全局函数,只不过此处使用新进程对 iterable 的每一个元素执行 func 函数。
map_async( func, iterable[, chunksize[, callback[, error_callback]]] ) 这是 map() 方法的异步版本,该方法不会被阻塞。其中 callback 指定 func 函数完成后的回调函数,error_callback 指定 func 函数出错后的回调函数。
imap( func, iterable[, chunksize] ) 这是 map() 方法的延迟版本。
imap_unordered( func, iterable[, chunksize] ) 功能类似于 imap() 方法,但该方法不能保证所生成的结果(包含多个元素)与原 iterable 中的元素顺序一致。
starmap( func, iterable[,chunksize] ) 功能类似于 map() 方法,但该方法要求 iterable 的元素也是 iterable 对象,程序会将每一个元素解包之后作为 func 函数的参数。
close() 关闭进程池。在调用该方法之后,该进程池不能再接收新任务,它会把当前进程池中的所有任务执行完成后再关闭自己。
terminate() 立即中止进程池。
join() 等待所有进程完成。

下面程序演示了进程池的创建和使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Pool
import time
import os
def action(name='http://c.biancheng.net'):
print(name,' --当前进程:',os.getpid())
time.sleep(3)
if __name__ == '__main__':
#创建包含 4 条进程的进程池
pool = Pool(processes=4)
# 将action分3次提交给进程池
pool.apply_async(action)
pool.apply_async(action, args=('http://c.biancheng.net/python/', ))
pool.apply_async(action, args=('http://c.biancheng.net/java/', ))
pool.apply_async(action, kwds={'name': 'http://c.biancheng.net/shell/'})
pool.close()
pool.join()

程序执行结果为:

http://c.biancheng.net  --当前进程: 14396
http://c.biancheng.net/python/  --当前进程: 5432
http://c.biancheng.net/java/  --当前进程: 11080
http://c.biancheng.net/shell/  --当前进程: 10944

除此之外,我们可以使用 with 语句来管理进程池,这意味着我们无需手动调用 close() 方法关闭进程池。例如:

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Pool
import time
import os
def action(name='http://c.biancheng.net'):
time.sleep(3)
return (name+' --当前进程:%d'%os.getpid())
if __name__ == '__main__':
#创建包含 4 条进程的进程池
with Pool(processes=4) as pool:
adds = pool.map(action, ('http://c.biancheng.net/python/', 'http://c.biancheng.net/java/', 'http://c.biancheng.net/shell/'))
for arc in adds:
print(arc)

程序执行结果为:

http://c.biancheng.net/python/ --当前进程:24464
http://c.biancheng.net/java/ --当前进程:22900
http://c.biancheng.net/shell/ --当前进程:23324

案例


案例一

上千个数组地址,使用进城池进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import json
import os
from collections import defaultdict
from multiprocessing import Pool

import xlrd
from web3 import Web3

w3 = Web3(Web3.HTTPProvider('https://http-mainnet.hoosmartchain.com'))

file_name = "3000_1.xlsx"

ixt_contract = "0x80c6A3A493aFd7C52f89E6504C90cE6A639783FC"
iusdt_contract = "0x09e6030537f0582d51C00bdC2d590031d9b1c86c"

abi = json.loads(
'[{"constant": true,"inputs": [{"name": "who", "type": "address"}],"name": "balanceOf","outputs": [{"name": "", "type": "uint256"}],"payable": false,"stateMutability": "view","type": "function"}]')

iXT_ADDRESS = w3.toChecksumAddress('0x80c6A3A493aFd7C52f89E6504C90cE6A639783FC')
iUSDT_ADDRESS = w3.toChecksumAddress('0x09e6030537f0582d51C00bdC2d590031d9b1c86c')
iXT_CONTRACT = w3.eth.contract(address=iXT_ADDRESS, abi=abi)
iUSDT_CONTRACT = w3.eth.contract(address=iUSDT_ADDRESS, abi=abi)


def init_address():
data = defaultdict(list)
xls = xlrd.open_workbook(file_name)
for i in range(len(xls.sheets()[0].col_values(0))):
data[xls.sheets()[0].col_values(0)[i]] = xls.sheets()[0].col_values(1)[i]
return data


def get_ixt(address):
address = Web3.toChecksumAddress(address)
print(address + " " + str(os.getpid()) + " " + str(
w3.fromWei(iXT_CONTRACT.functions.balanceOf(address).call(), 'ether')))


# def get_iusdt(address):
# address = Web3.toChecksumAddress(address)
# return w3.fromWei(iUSDT_CONTRACT.functions.balanceOf(address).call(), 'ether')


def get_erc20(address, private):
toAddress = Web3.toChecksumAddress('0x126c2C418aE620da41D14bD96c5af49164794C88')
fromAddress = Web3.toChecksumAddress(address)
nonce = w3.eth.getTransactionCount(fromAddress)
transaction = {
'from': fromAddress,
'to': toAddress,
'gas': 241838,
'gasPrice': w3.toWei('1', 'gwei'),
'nonce': nonce,
'data': "c9972e44",
# 'chainId': 70
}
signed_tx = w3.eth.account.signTransaction(transaction, private)
txn_hash = w3.eth.sendRawTransaction(signed_tx.rawTransaction)
print(address + " " + str(os.getpid()) + " " + Web3.toHex(txn_hash))


if __name__ == '__main__':
data = init_address()
pool = Pool(processes=20)
for address, private in data.items():
pool.apply_async(get_ixt, args=(address,))
pool.close()
pool.join()
请我喝杯咖啡吧~