当前位置:  开发笔记 > 编程语言 > 正文

使用multiprocessing.pool.map传递kwargs

如何解决《使用multiprocessing.pool.map传递kwargs》经验,为你挑选了2个好方法。

我想通过Pool.map()将关键字参数传递给我的worker-function.在搜索论坛时,我找不到一个明确的例子.

示例代码:

import multiprocessing as mp

def worker((x,y), **kwargs):
    kwarg_test = kwargs.get('kwarg_test', False)
    print("kwarg_test = {}".format(kwarg_test))     
    if kwarg_test:
        print("Success")
    return x*y

def wrapper_process(**kwargs):
    jobs = []
    pool=mp.Pool(4)
    for i, n in enumerate(range(4)):
        jobs.append((n,i))
    pool.map(worker, jobs) #works
    pool.map(worker, jobs, kwargs) #how to do this?   

def main(**kwargs):
    worker((1,2),kwarg_test=True) #accepts kwargs
    wrapper_process(kwarg_test=True)

if __name__ == "__main__":    
    main()

输出:

kwarg_test = True
Success
kwarg_test = False
kwarg_test = False
kwarg_test = False
kwarg_test = False
TypeError: unsupported operand type(s) for //: 'int' and 'dict'

类型错误与解析multiprocessing.Pool或Queue中的参数有关,我尝试了其他几种语法,比如制作一个kwargs列表; [kwargs,kwargs,kwargs,kwargs],以及几次尝试将kwarg列入工作列表但没有运气.我在multiprocessing.pool中从map到map_async跟踪代码,并task_batches = Pool._get_tasks(func, iterable, chunksize) 在遇到生成器结构时得到了pool.py中的代码 .我很高兴将来能够更多地了解这一点,但现在我只是想知道:

是否有一个简单的语法允许使用pool.map传递kwargs?



1> Mark..:

如果要迭代其他参数,请使用@ArcturusB的答案.

如果您只想传递它们,每次迭代具有相同的值,那么您可以这样做:

from functools import partial
pool.map(partial(worker, **kwargs), jobs)

部分 '绑定'函数的参数.旧版本的Python 无法序列化部分对象.



2> Arcturus B..:

multiprocessing.pool.Pool.map文件指出:

map()内置函数的并行等价物(它只支持一个可迭代的参数).它会阻塞,直到结果准备就绪.

我们只能传递一个可迭代的参数.故事的结局.但我们可以想到一个解决方法:定义worker_wrapper带有单个参数的函数,将其解包为args和kwargs,并将它们传递给worker:

def worker_wrapper(arg):
    args, kwargs = arg
    return worker(*args, **kwargs)

在你的wrapper_process,你需要构建这个单个参数 jobs(甚至直接在构造作业时)并调用worker_wrapper:

arg = [(j, kwargs) for j in jobs]
pool.map(worker_wrapper, arg)

这是一个有效的实现,尽可能接近原始代码:

import multiprocessing as mp

def worker_wrapper(arg):
    args, kwargs = arg
    return worker(*args, **kwargs)

def worker(x, y, **kwargs):
    kwarg_test = kwargs.get('kwarg_test', False)
    # print("kwarg_test = {}".format(kwarg_test))     
    if kwarg_test:
        print("Success")
    else:
        print("Fail")
    return x*y

def wrapper_process(**kwargs):
    jobs = []
    pool=mp.Pool(4)
    for i, n in enumerate(range(4)):
        jobs.append((n,i))
    arg = [(j, kwargs) for j in jobs]
    pool.map(worker_wrapper, arg)

def main(**kwargs):
    print("=> calling `worker`")
    worker(1, 2,kwarg_test=True) #accepts kwargs
    print("=> no kwargs")
    wrapper_process() # no kwargs
    print("=> with `kwar_test=True`")
    wrapper_process(kwarg_test=True)

if __name__ == "__main__":    
    main()

通过测试:

=> calling `worker`
Success
=> no kwargs
Fail
Fail
Fail
Fail
=> with `kwar_test=True`
Success
Success
Success
Success

推荐阅读
地之南_816
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有