当前位置:  开发笔记 > 编程语言 > 正文

多处理:如何在类中定义的函数上使用Pool.map?

如何解决《多处理:如何在类中定义的函数上使用Pool.map?》经验,为你挑选了9个好方法。

当我运行类似的东西:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

它工作正常.但是,将此作为类的函数:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

给我以下错误:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle : attribute lookup __builtin__.function failed

我看过Alex Martelli的一篇文章处理同样的问题,但它不够明确.



1> klaus se..:

我无法使用到目前为止发布的代码,因为使用"multiprocessing.Pool"的代码不能与lambda表达式一起使用,而不使用"multiprocessing.Pool"的代码会产生与工作项一样多的进程.

我调整了代码,它产生了预定义数量的工作者,并且只有在存在空闲工作者时才迭代输入列表.我还为worker st ctrl-c启用了"守护进程"模式.

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))


@deshtop:如果你有足够的声誉,你可以获得赏金:-)
你如何获得一个进度条来正确使用这个`parmap`函数?
一个问题 - 我使用了这个解决方案,但注意到我生成的python进程在内存中保持活动状态.有关如何在parmap退出时杀死它们的任何快速思考?
@greole传递`(None,None)`作为最后一项指示'fun`它已经到达每个进程的项目序列的末尾.

2> 小智..:

我也对pool.map可以接受的函数限制感到恼火.我写了以下内容以规避这一点.它似乎工作,即使是递归使用parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe,x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p,c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))


这适用于Python 2.7.3 Aug 1,2012,05:14:39.这不适用于巨型迭代 - >它导致OSError:[Errno 24]由于打开的管道数量太多打开文件.
这在win32上的Python 2.7.2(默认,2011年6月12日,15:08:59)[MSC v.1500 32位(英特尔)]中不起作用

3> Mike McKerns..:

除非跳到标准库之外,否则多处理和酸洗会被破坏和限制.

如果使用multiprocessing被调用的fork pathos.multiprocesssing,则可以直接在多处理map函数中使用类和类方法.这是因为dill使用而不是picklecPickle,并且dill可以序列化python中的几乎任何东西.

pathos.multiprocessing还提供了一个异步映射函数......它可以map使用多个参数(例如map(math.pow, [1,2,3], [4,5,6]))

请参阅讨论: 多处理和莳萝可以一起做什么?

并且:http: //matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

它甚至可以处理您最初编写的代码,无需修改,也可以处理解释器. 为什么还要做一些比单个案例更脆弱和更具体的事情呢?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

获取代码:https: //github.com/uqfoundation/pathos

而且,只是为了展示它可以做的更多:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]


@xApple:就像后续一样,`pathos`有一个新的稳定版本,也兼容2.x和3.x.

4> Eric O Lebig..:

据我所知,目前还没有解决您的问题的方法:您map()必须通过导入模块来访问您提供的功能.这就是罗伯特代码的工作原理:f()可以通过导入以下代码获得该函数:

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

我实际上添加了一个"主要"部分,因为它遵循Windows平台的建议("确保主模块可以由新的Python解释器安全地导入而不会导致意外的副作用").

我还在前面添加了一个大写字母Calculate,以便遵循PEP 8.:)



5> 小智..:

mrule的解决方案是正确的,但有一个错误:如果孩子发回大量数据,它可以填充管道的缓冲区,阻塞孩子的pipe.send(),而父母正在等待孩子退出pipe.join().解决方案是在孩子面前阅读孩子的数据join().此外,孩子应该关闭父管的末端以防止死锁.下面的代码修复了这个问题.另请注意,这parmap会为每个元素创建一个进程X.更先进的解决方案是使用multiprocessing.cpu_count()划分X成若干块,然后返回结果之前合并.我把它作为练习留给读者,以免破坏了mrule的好回答的简洁性.;)

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))



6> Brandt..:

我也在努力解决这个问题.我将函数作为类的数据成员,作为简化示例:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

我需要在同一个类中的Pool.map()调用中使用self.f函数,self.f没有将元组作为参数.由于此函数嵌入在类中,因此我不清楚如何编写其他答案建议的包装类型.

我通过使用一个带有元组/列表的不同包装器解决了这个问题,其中第一个元素是函数,其余元素是该函数的参数,称为eval_func_tuple(f_args).使用它,有问题的行可以被return pool.map替换(eval_func_tuple,itertools.izip(itertools.repeat(self.f),list1,list2)).这是完整的代码:

文件:util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

文件:main.py

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

运行main.py将给出[11,22,33].随意改进这一点,例如,也可以修改eval_func_tuple以获取关键字参数.

另一方面,在另一个答案中,对于更多进程的情况,可以使函数"parmap"更有效,而不是可用的CPU数量.我正在复制下面的编辑版本.这是我的第一篇文章,我不确定是否应该直接编辑原始答案.我还重命名了一些变量.

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))         



7> robert..:

在类中定义的函数(甚至在类中的函数内)并不真正发泡.但是,这有效:

def f(x):
    return x*x

class calculate(object):
    def run(self):
        p = Pool()
    return p.map(f, [1,2,3])

cl = calculate()
print cl.run()


谢谢,但我发现在课外定义函数有点脏.该类应捆绑实现给定任务所需的全部内容.
@Memoz:"班级应该捆绑所有需要的"真的吗?我找不到很多这方面的例子.大多数类依赖于其他类或函数.为什么要将类依赖性称为"脏"?依赖有什么问题?

8> xApple..:

我接受了klaus se和aganders3的回答,并制作了一个更具可读性的文档化模块,并保存在一个文件中.您只需将其添加到项目中即可.它甚至还有一个可选的进度条!

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from http://stackoverflow.com/a/16071616/287297

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

编辑:添加了@ alexander-mcfarlane建议和测试功能



9> nightowl..:

我知道这是6年前的问题,但是我只想添加我的解决方案,因为上面的一些建议看起来非常复杂,但我的解决方案实际上非常简单.

我所要做的就是将pool.map()调用包装到辅助函数中.将类对象和方法的args作为元组传递,看起来有点像这样.

def run_in_parallel(args):
    return args[0].method(args[1])

myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)

推荐阅读
sx-March23
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有