复制链接
克隆策略
In [2]:
class conf:
    start_date = '2010-01-01'#2016-04-20   '2016-10-27'  2017-03-16  2016-02-26
    end_date = '2015-01-01'
    start_trade = '2017-01-01'
    end_trade='2018-10-22'
    filter1='market_cap<30000000000'#市值设置成300亿,股票才可能有有大的变化
    rankStocks=100 #选预测的前100个股票
    isUseCepTaoTai=False #是否使用cep加权值判断
    isUseCepShaiXuan=False # 是否筛选不在核心Cep的股票
    cepShaiXuan=0.05#筛选分位
    isUseRisk=True
    isUseGoOnHold=True
    
# 回测引擎:每日数据处理函数,每天执行一次
def m19_handle_data_bigquant_run(context, data):
#     # 1. 获取今日的日期
#     today = data.current_dt.strftime('%Y-%m-%d')  
#     # 2. 按日期过滤得到今日的预测数据
    ranker_prediction = context.ranker_prediction[
        context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]
#     # 3. 获取当前持仓的股票
#     stock_hold_now = {e.symbol: p.amount * p.last_sale_price
#                       for e, p in context.perf_tracker.position_tracker.positions.items()} 
#     # 4. 获取可用现金,需要确认卖出是早盘还是尾盘
#     cash_for_buy = context.portfolio.cash   
    
    today = data.current_dt.strftime('%Y-%m-%d')
    stock_hold_now = [equity.symbol for equity in context.portfolio.positions ]
    
    positions = {e.symbol: p.amount * p.last_sale_price
                 for e, p in context.portfolio.positions.items()}
    #大盘风控模块,读取风控数据   

    print('data='+str(today))
#     交易量均值下降
  

     #------------------------------------------止损模块START--------------------------------------------
    equities = {e.symbol: p for e, p in context.portfolio.positions.items() if p.amount>0}
    
    # 新建当日止损股票列表是为了handle_data 策略逻辑部分不再对该股票进行判断
    stoploss_stock = [] 
    dontsell = [] 
    
    #-------------------------------------------止损模块END---------------------------------------------    
#     print('rank='+str(ranker_prediction))
    mydict = {}
    for i, cep in enumerate(ranker_prediction['concept']):
        array = cep.split(";")
#         myindex = ranker_prediction['concept'].index
#         print('myindex='+str(myindex))
        for cepName in array:
                if cepName in mydict.keys():
                    try:
                        if ranker_prediction['open'].iloc[i] == 0:
                            continue
                        if (ranker_prediction['close'].iloc[i] - ranker_prediction['open'].iloc[i]) == 0:
                            continue
                        oldvalue = mydict[cepName]
                        if oldvalue == 0:
                            newvalue = (ranker_prediction['close'].iloc[i] - ranker_prediction['open'].iloc[i])/ranker_prediction['open'].iloc[i]
                        else:
                            newvalue = (oldvalue + ((ranker_prediction['close'].iloc[i] - ranker_prediction['open'].iloc[i])/ranker_prediction['open'].iloc[i]))/2
                        
                        mydict[cepName] = newvalue
                    except Exception as e:
                        print('1--!='+str(e.args))
                else:
                    try:
                        if ranker_prediction['open'].iloc[i] == 0:
                            continue
                        if (ranker_prediction['close'].iloc[i] - ranker_prediction['open'].iloc[i]) == 0:
                            continue
                        mydict[cepName] =  ((ranker_prediction['close'].iloc[i] - ranker_prediction['open'].iloc[i])/ranker_prediction['open'].iloc[i])
                    except Exception as e:
                        print('2--!='+str(e.args)+'   '+cepName + '   ' )
                          
#                         print('open:'+str(ranker_prediction['open']))
                        continue
#                         print('2--!='+str(e.args)+'   '+cepName + '   '+ )
   
    a1 = dict(sorted(mydict.items(), key=lambda x: x[1]) )
#     del a1['超涨']
    a2 = dict(reversed(a1.items()))
    
    ceparray = a2.keys()
    coreCep = []
    limitCount = 0
    for key in a2.keys():
        coreCep.append(key)
        limitCount = limitCount + 1
        if a2[key] < conf.cepShaiXuan:
            break
        
    #增加这种方式的意思是,如果今天得分不高,就不买了,如果得分都高,选一个能买入的买
    biggerThanOneNum = 0;
    for score in ranker_prediction['score'].values:
#         print('buy='+str(score))
        if score > 0.8:
            biggerThanOneNum = biggerThanOneNum + 1
#     print('buy='+str(biggerThanOneNum))
   

    # 读取数据  默认会返回全部证券代码数据, 通过指定参数 instruments 可以读取到指定的证券代码数据 d.has_key('name')  reversed(test1.items())
#     df = DataSource("net_amount_CN_STOCK_A").read()
    
    # 1. 资金分配
    # 平均持仓时间是hold_days,每日都将买入股票,每日预期使用 1/hold_days 的资金
    # 实际操作中,会存在一定的买入误差,所以在前hold_days天,等量使用资金;之后,尽量使用剩余资金(这里设置最多用等量的1.5倍)
    is_staging = context.trading_day_index < context.options['hold_days'] # 是否在建仓期间(前 hold_days 天)
    cash_avg = context.portfolio.portfolio_value / context.options['hold_days']
    cash_for_buy = min(context.portfolio.cash, (1 if is_staging else 1.5) * cash_avg)
    cash_for_sell = cash_avg - (context.portfolio.cash - cash_for_buy)
#     positions = {e.symbol: p.amount * p.last_sale_price
#                  for e, p in context.portfolio.positions.items()}
    
    # 2. 生成卖出订单:hold_days天之后才开始卖出;对持仓的股票,按机器学习算法预测的排序末位淘汰
    if not is_staging and cash_for_sell > 0:
#         equities = {e.symbol: e for e, p in context.portfolio.positions.items()}
        instruments = list(reversed(list(ranker_prediction.instrument[ranker_prediction.instrument.apply(
                lambda x: x in equities)])))
#         print('instruments='+str(instruments))
        for instrument in positions.keys():
            if instrument in dontsell:
                print("本次不卖出"+str(instrument))
                continue
            if instrument in ranker_prediction.instrument:
                for i, ins in ranker_prediction.instrument:
                    if ins == instrument:
                        print('====sell_low:'+str(ranker_prediction['concept'].iloc[i]))  
            context.order_target(context.symbol(instrument), 0)
            
            
#         for instrument in instruments:
#             if list(ranker_prediction.instrument).index(instrument) < 5:
#                 print('继续持有:'+str(instrument))
#                 break
#             print('sell='+str(instrument))
#             context.order_target(context.symbol(instrument), 0)
#             cash_for_sell -= positions[instrument]
#             if cash_for_sell <= 0:
#                 break

    # 3. 生成买入订单:按机器学习算法预测的排序,买入前面的stock_count只股票
    buy_cash_weights = context.stock_weights

    buy_instruments = list(ranker_prediction.instrument[:conf.rankStocks])
    
    max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
    for i, instrument in enumerate(buy_instruments):
#         buy_low = ranker_prediction[instrument,'low'];
        buy_cep = ranker_prediction['concept'].iloc[i]
        buy_turn = ranker_prediction['turn'].iloc[i]
        buy_score = ranker_prediction['score'].iloc[i]
#         if buy_score < 0.9:
#             continue
#         if buy_turn < 0.8:
#             continue
        buy_array = buy_cep.split(";")
        countAvg = 0
        countNum = 0
        for buyName in array:
            if buyName in mydict.keys():
                countNum = countNum + 1
                countAvg = countAvg + mydict[buyName]
            else:
                print('no found:'+buyName)
        if countAvg < 0:
            if conf.isUseCepTaoTai:
                print('淘汰cep:'+str(instrument)+buy_cep)
                break   
        if conf.isUseCepShaiXuan:
            isinCep = 0
            for ccep in coreCep:
                if ccep in ranker_prediction['concept'].iloc[i]:
                    isinCep = 1
            if  isinCep == 0 and len(coreCep) > 3:
                print('未命中:'+str(coreCep))
                break
        cash = cash_for_buy * 1
#         print('equities='+str(instrument))
        if cash > max_cash_per_instrument - positions.get(instrument, 0):
            # 确保股票持仓量不会超过每次股票最大的占用资金量
            cash = max_cash_per_instrument - positions.get(instrument, 0)
            if benckmark_risk2 > 0:
                cash = cash/2;
        if cash > 0:
#             print('buy='+str(df.get(instrument)))
            context.order_value(context.symbol(instrument), cash)
            break
        

# 回测引擎:准备数据,只执行一次 增加一个风险控制模块,风险条件还得研究一下怎么写
# ta_macd_dif(close, fastperiod=12, slowperiod=26, signalperiod=9),#指数平滑移动平均线

# ta_macd_dea(close, fastperiod=12, slowperiod=26, signalperiod=9),#DIF的N日(默认9日)指数平滑移动平均线
def m19_prepare_bigquant_run(context):
    #在数据准备函数中一次性计算每日的大盘风控条件相比于在handle中每日计算风控条件可以提高回测速度
    # 多取50天的数据便于计算均值(保证回测的第一天均值不为Nan值),其中context.start_date和context.end_date是回测指定的起始时间和终止时间
    start_date= (pd.to_datetime(context.start_date) - datetime.timedelta(days=50)).strftime('%Y-%m-%d') 
    df=DataSource('bar1d_index_CN_STOCK_A').read(start_date=start_date,end_date=context.end_date,fields=['close'])
    benckmark_data=df[df.instrument=='000001.HIX']
#     where(ta_macd_dif(close,2,4,4)-ta_macd_dea(close,2,4,4)<0,1,0)
    #计算上证指数5日涨幅
    benckmark_data['ret5']=benckmark_data['close']/benckmark_data['close'].shift(5)-1
    #计算大盘风控条件,如果5日涨幅小于-4%则设置风险状态risk为1,否则为0
    benckmark_data['risk'] = np.where(benckmark_data['ret5']<-0.04,1,0)
    #修改日期格式为字符串(便于在handle中使用字符串日期索引来查看每日的风险状态)
    benckmark_data['date']=benckmark_data['date'].apply(lambda x:x.strftime('%Y-%m-%d'))
    #设置日期为索引
    benckmark_data.set_index('date',inplace=True)
    #把风控序列输出给全局变量context.benckmark_risk
    context.benckmark_risk=benckmark_data['risk']

# 回测引擎:初始化函数,只执行一次
def m19_initialize_bigquant_run(context):
    # 加载预测数据
    context.ranker_prediction = context.options['data'].read_df()

    # 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
    context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
    # 预测数据,通过options传入进来,使用 read_df 函数,加载到内存 (DataFrame)
    # 设置买入的股票数量,这里买入预测股票列表排名靠前的5只
    stock_count = 1
    # 每只的股票的权重,如下的权重分配会使得靠前的股票分配多一点的资金,[0.339160, 0.213986, 0.169580, ..]
    context.stock_weights = [1]
    # 设置每只股票占用的最大资金比例
    context.max_cash_per_instrument = 0.6
    # 5天下降至3天,从56%增长至76%
    context.options['hold_days'] = 1
    ##########################




g = T.Graph({

    'm1': 'M.instruments.v2',
    'm1.start_date': conf.start_date,
    'm1.end_date': conf.end_date,
    'm1.market': 'CN_STOCK_A',
    'm1.instrument_list': '',
    'm1.max_count': 0,
    
    'm61': 'M.use_datasource.v1',
    'm61.instruments': T.Graph.OutputPort('m1.data'),
    'm61.datasource_id':'market_value_CN_STOCK_A',
    'm61.start_date': '',
    'm61.end_date': '',
   
    'm62': 'M.filter.v3',
    'm62.input_data': T.Graph.OutputPort('m61.data'),
    'm62.expr':conf.filter1,
    'm62.output_left_data':False,
    
    'm63' :'M.select_columns.v3',
    'm63.input_ds': T.Graph.OutputPort('m62.data'),
    'm63.columns':'date,instrument',
    'm63.reverse_select':False,


    'm2': 'M.advanced_auto_labeler.v2',
    'm2.instruments': T.Graph.OutputPort('m1.data'),
    'm2.label_expr': """# #号开始的表示注释
# 0. 每行一个,顺序执行,从第二个开始,可以使用label字段
# 1. 可用数据字段见 https://bigquant.com/docs/data_history_data.html
#   添加benchmark_前缀,可使用对应的benchmark数据
# 2. 可用操作符和函数见 `表达式引擎 <https://bigquant.com/docs/big_expr.html>`_

# 计算收益:5日收盘价(作为卖出价格)除以明日开盘价(作为买入价格)
shift(close, -5) / shift(open, -1)

# 极值处理:用1%和99%分位的值做clip
clip(label, all_quantile(label, 0.01), all_quantile(label, 0.99))

# 将分数映射到分类,这里使用20个分类
all_wbins(label, 20)

# 过滤掉一字涨停的情况 (设置label为NaN,在后续处理和训练中会忽略NaN的label)
where(shift(high, -1) == shift(low, -1), NaN, label)
""",
    'm2.start_date': '',
    'm2.end_date': '',
    'm2.benchmark': '000300.SHA',
    'm2.drop_na_label': True,
    'm2.cast_label_int': True,

    'm3': 'M.input_features.v1',
    'm3.features': """# #号开始的表示注释
# 多个特征,每行一个,可以包含基础特征和衍生特征
rank_avg_mf_net_amount_0/rank_avg_mf_net_amount_5
rank_avg_mf_net_amount_5/rank_avg_mf_net_amount_10
return_5/return_20

rank(mean(mf_net_amount_l_0,5))/rank(mean(mf_net_amount_l_0,10))
correlation(sqrt(volume_0),return_0,5)
correlation(log(volume_0),abs(return_0-1),5)
(close_0-close_30)/close_30>1.25
(close_0-close_5)/close_5>1.16
ta_bbands_middleband_28_0""",
    

    'm25': 'M.input_features.v1',
    'm25.features_ds': T.Graph.OutputPort('m3.data'),
    'm25.features': """# #号开始的表示注释
# 多个特征,每行一个,可以包含基础特征和衍生特征
open_1
close_1
close_0
high_1
open_0
low_0
price_limit_status_0
volume_0
open_0/close_1
cond3=low_0 > mean(close_0,20)
#(今日收盘价-昨日收盘价)/昨日收盘价*100%
cond1=ta_trix(close_0, derive='long')
cond2=ta_dma(close_0, 'long')
#----当日最低价 站稳60日线
cond3=low_0 > mean(close_0,20)
#(今日收盘价-昨日收盘价)/昨日收盘价*100%
#cond4=(close_0-close_1)/close_1 >0.04
cond4=ta_dma(close_0, 'long')
cond6=low_0 > mean(close_0,20)
cond7=ta_macd(close_0,'long')
cond8=ta_ma(close_0,5, derive='long')
buy_condition=(close_0>=ts_max(close_0,20))
""",
    
    
    'm15': 'M.general_feature_extractor.v7',
    'm15.instruments': T.Graph.OutputPort('m1.data'),
    'm15.features': T.Graph.OutputPort('m25.data'),
    'm15.start_date': '',
    'm15.end_date': '',
    'm15.before_start_days': 120,

    'm16': 'M.derived_feature_extractor.v3',
    'm16.input_data': T.Graph.OutputPort('m15.data'),
    'm16.features': T.Graph.OutputPort('m25.data'),
    'm16.date_col': 'date',
    'm16.instrument_col': 'instrument',
    'm16.drop_na': False,
    'm16.remove_extra_columns': False,

    'm7': 'M.join.v3',
    'm7.data1': T.Graph.OutputPort('m2.data'),
    'm7.data2': T.Graph.OutputPort('m16.data'),
    'm7.on': 'date,instrument',
    'm7.how': 'inner',
    'm7.sort': False,
    
    
    'm5': 'M.chinaa_stock_filter.v1',
    'm5.input_data':T.Graph.OutputPort('m7.data'),
    'm5.index_constituent_cond':['全部'],
    'm5.board_cond':[ '上证主板', '深证主板'],
    'm5.industry_cond':['全部'],
    'm5.st_cond':['正常'],
    'm5.delist_cond':['非退市'],
    'm5.output_left_data':False,
    
    'm72': 'M.filter.v3',
    'm72.input_data':T.Graph.OutputPort('m5.data'),
    'm72.expr':' cond4 and  cond6 and cond7 and cond8 ',
    'm72.output_left_data':False,


    'm74': 'M.dropnan.v1',
    'm74.input_data':T.Graph.OutputPort('m5.data'),

    'm6': 'M.stock_ranker_train.v6',
    'm6.training_ds': T.Graph.OutputPort('m74.data'),
    'm6.features': T.Graph.OutputPort('m3.data'),
    'm6.learning_algorithm': '排序',
    'm6.number_of_leaves': 30,
    'm6.minimum_docs_per_leaf': 1000,
    'm6.number_of_trees': 20,
    'm6.learning_rate': 0.1,
    'm6.max_bins': 1023,
    'm6.feature_fraction': 1,
    'm6.data_row_fraction': 1,
    'm6.plot_charts': True,
    'm6.ndcg_discount_base': 1,
    'm6.m_lazy_run': False,

    'm9': 'M.instruments.v2',
    'm9.start_date': T.live_run_param('trading_date', conf.start_trade),
    'm9.end_date': T.live_run_param('trading_date', conf.end_trade),
    'm9.market': 'CN_STOCK_A',
    'm9.instrument_list': '',
    'm9.max_count': 0,
    
    'm17': 'M.general_feature_extractor.v7',
    'm17.instruments': T.Graph.OutputPort('m9.data'),
    'm17.features': T.Graph.OutputPort('m25.data'),
    'm17.start_date': '',
    'm17.end_date': '',
    'm17.before_start_days': 120,

    'm18': 'M.derived_feature_extractor.v3',
    'm18.input_data': T.Graph.OutputPort('m17.data'),
    'm18.features': T.Graph.OutputPort('m25.data'),
    'm18.date_col': 'date',
    'm18.instrument_col': 'instrument',
    'm18.drop_na': False,
    'm18.remove_extra_columns': False,
    
    'm45': 'M.filter.v3',
    'm45.input_data':T.Graph.OutputPort('m18.data'),
    'm45.expr':' cond4 and  cond6 and cond7 and cond8 ',
    'm45.output_left_data':False,
    
    'm41': 'M.use_datasource.v1',
    'm41.instruments': T.Graph.OutputPort('m9.data'),
    'm41.datasource_id':'market_value_CN_STOCK_A',
    'm41.start_date': '',
    'm41.end_date': '',
    

    'm42': 'M.filter.v3',
    'm42.input_data': T.Graph.OutputPort('m41.data'),
    'm42.expr':conf.filter1,
    'm42.output_left_data':False,

    'm55': 'M.chinaa_stock_filter.v1',
    'm55.input_data':T.Graph.OutputPort('m42.data'),
    'm55.index_constituent_cond':['全部'],
    'm55.board_cond':[ '上证主板', '深证主板'],
    'm55.industry_cond':['全部'],
    'm55.st_cond':['正常'],
    'm55.delist_cond':['非退市'],
    'm55.output_left_data':False,
    
    'm43':'M.select_columns.v3',
    'm43.input_ds':T.Graph.OutputPort('m55.data'),
    'm43.columns':'date,instrument',
    'm43.reverse_select':False,
    
    'm48': 'M.join.v3',
    'm48.data1': T.Graph.OutputPort('m45.data'),
    'm48.data2': T.Graph.OutputPort('m43.data'),
    'm48.on': 'date,instrument',
    'm48.how': 'inner',
    'm48.sort': False,
    'm49': 'M.dropnan.v1',
    'm49.input_data':T.Graph.OutputPort('m48.data'),
    

    'm8': 'M.stock_ranker_predict.v5',
    'm8.model': T.Graph.OutputPort('m6.model'),
    'm8.data': T.Graph.OutputPort('m49.data'),
    'm8.m_lazy_run': False,
    
    'm84': 'M.input_features.v1',
    'm84.features': """# #号开始的表示注释
# 多个特征,每行一个,可以包含基础特征和衍生特征
bm_0=where(mean(volume, 5)-mean(volume, 10)<0,1,0)
bm_2=where(ta_macd_dif(close,2,4,4)-ta_macd_dea(close,2,4,4)<0,1,0)
bm_1=where(mean(amount, 5)-mean(amount, 10)<0,1,0)
bm_3=where(ta_3red_soldiers(high, low, close, open),1,0)
bm_4=where(ta_3red_soldiers(high, low, close, open),1,0)
bm_5=where(ta_hammer(high, low, close, open),1,0)
bm_6=where(ta_inverted_hammer(high, low, close, open),1,0)
bm_7=where(ta_2crows(high, low, close, open),1,0)
bm_8=where(ta_3black_crows(high, low, close, open),1,0)
bm_9=where(ta_morning_star(high, low, close, open),1,0)
bm_10=where(ta_evening_star(high, low, close, open),1,0)
bm_11=where(ta_dark_cloud_cover(high, low, close, open),1,0)
bm_12=where(ta_shooting_star(high, low, close, open),1,0)
""",
    
    'm85':'M.index_feature_extract.v3',
    'm85.input_1':T.Graph.OutputPort('m9.data'),
    'm85.input_2':T.Graph.OutputPort('m84.data'),
    'm85.before_days':90,
    'm85.index':'000300.HIX',
    
    'm71': 'M.use_datasource.v1',
    'm71.instruments': T.Graph.OutputPort('m9.data'),
    'm71.datasource_id': 'industry_CN_STOCK_A',
    'm41.start_date': '',
    'm71.end_date': '',
    
    'm73' :'M.select_columns.v3',
    'm73.input_ds': T.Graph.OutputPort('m71.data'),
    'm73.columns':'date,instrument,concept',
    'm73.reverse_select':False,
   
    'm78': 'M.join.v3',
    'm78.data1': T.Graph.OutputPort('m8.predictions'),
    'm78.data2': T.Graph.OutputPort('m73.data'),
    'm78.on': 'date,instrument',
    'm78.how': 'left',
    'm78.sort': False,
    
    'm61': 'M.use_datasource.v1',
    'm61.instruments': T.Graph.OutputPort('m9.data'),
    'm61.datasource_id': 'bar1d_CN_STOCK_A',
    'm61.start_date': '',
    'm61.end_date': '',
    
    'm63' :'M.select_columns.v3',
    'm63.input_ds': T.Graph.OutputPort('m61.data'),
    'm63.columns':'date,instrument,open,close,high,low,turn',
    'm63.reverse_select':False,
    
    'm68': 'M.join.v3',
    'm68.data1': T.Graph.OutputPort('m78.data'),
    'm68.data2': T.Graph.OutputPort('m63.data'),
    'm68.on': 'date,instrument',
    'm68.how': 'left',
    'm68.sort': False,
    
    'm90': 'M.join.v3',
    'm90.data1': T.Graph.OutputPort('m68.data'),
    'm90.data2': T.Graph.OutputPort('m85.data_1'),
    'm90.on': 'date',
    'm90.how': 'left',
    'm90.sort': False,
    
    
    
    'm19': 'M.trade.v4',
    'm19.instruments': T.Graph.OutputPort('m9.data'),
    'm19.options_data': T.Graph.OutputPort('m90.data'),
    'm19.start_date': '',
    'm19.end_date': '',
    'm19.initialize': m19_initialize_bigquant_run,
    'm19.handle_data': m19_handle_data_bigquant_run,
    'm19.prepare': m19_prepare_bigquant_run,
    'm19.volume_limit': 0.025,
    'm19.order_price_field_buy': 'open',
    'm19.order_price_field_sell': 'close',
    'm19.capital_base': 30000,
    'm19.auto_cancel_non_tradable_orders': True,
    'm19.data_frequency': 'daily',
    'm19.price_type': '后复权',
    'm19.product_type': '股票',
    'm19.plot_charts': True,
    'm19.backtest_only': False,
    'm19.benchmark': '000300.HIX',
})


def m20_run_bigquant_run(
    bq_graph,
    inputs,
    trading_days_market='CN', # 使用那个市场的交易日历
    train_instruments_mid='m1', # 训练数据 证券代码列表 模块id
    test_instruments_mid='m9', # 测试数据 证券代码列表 模块id
    predict_mid='m8', # 预测 模块id
    trade_mid='m19', # 回测 模块id
    start_date='2016-01-01', # 数据开始日期
    end_date=T.live_run_param('trading_date',conf.end_trade), # 数据结束日期
    train_update_days=250, # 更新周期,按交易日计算,每多少天更新一次
    train_update_days_for_live=None, #模拟实盘模式下的更新周期,按交易日计算,每多少天更新一次。如果需要在模拟实盘阶段使用不同的模型更新周期,可以设置这个参数
    train_data_min_days=250, # 最小数据天数,按交易日计算,所以第一个滚动的结束日期是 从开始日期到开始日期+最小数据天数
    train_data_max_days=250, # 最大数据天数,按交易日计算,0,表示没有限制,否则每一个滚动的开始日期=max(此滚动的结束日期-最大数据天数, 开始日期
    rolling_count_for_live=1, #实盘模式下滚动次数,模拟实盘模式下,取最后多少次滚动。一般在模拟实盘模式下,只用到最后一次滚动训练的模型,这里可以设置为1;如果你的滚动训练数据时间段很短,以至于期间可能没有训练数据,这里可以设置大一点。0表示没有限制
):
    def merge_datasources(input_1):
        df_list = [ds[0].read_df().set_index('date').loc[ds[1]:].reset_index() for ds in input_1]
        df = pd.concat(df_list)
        instrument_data = {
            'start_date': df['date'].min().strftime('%Y-%m-%d'),
            'end_date': df['date'].max().strftime('%Y-%m-%d'),
            'instruments': list(set(df['instrument'])),
        }
        return Outputs(data=DataSource.write_df(df), instrument_data=DataSource.write_pickle(instrument_data))

    def gen_rolling_dates(trading_days_market, start_date, end_date, train_update_days, train_update_days_for_live, train_data_min_days, train_data_max_days, rolling_count_for_live):
        # 是否实盘模式
        tdays = list(D.trading_days(market=trading_days_market, start_date=start_date, end_date=end_date)['date'])
        is_live_run = T.live_run_param('trading_date', None) is not None

        if is_live_run and train_update_days_for_live:
            train_update_days = train_update_days_for_live

        rollings = []
        train_end_date = train_data_min_days
        while train_end_date < len(tdays):
            if train_data_max_days is not None:
                train_start_date = max(train_end_date - train_data_max_days, 0)
            else:
                train_start_date = start_date
            rollings.append({
                'train_start_date': tdays[train_start_date].strftime('%Y-%m-%d'),
                'train_end_date': tdays[train_end_date - 1].strftime('%Y-%m-%d'),
                'test_start_date': tdays[train_end_date].strftime('%Y-%m-%d'),
                'test_end_date': tdays[min(train_end_date + train_update_days, len(tdays)) - 1].strftime('%Y-%m-%d'),
            })
            train_end_date += train_update_days

        if not rollings:
            raise Exception('没有滚动需要执行,请检查配置')

        if is_live_run and rolling_count_for_live:
            rollings = rollings[-rolling_count_for_live:]

        return rollings

    g = bq_graph

    rolling_dates = gen_rolling_dates(
        trading_days_market, start_date, end_date, train_update_days, train_update_days_for_live, train_data_min_days, train_data_max_days, rolling_count_for_live)

    # 训练和预测
    results = []
    for rolling in rolling_dates:
        parameters = {}
        # 先禁用回测
        parameters[trade_mid + '.__enabled__'] = False
        parameters[train_instruments_mid + '.start_date'] = rolling['train_start_date']
        parameters[train_instruments_mid + '.end_date'] = rolling['train_end_date']
        parameters[test_instruments_mid + '.start_date'] = rolling['test_start_date']
        parameters[test_instruments_mid + '.end_date'] = rolling['test_end_date']
        print('------ rolling_train:', str(parameters))
        results.append(g.run(parameters))

    # 合并预测结果并回测
    mx = M.cached.v3(run=merge_datasources, input_1=[[result[predict_mid].predictions, result[test_instruments_mid].data.read_pickle()['start_date']] for result in results])
    parameters = {}
    parameters['*.__enabled__'] = False
    parameters[trade_mid + '.__enabled__'] = True
    parameters[trade_mid + '.instruments'] = mx.instrument_data
    parameters[trade_mid + '.options_data'] = mx.data

    trade = g.run(parameters)

    return {'rollings': results, 'trade': trade}


m20 = M.hyper_rolling_train.v1(
    run=m20_run_bigquant_run,
    run_now=True,
    bq_graph=g
)
------ rolling_train: {'m19.__enabled__': False, 'm1.start_date': '2016-01-04', 'm1.end_date': '2017-01-10', 'm9.start_date': '2017-01-11', 'm9.end_date': '2018-01-17'}
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
Exception: Unknown market_cap

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
KeyError: 'market_cap'

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
KeyError: 'market_cap'

The above exception was the direct cause of the following exception:

UndefinedVariableError                    Traceback (most recent call last)
UndefinedVariableError: name 'market_cap' is not defined

During handling of the above exception, another exception occurred:

Exception                                 Traceback (most recent call last)
Exception: Unknown market_cap

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
KeyError: 'market_cap'

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
KeyError: 'market_cap'

The above exception was the direct cause of the following exception:

UndefinedVariableError                    Traceback (most recent call last)
<ipython-input-2-03726a33752a> in <module>
    633 
    634 
--> 635 m20 = M.hyper_rolling_train.v1(
    636     run=m20_run_bigquant_run,
    637     run_now=True,

<ipython-input-2-03726a33752a> in m20_run_bigquant_run(bq_graph, inputs, trading_days_market, train_instruments_mid, test_instruments_mid, predict_mid, trade_mid, start_date, end_date, train_update_days, train_update_days_for_live, train_data_min_days, train_data_max_days, rolling_count_for_live)
    618         parameters[test_instruments_mid + '.end_date'] = rolling['test_end_date']
    619         print('------ rolling_train:', str(parameters))
--> 620         results.append(g.run(parameters))
    621 
    622     # 合并预测结果并回测

UndefinedVariableError: name 'market_cap' is not defined