1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| import multiprocessing import multiprocessing as mp import time from multiprocessing import Manager from multiprocessing.managers import BaseManager from multiprocessing.queues import Queue from typing import Dict
class Logger:
def __init__(self): print("logger 初始化")
def info(self, process, data): print(f"{process} 日志输出: {data}")
class Engine:
def start(self): queue = mp.Queue() dict_memory = Manager().dict() bm = BaseManager() bm.register("Logger", Logger) bm.start() logger = bm.Logger()
asserts = { "logger": logger, }
logger.info(multiprocessing.current_process().name, "test")
p1 = mp.Process(target=self.input, args=(queue, dict_memory, asserts), name="监控") p2 = mp.Process(target=self.output, args=(queue, dict_memory, asserts), name="web") p3 = mp.Process(target=self.active_start, args=(dict_memory, asserts), name="保活")
p1.start() p2.start() p3.start()
p1.join() p2.join() p3.join()
def input(self, queue: Queue, dict_memory: Dict, asserts: Dict): logger = asserts.get("logger") i = 0 while 1: queue.put(f"数据 {i}") logger.info(multiprocessing.current_process().name, f"队列入 {i}")
dict_memory[i] = i logger.info(multiprocessing.current_process().name, f"共用字典入 {i}")
time.sleep(3)
i += 1
def output(self, queue: Queue, dict_memory: Dict, asserts: Dict): logger = asserts.get("logger") while 1: data = queue.get() logger.info(multiprocessing.current_process().name, f"队列出 {data}") time.sleep(1)
def active_start(self, dict_memory: Dict, asserts: Dict): logger = asserts.get("logger") while 1: logger.info(multiprocessing.current_process().name, f"字典长度为 {len(dict_memory)}") time.sleep(4)
if __name__ == '__main__': Engine().start()
|