【宽客学院】自定义买入卖出逻辑


(iQuant) #1

导语:策略思想丰富多样,尤其是在买入和卖出方面,一千个投资者可能有一千个交易想法。因此,本文告诉大家怎样进行灵活地买入和卖出,以便于大家能够更高效地开发量化策略。


作者:bigquant
阅读时间:20分钟
本文由BigQuant宽客学院推出,难度标签:☆☆☆

BigQuant平台提供了很多策略生成器的模板策略,其买入和卖出的思想是确定了的。由于每个人交易的想法可能千差万别,因此如果能灵活地自定义买入和卖出岂不是更好。BigQuant的策略编写语言是Python,策略的交易主要是通过策略回测机制完成,因此一方面需要了解Python语言,另一方面,也是最重要的是需要熟悉BigQuant回测机制

我们首先在编写策略界面中新建一个可视化AI策略,如下图所示。

我们点击最下方的Trade模块,可以看到右侧的属性栏,属性栏中可以设置策略基本参数

  • 成交率限制
  • 初始资金
  • 买入点
  • 卖出点
  • 回测采用的价格模式

此外,Trade模块属性栏中还包含了主函数数据准备函数初始化函数盘前处理函数

策略常用的功能和实现位置如下表所示

策略常用功能 实现位置
设置股票权重 初始化函数
每隔几天运行一次 主函数
指定时间执行 主函数和初始化函数
持有固定天数卖出 主函数
固定周期轮仓 主函数
止盈止损处理 主函数
大盘风控 数据准备函数和主函数
整百下单买入 主函数

我们首先新建一个可视化AI模板策略,AI模板策略的讲解可以查看AI模板策略逻辑解读
然后我们以默认的可视化AI模板策略为例,逐一介绍各功能对应函数的修改方式:

一、设置股票为等权重

修改初始化函数
# 回测引擎:初始化函数,只执行一次
def bigquant_run(context):
    # 加载预测数据
    context.ranker_prediction = context.options['data'].read_df()

    # 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
    context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
    # 预测数据,通过options传入进来,使用 read_df 函数,加载到内存 (DataFrame)
    # 设置买入的股票数量,这里买入预测股票列表排名靠前的5只
    stock_count = 5
    # 每只的股票的权重,修改为等权重买入
    context.stock_weights = [1/stock_count for i in range(stock_count)]
    # 设置每只股票占用的最大资金比例
    context.max_cash_per_instrument = 0.2
    context.options['hold_days'] = 5

二、每隔几天运行一次

修改主函数
# 回测引擎:每日数据处理函数,每天执行一次
def bigquant_run(context, data):
    #------------------------START:加入下面if的两行代码到之前到主函数的最前部分-------------------
    # 相隔几天(以3天举例)运行一下handle_data函数
    if context.trading_day_index % 3 != 0:
        return 
    #------------------------END:加上这两句代码在主函数就能实现隔几天运行---------------------

     # 按日期过滤得到今日的预测数据
    ranker_prediction = context.ranker_prediction[
        context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]
    # 1. 资金分配
    # 平均持仓时间是hold_days,每日都将买入股票,每日预期使用 1/hold_days 的资金
    # 实际操作中,会存在一定的买入误差,所以在前hold_days天,等量使用资金;之后,尽量使用剩余资金(这里设置最多用等量的1.5倍)
    is_staging = context.trading_day_index < context.options['hold_days'] # 是否在建仓期间(前 hold_days 天)
    cash_avg = context.portfolio.portfolio_value / context.options['hold_days']
    cash_for_buy = min(context.portfolio.cash, (1 if is_staging else 1.5) * cash_avg)
    cash_for_sell = cash_avg - (context.portfolio.cash - cash_for_buy)
    positions = {e.symbol: p.amount * p.last_sale_price
                 for e, p in context.portfolio.positions.items()}
    # 2. 生成卖出订单:hold_days天之后才开始卖出;对持仓的股票,按StockRanker预测的排序末位淘汰
    if not is_staging and cash_for_sell > 0:
        equities = {e.symbol: e for e, p in context.portfolio.positions.items()}
        instruments = list(reversed(list(ranker_prediction.instrument[ranker_prediction.instrument.apply(
                lambda x: x in equities )])))
        for instrument in instruments:
            context.order_target(context.symbol(instrument), 0)
            cash_for_sell -= positions[instrument]
            if cash_for_sell <= 0:
                break
    # 3. 生成买入订单:按StockRanker预测的排序,买入前面的stock_count只股票
    buy_cash_weights = context.stock_weights
    buy_instruments = list(ranker_prediction.instrument[:len(buy_cash_weights)])
    max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
    for i, instrument in enumerate(buy_instruments):
        cash = cash_for_buy * buy_cash_weights[i]
        if cash > max_cash_per_instrument - positions.get(instrument, 0):
            # 确保股票持仓量不会超过每次股票最大的占用资金量
            cash = max_cash_per_instrument - positions.get(instrument, 0)
        if cash > 0:
            context.order_value(context.symbol(instrument), cash)

三、指定时间执行

例如指定在每周起始日执行一次,我们修改主函数为pass,在初始化函数中通过schedule_function函数设置定时执行rebalance函数

修改主函数
# 回测引擎:每日数据处理函数,每天执行一次
def bigquant_run(context, data):
    pass
修改初始化函数
# 回测引擎:初始化函数,只执行一次
def bigquant_run(context):
    # 加载预测数据
    context.ranker_prediction = context.options['data'].read_df()

    # 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
    context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
    # 预测数据,通过options传入进来,使用 read_df 函数,加载到内存 (DataFrame)
    # 设置买入的股票数量,这里买入预测股票列表排名靠前的5只
    stock_count = 5
    # 每只的股票的权重,如下的权重分配会使得靠前的股票分配多一点的资金,[0.339160, 0.213986, 0.169580, ..]
    context.stock_weights = T.norm([1 / math.log(i + 2) for i in range(0, stock_count)])
    # 设置每只股票占用的最大资金比例
    context.max_cash_per_instrument = 0.2
    context.options['hold_days'] = 5
    schedule_function(func=rebalance,
        date_rule=date_rules.week_start())   # 例如每周开始时执行一次
    
def rebalance(context, data):
    # 按日期过滤得到今日的预测数据
    ranker_prediction = context.ranker_prediction[
        context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]

    # 1. 资金分配
    # 平均持仓时间是hold_days,每日都将买入股票,每日预期使用 1/hold_days 的资金
    # 实际操作中,会存在一定的买入误差,所以在前hold_days天,等量使用资金;之后,尽量使用剩余资金(这里设置最多用等量的1.5倍)
    is_staging = context.trading_day_index < context.options['hold_days'] # 是否在建仓期间(前 hold_days 天)
    cash_avg = context.portfolio.portfolio_value / context.options['hold_days']
    cash_for_buy = min(context.portfolio.cash, (1 if is_staging else 1.5) * cash_avg)
    cash_for_sell = cash_avg - (context.portfolio.cash - cash_for_buy)
    positions = {e.symbol: p.amount * p.last_sale_price
                 for e, p in context.portfolio.positions.items()}

    # 2. 生成卖出订单:hold_days天之后才开始卖出;对持仓的股票,按机器学习算法预测的排序末位淘汰
    if not is_staging and cash_for_sell > 0:
        equities = {e.symbol: e for e, p in  context.portfolio.positions.items()}
        instruments = list(reversed(list(ranker_prediction.instrument[ranker_prediction.instrument.apply(
                lambda x: x in equities )])))
        for instrument in instruments:
            context.order_target(context.symbol(instrument), 0)
            cash_for_sell -= positions[instrument]
            if cash_for_sell <= 0:
                break

    # 3. 生成买入订单:按机器学习算法预测的排序,买入前面的stock_count只股票
    buy_cash_weights = context.stock_weights
    buy_instruments = list(ranker_prediction.instrument[:len(buy_cash_weights)])
    max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
    for i, instrument in enumerate(buy_instruments):
        cash = cash_for_buy * buy_cash_weights[i]
        if cash > max_cash_per_instrument - positions.get(instrument, 0):
            # 确保股票持仓量不会超过每次股票最大的占用资金量
            cash = max_cash_per_instrument - positions.get(instrument, 0)
        if cash > 0:
            context.order_value(context.symbol(instrument), cash)

四、持有固定天数卖出

持有固定自然日天数修改主函数
# 回测引擎:每日数据处理函数,每天执行一次
def bigquant_run(context, data):
    # 按日期过滤得到今日的预测数据
    ranker_prediction = context.ranker_prediction[
        context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]

    # 1. 资金分配
    # 平均持仓时间是hold_days,每日都将买入股票,每日预期使用 1/hold_days 的资金
    # 实际操作中,会存在一定的买入误差,所以在前hold_days天,等量使用资金;之后,尽量使用剩余资金(这里设置最多用等量的1.5倍)
    is_staging = context.trading_day_index < context.options['hold_days'] # 是否在建仓期间(前 hold_days 天)
    cash_avg = context.portfolio.portfolio_value / context.options['hold_days']
    cash_for_buy = min(context.portfolio.cash, (1 if is_staging else 1.5) * cash_avg)
    positions = {e.symbol: p.amount * p.last_sale_price
                 for e, p in context.portfolio.positions.items()}
   
    #----------------------------START:持有固定自然日天数卖出---------------------------
    today = data.current_dt
    # 不是建仓期(在前hold_days属于建仓期)
    if not is_staging:
        equities = {e.symbol: p for e, p in context.portfolio.positions.items() if p.amount>0}
        for instrument in equities:
#          print('last_sale_date: ', equities[instrument].last_sale_date)
            sid = equities[instrument].sid  # 交易标的
            # 今天和上次交易的时间相隔hold_days就全部卖出
            if today-equities[instrument].last_sale_date>=datetime.timedelta(context.options['hold_days']) and data.can_trade(context.symbol(instrument)):
                context.order_target_percent(sid, 0)
    #--------------------------------END:持有固定天数卖出---------------------------    
          

    # 3. 生成买入订单:按StockRanker预测的排序,买入前面的stock_count只股票
    buy_cash_weights = context.stock_weights
    buy_instruments = list(ranker_prediction.instrument[:len(buy_cash_weights)])
    max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
    for i, instrument in enumerate(buy_instruments):
        cash = cash_for_buy * buy_cash_weights[i]
        if cash > max_cash_per_instrument - positions.get(instrument, 0):
            # 确保股票持仓量不会超过每次股票最大的占用资金量
            cash = max_cash_per_instrument - positions.get(instrument, 0)
        if cash > 0:
            context.order_value(context.symbol(instrument), cash)
持有固定交易日天数修改主函数
# 回测引擎:每日数据处理函数,每天执行一次
def bigquant_run(context, data):
    # 按日期过滤得到今日的预测数据
    ranker_prediction = context.ranker_prediction[
        context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]

    # 1. 资金分配
    # 平均持仓时间是hold_days,每日都将买入股票,每日预期使用 1/hold_days 的资金
    # 实际操作中,会存在一定的买入误差,所以在前hold_days天,等量使用资金;之后,尽量使用剩余资金(这里设置最多用等量的1.5倍)
    is_staging = context.trading_day_index < context.options['hold_days'] # 是否在建仓期间(前 hold_days 天)
    cash_avg = context.portfolio.portfolio_value / context.options['hold_days']
    cash_for_buy = min(context.portfolio.cash, (1 if is_staging else 1.5) * cash_avg)
    positions = {e.symbol: p.amount * p.last_sale_price
                 for e, p in context.portfolio.positions.items()}
   
    #----------------------------START:持有固定交易日天数卖出---------------------------
    today = data.current_dt.strftime('%Y-%m-%d')
    # 不是建仓期(在前hold_days属于建仓期)
    if not is_staging:
        equities = {e.symbol: p for e, p in context.portfolio.positions.items() if p.amount>0}
        for instrument in equities:
            sid = equities[instrument].sid  # 交易标的
            # 今天和上次交易的时间相隔hold_days就全部卖出
            dt = pd.to_datetime(D.trading_days(end_date = today).iloc[-context.options['hold_days']].values[0])
            if  pd.to_datetime(equities[instrument].last_sale_date.strftime('%Y-%m-%d')) <= dt and data.can_trade(context.symbol(instrument)):
                context.order_target_percent(sid, 0)
                cash_for_buy += positions[instrument]
    #--------------------------------END:持有固定天数卖出--------------------------- 
          

    # 3. 生成买入订单:按StockRanker预测的排序,买入前面的stock_count只股票
    buy_cash_weights = context.stock_weights
    buy_instruments = list(ranker_prediction.instrument[:len(buy_cash_weights)])
    max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
    for i, instrument in enumerate(buy_instruments):
        cash = cash_for_buy * buy_cash_weights[i]
        if cash > max_cash_per_instrument - positions.get(instrument, 0):
            # 确保股票持仓量不会超过每次股票最大的占用资金量
            cash = max_cash_per_instrument - positions.get(instrument, 0)
        if cash > 0:
            context.order_value(context.symbol(instrument), cash)

五、固定周期轮仓

修改主函数
# 回测引擎:每日数据处理函数,每天执行一次
def bigquant_run(context, data):
    
    # 相隔几天(hold_days)进行一下换仓
    if context.trading_day_index % context.options['hold_days'] != 0:
        return 
    
    # 按日期过滤得到今日的预测数据
    ranker_prediction = context.ranker_prediction[
        context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]
    # 目前持仓
    positions = {e.symbol: p.amount * p.last_sale_price
                 for e, p in context.portfolio.positions.items()}
    # 权重
    buy_cash_weights = context.stock_weights
    # 今日买入股票列表
    stock_to_buy = list(ranker_prediction.instrument[:len(buy_cash_weights)])
    # 持仓上限
    max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument

    # 通过positions对象,使用列表生成式的方法获取目前持仓的股票列表
    stock_hold_now = [equity.symbol for equity in context.portfolio.positions ]
    # 继续持有的股票:调仓时,如果买入的股票已经存在于目前的持仓里,那么应继续持有
    no_need_to_sell = [i for i in stock_hold_now if i in stock_to_buy]
    # 需要卖出的股票
    stock_to_sell = [i for i in stock_hold_now if i not in no_need_to_sell]
  
    # 卖出
    for stock in stock_to_sell:
        # 如果该股票停牌,则没法成交。因此需要用can_trade方法检查下该股票的状态
        # 如果返回真值,则可以正常下单,否则会出错
        # 因为stock是字符串格式,我们用symbol方法将其转化成平台可以接受的形式:Equity格式
        if data.can_trade(context.symbol(stock)):
            # order_target_percent是平台的一个下单接口,表明下单使得该股票的权重为0,
            #   即卖出全部股票,可参考回测文档
            context.order_target_percent(context.symbol(stock), 0)
    
    # 如果当天没有买入的股票,就返回
    if len(stock_to_buy) == 0:
        return
    
    # 买入
    for i, instrument in enumerate(stock_to_buy):
        cash = context.portfolio.portfolio_value * buy_cash_weights[i]
        if cash > max_cash_per_instrument - positions.get(instrument, 0):
            # 确保股票持仓量不会超过每次股票最大的占用资金量
            cash = max_cash_per_instrument - positions.get(instrument, 0)
        if cash > 0:
            context.order_value(context.symbol(instrument), cash)

六、止盈止损处理

该部分涉及内容较多,单独作为专题讨论,请点击链接查看具体功能的实现。

七、大盘风控

修改数据准备函数
# 回测引擎:准备数据,只执行一次
def bigquant_run(context):
    #在数据准备函数中一次性计算每日的大盘风控条件相比于在handle中每日计算风控条件可以提高回测速度
    # 多取50天的数据便于计算均值(保证回测的第一天均值不为Nan值),其中context.start_date和context.end_date是回测指定的起始时间和终止时间
    start_date= (pd.to_datetime(context.start_date) - datetime.timedelta(days=50)).strftime('%Y-%m-%d') 
    df=DataSource('bar1d_index_CN_STOCK_A').read(start_date=start_date,end_date=context.end_date,fields=['close'])

    #这里以上证指数000001.HIX为例
    benckmark_data=df[df.instrument=='000001.HIX']
    #计算上证指数5日涨幅
    benckmark_data['ret5']=benckmark_data['close']/benckmark_data['close'].shift(5)-1
    #计算大盘风控条件,如果5日涨幅小于-4%则设置风险状态risk为1,否则为0
    benckmark_data['risk'] = np.where(benckmark_data['ret5']<-0.04,1,0)
    #修改日期格式为字符串(便于在handle中使用字符串日期索引来查看每日的风险状态)
    benckmark_data['date']=benckmark_data['date'].apply(lambda x:x.strftime('%Y-%m-%d'))
    #设置日期为索引
    benckmark_data.set_index('date',inplace=True)
    #把风控序列输出给全局变量context.benckmark_risk
    context.benckmark_risk=benckmark_data['risk']
修改主函数
# 回测引擎:每日数据处理函数,每天执行一次
def bigquant_run(context, data):
    #获取当日日期
    today = data.current_dt.strftime('%Y-%m-%d')
    stock_hold_now = [equity.symbol for equity in context.portfolio.positions ]
    #大盘风控模块,读取风控数据    
    benckmark_risk=context.benckmark_risk[today]
    context.symbol
    #当risk为1时,市场有风险,全部平仓,不再执行其它操作
    if benckmark_risk > 0:
        for instrument in stock_hold_now:
            context.order_target(symbol(instrument), 0)
        print(today,'大盘风控止损触发,全仓卖出')
        return
 
    # 按日期过滤得到今日的预测数据
    ranker_prediction = context.ranker_prediction[
        context.ranker_prediction.date == data.current_dt.strftime('%Y-%m-%d')]

    # 1. 资金分配
    # 平均持仓时间是hold_days,每日都将买入股票,每日预期使用 1/hold_days 的资金
    # 实际操作中,会存在一定的买入误差,所以在前hold_days天,等量使用资金;之后,尽量使用剩余资金(这里设置最多用等量的1.5倍)
    is_staging = context.trading_day_index < context.options['hold_days'] # 是否在建仓期间(前 hold_days 天)
    cash_avg = context.portfolio.portfolio_value / context.options['hold_days']
    cash_for_buy = min(context.portfolio.cash, (1 if is_staging else 1.5) * cash_avg)
    cash_for_sell = cash_avg - (context.portfolio.cash - cash_for_buy)
    positions = {e.symbol: p.amount * p.last_sale_price
                 for e, p in context.portfolio.positions.items()}

    # 2. 生成卖出订单:hold_days天之后才开始卖出;对持仓的股票,按机器学习算法预测的排序末位淘汰
    if not is_staging and cash_for_sell > 0:
        equities = {e.symbol: e for e, p in context.portfolio.positions.items()}
        instruments = list(reversed(list(ranker_prediction.instrument[ranker_prediction.instrument.apply(
                lambda x: x in equities)])))
        for instrument in instruments:
            context.order_target(context.symbol(instrument), 0)
            cash_for_sell -= positions[instrument]
            if cash_for_sell <= 0:
                break

    # 3. 生成买入订单:按机器学习算法预测的排序,买入前面的stock_count只股票
    buy_cash_weights = context.stock_weights
    buy_instruments = list(ranker_prediction.instrument[:len(buy_cash_weights)])
    max_cash_per_instrument = context.portfolio.portfolio_value * context.max_cash_per_instrument
    for i, instrument in enumerate(buy_instruments):
        cash = cash_for_buy * buy_cash_weights[i]
        if cash > max_cash_per_instrument - positions.get(instrument, 0):
            # 确保股票持仓量不会超过每次股票最大的占用资金量
            cash = max_cash_per_instrument - positions.get(instrument, 0)
        if cash > 0:
            context.order_value(context.symbol(instrument), cash)

八、整百下单

修改主函数
  1. 平台策略默认为后复权价格,将价格修改为真实价格;

  2. 找到主函数中的买入下单代码,修改如下

    for i, instrument in enumerate(buy_instruments):
        cash = cash_for_buy * buy_cash_weights[i]
        if cash > max_cash_per_instrument - positions.get(instrument, 0):
            # 确保股票持仓量不会超过每次股票最大的占用资金量
            cash = max_cash_per_instrument - positions.get(instrument, 0)
        if cash > 0:
            price = data.current(context.symbol(instrument), 'price')  # 最新价格
            stock_num = np.floor(cash/price/100)*100  # 向下取整
            context.order(context.symbol(instrument), stock_num) # 整百下单

小结: 本文总结了实际策略开发过程中的几种需求,可以直接复制粘贴相应的功能代码替换策略中的各函数代码模块即可快速实现对应的功能。希望大家对回测机制有更深入的理解,能够快速灵活开发策略,验证策略思想。


   本文由BigQuant宽客学院推出,版权归BigQuant所有,转载请注明出处。


【宽客学院】AI模板策略交易逻辑解读
麻烦谁给我好好讲讲context和data,新手真的懵了
[量化学堂-策略开发]金叉死叉策略
麻烦谁给我好好讲讲context和data,新手真的懵了
如何在AI策略中加入大盘指标因素,比如,大盘走坏时,不开仓
在回测过程中,如何动态更改仓位上下限
AI量化策略开发第六步:回测
BigQuant平台常见问题汇总(持续更新)
[量化学堂-新手专区]BigQuant回测机制
(youke) #2

这个非常好,学习了。。
不过还是有一个问题,还是没太明白。默认是开盘买入,收盘卖出之前的持仓股,我想改成收盘买入,第二天开盘卖出,要如何修改?


(iQuant) #3

这里修改。


(oversky2003) #4

这个持有天数是自然日,应该换成交易日才对,怎么改?


(iQuant) #5

已经增加了固定交易日天数卖出,查看教程


(达达) #6

多笔买入时,只会记录最后一笔日期,所以最好增加逻辑,持仓中的股票不要再买入了。否则之前买入的股票的建仓时间会被后来买入的覆盖掉。


(oversky2003) #7

last_sale_date是指股票最后发生交易的时间,不仅是指最后卖出的时间,对吗?