0%

python | 进程池的同步信息紊乱解决

请先看

先看下面一段代码。

进程池的数据紊乱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import os
import time
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager


def job(v, k):
for i in range(10):
value = v.setdefault(k, 0)
time.sleep(0.1)
v[k] = value + i
print(f"{os.getpid()} {value} {v}")


if __name__ == '__main__':
v = Manager().dict()
jobs = ["aa", "bb"]
p = ProcessPoolExecutor(max_workers=2)
for i in range(10):
for k in jobs:
p.submit(job, v, k)

最后输出

1
34248 441 {'aa': 450, 'bb': 450}

如果,把进程池的数量改为 5

p = ProcessPoolExecutor(max_workers=5)

最后输出

1
22632 171 {'aa': 180, 'bb': 180}

很明显, max_workers 等于 5 的时候,在未改变变量的时候,已经读取了,造成了数据的紊乱。

进程池的加锁

这个时候就要加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import os
import time
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager

lock = Manager().Lock()


def job(v, k):
lock.acquire()
for i in range(10):
value = v.setdefault(k, 0)
time.sleep(0.1)
v[k] = value + i
print(f"{os.getpid()} {value} {v}")
lock.release()


if __name__ == '__main__':
v = Manager().dict()
jobs = ["aa", "bb"]
p = ProcessPoolExecutor(max_workers=5)
for i in range(10):
for k in jobs:
p.submit(job, v, k)

但是,根据

这种加锁方式,就变成了串行了,最终的输出也是如此。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
34668 0 {'aa': 0}
34668 0 {'aa': 1}
34668 1 {'aa': 3}
34668 3 {'aa': 6}
34668 6 {'aa': 10}
34668 10 {'aa': 15}
34668 15 {'aa': 21}
34668 21 {'aa': 28}
34668 28 {'aa': 36}
34668 36 {'aa': 45}
34669 0 {'aa': 45, 'bb': 0}
34669 0 {'aa': 45, 'bb': 1}
34669 1 {'aa': 45, 'bb': 3}
34669 3 {'aa': 45, 'bb': 6}
34669 6 {'aa': 45, 'bb': 10}
34669 10 {'aa': 45, 'bb': 15}
34669 15 {'aa': 45, 'bb': 21}
34669 21 {'aa': 45, 'bb': 28}
34669 28 {'aa': 45, 'bb': 36}
34669 36 {'aa': 45, 'bb': 45}
...

加锁的优化

这里想一个场景,假设,我们做一个股票信息处理系统。股票信息是实时过来的,所以系统有以下的特点

  • 系统是 CPU 密集型
  • 股票可能有几千个
  • 系统需要进程池管理,因为,如果每一个股票都创建一个进程的话,对系统的压力太大了
  • 进程池需要进行数据同步
    • 进程池每一次调用都相当于新进程,所以,进程池需要读取老数据,然后根据新数据来更新老数据
  • 会出现股票数据连续的情况
    • 类似上面第二个执行代码「进程池里面的数量为 5」

设计思路

  • 一个进程池
  • 一个同步数据区 Manger().dict()
    • 同步数据区里面为每一个股票代码创建一个数据区
  • 为每一个股票代码创建一个锁

简化的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import os
import time
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager


def job(v, k, lock):
with lock:
for i in range(10):
value = v.setdefault(k, 0)
time.sleep(0.1)
v[k] = value + i
print(f"{k} {os.getpid()} {value} {v}")


if __name__ == '__main__':
v = Manager().dict()
l = {}
jobs = ["aa", "bb"]
p = ProcessPoolExecutor(max_workers=5)
for i in range(10):
for k in jobs:
lock = l.setdefault(k, Manager().Lock())
p.submit(job, v, k, lock)

如果运行的话,就会发现运行的和设计的一样。

但是,我有一个疑问,就是把

l 换成 Manager().dict() 会报错,这个我暂时不知道原因。

ps: l.setdefault(k, Manager().Lock()) 这句话严重影响效率。具体原因还未知。

不加锁数据不紊乱的情况

这个例子来自于

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import time
from concurrent.futures.process import ProcessPoolExecutor
from dataclasses import dataclass
from multiprocessing import Manager


@dataclass
class Item:
H: int = 1


def ChangeItem(v, item, i):
try:
a = v[i].H
time.sleep(0.1)
t = Item()
t.H = a + i

v[i] = t

print(v)
except Exception as e:
print(e)


if __name__ == '__main__':
v = Manager().dict()
p = ProcessPoolExecutor(max_workers=5)
for i in range(10):
item = v.setdefault(i, Item())
p.submit(ChangeItem, v, item, i)

for i in range(10):
item = v.setdefault(i, Item())
p.submit(ChangeItem, v, item, i)

当,数据的种类大于进程池中的数量,并且,以进程池数量为长度的数据中,不会出现相同的种类,进程池就不会紊乱。

比如,进程池有 2 个进程,种类是 a,b,c,d

如果数据推送如下

  • a,b,c,d
    • 不会紊乱
  • a,a,c,d
    • 会紊乱

紊乱的例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time
from concurrent.futures.process import ProcessPoolExecutor
from dataclasses import dataclass
from multiprocessing import Manager


@dataclass
class Item:
H: int = 1


def ChangeItem(v, item, i):
try:
a = v[i].H
time.sleep(0.1)
t = Item()
t.H = a + i

v[i] = t

print(v)
except Exception as e:
print(e)


if __name__ == '__main__':
v = Manager().dict()
p = ProcessPoolExecutor(max_workers=5)
keys = [1, 2, 1, 1, 3, 1, 1, 1, 1, 3]
for i in keys:
item = v.setdefault(i, Item())
p.submit(ChangeItem, v, item, i)

输出

1
2
3
4
5
6
7
8
9
10
{1: Item(H=2), 2: Item(H=3), 3: Item(H=4)}
{1: Item(H=2), 2: Item(H=3), 3: Item(H=4)}
{1: Item(H=2), 2: Item(H=3), 3: Item(H=4)}
{1: Item(H=2), 2: Item(H=3), 3: Item(H=4)}
{1: Item(H=2), 2: Item(H=3), 3: Item(H=4)}
{1: Item(H=3), 2: Item(H=3), 3: Item(H=7)}
{1: Item(H=3), 2: Item(H=3), 3: Item(H=7)}
{1: Item(H=3), 2: Item(H=3), 3: Item(H=7)}
{1: Item(H=3), 2: Item(H=3), 3: Item(H=7)}
{1: Item(H=3), 2: Item(H=3), 3: Item(H=7)}
请我喝杯咖啡吧~