复制链接
克隆策略

    {"description":"实验创建于2022/9/13","graph":{"edges":[],"nodes":[{"node_id":"-8138","module_id":"BigQuantSpace.hyper_rolling_train.hyper_rolling_train-v1","parameters":[{"name":"run","value":"def bigquant_run(\n bq_graph,\n inputs,\n trading_days_market='CN', # 使用那个市场的交易日历, TODO\n train_instruments_mid='m1', # 训练数据 证券代码列表 模块id\n test_instruments_mid='m9', # 测试数据 证券代码列表 模块id\n predict_mid='m8', # 预测 模块id\n trade_mid='m19', # 回测 模块id\n start_date='2014-01-01', # 数据开始日期\n end_date=T.live_run_param('trading_date', '2017-01-01'), # 数据结束日期\n train_update_days=250, # 更新周期,按交易日计算,每多少天更新一次\n train_update_days_for_live=None, #模拟实盘模式下的更新周期,按交易日计算,每多少天更新一次。如果需要在模拟实盘阶段使用不同的模型更新周期,可以设置这个参数\n train_data_min_days=250, # 最小数据天数,按交易日计算,所以第一个滚动的结束日期是 从开始日期到开始日期+最小数据天数\n train_data_max_days=250, # 最大数据天数,按交易日计算,0,表示没有限制,否则每一个滚动的开始日期=max(此滚动的结束日期-最大数据天数, 开始日期\n rolling_count_for_live=1, #实盘模式下滚动次数,模拟实盘模式下,取最后多少次滚动。一般在模拟实盘模式下,只用到最后一次滚动训练的模型,这里可以设置为1;如果你的滚动训练数据时间段很短,以至于期间可能没有训练数据,这里可以设置大一点。0表示没有限制\n):\n def merge_datasources(input_1):\n df_list = [ds[0].read_df().set_index('date').loc[ds[1]:].reset_index() for ds in input_1]\n df = pd.concat(df_list)\n instrument_data = {\n 'start_date': df['date'].min().strftime('%Y-%m-%d'),\n 'end_date': df['date'].max().strftime('%Y-%m-%d'),\n 'instruments': list(set(df['instrument'])),\n }\n return Outputs(data=DataSource.write_df(df), instrument_data=DataSource.write_pickle(instrument_data))\n\n def gen_rolling_dates(trading_days_market, start_date, end_date, train_update_days, train_update_days_for_live, train_data_min_days, train_data_max_days, rolling_count_for_live):\n # 是否实盘模式\n tdays = list(D.trading_days(market=trading_days_market, start_date=start_date, end_date=end_date)['date'])\n is_live_run = T.live_run_param('trading_date', None) is not None\n\n if is_live_run and train_update_days_for_live:\n train_update_days = train_update_days_for_live\n\n rollings = []\n train_end_date = train_data_min_days\n while train_end_date < len(tdays):\n if train_data_max_days is not None and train_data_max_days > 0:\n train_start_date = max(train_end_date - train_data_max_days, 0)\n else:\n train_start_date = 0\n rollings.append({\n 'train_start_date': tdays[train_start_date].strftime('%Y-%m-%d'),\n 'train_end_date': tdays[train_end_date - 1].strftime('%Y-%m-%d'),\n 'test_start_date': tdays[train_end_date].strftime('%Y-%m-%d'),\n 'test_end_date': tdays[min(train_end_date + train_update_days, len(tdays)) - 1].strftime('%Y-%m-%d'),\n })\n train_end_date += train_update_days\n\n if not rollings:\n raise Exception('没有滚动需要执行,请检查配置')\n\n if is_live_run and rolling_count_for_live:\n rollings = rollings[-rolling_count_for_live:]\n\n return rollings\n\n g = bq_graph\n\n rolling_dates = gen_rolling_dates(\n trading_days_market, start_date, end_date, train_update_days, train_update_days_for_live, train_data_min_days, train_data_max_days, rolling_count_for_live)\n\n # 训练和预测\n results = []\n for rolling in rolling_dates:\n parameters = {}\n # 先禁用回测\n parameters[trade_mid + '.__enabled__'] = False\n parameters[train_instruments_mid + '.start_date'] = rolling['train_start_date']\n parameters[train_instruments_mid + '.end_date'] = rolling['train_end_date']\n parameters[test_instruments_mid + '.start_date'] = rolling['test_start_date']\n parameters[test_instruments_mid + '.end_date'] = rolling['test_end_date']\n # print('------ rolling_train:', parameters)\n results.append(g.run(parameters))\n\n # 合并预测结果并回测\n mx = M.cached.v3(run=merge_datasources, input_1=[[result[predict_mid].predictions, result[test_instruments_mid].data.read_pickle()['start_date']] for result in results])\n parameters = {}\n parameters['*.__enabled__'] = False\n parameters[trade_mid + '.__enabled__'] = True\n parameters[trade_mid + '.instruments'] = mx.instrument_data\n parameters[trade_mid + '.options_data'] = mx.data\n\n trade = g.run(parameters)\n\n return {'rollings': results, 'trade': trade}\n","type":"Literal","bound_global_parameter":null},{"name":"run_now","value":"True","type":"Literal","bound_global_parameter":null},{"name":"bq_graph","value":"True","type":"Literal","bound_global_parameter":null}],"input_ports":[{"name":"bq_graph_port","node_id":"-8138"},{"name":"input_1","node_id":"-8138"},{"name":"input_2","node_id":"-8138"},{"name":"input_3","node_id":"-8138"}],"output_ports":[{"name":"result","node_id":"-8138"}],"cacheable":false,"seq_num":1,"comment":"","comment_collapsed":true}],"node_layout":"<node_postions><node_position Node='-8138' Position='270.59375,297,200,200'/></node_postions>"},"nodes_readonly":false,"studio_version":"v2"}
    In [ ]:
    # 本代码由可视化策略环境自动生成 2022年9月13日 11:59
    # 本代码单元只能在可视化模式下编辑。您也可以拷贝代码,粘贴到新建的代码单元或者策略,然后修改。
    
    
    g = T.Graph({
    })
    
    # g.run({})
    
    
    def m1_run_bigquant_run(
        bq_graph,
        inputs,
        trading_days_market='CN', # 使用那个市场的交易日历, TODO
        train_instruments_mid='m1', # 训练数据 证券代码列表 模块id
        test_instruments_mid='m9', # 测试数据 证券代码列表 模块id
        predict_mid='m8', # 预测 模块id
        trade_mid='m19', # 回测 模块id
        start_date='2014-01-01', # 数据开始日期
        end_date=T.live_run_param('trading_date', '2017-01-01'), # 数据结束日期
        train_update_days=250, # 更新周期,按交易日计算,每多少天更新一次
        train_update_days_for_live=None, #模拟实盘模式下的更新周期,按交易日计算,每多少天更新一次。如果需要在模拟实盘阶段使用不同的模型更新周期,可以设置这个参数
        train_data_min_days=250, # 最小数据天数,按交易日计算,所以第一个滚动的结束日期是 从开始日期到开始日期+最小数据天数
        train_data_max_days=250, # 最大数据天数,按交易日计算,0,表示没有限制,否则每一个滚动的开始日期=max(此滚动的结束日期-最大数据天数, 开始日期
        rolling_count_for_live=1, #实盘模式下滚动次数,模拟实盘模式下,取最后多少次滚动。一般在模拟实盘模式下,只用到最后一次滚动训练的模型,这里可以设置为1;如果你的滚动训练数据时间段很短,以至于期间可能没有训练数据,这里可以设置大一点。0表示没有限制
    ):
        def merge_datasources(input_1):
            df_list = [ds[0].read_df().set_index('date').loc[ds[1]:].reset_index() for ds in input_1]
            df = pd.concat(df_list)
            instrument_data = {
                'start_date': df['date'].min().strftime('%Y-%m-%d'),
                'end_date': df['date'].max().strftime('%Y-%m-%d'),
                'instruments': list(set(df['instrument'])),
            }
            return Outputs(data=DataSource.write_df(df), instrument_data=DataSource.write_pickle(instrument_data))
    
        def gen_rolling_dates(trading_days_market, start_date, end_date, train_update_days, train_update_days_for_live, train_data_min_days, train_data_max_days, rolling_count_for_live):
            # 是否实盘模式
            tdays = list(D.trading_days(market=trading_days_market, start_date=start_date, end_date=end_date)['date'])
            is_live_run = T.live_run_param('trading_date', None) is not None
    
            if is_live_run and train_update_days_for_live:
                train_update_days = train_update_days_for_live
    
            rollings = []
            train_end_date = train_data_min_days
            while train_end_date < len(tdays):
                if train_data_max_days is not None and train_data_max_days > 0:
                    train_start_date = max(train_end_date - train_data_max_days, 0)
                else:
                    train_start_date = 0
                rollings.append({
                    'train_start_date': tdays[train_start_date].strftime('%Y-%m-%d'),
                    'train_end_date': tdays[train_end_date - 1].strftime('%Y-%m-%d'),
                    'test_start_date': tdays[train_end_date].strftime('%Y-%m-%d'),
                    'test_end_date': tdays[min(train_end_date + train_update_days, len(tdays)) - 1].strftime('%Y-%m-%d'),
                })
                train_end_date += train_update_days
    
            if not rollings:
                raise Exception('没有滚动需要执行,请检查配置')
    
            if is_live_run and rolling_count_for_live:
                rollings = rollings[-rolling_count_for_live:]
    
            return rollings
    
        g = bq_graph
    
        rolling_dates = gen_rolling_dates(
            trading_days_market, start_date, end_date, train_update_days, train_update_days_for_live, train_data_min_days, train_data_max_days, rolling_count_for_live)
    
        # 训练和预测
        results = []
        for rolling in rolling_dates:
            parameters = {}
            # 先禁用回测
            parameters[trade_mid + '.__enabled__'] = False
            parameters[train_instruments_mid + '.start_date'] = rolling['train_start_date']
            parameters[train_instruments_mid + '.end_date'] = rolling['train_end_date']
            parameters[test_instruments_mid + '.start_date'] = rolling['test_start_date']
            parameters[test_instruments_mid + '.end_date'] = rolling['test_end_date']
            # print('------ rolling_train:', parameters)
            results.append(g.run(parameters))
    
        # 合并预测结果并回测
        mx = M.cached.v3(run=merge_datasources, input_1=[[result[predict_mid].predictions, result[test_instruments_mid].data.read_pickle()['start_date']] for result in results])
        parameters = {}
        parameters['*.__enabled__'] = False
        parameters[trade_mid + '.__enabled__'] = True
        parameters[trade_mid + '.instruments'] = mx.instrument_data
        parameters[trade_mid + '.options_data'] = mx.data
    
        trade = g.run(parameters)
    
        return {'rollings': results, 'trade': trade}
    
    
    m1 = M.hyper_rolling_train.v1(
        run=m1_run_bigquant_run,
        run_now=True,
        bq_graph=g
    )