当前位置:  开发笔记 > 编程语言 > 正文

在多处理中使用共享列表的正确方法是什么

如何解决《在多处理中使用共享列表的正确方法是什么》经验,为你挑选了0个好方法。

我已经在多处理的帮助下在Python(版本3.7)中实现了SharedListManager, Lock。我已将其用作使用多处理Process函数调用创建的进程之间的共享对象。共享列表用于存储共享每个进程所生成的值/对象。

使用Python 和的SharedList的实现ManagerLockmultiprocessing

class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.results = self.manager.list([])
        self.lock = Lock()
        self.limit = limit

    def append(self, new_value):
        with self.lock:
            if len(self.results) == self.limit:
                return False
            self.results.append(new_value)
            return True

    def list(self):
        with self.lock:
            return list(self.results).copy()

使用创建的SharedList存储使用创建的多个进程的值multiprocessing

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

实施 child_function

while True:
  result = func()
  if not (results.append(result)):
     break

在某些情况下,该实现可以工作,但是当我增加限制时会遇到麻烦。我使用的处理器数量少于CPU数量,并且相同的实验仍然挂在相同的位置。

有没有更好的方法来解决上述问题,我已经研究了其他方法,例如使用Queue,但这种方法无法按预期工作,请挂断电话?

添加了使用Queue的先前实现

使用队列实现

results_out = []
manager = multiprocessing.Manager()
results = manager.Queue()
tasks = manager.Queue()
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
    new_process = multiprocessing.Process(target=child_function,
                                            args=(tasks, results)
    processes.append(new_process)
    new_process.start()

sleep(5)
for i in range(limit):
    tasks.put(0)
sleep(1)

for i in range(num_processes):
    tasks.put(-1)

num_finished_processes = 0
while True:
    new_result = results.get()
    if new_result == -1:
        num_finished_processes += 1
        if num_finished_processes == num_processes:
            break
    else:
        results_out.append(new_result)

for process in processes:
    process.join()

for process in processes:
    process.close()

child_function

while True:
    task_val = tasks.get()
    if task_val < 0:
        results.put(-1)
        break
    else:
        result = func()
        results.put(result)

更新

在发布此问题之前,我已经阅读了以下参考资料,但是我无法获得所需的输出。我同意,这段代码导致了死锁状态,但是我无法在python中使用多处理来找到没有死锁的实现

参考文献

    共享列表的多处理

    https://pymotw.com/2/multiprocessing/basics.html

    python的多处理中的共享变量

    https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing

    https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba

    http://kmdouglass.github.io/posts/learning-pythons-multiprocessing-module/

    python多处理/线程清理

根据该建议,我能够修改SharedList使用Queue

class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.tasks = self.manager.Queue()
        self.results = self.manager.Queue()
        self.limit = limit
        self.no_of_process = min(process_count, limit)

    def setup(self):
        sleep(1)
        for i in range(self.limit):
            self.tasks.put(0)
        sleep(1)
        for i in range(self.no_of_process):
            self.tasks.put(-1)

    def append(self, new_value):
        task_val = self.tasks.get()
        if task_val < 0:
            self.results.put(-1)
            return False
        else:
            self.results.put(new_value)
            return True

    def list(self):
        results_out = []
        num_finished_processes = 0
        while True:
            new_result = self.results.get()
            if new_result == -1:
                num_finished_processes += 1
                if num_finished_processes == self.no_of_process:
                    break
            else:
                results_out.append(new_result)
        return results_out

此实现工作正常,但需要更改以下实现

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

results.setup()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

实施 child_function

while True:
  result = func()
  if not (results.append(result)):
     break

但是,仍然如此,在一些迭代之后,它再次陷入死锁,在此死机

推荐阅读
ar_wen2402851455
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有