0%

python | 进程池共享变量

请先看

另外,我对这一章节理解的不够深刻。。。

参考资料

使用前提

server process模型中,有一个 manager 进程(就是那个 server 进程),负责管理实际的对象,真正的对象也是在 manager 进程的内存空间中。所有需要访问该对象的进程都需要先连接到该管理进程,然后获取到对象的一个代理对象(Proxy object)。这个模型是一个典型的 RPC (远程过程调用)的模型。因为每个客户进程实际上都是在访问 manager 进程当中的对象,因此完全可以通过这个实现对象共享。

共享变量

源码得知

1
2
3
4
5
6
7
8
9
10
11
12
def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...
def Condition(self, lock: Any = ...) -> threading.Condition: ...
def Event(self) -> threading.Event: ...
def Lock(self) -> threading.Lock: ...
def Namespace(self) -> _Namespace: ...
def Queue(self, maxsize: int = ...) -> queue.Queue[Any]: ...
def RLock(self) -> threading.RLock: ...
def Semaphore(self, value: Any = ...) -> threading.Semaphore: ...
def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ...
def Value(self, typecode: Any, value: _T) -> ValueProxy[_T]: ...
def dict(self, sequence: Mapping[_KT, _VT] = ...) -> Dict[_KT, _VT]: ...
def list(self, sequence: Sequence[_T] = ...) -> List[_T]: ...

用 Manager().dict()

简单的层数使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager


def proc(d, l):
if l.acquire():
d["t"] = d["t"] + 1
print(d["t"])
l.release()


if __name__ == '__main__':
d = Manager().dict()
l = Manager().Lock()

d["t"] = 1
process_pools = ProcessPoolExecutor(max_workers=4)
for i in range(200):
process_pools.submit(proc, d, l)

上述可以正常运行。

嵌套字典使用

嵌套的字典不能改变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager


def proc(d, l):
if l.acquire():
d["t"]["s"] = d["t"]["s"] + 1
print(d["t"]["s"])
l.release()


if __name__ == '__main__':
d = Manager().dict()
l = Manager().Lock()

d["t"] = {
"s": 1
}
process_pools = ProcessPoolExecutor(max_workers=4)
for i in range(200):
process_pools.submit(proc, d, l)

输出都是 1

嵌套字典的使用 优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager


def proc(d, l):
if l.acquire():
i = d["t"]["s"] + 1
d["t"] = {"s": i}
print(d["t"]["s"])
l.release()


if __name__ == '__main__':
d = Manager().dict()
l = Manager().Lock()

d["t"] = {
"s": 1
}
process_pools = ProcessPoolExecutor(max_workers=4)
for i in range(200):
process_pools.submit(proc, d, l)

这样的话,就能输出正确的值了。

自定义对象的字典

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager


class T:
def __init__(self):
self.t = 1

def proc(self):
self.t = self.t + 1


def proc(t, l):
if l.acquire():
t["t"].proc()
print(t["t"].t)
l.release()


if __name__ == '__main__':
d = Manager().dict()
l = Manager().Lock()

d["t"] = T()
process_pools = ProcessPoolExecutor(max_workers=4)
for i in range(200):
process_pools.submit(proc, d, l)

不能使用,改变不了 T 类中 t 的值。

请我喝杯咖啡吧~