我有一个相当复杂的Python对象,我需要在多个进程之间共享.我使用启动这些过程multiprocessing.Process
.当我分享一个对象multiprocessing.Queue
,并multiprocessing.Pipe
在其中,他们共享就好了.但是当我尝试与其他非多处理模块对象共享一个对象时,似乎Python会分叉这些对象.真的吗?
我尝试使用multiprocessing.Value.但我不确定应该是什么类型的?我的对象类叫做MyClass.但是当我尝试时multiprocess.Value(MyClass, instance)
,它失败了:
TypeError: this type has no size
知道发生了什么事吗?
经过大量的研究和测试,我发现"经理"在非复杂的对象层面上完成这项工作.
下面的代码示出了对象inst
在进程之间共享,这意味着属性var
的inst
时子进程改变其外部改变.
from multiprocessing import Process, Manager from multiprocessing.managers import BaseManager class SimpleClass(object): def __init__(self): self.var = 0 def set(self, value): self.var = value def get(self): return self.var def change_obj_value(obj): obj.set(100) if __name__ == '__main__': BaseManager.register('SimpleClass', SimpleClass) manager = BaseManager() manager.start() inst = manager.SimpleClass() p = Process(target=change_obj_value, args=[inst]) p.start() p.join() print inst # <__main__.SimpleClass object at 0x10cf82350> print inst.get() # 100
好的,如果你只需要共享简单的对象,上面的代码就足够了.
为什么没有复杂?因为如果对象是嵌套的(对象内部对象),它可能会失败:
from multiprocessing import Process, Manager from multiprocessing.managers import BaseManager class GetSetter(object): def __init__(self): self.var = None def set(self, value): self.var = value def get(self): return self.var class ChildClass(GetSetter): pass class ParentClass(GetSetter): def __init__(self): self.child = ChildClass() GetSetter.__init__(self) def getChild(self): return self.child def change_obj_value(obj): obj.set(100) obj.getChild().set(100) if __name__ == '__main__': BaseManager.register('ParentClass', ParentClass) manager = BaseManager() manager.start() inst2 = manager.ParentClass() p2 = Process(target=change_obj_value, args=[inst2]) p2.start() p2.join() print inst2 # <__main__.ParentClass object at 0x10cf82350> print inst2.getChild() # <__main__.ChildClass object at 0x10cf6dc50> print inst2.get() # 100 #good! print inst2.getChild().get() # None #bad! you need to register child class too but there's almost no way to do it #even if you did register child class, you may get PicklingError :)
我认为这种行为的主要原因是因为Manager
只是在管道/队列等低级通信工具之上构建了一个直板.
因此,这种方法不适用于多处理案例.如果您可以使用锁定/信号量/管道/队列等低级工具或Redis 队列或Redis发布/订阅等高级工具来处理复杂的用例(仅限我的推荐lol),那总是更好.
您可以使用Python的Multiprocessing"Manager"类和您定义的代理类来完成此操作.来自Python文档:http: //docs.python.org/library/multiprocessing.html#proxy-objects
您要做的是为自定义对象定义代理类,然后使用"远程管理器"共享对象 - 查看"远程管理器"的相同链接文档页面中的示例,其中文档显示如何共享一个远程队列.你将会做同样的事情,但是你对your_manager_instance.register()的调用将在你的参数列表中包含你的自定义代理类.
通过这种方式,您将设置一个服务器以与自定义代理共享自定义对象.您的客户端需要访问服务器(再次,请参阅如何设置客户端/服务器访问远程队列的优秀文档示例,但您不是共享队列,而是共享对特定类的访问权限).
这是我为此而制作的python包(在进程之间共享复杂的对象)。
git的:https : //github.com/dRoje/pipe-proxy
这个想法是您为对象创建代理,并将其传递给流程。然后,您可以像使用对原始对象的引用一样使用代理。尽管您只能使用方法调用,但是访问对象变量的操作是通过setter和getter进行的。
假设我们有一个名为“示例”的对象,创建代理和代理侦听器很容易:
from pipeproxy import proxy example = Example() exampleProxy, exampleProxyListener = proxy.createProxy(example)
现在,您将代理发送到另一个进程。
p = Process(target=someMethod, args=(exampleProxy,)) p.start()
与使用原始对象(示例)一样,在其他过程中使用它:
def someMethod(exampleProxy): ... exampleProxy.originalExampleMethod() ...
但是您必须在主要过程中听一下:
exampleProxyListener.listen()
在此处阅读更多内容并找到示例:
http://matkodjipalo.com/index.php/2017/11/12/proxy-solution-python-multiprocessing/