当我运行类似的东西:
from multiprocessing import Pool p = Pool(5) def f(x): return x*x p.map(f, [1,2,3])
它工作正常.但是,将此作为类的函数:
class calculate(object): def run(self): def f(x): return x*x p = Pool() return p.map(f, [1,2,3]) cl = calculate() print cl.run()
给我以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle : attribute lookup __builtin__.function failed
我看过Alex Martelli的一篇文章处理同样的问题,但它不够明确.
我无法使用到目前为止发布的代码,因为使用"multiprocessing.Pool"的代码不能与lambda表达式一起使用,而不使用"multiprocessing.Pool"的代码会产生与工作项一样多的进程.
我调整了代码,它产生了预定义数量的工作者,并且只有在存在空闲工作者时才迭代输入列表.我还为worker st ctrl-c启用了"守护进程"模式.
import multiprocessing def fun(f, q_in, q_out): while True: i, x = q_in.get() if i is None: break q_out.put((i, f(x))) def parmap(f, X, nprocs=multiprocessing.cpu_count()): q_in = multiprocessing.Queue(1) q_out = multiprocessing.Queue() proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out)) for _ in range(nprocs)] for p in proc: p.daemon = True p.start() sent = [q_in.put((i, x)) for i, x in enumerate(X)] [q_in.put((None, None)) for _ in range(nprocs)] res = [q_out.get() for _ in range(len(sent))] [p.join() for p in proc] return [x for i, x in sorted(res)] if __name__ == '__main__': print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
我也对pool.map可以接受的函数限制感到恼火.我写了以下内容以规避这一点.它似乎工作,即使是递归使用parmap.
from multiprocessing import Process, Pipe from itertools import izip def spawn(f): def fun(pipe,x): pipe.send(f(x)) pipe.close() return fun def parmap(f,X): pipe=[Pipe() for x in X] proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] [p.start() for p in proc] [p.join() for p in proc] return [p.recv() for (p,c) in pipe] if __name__ == '__main__': print parmap(lambda x:x**x,range(1,5))
除非跳到标准库之外,否则多处理和酸洗会被破坏和限制.
如果使用multiprocessing
被调用的fork pathos.multiprocesssing
,则可以直接在多处理map
函数中使用类和类方法.这是因为dill
使用而不是pickle
或cPickle
,并且dill
可以序列化python中的几乎任何东西.
pathos.multiprocessing
还提供了一个异步映射函数......它可以map
使用多个参数(例如map(math.pow, [1,2,3], [4,5,6])
)
请参阅讨论: 多处理和莳萝可以一起做什么?
并且:http: //matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
它甚至可以处理您最初编写的代码,无需修改,也可以处理解释器. 为什么还要做一些比单个案例更脆弱和更具体的事情呢?
>>> from pathos.multiprocessing import ProcessingPool as Pool >>> class calculate(object): ... def run(self): ... def f(x): ... return x*x ... p = Pool() ... return p.map(f, [1,2,3]) ... >>> cl = calculate() >>> print cl.run() [1, 4, 9]
获取代码:https: //github.com/uqfoundation/pathos
而且,只是为了展示它可以做的更多:
>>> from pathos.multiprocessing import ProcessingPool as Pool >>> >>> p = Pool(4) >>> >>> def add(x,y): ... return x+y ... >>> x = [0,1,2,3] >>> y = [4,5,6,7] >>> >>> p.map(add, x, y) [4, 6, 8, 10] >>> >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> >>> p.map(Test.plus, [t]*4, x, y) [4, 6, 8, 10] >>> >>> res = p.amap(t.plus, x, y) >>> res.get() [4, 6, 8, 10]
据我所知,目前还没有解决您的问题的方法:您map()
必须通过导入模块来访问您提供的功能.这就是罗伯特代码的工作原理:f()
可以通过导入以下代码获得该函数:
def f(x): return x*x class Calculate(object): def run(self): p = Pool() return p.map(f, [1,2,3]) if __name__ == '__main__': cl = Calculate() print cl.run()
我实际上添加了一个"主要"部分,因为它遵循Windows平台的建议("确保主模块可以由新的Python解释器安全地导入而不会导致意外的副作用").
我还在前面添加了一个大写字母Calculate
,以便遵循PEP 8.:)
mrule的解决方案是正确的,但有一个错误:如果孩子发回大量数据,它可以填充管道的缓冲区,阻塞孩子的pipe.send()
,而父母正在等待孩子退出pipe.join()
.解决方案是在孩子面前阅读孩子的数据join()
.此外,孩子应该关闭父管的末端以防止死锁.下面的代码修复了这个问题.另请注意,这parmap
会为每个元素创建一个进程X
.更先进的解决方案是使用multiprocessing.cpu_count()
划分X
成若干块,然后返回结果之前合并.我把它作为练习留给读者,以免破坏了mrule的好回答的简洁性.;)
from multiprocessing import Process, Pipe from itertools import izip def spawn(f): def fun(ppipe, cpipe,x): ppipe.close() cpipe.send(f(x)) cpipe.close() return fun def parmap(f,X): pipe=[Pipe() for x in X] proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)] [p.start() for p in proc] ret = [p.recv() for (p,c) in pipe] [p.join() for p in proc] return ret if __name__ == '__main__': print parmap(lambda x:x**x,range(1,5))
我也在努力解决这个问题.我将函数作为类的数据成员,作为简化示例:
from multiprocessing import Pool import itertools pool = Pool() class Example(object): def __init__(self, my_add): self.f = my_add def add_lists(self, list1, list2): # Needed to do something like this (the following line won't work) return pool.map(self.f,list1,list2)
我需要在同一个类中的Pool.map()调用中使用self.f函数,self.f没有将元组作为参数.由于此函数嵌入在类中,因此我不清楚如何编写其他答案建议的包装类型.
我通过使用一个带有元组/列表的不同包装器解决了这个问题,其中第一个元素是函数,其余元素是该函数的参数,称为eval_func_tuple(f_args).使用它,有问题的行可以被return pool.map替换(eval_func_tuple,itertools.izip(itertools.repeat(self.f),list1,list2)).这是完整的代码:
文件:util.py
def add(a, b): return a+b def eval_func_tuple(f_args): """Takes a tuple of a function and args, evaluates and returns result""" return f_args[0](*f_args[1:])
文件:main.py
from multiprocessing import Pool import itertools import util pool = Pool() class Example(object): def __init__(self, my_add): self.f = my_add def add_lists(self, list1, list2): # The following line will now work return pool.map(util.eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2)) if __name__ == '__main__': myExample = Example(util.add) list1 = [1, 2, 3] list2 = [10, 20, 30] print myExample.add_lists(list1, list2)
运行main.py将给出[11,22,33].随意改进这一点,例如,也可以修改eval_func_tuple以获取关键字参数.
另一方面,在另一个答案中,对于更多进程的情况,可以使函数"parmap"更有效,而不是可用的CPU数量.我正在复制下面的编辑版本.这是我的第一篇文章,我不确定是否应该直接编辑原始答案.我还重命名了一些变量.
from multiprocessing import Process, Pipe from itertools import izip def spawn(f): def fun(pipe,x): pipe.send(f(x)) pipe.close() return fun def parmap(f,X): pipe=[Pipe() for x in X] processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] numProcesses = len(processes) processNum = 0 outputList = [] while processNum < numProcesses: endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses) for proc in processes[processNum:endProcessNum]: proc.start() for proc in processes[processNum:endProcessNum]: proc.join() for proc,c in pipe[processNum:endProcessNum]: outputList.append(proc.recv()) processNum = endProcessNum return outputList if __name__ == '__main__': print parmap(lambda x:x**x,range(1,5))
在类中定义的函数(甚至在类中的函数内)并不真正发泡.但是,这有效:
def f(x): return x*x class calculate(object): def run(self): p = Pool() return p.map(f, [1,2,3]) cl = calculate() print cl.run()
我接受了klaus se和aganders3的回答,并制作了一个更具可读性的文档化模块,并保存在一个文件中.您只需将其添加到项目中即可.它甚至还有一个可选的进度条!
""" The ``processes`` module provides some convenience functions for using parallel processes in python. Adapted from http://stackoverflow.com/a/16071616/287297 Example usage: print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True) Comments: "It spawns a predefined amount of workers and only iterates through the input list if there exists an idle worker. I also enabled the "daemon" mode for the workers so that KeyboardInterupt works as expected." Pitfalls: all the stdouts are sent back to the parent stdout, intertwined. Alternatively, use this fork of multiprocessing: https://github.com/uqfoundation/multiprocess """ # Modules # import multiprocessing from tqdm import tqdm ################################################################################ def apply_function(func_to_apply, queue_in, queue_out): while not queue_in.empty(): num, obj = queue_in.get() queue_out.put((num, func_to_apply(obj))) ################################################################################ def prll_map(func_to_apply, items, cpus=None, verbose=False): # Number of processes to use # if cpus is None: cpus = min(multiprocessing.cpu_count(), 32) # Create queues # q_in = multiprocessing.Queue() q_out = multiprocessing.Queue() # Process list # new_proc = lambda t,a: multiprocessing.Process(target=t, args=a) processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)] # Put all the items (objects) in the queue # sent = [q_in.put((i, x)) for i, x in enumerate(items)] # Start them all # for proc in processes: proc.daemon = True proc.start() # Display progress bar or not # if verbose: results = [q_out.get() for x in tqdm(range(len(sent)))] else: results = [q_out.get() for x in range(len(sent))] # Wait for them to finish # for proc in processes: proc.join() # Return results # return [x for i, x in sorted(results)] ################################################################################ def test(): def slow_square(x): import time time.sleep(2) return x**2 objs = range(20) squares = prll_map(slow_square, objs, 4, verbose=True) print "Result: %s" % squares
编辑:添加了@ alexander-mcfarlane建议和测试功能
我知道这是6年前的问题,但是我只想添加我的解决方案,因为上面的一些建议看起来非常复杂,但我的解决方案实际上非常简单.
我所要做的就是将pool.map()调用包装到辅助函数中.将类对象和方法的args作为元组传递,看起来有点像这样.
def run_in_parallel(args): return args[0].method(args[1]) myclass = MyClass() method_args = [1,2,3,4,5,6] args_map = [ (myclass, arg) for arg in method_args ] pool = Pool() pool.map(run_in_parallel, args_map)