导语:策略思想丰富多样,尤其是在买入和卖出方面,一千个投资者可能有一千个交易想法。因此,本文告诉大家怎样进行灵活地买入和卖出,以便于大家能够更高效地开发量化策略。
作者:bigquant
阅读时间:20分钟
本文由BigQuant宽客学院推出,难度标签:☆☆☆
BigQuant平台提供了很多策略生成器的模板策略,其买入和卖出的思想是确定了的。由于每个人交易的想法可能千差万别,因此如果能灵活地自定义买入和卖出岂不是更好。BigQuant的策略编写语言是Python,策略的交易主要是通过策略回测机制完成,因此一方面需要了解Python语言,另一方面,也是最重要的是需要熟悉BigQuant回测机制。
基础策略¶
## 基础配置
class conf:
start_date = '2014-01-01'
end_date='2017-07-17'
split_date = '2015-01-01'
instruments = D.instruments(start_date, end_date)
hold_days = 30
features = ['rank_pb_lf_0']
# 数据标注标注
label_expr = [
# 计算未来一段时间(hold_days)的相对收益
'shift(close, -5) / shift(open, -1) - shift(benchmark_close, -5) / shift(benchmark_open, -1)',
# 极值处理:用1%和99%分位的值做clip
'clip(label, all_quantile(label, 0.01), all_quantile(label, 0.99))',
# 将分数映射到分类,这里使用20个分类,这里采取等宽离散化
'all_wbins(label, 20)',
# 过滤掉一字涨停的情况 (设置label为NaN,在后续处理和训练中会忽略NaN的label)
'where(shift(high, -1) == shift(low, -1), NaN, label)'
]
## 量化回测 https://bigquant.com/docs/module_trade.html
# 回测引擎:准备数据,只执行一次
def prepare(context):
# context.start_date / end_date,回测的时候,为trader传入参数;在实盘运行的时候,由系统替换为实盘日期
instruments = D.instruments()
## 在样本外数据上进行预测
n0 = M.general_feature_extractor.v5(
instruments=D.instruments(),
start_date=context.start_date, end_date=context.end_date,
features=conf.features)
n1 = M.derived_feature_extractor.v1(
data=n0.data,
features= conf.features)
n2 = M.transform.v2(data=n1.data, transforms=None, drop_null=True)
n3 = M.stock_ranker_predict.v5(model=context.options['model'], data=n2.data)
context.instruments = n3.instruments
context.options['predictions'] = n3.predictions
# 回测引擎:初始化函数,只执行一次
def initialize(context):
# 加载预测数据
context.ranker_prediction = context.options['predictions'].read_df()
# 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
# 预测数据,通过options传入进来,使用 read_df 函数,加载到内存 (DataFrame)
# 设置买入的股票数量,这里买入预测股票列表排名靠前的5只
stock_count = 3
# 每只的股票的权重,如下的权重分配会使得靠前的股票分配多一点的资金,[0.339160, 0.213986, 0.169580, ..]
context.stock_weights = T.norm([1 / math.log(i + 2) for i in range(0, stock_count)])
# 设置每只股票占用的最大资金比例
context.max_cash_per_instrument = 0.2
# 回测引擎:每日数据处理函数,每天执行一次
def handle_data(context, data):
# 按日期过滤得到今日的预测数据
ranker_prediction = context.ranker_prediction[
context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]
# 1. 资金分配
# 平均持仓时间是hold_days,每日都将买入股票,每日预期使用 1/hold_days 的资金
# 实际操作中,会存在一定的买入误差,所以在前hold_days天,等量使用资金;之后,尽量使用剩余资金(这里设置最多用等量的1.5倍)
is_staging = context.trading_day_index < context.options['hold_days'] # 是否在建仓期间(前 hold_days 天)
cash_avg = context.portfolio.portfolio_value / context.options['hold_days']
cash_for_buy = min(context.portfolio.cash, (1 if is_staging else 1.5) * cash_avg)
cash_for_sell = cash_avg - (context.portfolio.cash - cash_for_buy)
positions = {e.symbol: p.amount * p.last_sale_price
for e, p in context.portfolio.positions.items()}
# 2. 生成卖出订单:hold_days天之后才开始卖出;对持仓的股票,按StockRanker预测的排序末位淘汰
if not is_staging and cash_for_sell > 0:
equities = {e.symbol: e for e, p in context.portfolio.positions.items()}
instruments = list(reversed(list(ranker_prediction.instrument[ranker_prediction.instrument.apply(
lambda x: x in equities and not context.has_unfinished_sell_order(equities[x]))])))
# print('rank order for sell %s' % instruments)
for instrument in instruments:
context.order_target(context.symbol(instrument), 0)
cash_for_sell -= positions[instrument]
if cash_for_sell <= 0:
break
# 3. 生成买入订单:按StockRanker预测的排序,买入前面的stock_count只股票
buy_cash_weights = context.stock_weights
buy_instruments = list(ranker_prediction.instrument[:len(buy_cash_weights)])
max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
for i, instrument in enumerate(buy_instruments):
cash = cash_for_buy * buy_cash_weights[i]
if cash > max_cash_per_instrument - positions.get(instrument, 0):
# 确保股票持仓量不会超过每次股票最大的占用资金量
cash = max_cash_per_instrument - positions.get(instrument, 0)
if cash > 0:
price = data.current(context.symbol(instrument), 'price') # 最新价格
stock_num = np.floor(cash/price/100)*100 # 向下取整
context.order(context.symbol(instrument), stock_num) # 整百下单
## 通过训练集数据训练模型
# 数据标注
m1 = M.advanced_auto_labeler.v1(
instruments=conf.instruments, start_date=conf.start_date, end_date=conf.split_date,
label_expr=conf.label_expr, benchmark='000300.SHA', cast_label_int=True)
# 抽取基础特征
m2_1 = M.general_feature_extractor.v5(
instruments=D.instruments(),
start_date=conf.start_date, end_date=conf.split_date,
features=conf.features)
# 抽取衍生特征
m2_2 = M.derived_feature_extractor.v1(
data=m2_1.data,
features= conf.features)
# 特征转换
m3 = M.transform.v2(data=m2_2.data, transforms=None, drop_null=True)
# 合并标注和特征数据
m4 = M.join.v2(data1=m1.data, data2=m3.data, on=['date', 'instrument'], sort=False)
# 开始训练模型
m5 = M.stock_ranker_train.v4(training_ds=m4.data, features=conf.features)
## 测试集上进行回测
m6 = M.trade.v3(
instruments=None,
start_date=conf.split_date,
end_date=conf.end_date,
prepare=prepare,
initialize=initialize,
handle_data=handle_data,
order_price_field_buy='open',
order_price_field_sell='close',
capital_base=50001,
benchmark='000300.SHA',
options={'hold_days': conf.hold_days, 'model': m5.model_id},
m_deps=np.random.rand() # 避免使用缓存
)
1.等权重买入股票¶
在基础策略中,排序靠前的股票买入资金权重会大一些,那么如果想等权重买入,应该怎样处理呢?
# 操作非常简单,如下处理:
# 假设买入股票为10只
weight = 1/10
context.order_target_percent(sid,1/10)
# 使用order_target_percent下单接口即可,更多下单接口请参看以下链接:
# https://bigquant.com/docs/module_trade.html#order-method-section
2.修改买入卖出时间¶
买入时间由order_price_field_buy 参数决定,如果为'open'表示以开盘价买入,买入时间就为开盘时间,'high','low','close'比较好理解
卖出时间由order_price_field_sell 参数决定,如果为'close'表示以收盘价卖出,卖出时间就为收盘时间,'high','low','open'比较好理解
m6 = M.trade.v3(
instruments=None,
start_date=conf.split_date,
end_date=conf.end_date,
prepare=prepare,
initialize=initialize,
handle_data=handle_data,
order_price_field_buy='close',
order_price_field_sell='close',
capital_base=50001,
benchmark='000300.SHA',
options={'hold_days': conf.hold_days, 'model': m5.model_id},
m_deps=np.random.rand() # 避免使用缓存
)
3.隔几天运行一次¶
由于handle_data函数是每个交易日都会运行一次,那么如果想隔几天运行一次,相当于隔几天进行一下买入卖出交易,此时应该怎样修改代码呢?
## 非常简单,这里我们的目的是不用每天运行handle_data函数,而是隔几天运行一次。一个极为简单的方法就是在handle_data函数中,加入一个判断条件,
## 如果达到判断条件就返回(return),这样就实现了当天不运行handle_data函数的目的。
def handle_data(context, data):
#------------------------START:加入下面if的两行代码到之前的handle_data函数的最前部分即可-------------------
# 相隔几天(以3天举例)运行一下handle_data函数
if context.trading_day_index % 3 != 0:
return
#------------------------END:加上这两句代码在handle_data函数就能实现隔几天运行下该函数的目的---------------------
# 按日期过滤得到今日的预测数据
ranker_prediction = context.ranker_prediction[
context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]
# 1. 资金分配
# 平均持仓时间是hold_days,每日都将买入股票,每日预期使用 1/hold_days 的资金
# 实际操作中,会存在一定的买入误差,所以在前hold_days天,等量使用资金;之后,尽量使用剩余资金(这里设置最多用等量的1.5倍)
is_staging = context.trading_day_index < context.options['hold_days'] # 是否在建仓期间(前 hold_days 天)
cash_avg = context.portfolio.portfolio_value / context.options['hold_days']
cash_for_buy = min(context.portfolio.cash, (1 if is_staging else 1.5) * cash_avg)
cash_for_sell = cash_avg - (context.portfolio.cash - cash_for_buy)
positions = {e.symbol: p.amount * p.last_sale_price
for e, p in context.portfolio.positions.items()}
# 2. 生成卖出订单:hold_days天之后才开始卖出;对持仓的股票,按StockRanker预测的排序末位淘汰
if not is_staging and cash_for_sell > 0:
equities = {e.symbol: e for e, p in context.portfolio.positions.items()}
instruments = list(reversed(list(ranker_prediction.instrument[ranker_prediction.instrument.apply(
lambda x: x in equities and not context.has_unfinished_sell_order(equities[x]))])))
# print('rank order for sell %s' % instruments)
for instrument in instruments:
context.order_target(context.symbol(instrument), 0)
cash_for_sell -= positions[instrument]
if cash_for_sell <= 0:
break
# 3. 生成买入订单:按StockRanker预测的排序,买入前面的stock_count只股票
buy_cash_weights = context.stock_weights
buy_instruments = list(ranker_prediction.instrument[:len(buy_cash_weights)])
max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
for i, instrument in enumerate(buy_instruments):
cash = cash_for_buy * buy_cash_weights[i]
if cash > max_cash_per_instrument - positions.get(instrument, 0):
# 确保股票持仓量不会超过每次股票最大的占用资金量
cash = max_cash_per_instrument - positions.get(instrument, 0)
if cash > 0:
price = data.current(context.symbol(instrument), 'price') # 最新价格
stock_num = np.floor(cash/price/100)*100 # 向下取整
context.order(context.symbol(instrument), stock_num) # 整百下单
4.持有固定天数卖出¶
在基础策略中。对持仓的股票,按StockRanker预测的排序末位淘汰,现在我们修改卖出规则,按照持有固定天数(hold_days)进行卖出,只需在handle_data函数中进行修改即可
# 回测引擎:每日数据处理函数,每天执行一次
def handle_data(context, data):
# 按日期过滤得到今日的预测数据
ranker_prediction = context.ranker_prediction[
context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]
# 1. 资金分配
# 平均持仓时间是hold_days,每日都将买入股票,每日预期使用 1/hold_days 的资金
# 实际操作中,会存在一定的买入误差,所以在前hold_days天,等量使用资金;之后,尽量使用剩余资金(这里设置最多用等量的1.5倍)
is_staging = context.trading_day_index < context.options['hold_days'] # 是否在建仓期间(前 hold_days 天)
cash_avg = context.portfolio.portfolio_value / context.options['hold_days']
cash_for_buy = min(context.portfolio.cash, (1 if is_staging else 1.5) * cash_avg)
positions = {e.symbol: p.amount * p.last_sale_price
for e, p in context.portfolio.positions.items()}
#----------------------------START:持有固定天数卖出---------------------------
today = data.current_dt
# 不是建仓期(在前hold_days属于建仓期)
if not is_staging:
equities = {e.symbol: p for e, p in context.portfolio.positions.items() if p.amount>0}
for instrument in equities:
# print('last_sale_date: ', equities[instrument].last_sale_date)
sid = equities[instrument].sid # 交易标的
# 今天和上次交易的时间相隔hold_days就全部卖出
if today-equities[instrument].last_sale_date>=datetime.timedelta(context.options['hold_days']) and data.can_trade(context.symbol(instrument)):
context.order_target_percent(sid, 0)
#--------------------------------END:持有固定天数卖出---------------------------
# 3. 生成买入订单:按StockRanker预测的排序,买入前面的stock_count只股票
buy_cash_weights = context.stock_weights
buy_instruments = list(ranker_prediction.instrument[:len(buy_cash_weights)])
max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
for i, instrument in enumerate(buy_instruments):
cash = cash_for_buy * buy_cash_weights[i]
if cash > max_cash_per_instrument - positions.get(instrument, 0):
# 确保股票持仓量不会超过每次股票最大的占用资金量
cash = max_cash_per_instrument - positions.get(instrument, 0)
if cash > 0:
price = data.current(context.symbol(instrument), 'price') # 最新价格
stock_num = np.floor(cash/price/100)*100 # 向下取整
context.order(context.symbol(instrument), stock_num) # 整百下单
5. 一次性全仓买入股票,使现金为0,然后持有固定天数,换仓¶
换仓:卖出持仓股票,买入新的股票
在基础策略中,如果hold_days = 5,那么每天都需要买入20%仓位的股票,每天也需要卖出一定股票,每天滚动进行。但是,如果我只想在第一天买入100%资金股票, 持有五天,这五天内什么都不操作,等到5天满了以后再进行全部换仓,这样如何修改策略代码呢?方法很简单,也是修改handle_data中的代码。
# 回测引擎:每日数据处理函数,每天执行一次
def handle_data(context, data):
# 相隔几天(hold_days)进行一下换仓
if context.trading_day_index % context.options['hold_days'] != 0:
return
# 按日期过滤得到今日的预测数据
ranker_prediction = context.ranker_prediction[
context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]
# 目前持仓
positions = {e.symbol: p.amount * p.last_sale_price
for e, p in context.portfolio.positions.items()}
# 权重
buy_cash_weights = context.stock_weights
# 今日买入股票列表
stock_to_buy = list(ranker_prediction.instrument[:len(buy_cash_weights)])
# 持仓上限
max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
# 通过positions对象,使用列表生成式的方法获取目前持仓的股票列表
stock_hold_now = [equity.symbol for equity in context.portfolio.positions ]
# 继续持有的股票:调仓时,如果买入的股票已经存在于目前的持仓里,那么应继续持有
no_need_to_sell = [i for i in stock_hold_now if i in stock_to_buy]
# 需要卖出的股票
stock_to_sell = [i for i in stock_hold_now if i not in no_need_to_sell]
# 卖出
for stock in stock_to_sell:
# 如果该股票停牌,则没法成交。因此需要用can_trade方法检查下该股票的状态
# 如果返回真值,则可以正常下单,否则会出错
# 因为stock是字符串格式,我们用symbol方法将其转化成平台可以接受的形式:Equity格式
if data.can_trade(context.symbol(stock)):
# order_target_percent是平台的一个下单接口,表明下单使得该股票的权重为0,
# 即卖出全部股票,可参考回测文档
context.order_target_percent(context.symbol(stock), 0)
# 如果当天没有买入的股票,就返回
if len(stock_to_buy) == 0:
return
# 买入
for i, instrument in enumerate(stock_to_buy):
cash = context.portfolio.portfolio_value * buy_cash_weights[i]
if cash > max_cash_per_instrument - positions.get(instrument, 0):
# 确保股票持仓量不会超过每次股票最大的占用资金量
cash = max_cash_per_instrument - positions.get(instrument, 0)
if cash > 0:
price = data.current(context.symbol(instrument), 'price') # 最新价格
stock_num = np.floor(cash/price/100)*100 # 向下取整
context.order(context.symbol(instrument), stock_num) # 整百下单
6.动态择时仓位调整¶
比如,根据市场状况,每天的买入股票的仓位并不是一成不变固定的,而是会依据市场情绪、新闻事件等进行动态调整
参考链接:
7.止盈止损处理¶
在交易中,如果需要加入更多的进场出场规则,比如止盈止损
参考链接:
8.修改策略回测价格模式¶
BigQuant平台现在支持三种价格模式,分别是前复权(forward_adjusted)、真实价格(original)、后复权(backward_adjusted)。目前默认是后复权模式
前复权回测模式
m6 = M.trade.v3(
instruments=None,
start_date=conf.split_date,
end_date=conf.end_date,
prepare=prepare,
initialize=initialize,
handle_data=handle_data,
order_price_field_buy='close',
order_price_field_sell='close',
capital_base=50001,
benchmark='000300.SHA',
price_type='foward_adjusted', # 前复权回测模式
options={'hold_days': conf.hold_days, 'model': m5.model_id},
m_deps=np.random.rand() # 避免使用缓存
)
真实价格回测模式
m6 = M.trade.v3(
instruments=None,
start_date=conf.split_date,
end_date=conf.end_date,
prepare=prepare,
initialize=initialize,
handle_data=handle_data,
order_price_field_buy='close',
order_price_field_sell='close',
capital_base=50001,
benchmark='000300.SHA',
price_type='original', # 真实价格回测模式
options={'hold_days': conf.hold_days, 'model': m5.model_id},
m_deps=np.random.rand() # 避免使用缓存
)
小结: 本文总结了实际策略开发过程中的几种需求,希望对大家有所帮助,同时希望大家对回测机制有更深入的理解,能够快速灵活开发策略,验证策略思想。
本文由BigQuant宽客学院推出,版权归BigQuant所有,转载请注明出处。