自定义模块教程-拼接数据

自定义模块
标签: #<Tag:0x00007f73fccbb2f0>

(iQuant) #1

参考 自定义模块教程-开发一个修改数据列名的模块为例-超级详细版 开发自定义模块

克隆策略

    {"Description":"实验创建于2017/8/26","Summary":"","Graph":{"EdgesInternal":[{"DestinationInputPortId":"-27:input_ds","SourceOutputPortId":"-185:data"},{"DestinationInputPortId":"-84:input_1","SourceOutputPortId":"-185:data"},{"DestinationInputPortId":"-36:input_1","SourceOutputPortId":"-185:data"},{"DestinationInputPortId":"-84:input_2","SourceOutputPortId":"-27:data"},{"DestinationInputPortId":"-84:input_3","SourceOutputPortId":"-27:data"},{"DestinationInputPortId":"-36:input_2","SourceOutputPortId":"-27:data"},{"DestinationInputPortId":"-36:input_3","SourceOutputPortId":"-27:data"}],"ModuleNodes":[{"Id":"-185","ModuleId":"BigQuantSpace.use_datasource.use_datasource-v1","ModuleParameters":[{"Name":"datasource_id","Value":"bar1d_CN_STOCK_A","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"start_date","Value":"2018-01-01","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"end_date","Value":"2018-02-01","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"instruments","NodeId":"-185"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"features","NodeId":"-185"}],"OutputPortsInternal":[{"Name":"data","NodeId":"-185","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":4,"Comment":"","CommentCollapsed":true},{"Id":"-27","ModuleId":"BigQuantSpace.date_add_days.date_add_days-v2","ModuleParameters":[{"Name":"add_days","Value":"1000","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"date_column_name","Value":"date","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_ds","NodeId":"-27"}],"OutputPortsInternal":[{"Name":"data","NodeId":"-27","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":3,"Comment":"","CommentCollapsed":true},{"Id":"-84","ModuleId":"BigQuantSpace.cached.cached-v3","ModuleParameters":[{"Name":"run","Value":"def bigquant_run(input_1, input_2, input_3=None, input_4=None, sort_by=None):\n # 重新构造 DataSource,不然输入input有重复的时候会出错\n inputs =[DataSource(i.id) for i in [input_1, input_2, input_3, input_4] if i is not None]\n input_stores = [i.open_df_store() for i in inputs]\n input_keys = [set(i.keys()) for i in input_stores]\n input_key_set = set()\n for keys in input_keys:\n input_key_set |= keys\n if sort_by:\n sort_by = sort_by.split(',')\n\n dataset_ds = DataSource()\n output_store = dataset_ds.open_df_store()\n\n for key in sorted(input_key_set):\n df_list = []\n for i, input_store in enumerate(input_stores):\n if key not in input_keys[i]:\n continue\n df_list.append(input_store[key])\n if len(df_list) == 1:\n df = df_list[0]\n else:\n df = pd.concat(df_list)\n if sort_by:\n df.sort_values(sort_by, inplace=True)\n df.to_hdf(output_store, key)\n row_count = len(df)\n print('%s: %s' % (key, len(df)))\n\n for i in inputs:\n i.close_df_store()\n dataset_ds.close_df_store()\n return Outputs(data=dataset_ds)","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"post_run","Value":"# 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。\ndef bigquant_run(outputs):\n return outputs\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"input_ports","Value":"input_1,input_2,input_3,input_4","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"params","Value":"{\n 'sort_by': 'date'\n}","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"output_ports","Value":"data","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_1","NodeId":"-84"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_2","NodeId":"-84"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_3","NodeId":"-84"}],"OutputPortsInternal":[{"Name":"data_1","NodeId":"-84","OutputType":null},{"Name":"data_2","NodeId":"-84","OutputType":null},{"Name":"data_3","NodeId":"-84","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":5,"Comment":"","CommentCollapsed":true},{"Id":"-36","ModuleId":"BigQuantSpace.concat_inputs.concat_inputs-v4","ModuleParameters":[{"Name":"sort_by","Value":"date","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_1","NodeId":"-36"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_2","NodeId":"-36"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_3","NodeId":"-36"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_4","NodeId":"-36"}],"OutputPortsInternal":[{"Name":"data","NodeId":"-36","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":1,"Comment":"","CommentCollapsed":true}],"SerializedClientData":"<?xml version='1.0' encoding='utf-16'?><DataV1 xmlns:xsd='http://www.w3.org/2001/XMLSchema' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'><Meta /><NodePositions><NodePosition Node='-185' Position='1114,4,200,200'/><NodePosition Node='-27' Position='1209,215,200,200'/><NodePosition Node='-84' Position='957,373,200,200'/><NodePosition Node='-36' Position='1318,424,200,200'/></NodePositions><NodeGroups /></DataV1>"},"IsDraft":true,"ParentExperimentId":null,"WebService":{"IsWebServiceExperiment":false,"Inputs":[],"Outputs":[],"Parameters":[{"Name":"交易日期","Value":"","ParameterDefinition":{"Name":"交易日期","FriendlyName":"交易日期","DefaultValue":"","ParameterType":"String","HasDefaultValue":true,"IsOptional":true,"ParameterRules":[],"HasRules":false,"MarkupType":0,"CredentialDescriptor":null}}],"WebServiceGroupId":null,"SerializedClientData":"<?xml version='1.0' encoding='utf-16'?><DataV1 xmlns:xsd='http://www.w3.org/2001/XMLSchema' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'><Meta /><NodePositions></NodePositions><NodeGroups /></DataV1>"},"DisableNodesUpdate":false,"Category":"user","Tags":[],"IsPartialRun":true}
    In [43]:
    # 本代码由可视化策略环境自动生成 2018年8月8日 20:11
    # 本代码单元只能在可视化模式下编辑。您也可以拷贝代码,粘贴到新建的代码单元或者策略,然后修改。
    
    
    m4 = M.use_datasource.v1(
        datasource_id='bar1d_CN_STOCK_A',
        start_date='2018-01-01',
        end_date='2018-02-01'
    )
    
    m3 = M.date_add_days.v2(
        input_ds=m4.data,
        add_days=1000,
        date_column_name='date'
    )
    
    def m5_run_bigquant_run(input_1, input_2, input_3=None, input_4=None, sort_by=None):
        # 重新构造 DataSource,不然输入input有重复的时候会出错
        inputs =[DataSource(i.id) for i in [input_1, input_2, input_3, input_4] if i is not None]
        input_stores = [i.open_df_store() for i in inputs]
        input_keys = [set(i.keys()) for i in input_stores]
        input_key_set = set()
        for keys in input_keys:
            input_key_set |= keys
        if sort_by:
            sort_by = sort_by.split(',')
    
        dataset_ds = DataSource()
        output_store = dataset_ds.open_df_store()
    
        for key in sorted(input_key_set):
            df_list = []
            for i, input_store in enumerate(input_stores):
                if key not in input_keys[i]:
                    continue
                df_list.append(input_store[key])
            if len(df_list) == 1:
                df = df_list[0]
            else:
                df = pd.concat(df_list)
            if sort_by:
                df.sort_values(sort_by, inplace=True)
            df.to_hdf(output_store, key)
            row_count = len(df)
            print('%s: %s' % (key, len(df)))
    
        for i in inputs:
            i.close_df_store()
        dataset_ds.close_df_store()
        return Outputs(data=dataset_ds)
    # 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。
    def m5_post_run_bigquant_run(outputs):
        return outputs
    
    m5 = M.cached.v3(
        input_1=m4.data,
        input_2=m3.data,
        input_3=m3.data,
        run=m5_run_bigquant_run,
        post_run=m5_post_run_bigquant_run,
        input_ports='input_1,input_2,input_3,input_4',
        params="""{
        'sort_by': 'date'
    }""",
        output_ports='data'
    )
    
    m1 = M.concat_inputs.v4(
        input_1=m4.data,
        input_2=m3.data,
        input_3=m3.data,
        sort_by='date'
    )
    
    [2018-08-08 20:10:12.042544] INFO: bigquant: use_datasource.v1 开始运行..
    [2018-08-08 20:10:12.052905] INFO: bigquant: 命中缓存
    [2018-08-08 20:10:12.054221] INFO: bigquant: use_datasource.v1 运行完成[0.011701s].
    [2018-08-08 20:10:12.058173] INFO: bigquant: date_add_days.v2 开始运行..
    [2018-08-08 20:10:12.062087] INFO: bigquant: 命中缓存
    [2018-08-08 20:10:12.063242] INFO: bigquant: date_add_days.v2 运行完成[0.005086s].
    [2018-08-08 20:10:12.067022] INFO: bigquant: concat_inputs.v4 开始运行..
    [2018-08-08 20:10:12.070005] INFO: bigquant: 命中缓存
    [2018-08-08 20:10:12.071710] INFO: bigquant: concat_inputs.v4 运行完成[0.004694s].
    
    In [44]:
    m5.data.read_df().head()
    
    Out[44]:
    adjust_factor amount close instrument deal_number date high low open turn volume
    0 15.185156 96262413.0 42.973991 002505.SZA 12795 2018-01-02 43.277695 42.518436 43.125843 0.621448 34085236.0
    2331 8.262311 26748124.0 31.975143 300048.SZA 2015 2018-01-02 31.975143 31.479404 31.562029 0.716238 6963410.0
    2332 2.619729 154280480.0 30.284067 002307.SZA 10488 2018-01-02 30.519842 29.629135 29.655333 2.406188 13410151.0
    2333 8.399022 31958028.0 55.265564 600105.SHA 4013 2018-01-02 55.769505 54.845615 55.769505 0.637788 4859368.0
    2334 3.765774 7777285.0 22.707617 600778.SHA 648 2018-01-02 22.782932 22.331039 22.669960 0.417038 1297350.0