0%

python | 进程池、多进程共享自定义变量 BaseManager

很多场景下, Manager 自带的类并不能满足我们的需求,这时候就需要用到 Manager 对自定义类的支持。

用 BaseManager 进行共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import os
import time
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager
from multiprocessing.managers import BaseManager

l = Manager().Lock()


class Item:
def __init__(self):
self.H = 1

def add(self):
print(f"Item {os.getpid()}")
self.H += 1

def see(self):
print(f"Item {os.getpid()}")


def ChangeItem(v):
try:
# with l:
print(f"ChangeItem {os.getpid()}")
# v.H += 1
v.add()
v.see()
except Exception as e:
print(e)


if __name__ == '__main__':
print(f"{os.getpid()}")
# v = Value('I', Item())
BM = BaseManager()
BM.register("Item", Item)
BM.start()
I = BM.Item()

p = ProcessPoolExecutor(max_workers=5)
for i in range(10):
p.submit(ChangeItem, I)

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
42926
ChangeItem 42936
ChangeItem 42938
ChangeItem 42937
ChangeItem 42939
Item 42934
Item 42934
Item 42934
Item 42934
Item 42934
Item 42934
ChangeItem 42940
Item 42934
Item 42934
ChangeItem 42936
Item 42934
Item 42934
ChangeItem 42938
Item 42934
Item 42934
ChangeItem 42937
ChangeItem 42939
Item 42934
ChangeItem 42940
Item 42934
Item 42934
Item 42934
Item 42934
Item 42934
Item 42934
Item 42934

可以看出,BaseManager 下面的实例有自己单独的进程。

但是,这样做的实例,只能调用实例方法,调用不了属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import os
import time
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager
from multiprocessing.managers import BaseManager

l = Manager().Lock()


class Item:
def __init__(self):
self.H = 1

def add(self):
print(f"Item {os.getpid()}")
self.H += 1

def see(self):
print(f"Item {os.getpid()}")


def ChangeItem(v):
try:
# with l:
print(f"ChangeItem {os.getpid()}")
v.H += 1
# v.add()
# v.see()
except Exception as e:
print(e)


if __name__ == '__main__':
print(f"{os.getpid()}")
# v = Value('I', Item())
BM = BaseManager()
BM.register("Item", Item)
BM.start()
I = BM.Item()

p = ProcessPoolExecutor(max_workers=5)
for i in range(10):
p.submit(ChangeItem, I)

输出

'AutoProxy[Item]' object has no attribute 'H'

具体原因可以查看

The Proxy objects used by multiprocessing.BaseManager and its sub-classes normally only expose methods from the objects they’re referring to, not attributes. Now, there is multiprocessing.Manager().Namespace, which provides a Proxy sub-class that does provide access to attributes, rather than methods. We can create our own Proxy type which inherits from that, which enables access to all our attributes, as well as access to our b function

但是,上述方法属于 python2.7 版本的用法,在 python3 中并不适用,所以,只能留给后面更新了。

这里如果想要访问属性的话,就用 getset 方法吧。

变量初始化传值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing.managers import BaseManager


class A:
pass


class T:
def __init__(self, c):
self.test = None
self.c = c
self.proc()

def proc(self):
self.test = A()

def get_a(self):
print(self.test)

def get_c(self):
print(self.c)


if __name__ == '__main__':
b = BaseManager()
b.register("T", T)
b.start()
t = b.T("123")
process_pools = ProcessPoolExecutor(max_workers=4)
for i in range(10):
process_pools.submit(t.get_c)

多个自定义变量共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager
from multiprocessing.managers import BaseManager


class B:
def __init__(self):
self.t = 1

def proc(self):
self.t = self.t + 1

def get_t(self):
print("B" + str(self.t))
return self.t


class T:
def __init__(self):
self.t = 1

def proc(self):
self.t = self.t + 1

def get_t(self):
print("T" + str(self.t))
return self.t


def proc(t, l):
if l.acquire():
t.proc()
t.get_t()
l.release()


if __name__ == '__main__':
b = BaseManager()
b.register("T", T)
b.register("B", B)
b.start()
t = b.T()
b = b.B()
l = Manager().Lock()

process_pools = ProcessPoolExecutor(max_workers=4)
for i in range(200):
if i % 2 == 0:
process_pools.submit(proc, t, l)
else:
process_pools.submit(proc, b, l)

嵌套自定义对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager
from multiprocessing.managers import BaseManager


class A:
def __init__(self):
self.a = 500

def proc(self):
self.a = self.a + 1

def get_a(self):
return self.a


class B:
def __init__(self):
self.t = A()

def proc(self):
self.t.proc()

def get_t(self):
print("B" + str(self.t.get_a()))
return self.t


class T:
def __init__(self):
self.t = A()

def proc(self):
self.t.proc()

def get_t(self):
print("T" + str(self.t.get_a()))
return self.t


def proc(t, l):
if l.acquire():
t.proc()
t.get_t()
l.release()


if __name__ == '__main__':
b = BaseManager()
b.register("T", T)
b.register("B", B)
b.start()
t = b.T()
b = b.B()
l = Manager().Lock()

process_pools = ProcessPoolExecutor(max_workers=4)
for i in range(200):
if i % 2 == 0:
process_pools.submit(proc, t, l)
else:
process_pools.submit(proc, b, l)

自定义对象的赋值问题

这个应该是我目前的技术水平的问题,先记录一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager
from multiprocessing.managers import BaseManager


class A:
pass


class T:
def __init__(self):
self.test = None

def proc(self):
self.test = A()

def get_a(self):
print(self.test)


if __name__ == '__main__':
b = BaseManager()
b.register("T", T)
b.start()
b.T().proc()
b.T().get_a()

输出为 None 也就是 proc 的执行并没有赋值。

可以改进为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager
from multiprocessing.managers import BaseManager


class A:
pass


class T:
def __init__(self):
self.test = None
self.proc()

def proc(self):
self.test = A()

def get_a(self):
print(self.test)


if __name__ == '__main__':
b = BaseManager()
b.register("T", T)
b.start()
b.T().get_a()

这样的话就赋值成功了。

继承 BaseManager

继承的用法,其实和直接用没啥区别,具体看 Python跨进程共享数据/对象

请我喝杯咖啡吧~