多进程代码封装报错

策略分享
标签: #<Tag:0x00007fcf79adae80>

(polll) #1
克隆策略

一段多进程代码如下:

In [12]:
from multiprocessing import Pool
def worker(arg):
    return  arg
 
ps=Pool(5)
results = [] 
for i in range(10):
    results.append(ps.apply_async(worker,args=(i,))) # 异步执行

    
ps.close()
# 阻塞进程
ps.join()

summary = []
for res in results:
    summary.append(res.get())
print(summary)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
In [ ]:
## 问题描述

希望将上述的多进程代码封装为可视化模块但报错如下

AttributeError: Can't pickle local object 'm1_run_bigquant_run.<locals>.worker'

    {"Description":"实验创建于2020/10/6","Summary":"","Graph":{"EdgesInternal":[],"ModuleNodes":[{"Id":"-417","ModuleId":"BigQuantSpace.cached.cached-v3","ModuleParameters":[{"Name":"run","Value":"# Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端\ndef bigquant_run(input_1, input_2, input_3):\n \n from multiprocessing import Pool\n def worker(arg):\n return arg\n\n ps=Pool(5)\n results = [] \n for i in range(10):\n results.append(ps.apply_async(worker,args=(i,))) # 异步执行\n\n\n ps.close()\n # 阻塞进程\n ps.join()\n\n summary = []\n for res in results:\n summary.append(res.get())\n print(summary)\n\n return Outputs()","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"post_run","Value":"# 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。\ndef bigquant_run(outputs):\n return outputs\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"input_ports","Value":"","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"params","Value":"{}","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"output_ports","Value":"","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_1","NodeId":"-417"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_2","NodeId":"-417"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_3","NodeId":"-417"}],"OutputPortsInternal":[{"Name":"data_1","NodeId":"-417","OutputType":null},{"Name":"data_2","NodeId":"-417","OutputType":null},{"Name":"data_3","NodeId":"-417","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":1,"Comment":"","CommentCollapsed":true}],"SerializedClientData":"<?xml version='1.0' encoding='utf-16'?><DataV1 xmlns:xsd='http://www.w3.org/2001/XMLSchema' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'><Meta /><NodePositions><NodePosition Node='-417' Position='233.125,470.8951416015625,200,200'/></NodePositions><NodeGroups /></DataV1>"},"IsDraft":true,"ParentExperimentId":null,"WebService":{"IsWebServiceExperiment":false,"Inputs":[],"Outputs":[],"Parameters":[{"Name":"交易日期","Value":"","ParameterDefinition":{"Name":"交易日期","FriendlyName":"交易日期","DefaultValue":"","ParameterType":"String","HasDefaultValue":true,"IsOptional":true,"ParameterRules":[],"HasRules":false,"MarkupType":0,"CredentialDescriptor":null}}],"WebServiceGroupId":null,"SerializedClientData":"<?xml version='1.0' encoding='utf-16'?><DataV1 xmlns:xsd='http://www.w3.org/2001/XMLSchema' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'><Meta /><NodePositions></NodePositions><NodeGroups /></DataV1>"},"DisableNodesUpdate":false,"Category":"user","Tags":[],"IsPartialRun":true}
    In [11]:
    # 本代码由可视化策略环境自动生成 2020年10月6日 19:15
    # 本代码单元只能在可视化模式下编辑。您也可以拷贝代码,粘贴到新建的代码单元或者策略,然后修改。
    
    
    # Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端
    def m1_run_bigquant_run(input_1, input_2, input_3):
       
        from multiprocessing import Pool
        def worker(arg):
            return  arg
    
        ps=Pool(5)
        results = [] 
        for i in range(10):
            results.append(ps.apply_async(worker,args=(i,))) # 异步执行
    
    
        ps.close()
        # 阻塞进程
        ps.join()
    
        summary = []
        for res in results:
            summary.append(res.get())
        print(summary)
    
        return Outputs()
    # 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。
    def m1_post_run_bigquant_run(outputs):
        return outputs
    
    
    m1 = M.cached.v3(
        run=m1_run_bigquant_run,
        post_run=m1_post_run_bigquant_run,
        input_ports='',
        params='{}',
        output_ports=''
    )
    
    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
    <ipython-input-11-8138903e4793> in <module>()
         36     input_ports='',
         37     params='{}',
    ---> 38     output_ports=''
         39 )
    
    /var/app/enabled/biglearning/module2/common/modulemanagerv2.cpython-36m-x86_64-linux-gnu.so in biglearning.module2.common.modulemanagerv2.BigQuantModuleVersion.__call__()
    
    /var/app/enabled/biglearning/module2/common/moduleinvoker.cpython-36m-x86_64-linux-gnu.so in biglearning.module2.common.moduleinvoker.module_invoke()
    
    /var/app/enabled/biglearning/module2/common/moduleinvoker.cpython-36m-x86_64-linux-gnu.so in biglearning.module2.common.moduleinvoker._invoke_with_cache()
    
    /var/app/enabled/biglearning/module2/common/moduleinvoker.cpython-36m-x86_64-linux-gnu.so in biglearning.module2.common.moduleinvoker._invoke_with_cache()
    
    /var/app/enabled/biglearning/module2/common/moduleinvoker.cpython-36m-x86_64-linux-gnu.so in biglearning.module2.common.moduleinvoker._module_run()
    
    /var/app/enabled/biglearning/module2/modules/cached/v3/__init__.cpython-36m-x86_64-linux-gnu.so in biglearning.module2.modules.cached.v3.__init__.BigQuantModule.run()
    
    <ipython-input-11-8138903e4793> in m1_run_bigquant_run(input_1, input_2, input_3)
         22     summary = []
         23     for res in results:
    ---> 24         summary.append(res.get())
         25     print(summary)
         26 
    
    /usr/lib64/python3.6/multiprocessing/pool.py in get(self, timeout)
        642             return self._value
        643         else:
    --> 644             raise self._value
        645 
        646     def _set(self, i, obj):
    
    /usr/lib64/python3.6/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
        422                         break
        423                     try:
    --> 424                         put(task)
        425                     except Exception as e:
        426                         job, idx = task[:2]
    
    /usr/lib64/python3.6/multiprocessing/connection.py in send(self, obj)
        204         self._check_closed()
        205         self._check_writable()
    --> 206         self._send_bytes(_ForkingPickler.dumps(obj))
        207 
        208     def recv_bytes(self, maxlength=None):
    
    /usr/lib64/python3.6/multiprocessing/reduction.py in dumps(cls, obj, protocol)
         49     def dumps(cls, obj, protocol=None):
         50         buf = io.BytesIO()
    ---> 51         cls(buf, protocol).dump(obj)
         52         return buf.getbuffer()
         53 
    
    AttributeError: Can't pickle local object 'm1_run_bigquant_run.<locals>.worker'

    (polll) #2

    主要目的是想并发来处理一个问题,并且要能封装成可视化模块。
    其实 多进程 多线程都是可以的,运行完以后,要能汇总并发处理的结果,便于往下游模块传输数据


    (iQuant) #3

    错误原因

    Pool needs to pickle (serialize) everything it sends to its worker-processes ([IPC](https://en.wikipedia.org/wiki/Inter-process_communication)). Pickling actually only saves the name of a function and unpickling requires re-importing the function by name. For that to work, the function needs to be defined at the top-level, nested functions won't be importable by the child and already trying to pickle them raises an exception ([more](https://stackoverflow.com/a/56534386/9059420)).
    

    临时解决方案

    将worker放到top level定义,但这会带来top level函数可能冲突的问题

    更好的解决方案

    参考如下的代码,使用bigpickle序列化需要的函数

    def _picklable_func(func_dumps, *args, **kwargs):
        import biglearning.module2.common.bigpickle as bigpickle
        func = bigpickle.loads(func_dumps)
        return func(*args, **kwargs)
    
    
    # Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端
    def bigquant_run(input_1, input_2, input_3):
        def picklable(func):
            from functools import partial
            import biglearning.module2.common.bigpickle as bigpickle
            return partial(_picklable_func, bigpickle.dumps(func))
       
        # from multiprocessing import Pool
        def worker(arg):
            return arg
    
        ps=multiprocessing.Pool(5)
        results = [] 
        for i in range(10):
            results.append(ps.apply_async(picklable(worker), args=(i,))) # 异步执行
    
        ps.close()
        # 阻塞进程
        ps.join()
    
        summary = []
        for res in results:
            summary.append(res.get())
        print(summary)
    
        return Outputs()
    

    解决方案内置支持

    我们已经将如上方案内置到系统支持中,参考如下代码,调用 T.picklable(worker) 就可以了

    # Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端
    def bigquant_run(input_1, input_2, input_3):
        # from multiprocessing import Pool
        def worker(arg):
            return arg
    
        ps=multiprocessing.Pool(5)
        results = [] 
        for i in range(10):
            results.append(ps.apply_async(T.picklable(worker), args=(i,))) # 异步执行
    
        ps.close()
        # 阻塞进程
        ps.join()
    
        summary = []
        for res in results:
            summary.append(res.get())
        print(summary)
    
        return Outputs()