我已经在多处理的帮助下在Python(版本3.7)中实现了SharedListManager, Lock
。我已将其用作使用多处理Process
函数调用创建的进程之间的共享对象。共享列表用于存储共享每个进程所生成的值/对象。
使用Python 和的SharedList的实现Manager
Lock
multiprocessing
class SharedList(object): def __init__(self, limit): self.manager = Manager() self.results = self.manager.list([]) self.lock = Lock() self.limit = limit def append(self, new_value): with self.lock: if len(self.results) == self.limit: return False self.results.append(new_value) return True def list(self): with self.lock: return list(self.results).copy()
使用创建的SharedList存储使用创建的多个进程的值multiprocessing
results = SharedList(limit) num_processes = min(process_count, limit) processes = [] for i in range(num_processes): new_process = Process(target=child_function, args=(results)) processes.append(new_process) new_process.start() for _process in processes: _process.join() for _process in processes: _process.close()
实施 child_function
while True: result = func() if not (results.append(result)): break
在某些情况下,该实现可以工作,但是当我增加限制时会遇到麻烦。我使用的处理器数量少于CPU数量,并且相同的实验仍然挂在相同的位置。
有没有更好的方法来解决上述问题,我已经研究了其他方法,例如使用Queue,但这种方法无法按预期工作,请挂断电话?
添加了使用Queue的先前实现
使用队列实现
results_out = [] manager = multiprocessing.Manager() results = manager.Queue() tasks = manager.Queue() num_processes = min(process_count, limit) processes = [] for i in range(num_processes): new_process = multiprocessing.Process(target=child_function, args=(tasks, results) processes.append(new_process) new_process.start() sleep(5) for i in range(limit): tasks.put(0) sleep(1) for i in range(num_processes): tasks.put(-1) num_finished_processes = 0 while True: new_result = results.get() if new_result == -1: num_finished_processes += 1 if num_finished_processes == num_processes: break else: results_out.append(new_result) for process in processes: process.join() for process in processes: process.close()
在 child_function
while True: task_val = tasks.get() if task_val < 0: results.put(-1) break else: result = func() results.put(result)
更新
在发布此问题之前,我已经阅读了以下参考资料,但是我无法获得所需的输出。我同意,这段代码导致了死锁状态,但是我无法在python中使用多处理来找到没有死锁的实现
参考文献
共享列表的多处理
https://pymotw.com/2/multiprocessing/basics.html
python的多处理中的共享变量
https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing
https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba
http://kmdouglass.github.io/posts/learning-pythons-multiprocessing-module/
python多处理/线程清理
根据该建议,我能够修改SharedList使用Queue
class SharedList(object): def __init__(self, limit): self.manager = Manager() self.tasks = self.manager.Queue() self.results = self.manager.Queue() self.limit = limit self.no_of_process = min(process_count, limit) def setup(self): sleep(1) for i in range(self.limit): self.tasks.put(0) sleep(1) for i in range(self.no_of_process): self.tasks.put(-1) def append(self, new_value): task_val = self.tasks.get() if task_val < 0: self.results.put(-1) return False else: self.results.put(new_value) return True def list(self): results_out = [] num_finished_processes = 0 while True: new_result = self.results.get() if new_result == -1: num_finished_processes += 1 if num_finished_processes == self.no_of_process: break else: results_out.append(new_result) return results_out
此实现工作正常,但需要更改以下实现
results = SharedList(limit) num_processes = min(process_count, limit) processes = [] for i in range(num_processes): new_process = Process(target=child_function, args=(results)) processes.append(new_process) new_process.start() results.setup() for _process in processes: _process.join() for _process in processes: _process.close()
实施 child_function
while True: result = func() if not (results.append(result)): break
但是,仍然如此,在一些迭代之后,它再次陷入死锁,在此死机