我想通过Pool.map()将关键字参数传递给我的worker-function.在搜索论坛时,我找不到一个明确的例子.
示例代码:
import multiprocessing as mp def worker((x,y), **kwargs): kwarg_test = kwargs.get('kwarg_test', False) print("kwarg_test = {}".format(kwarg_test)) if kwarg_test: print("Success") return x*y def wrapper_process(**kwargs): jobs = [] pool=mp.Pool(4) for i, n in enumerate(range(4)): jobs.append((n,i)) pool.map(worker, jobs) #works pool.map(worker, jobs, kwargs) #how to do this? def main(**kwargs): worker((1,2),kwarg_test=True) #accepts kwargs wrapper_process(kwarg_test=True) if __name__ == "__main__": main()
输出:
kwarg_test = True Success kwarg_test = False kwarg_test = False kwarg_test = False kwarg_test = False TypeError: unsupported operand type(s) for //: 'int' and 'dict'
类型错误与解析multiprocessing.Pool或Queue中的参数有关,我尝试了其他几种语法,比如制作一个kwargs列表; [kwargs,kwargs,kwargs,kwargs],以及几次尝试将kwarg列入工作列表但没有运气.我在multiprocessing.pool中从map到map_async跟踪代码,并task_batches = Pool._get_tasks(func, iterable, chunksize)
在遇到生成器结构时得到了pool.py中的代码
.我很高兴将来能够更多地了解这一点,但现在我只是想知道:
是否有一个简单的语法允许使用pool.map传递kwargs?
如果要迭代其他参数,请使用@ArcturusB的答案.
如果您只想传递它们,每次迭代具有相同的值,那么您可以这样做:
from functools import partial pool.map(partial(worker, **kwargs), jobs)
部分 '绑定'函数的参数.旧版本的Python 无法序列化部分对象.
该multiprocessing.pool.Pool.map
文件指出:
map()内置函数的并行等价物(它只支持一个可迭代的参数).它会阻塞,直到结果准备就绪.
我们只能传递一个可迭代的参数.故事的结局.但我们可以想到一个解决方法:定义worker_wrapper
带有单个参数的函数,将其解包为args和kwargs,并将它们传递给worker
:
def worker_wrapper(arg): args, kwargs = arg return worker(*args, **kwargs)
在你的wrapper_process
,你需要构建这个单个参数
jobs
(甚至直接在构造作业时)并调用worker_wrapper
:
arg = [(j, kwargs) for j in jobs] pool.map(worker_wrapper, arg)
这是一个有效的实现,尽可能接近原始代码:
import multiprocessing as mp def worker_wrapper(arg): args, kwargs = arg return worker(*args, **kwargs) def worker(x, y, **kwargs): kwarg_test = kwargs.get('kwarg_test', False) # print("kwarg_test = {}".format(kwarg_test)) if kwarg_test: print("Success") else: print("Fail") return x*y def wrapper_process(**kwargs): jobs = [] pool=mp.Pool(4) for i, n in enumerate(range(4)): jobs.append((n,i)) arg = [(j, kwargs) for j in jobs] pool.map(worker_wrapper, arg) def main(**kwargs): print("=> calling `worker`") worker(1, 2,kwarg_test=True) #accepts kwargs print("=> no kwargs") wrapper_process() # no kwargs print("=> with `kwar_test=True`") wrapper_process(kwarg_test=True) if __name__ == "__main__": main()
通过测试:
=> calling `worker` Success => no kwargs Fail Fail Fail Fail => with `kwar_test=True` Success Success Success Success