class conf:
start_date = '2010-01-01'#2016-04-20 '2016-10-27' 2017-03-16 2016-02-26
end_date = '2015-01-01'
start_trade = '2017-01-01'
end_trade='2018-10-22'
filter1='market_cap<30000000000'#市值设置成300亿,股票才可能有有大的变化
rankStocks=100 #选预测的前100个股票
isUseCepTaoTai=False #是否使用cep加权值判断
isUseCepShaiXuan=False # 是否筛选不在核心Cep的股票
cepShaiXuan=0.05#筛选分位
isUseRisk=True
isUseGoOnHold=True
# 回测引擎:每日数据处理函数,每天执行一次
def m19_handle_data_bigquant_run(context, data):
# # 1. 获取今日的日期
# today = data.current_dt.strftime('%Y-%m-%d')
# # 2. 按日期过滤得到今日的预测数据
ranker_prediction = context.ranker_prediction[
context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]
# # 3. 获取当前持仓的股票
# stock_hold_now = {e.symbol: p.amount * p.last_sale_price
# for e, p in context.perf_tracker.position_tracker.positions.items()}
# # 4. 获取可用现金,需要确认卖出是早盘还是尾盘
# cash_for_buy = context.portfolio.cash
today = data.current_dt.strftime('%Y-%m-%d')
stock_hold_now = [equity.symbol for equity in context.portfolio.positions ]
positions = {e.symbol: p.amount * p.last_sale_price
for e, p in context.portfolio.positions.items()}
#大盘风控模块,读取风控数据
print('data='+str(today))
# 交易量均值下降
#------------------------------------------止损模块START--------------------------------------------
equities = {e.symbol: p for e, p in context.portfolio.positions.items() if p.amount>0}
# 新建当日止损股票列表是为了handle_data 策略逻辑部分不再对该股票进行判断
stoploss_stock = []
dontsell = []
#-------------------------------------------止损模块END---------------------------------------------
# print('rank='+str(ranker_prediction))
mydict = {}
for i, cep in enumerate(ranker_prediction['concept']):
array = cep.split(";")
# myindex = ranker_prediction['concept'].index
# print('myindex='+str(myindex))
for cepName in array:
if cepName in mydict.keys():
try:
if ranker_prediction['open'].iloc[i] == 0:
continue
if (ranker_prediction['close'].iloc[i] - ranker_prediction['open'].iloc[i]) == 0:
continue
oldvalue = mydict[cepName]
if oldvalue == 0:
newvalue = (ranker_prediction['close'].iloc[i] - ranker_prediction['open'].iloc[i])/ranker_prediction['open'].iloc[i]
else:
newvalue = (oldvalue + ((ranker_prediction['close'].iloc[i] - ranker_prediction['open'].iloc[i])/ranker_prediction['open'].iloc[i]))/2
mydict[cepName] = newvalue
except Exception as e:
print('1--!='+str(e.args))
else:
try:
if ranker_prediction['open'].iloc[i] == 0:
continue
if (ranker_prediction['close'].iloc[i] - ranker_prediction['open'].iloc[i]) == 0:
continue
mydict[cepName] = ((ranker_prediction['close'].iloc[i] - ranker_prediction['open'].iloc[i])/ranker_prediction['open'].iloc[i])
except Exception as e:
print('2--!='+str(e.args)+' '+cepName + ' ' )
# print('open:'+str(ranker_prediction['open']))
continue
# print('2--!='+str(e.args)+' '+cepName + ' '+ )
a1 = dict(sorted(mydict.items(), key=lambda x: x[1]) )
# del a1['超涨']
a2 = dict(reversed(a1.items()))
ceparray = a2.keys()
coreCep = []
limitCount = 0
for key in a2.keys():
coreCep.append(key)
limitCount = limitCount + 1
if a2[key] < conf.cepShaiXuan:
break
#增加这种方式的意思是,如果今天得分不高,就不买了,如果得分都高,选一个能买入的买
biggerThanOneNum = 0;
for score in ranker_prediction['score'].values:
# print('buy='+str(score))
if score > 0.8:
biggerThanOneNum = biggerThanOneNum + 1
# print('buy='+str(biggerThanOneNum))
# 读取数据 默认会返回全部证券代码数据, 通过指定参数 instruments 可以读取到指定的证券代码数据 d.has_key('name') reversed(test1.items())
# df = DataSource("net_amount_CN_STOCK_A").read()
# 1. 资金分配
# 平均持仓时间是hold_days,每日都将买入股票,每日预期使用 1/hold_days 的资金
# 实际操作中,会存在一定的买入误差,所以在前hold_days天,等量使用资金;之后,尽量使用剩余资金(这里设置最多用等量的1.5倍)
is_staging = context.trading_day_index < context.options['hold_days'] # 是否在建仓期间(前 hold_days 天)
cash_avg = context.portfolio.portfolio_value / context.options['hold_days']
cash_for_buy = min(context.portfolio.cash, (1 if is_staging else 1.5) * cash_avg)
cash_for_sell = cash_avg - (context.portfolio.cash - cash_for_buy)
# positions = {e.symbol: p.amount * p.last_sale_price
# for e, p in context.portfolio.positions.items()}
# 2. 生成卖出订单:hold_days天之后才开始卖出;对持仓的股票,按机器学习算法预测的排序末位淘汰
if not is_staging and cash_for_sell > 0:
# equities = {e.symbol: e for e, p in context.portfolio.positions.items()}
instruments = list(reversed(list(ranker_prediction.instrument[ranker_prediction.instrument.apply(
lambda x: x in equities)])))
# print('instruments='+str(instruments))
for instrument in positions.keys():
if instrument in dontsell:
print("本次不卖出"+str(instrument))
continue
if instrument in ranker_prediction.instrument:
for i, ins in ranker_prediction.instrument:
if ins == instrument:
print('====sell_low:'+str(ranker_prediction['concept'].iloc[i]))
context.order_target(context.symbol(instrument), 0)
# for instrument in instruments:
# if list(ranker_prediction.instrument).index(instrument) < 5:
# print('继续持有:'+str(instrument))
# break
# print('sell='+str(instrument))
# context.order_target(context.symbol(instrument), 0)
# cash_for_sell -= positions[instrument]
# if cash_for_sell <= 0:
# break
# 3. 生成买入订单:按机器学习算法预测的排序,买入前面的stock_count只股票
buy_cash_weights = context.stock_weights
buy_instruments = list(ranker_prediction.instrument[:conf.rankStocks])
max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
for i, instrument in enumerate(buy_instruments):
# buy_low = ranker_prediction[instrument,'low'];
buy_cep = ranker_prediction['concept'].iloc[i]
buy_turn = ranker_prediction['turn'].iloc[i]
buy_score = ranker_prediction['score'].iloc[i]
# if buy_score < 0.9:
# continue
# if buy_turn < 0.8:
# continue
buy_array = buy_cep.split(";")
countAvg = 0
countNum = 0
for buyName in array:
if buyName in mydict.keys():
countNum = countNum + 1
countAvg = countAvg + mydict[buyName]
else:
print('no found:'+buyName)
if countAvg < 0:
if conf.isUseCepTaoTai:
print('淘汰cep:'+str(instrument)+buy_cep)
break
if conf.isUseCepShaiXuan:
isinCep = 0
for ccep in coreCep:
if ccep in ranker_prediction['concept'].iloc[i]:
isinCep = 1
if isinCep == 0 and len(coreCep) > 3:
print('未命中:'+str(coreCep))
break
cash = cash_for_buy * 1
# print('equities='+str(instrument))
if cash > max_cash_per_instrument - positions.get(instrument, 0):
# 确保股票持仓量不会超过每次股票最大的占用资金量
cash = max_cash_per_instrument - positions.get(instrument, 0)
if benckmark_risk2 > 0:
cash = cash/2;
if cash > 0:
# print('buy='+str(df.get(instrument)))
context.order_value(context.symbol(instrument), cash)
break
# 回测引擎:准备数据,只执行一次 增加一个风险控制模块,风险条件还得研究一下怎么写
# ta_macd_dif(close, fastperiod=12, slowperiod=26, signalperiod=9),#指数平滑移动平均线
# ta_macd_dea(close, fastperiod=12, slowperiod=26, signalperiod=9),#DIF的N日(默认9日)指数平滑移动平均线
def m19_prepare_bigquant_run(context):
#在数据准备函数中一次性计算每日的大盘风控条件相比于在handle中每日计算风控条件可以提高回测速度
# 多取50天的数据便于计算均值(保证回测的第一天均值不为Nan值),其中context.start_date和context.end_date是回测指定的起始时间和终止时间
start_date= (pd.to_datetime(context.start_date) - datetime.timedelta(days=50)).strftime('%Y-%m-%d')
df=DataSource('bar1d_index_CN_STOCK_A').read(start_date=start_date,end_date=context.end_date,fields=['close'])
benckmark_data=df[df.instrument=='000001.HIX']
# where(ta_macd_dif(close,2,4,4)-ta_macd_dea(close,2,4,4)<0,1,0)
#计算上证指数5日涨幅
benckmark_data['ret5']=benckmark_data['close']/benckmark_data['close'].shift(5)-1
#计算大盘风控条件,如果5日涨幅小于-4%则设置风险状态risk为1,否则为0
benckmark_data['risk'] = np.where(benckmark_data['ret5']<-0.04,1,0)
#修改日期格式为字符串(便于在handle中使用字符串日期索引来查看每日的风险状态)
benckmark_data['date']=benckmark_data['date'].apply(lambda x:x.strftime('%Y-%m-%d'))
#设置日期为索引
benckmark_data.set_index('date',inplace=True)
#把风控序列输出给全局变量context.benckmark_risk
context.benckmark_risk=benckmark_data['risk']
# 回测引擎:初始化函数,只执行一次
def m19_initialize_bigquant_run(context):
# 加载预测数据
context.ranker_prediction = context.options['data'].read_df()
# 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
# 预测数据,通过options传入进来,使用 read_df 函数,加载到内存 (DataFrame)
# 设置买入的股票数量,这里买入预测股票列表排名靠前的5只
stock_count = 1
# 每只的股票的权重,如下的权重分配会使得靠前的股票分配多一点的资金,[0.339160, 0.213986, 0.169580, ..]
context.stock_weights = [1]
# 设置每只股票占用的最大资金比例
context.max_cash_per_instrument = 0.6
# 5天下降至3天,从56%增长至76%
context.options['hold_days'] = 1
##########################
g = T.Graph({
'm1': 'M.instruments.v2',
'm1.start_date': conf.start_date,
'm1.end_date': conf.end_date,
'm1.market': 'CN_STOCK_A',
'm1.instrument_list': '',
'm1.max_count': 0,
'm61': 'M.use_datasource.v1',
'm61.instruments': T.Graph.OutputPort('m1.data'),
'm61.datasource_id':'market_value_CN_STOCK_A',
'm61.start_date': '',
'm61.end_date': '',
'm62': 'M.filter.v3',
'm62.input_data': T.Graph.OutputPort('m61.data'),
'm62.expr':conf.filter1,
'm62.output_left_data':False,
'm63' :'M.select_columns.v3',
'm63.input_ds': T.Graph.OutputPort('m62.data'),
'm63.columns':'date,instrument',
'm63.reverse_select':False,
'm2': 'M.advanced_auto_labeler.v2',
'm2.instruments': T.Graph.OutputPort('m1.data'),
'm2.label_expr': """# #号开始的表示注释
# 0. 每行一个,顺序执行,从第二个开始,可以使用label字段
# 1. 可用数据字段见 https://bigquant.com/docs/data_history_data.html
# 添加benchmark_前缀,可使用对应的benchmark数据
# 2. 可用操作符和函数见 `表达式引擎 <https://bigquant.com/docs/big_expr.html>`_
# 计算收益:5日收盘价(作为卖出价格)除以明日开盘价(作为买入价格)
shift(close, -5) / shift(open, -1)
# 极值处理:用1%和99%分位的值做clip
clip(label, all_quantile(label, 0.01), all_quantile(label, 0.99))
# 将分数映射到分类,这里使用20个分类
all_wbins(label, 20)
# 过滤掉一字涨停的情况 (设置label为NaN,在后续处理和训练中会忽略NaN的label)
where(shift(high, -1) == shift(low, -1), NaN, label)
""",
'm2.start_date': '',
'm2.end_date': '',
'm2.benchmark': '000300.SHA',
'm2.drop_na_label': True,
'm2.cast_label_int': True,
'm3': 'M.input_features.v1',
'm3.features': """# #号开始的表示注释
# 多个特征,每行一个,可以包含基础特征和衍生特征
rank_avg_mf_net_amount_0/rank_avg_mf_net_amount_5
rank_avg_mf_net_amount_5/rank_avg_mf_net_amount_10
return_5/return_20
rank(mean(mf_net_amount_l_0,5))/rank(mean(mf_net_amount_l_0,10))
correlation(sqrt(volume_0),return_0,5)
correlation(log(volume_0),abs(return_0-1),5)
(close_0-close_30)/close_30>1.25
(close_0-close_5)/close_5>1.16
ta_bbands_middleband_28_0""",
'm25': 'M.input_features.v1',
'm25.features_ds': T.Graph.OutputPort('m3.data'),
'm25.features': """# #号开始的表示注释
# 多个特征,每行一个,可以包含基础特征和衍生特征
open_1
close_1
close_0
high_1
open_0
low_0
price_limit_status_0
volume_0
open_0/close_1
cond3=low_0 > mean(close_0,20)
#(今日收盘价-昨日收盘价)/昨日收盘价*100%
cond1=ta_trix(close_0, derive='long')
cond2=ta_dma(close_0, 'long')
#----当日最低价 站稳60日线
cond3=low_0 > mean(close_0,20)
#(今日收盘价-昨日收盘价)/昨日收盘价*100%
#cond4=(close_0-close_1)/close_1 >0.04
cond4=ta_dma(close_0, 'long')
cond6=low_0 > mean(close_0,20)
cond7=ta_macd(close_0,'long')
cond8=ta_ma(close_0,5, derive='long')
buy_condition=(close_0>=ts_max(close_0,20))
""",
'm15': 'M.general_feature_extractor.v7',
'm15.instruments': T.Graph.OutputPort('m1.data'),
'm15.features': T.Graph.OutputPort('m25.data'),
'm15.start_date': '',
'm15.end_date': '',
'm15.before_start_days': 120,
'm16': 'M.derived_feature_extractor.v3',
'm16.input_data': T.Graph.OutputPort('m15.data'),
'm16.features': T.Graph.OutputPort('m25.data'),
'm16.date_col': 'date',
'm16.instrument_col': 'instrument',
'm16.drop_na': False,
'm16.remove_extra_columns': False,
'm7': 'M.join.v3',
'm7.data1': T.Graph.OutputPort('m2.data'),
'm7.data2': T.Graph.OutputPort('m16.data'),
'm7.on': 'date,instrument',
'm7.how': 'inner',
'm7.sort': False,
'm5': 'M.chinaa_stock_filter.v1',
'm5.input_data':T.Graph.OutputPort('m7.data'),
'm5.index_constituent_cond':['全部'],
'm5.board_cond':[ '上证主板', '深证主板'],
'm5.industry_cond':['全部'],
'm5.st_cond':['正常'],
'm5.delist_cond':['非退市'],
'm5.output_left_data':False,
'm72': 'M.filter.v3',
'm72.input_data':T.Graph.OutputPort('m5.data'),
'm72.expr':' cond4 and cond6 and cond7 and cond8 ',
'm72.output_left_data':False,
'm74': 'M.dropnan.v1',
'm74.input_data':T.Graph.OutputPort('m5.data'),
'm6': 'M.stock_ranker_train.v6',
'm6.training_ds': T.Graph.OutputPort('m74.data'),
'm6.features': T.Graph.OutputPort('m3.data'),
'm6.learning_algorithm': '排序',
'm6.number_of_leaves': 30,
'm6.minimum_docs_per_leaf': 1000,
'm6.number_of_trees': 20,
'm6.learning_rate': 0.1,
'm6.max_bins': 1023,
'm6.feature_fraction': 1,
'm6.data_row_fraction': 1,
'm6.plot_charts': True,
'm6.ndcg_discount_base': 1,
'm6.m_lazy_run': False,
'm9': 'M.instruments.v2',
'm9.start_date': T.live_run_param('trading_date', conf.start_trade),
'm9.end_date': T.live_run_param('trading_date', conf.end_trade),
'm9.market': 'CN_STOCK_A',
'm9.instrument_list': '',
'm9.max_count': 0,
'm17': 'M.general_feature_extractor.v7',
'm17.instruments': T.Graph.OutputPort('m9.data'),
'm17.features': T.Graph.OutputPort('m25.data'),
'm17.start_date': '',
'm17.end_date': '',
'm17.before_start_days': 120,
'm18': 'M.derived_feature_extractor.v3',
'm18.input_data': T.Graph.OutputPort('m17.data'),
'm18.features': T.Graph.OutputPort('m25.data'),
'm18.date_col': 'date',
'm18.instrument_col': 'instrument',
'm18.drop_na': False,
'm18.remove_extra_columns': False,
'm45': 'M.filter.v3',
'm45.input_data':T.Graph.OutputPort('m18.data'),
'm45.expr':' cond4 and cond6 and cond7 and cond8 ',
'm45.output_left_data':False,
'm41': 'M.use_datasource.v1',
'm41.instruments': T.Graph.OutputPort('m9.data'),
'm41.datasource_id':'market_value_CN_STOCK_A',
'm41.start_date': '',
'm41.end_date': '',
'm42': 'M.filter.v3',
'm42.input_data': T.Graph.OutputPort('m41.data'),
'm42.expr':conf.filter1,
'm42.output_left_data':False,
'm55': 'M.chinaa_stock_filter.v1',
'm55.input_data':T.Graph.OutputPort('m42.data'),
'm55.index_constituent_cond':['全部'],
'm55.board_cond':[ '上证主板', '深证主板'],
'm55.industry_cond':['全部'],
'm55.st_cond':['正常'],
'm55.delist_cond':['非退市'],
'm55.output_left_data':False,
'm43':'M.select_columns.v3',
'm43.input_ds':T.Graph.OutputPort('m55.data'),
'm43.columns':'date,instrument',
'm43.reverse_select':False,
'm48': 'M.join.v3',
'm48.data1': T.Graph.OutputPort('m45.data'),
'm48.data2': T.Graph.OutputPort('m43.data'),
'm48.on': 'date,instrument',
'm48.how': 'inner',
'm48.sort': False,
'm49': 'M.dropnan.v1',
'm49.input_data':T.Graph.OutputPort('m48.data'),
'm8': 'M.stock_ranker_predict.v5',
'm8.model': T.Graph.OutputPort('m6.model'),
'm8.data': T.Graph.OutputPort('m49.data'),
'm8.m_lazy_run': False,
'm84': 'M.input_features.v1',
'm84.features': """# #号开始的表示注释
# 多个特征,每行一个,可以包含基础特征和衍生特征
bm_0=where(mean(volume, 5)-mean(volume, 10)<0,1,0)
bm_2=where(ta_macd_dif(close,2,4,4)-ta_macd_dea(close,2,4,4)<0,1,0)
bm_1=where(mean(amount, 5)-mean(amount, 10)<0,1,0)
bm_3=where(ta_3red_soldiers(high, low, close, open),1,0)
bm_4=where(ta_3red_soldiers(high, low, close, open),1,0)
bm_5=where(ta_hammer(high, low, close, open),1,0)
bm_6=where(ta_inverted_hammer(high, low, close, open),1,0)
bm_7=where(ta_2crows(high, low, close, open),1,0)
bm_8=where(ta_3black_crows(high, low, close, open),1,0)
bm_9=where(ta_morning_star(high, low, close, open),1,0)
bm_10=where(ta_evening_star(high, low, close, open),1,0)
bm_11=where(ta_dark_cloud_cover(high, low, close, open),1,0)
bm_12=where(ta_shooting_star(high, low, close, open),1,0)
""",
'm85':'M.index_feature_extract.v3',
'm85.input_1':T.Graph.OutputPort('m9.data'),
'm85.input_2':T.Graph.OutputPort('m84.data'),
'm85.before_days':90,
'm85.index':'000300.HIX',
'm71': 'M.use_datasource.v1',
'm71.instruments': T.Graph.OutputPort('m9.data'),
'm71.datasource_id': 'industry_CN_STOCK_A',
'm41.start_date': '',
'm71.end_date': '',
'm73' :'M.select_columns.v3',
'm73.input_ds': T.Graph.OutputPort('m71.data'),
'm73.columns':'date,instrument,concept',
'm73.reverse_select':False,
'm78': 'M.join.v3',
'm78.data1': T.Graph.OutputPort('m8.predictions'),
'm78.data2': T.Graph.OutputPort('m73.data'),
'm78.on': 'date,instrument',
'm78.how': 'left',
'm78.sort': False,
'm61': 'M.use_datasource.v1',
'm61.instruments': T.Graph.OutputPort('m9.data'),
'm61.datasource_id': 'bar1d_CN_STOCK_A',
'm61.start_date': '',
'm61.end_date': '',
'm63' :'M.select_columns.v3',
'm63.input_ds': T.Graph.OutputPort('m61.data'),
'm63.columns':'date,instrument,open,close,high,low,turn',
'm63.reverse_select':False,
'm68': 'M.join.v3',
'm68.data1': T.Graph.OutputPort('m78.data'),
'm68.data2': T.Graph.OutputPort('m63.data'),
'm68.on': 'date,instrument',
'm68.how': 'left',
'm68.sort': False,
'm90': 'M.join.v3',
'm90.data1': T.Graph.OutputPort('m68.data'),
'm90.data2': T.Graph.OutputPort('m85.data_1'),
'm90.on': 'date',
'm90.how': 'left',
'm90.sort': False,
'm19': 'M.trade.v4',
'm19.instruments': T.Graph.OutputPort('m9.data'),
'm19.options_data': T.Graph.OutputPort('m90.data'),
'm19.start_date': '',
'm19.end_date': '',
'm19.initialize': m19_initialize_bigquant_run,
'm19.handle_data': m19_handle_data_bigquant_run,
'm19.prepare': m19_prepare_bigquant_run,
'm19.volume_limit': 0.025,
'm19.order_price_field_buy': 'open',
'm19.order_price_field_sell': 'close',
'm19.capital_base': 30000,
'm19.auto_cancel_non_tradable_orders': True,
'm19.data_frequency': 'daily',
'm19.price_type': '后复权',
'm19.product_type': '股票',
'm19.plot_charts': True,
'm19.backtest_only': False,
'm19.benchmark': '000300.HIX',
})
def m20_run_bigquant_run(
bq_graph,
inputs,
trading_days_market='CN', # 使用那个市场的交易日历
train_instruments_mid='m1', # 训练数据 证券代码列表 模块id
test_instruments_mid='m9', # 测试数据 证券代码列表 模块id
predict_mid='m8', # 预测 模块id
trade_mid='m19', # 回测 模块id
start_date='2016-01-01', # 数据开始日期
end_date=T.live_run_param('trading_date',conf.end_trade), # 数据结束日期
train_update_days=250, # 更新周期,按交易日计算,每多少天更新一次
train_update_days_for_live=None, #模拟实盘模式下的更新周期,按交易日计算,每多少天更新一次。如果需要在模拟实盘阶段使用不同的模型更新周期,可以设置这个参数
train_data_min_days=250, # 最小数据天数,按交易日计算,所以第一个滚动的结束日期是 从开始日期到开始日期+最小数据天数
train_data_max_days=250, # 最大数据天数,按交易日计算,0,表示没有限制,否则每一个滚动的开始日期=max(此滚动的结束日期-最大数据天数, 开始日期
rolling_count_for_live=1, #实盘模式下滚动次数,模拟实盘模式下,取最后多少次滚动。一般在模拟实盘模式下,只用到最后一次滚动训练的模型,这里可以设置为1;如果你的滚动训练数据时间段很短,以至于期间可能没有训练数据,这里可以设置大一点。0表示没有限制
):
def merge_datasources(input_1):
df_list = [ds[0].read_df().set_index('date').loc[ds[1]:].reset_index() for ds in input_1]
df = pd.concat(df_list)
instrument_data = {
'start_date': df['date'].min().strftime('%Y-%m-%d'),
'end_date': df['date'].max().strftime('%Y-%m-%d'),
'instruments': list(set(df['instrument'])),
}
return Outputs(data=DataSource.write_df(df), instrument_data=DataSource.write_pickle(instrument_data))
def gen_rolling_dates(trading_days_market, start_date, end_date, train_update_days, train_update_days_for_live, train_data_min_days, train_data_max_days, rolling_count_for_live):
# 是否实盘模式
tdays = list(D.trading_days(market=trading_days_market, start_date=start_date, end_date=end_date)['date'])
is_live_run = T.live_run_param('trading_date', None) is not None
if is_live_run and train_update_days_for_live:
train_update_days = train_update_days_for_live
rollings = []
train_end_date = train_data_min_days
while train_end_date < len(tdays):
if train_data_max_days is not None:
train_start_date = max(train_end_date - train_data_max_days, 0)
else:
train_start_date = start_date
rollings.append({
'train_start_date': tdays[train_start_date].strftime('%Y-%m-%d'),
'train_end_date': tdays[train_end_date - 1].strftime('%Y-%m-%d'),
'test_start_date': tdays[train_end_date].strftime('%Y-%m-%d'),
'test_end_date': tdays[min(train_end_date + train_update_days, len(tdays)) - 1].strftime('%Y-%m-%d'),
})
train_end_date += train_update_days
if not rollings:
raise Exception('没有滚动需要执行,请检查配置')
if is_live_run and rolling_count_for_live:
rollings = rollings[-rolling_count_for_live:]
return rollings
g = bq_graph
rolling_dates = gen_rolling_dates(
trading_days_market, start_date, end_date, train_update_days, train_update_days_for_live, train_data_min_days, train_data_max_days, rolling_count_for_live)
# 训练和预测
results = []
for rolling in rolling_dates:
parameters = {}
# 先禁用回测
parameters[trade_mid + '.__enabled__'] = False
parameters[train_instruments_mid + '.start_date'] = rolling['train_start_date']
parameters[train_instruments_mid + '.end_date'] = rolling['train_end_date']
parameters[test_instruments_mid + '.start_date'] = rolling['test_start_date']
parameters[test_instruments_mid + '.end_date'] = rolling['test_end_date']
print('------ rolling_train:', str(parameters))
results.append(g.run(parameters))
# 合并预测结果并回测
mx = M.cached.v3(run=merge_datasources, input_1=[[result[predict_mid].predictions, result[test_instruments_mid].data.read_pickle()['start_date']] for result in results])
parameters = {}
parameters['*.__enabled__'] = False
parameters[trade_mid + '.__enabled__'] = True
parameters[trade_mid + '.instruments'] = mx.instrument_data
parameters[trade_mid + '.options_data'] = mx.data
trade = g.run(parameters)
return {'rollings': results, 'trade': trade}
m20 = M.hyper_rolling_train.v1(
run=m20_run_bigquant_run,
run_now=True,
bq_graph=g
)