# 基础参数配置
class conf:
start_date = '2010-01-01'
end_date='2017-08-01'
# split_date 之前的数据用于训练,之后的数据用作效果评估
split_date = '2015-01-01'
# D.instruments: https://bigquant.com/docs/data_instruments.html
instruments = D.instruments(start_date, split_date,market='HK_STOCK')
# 机器学习目标标注函数
# 如下标注函数等价于 min(max((持有期间的收益 * 100), -20), 20) + 20 (后面的M.fast_auto_labeler会做取整操作)
# 说明:max/min这里将标注分数限定在区间[-20, 20],+20将分数变为非负数 (StockRanker要求标注分数非负整数)
label_expr = ['return * 100', 'where(label > {0}, {0}, where(label < -{0}, -{0}, label)) + {0}'.format(20)]
# 持有天数,用于计算label_expr中的return值(收益)
hold_days = 5
# 特征 https://bigquant.com/docs/data_features.html,你可以通过表达式构造任何特征
features = [
'rank_return_20', # 20日收益排名
'ta_sma_10_0/ta_sma_20_0',
'ta_sma_20_0/ta_sma_30_0',
'ta_sma_30_0/ta_sma_60_0',
'ta_rsi_14_0',
'ta_rsi_28_0',
'close_5/close_0',
'close_1/open_0',
'close_0/open_0',
'high_0/low_0',
'close_1/close_0',
'close_2/close_0',
'close_3/close_0',
'close_4/close_0',
'amount_1/amount_0',
'amount_2/amount_0',
'amount_3/amount_0',
'amount_4/amount_0',
'amount_5/amount_0',
]
conf.label_expr = [
# 计算收益:5日收盘价(作为卖出价格)除以明日开盘价(作为买入价格)
'shift(close, - %s) / shift(open, -1) - 1'%conf.hold_days,
# 极值处理:用1%和99%分位的值做clip
'clip(label, all_quantile(label, 0.01), all_quantile(label, 0.99))',
# 将分数映射到分类,这里使用20个分类
'all_wbins(label, 20)',
# 过滤掉一字涨停的情况 (设置label为NaN,在后续处理和训练中会忽略NaN的label)
'where(shift(high, -1) == shift(low, -1), NaN, label)'
]
m1 = M.advanced_auto_labeler.v1(
instruments=conf.instruments, start_date=conf.start_date, end_date=conf.split_date,
label_expr=conf.label_expr, benchmark=None,cast_label_int=True)
# 将最近20天平均交易量较小的数据过滤掉
def filter_by_amount(ds,start_date,end_date):
filter_field='avg_amount_20'
filter_df = D.features(D.instruments(market='HK_STOCK'),start_date=start_date,end_date=end_date, fields=[filter_field])
filter_df = filter_df[filter_df[filter_field]>1000000]
base_df = ds.read_df()
print('原始数据行数是%s'% len(base_df))
base_df = filter_df.merge(base_df, on=['date','instrument'], how='inner')
base_df.drop(filter_field, inplace=True, axis=1)
print('过滤后数据行数是%s'% len(base_df))
new_ds = DataSource.write_df(base_df)
return Outputs(data=new_ds)
m2 = M.cached.v3(run=filter_by_amount, kwargs={'ds':m1.data, 'start_date':conf.start_date, 'end_date':conf.split_date})
# 计算特征数据
m3 = M.general_feature_extractor.v5(
instruments=conf.instruments, start_date=conf.start_date, end_date=conf.split_date,
features=conf.features)
# 衍生特征计算,比如 ta_sma_10_0/ta_sma_20_0 相除计算
m3_1 = M.derived_feature_extractor.v1(data=m3.data, features=conf.features)
# 数据预处理:缺失数据处理,数据规范化
m4 = M.transform.v2(data=m3_1.data, transforms=None, drop_null=True)
# 合并标注和特征数据
m5 = M.join.v2(data1=m2.data, data2=m4.data, on=['date', 'instrument'], sort=True)
# StockRanker机器学习训练
m6 = M.stock_ranker_train.v5(training_ds=m5.data, features=conf.features)
## 量化回测 https://bigquant.com/docs/module_trade.html
# 回测引擎:准备数据,只执行一次
def prepare(context):
# context.start_date / end_date,回测的时候,为trader传入参数;在实盘运行的时候,由系统替换为实盘日期
n1 = M.general_feature_extractor.v5(
instruments=D.instruments(market='HK_STOCK'),
start_date=context.start_date, end_date=context.end_date,
model_id=context.options['model_id'])
n2 = M.derived_feature_extractor.v1(data=n1.data, model_id=context.options['model_id'])
n3 = M.transform.v2(data=n2.data, transforms=None, drop_null=True)
n4 = M.stock_ranker_predict.v5(model=context.options['model_id'], data=n3.data)
context.instruments = n4.instruments
context.options['predictions'] = n4.predictions
# 回测引擎:初始化函数,只执行一次
def initialize(context):
# 加载预测数据
global filter_df
context.ranker_prediction = context.options['predictions'].read_df()
filter_field = 'avg_amount_20'
filter_df = D.features(context.instruments, fields=[filter_field],start_date=context.start_date, end_date=context.end_date)
filter_df = filter_df[filter_df[filter_field]>40000000]
context.filter_df = filter_df
# 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
# 预测数据,通过options传入进来,使用 read_df 函数,加载到内存 (DataFrame)
# 设置买入的股票数量,这里买入预测股票列表排名靠前的5只
stock_count = 2
# 每只的股票的权重,如下的权重分配会使得靠前的股票分配多一点的资金,[0.339160, 0.213986, 0.169580, ..]
context.stock_weights = T.norm([1 / math.log(i + 2) for i in range(0, stock_count)])
# 设置每只股票占用的最大资金比例
context.max_cash_per_instrument = 0.2
# 回测引擎:每日数据处理函数,每天执行一次
def handle_data(context, data):
# 按日期过滤得到今日的预测数据
ranker_prediction = context.ranker_prediction[
context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]
# 如果交易量过小,不买入
can_buy_instruments = set(context.filter_df[context.filter_df.date == data.current_dt.date()].instrument)
# 1. 资金分配
# 平均持仓时间是hold_days,每日都将买入股票,每日预期使用 1/hold_days 的资金
# 实际操作中,会存在一定的买入误差,所以在前hold_days天,等量使用资金;之后,尽量使用剩余资金(这里设置最多用等量的1.5倍)
is_staging = context.trading_day_index < context.options['hold_days'] # 是否在建仓期间(前 hold_days 天)
cash_avg = context.portfolio.portfolio_value / context.options['hold_days']
cash_for_buy = min(context.portfolio.cash, (1 if is_staging else 1.5) * cash_avg)
cash_for_sell = cash_avg - (context.portfolio.cash - cash_for_buy)
positions = {e.symbol: p.amount * p.last_sale_price
for e, p in context.perf_tracker.position_tracker.positions.items()}
# 2. 生成卖出订单:hold_days天之后才开始卖出;对持仓的股票,按StockRanker预测的排序末位淘汰
if not is_staging and cash_for_sell > 0:
equities = {e.symbol: e for e, p in context.perf_tracker.position_tracker.positions.items()}
instruments = list(reversed(list(ranker_prediction.instrument[ranker_prediction.instrument.apply(
lambda x: x in equities and not context.has_unfinished_sell_order(equities[x]))])))
# print('rank order for sell %s' % instruments)
for instrument in instruments:
context.order_target(context.symbol(instrument), 0)
cash_for_sell -= positions[instrument]
if cash_for_sell <= 0:
break
# 3. 生成买入订单:按StockRanker预测的排序,买入前面的stock_count只股票
buy_cash_weights = context.stock_weights
buy_instruments = [instrument for instrument in list(ranker_prediction.instrument) if instrument in can_buy_instruments][:len(buy_cash_weights)]
max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
for i, instrument in enumerate(buy_instruments):
cash = cash_for_buy * buy_cash_weights[i]
if cash > max_cash_per_instrument - positions.get(instrument, 0):
# 确保股票持仓量不会超过每次股票最大的占用资金量
cash = max_cash_per_instrument - positions.get(instrument, 0)
if cash > 0:
context.order_value(context.symbol(instrument), cash)
# 调用交易引擎
m7 = M.trade.v2(
instruments=None,
start_date=conf.split_date,
end_date=conf.end_date,
prepare=prepare,
initialize=initialize,
handle_data=handle_data,
order_price_field_buy='open', # 表示 开盘 时买入
order_price_field_sell='close', # 表示 收盘 前卖出
capital_base=100000, # 初始资金
benchmark='HSI.HKEX', # 比较基准,不影响回测结果
# 通过 options 参数传递预测数据和参数给回测引擎
options={'hold_days': conf.hold_days, 'model_id': m6.model_id},
)