复制链接
克隆策略
In [1]:
# Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端
def m8_run_bigquant_run(input_1, input_index):
    # 示例代码如下。在这里编写您的代码
    start_date=input_1.read_pickle()['start_date']
    end_date=input_1.read_pickle()['end_date']
    df = DataSource('bar1d_index_CN_STOCK_A').read(instruments=[input_index],start_date=start_date,end_date=end_date,fields=['close'])
    data_1 = DataSource.write_df(df)
    return Outputs(data_1=data_1, data_2=None, data_3=None)

# 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。
def m8_post_run_bigquant_run(outputs):
    return outputs

# Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端
def m22_run_bigquant_run(input_1, input_2, input_3):
    # 示例代码如下。在这里编写您的代码
    df = input_1.read_df()
    # 缺失值处理
    # if len(df)!=0:
    #     df.dropna(inplace=True)
        
    # 选股条件
    if len(df)!=0:
        df_filter1 = df[df['cond1']>0]
    else:
        df_filter1 = df
    
    # 指标排序
    if len(df_filter1)!=0:
        df_filter2 = df_filter1.groupby('date').apply(lambda x:x.sort_values(by=['cond2'],ascending=True))
    else:
        df_filter2 = df_filter1
    
    #输出条件过滤股票池
    data_1 = DataSource.write_df(df_filter2)

    
    # 进场条件
    if len(df)!=0:
        df_buy = df[df['cond3']>0]
    else:
        df_buy = df
    # 输出满足进场条件的股票池
    data_2 = DataSource.write_df(df_buy)

    
    # 出场条件
    if len(df)!=0:
        df_sell = df[df['cond4']>0]
    else:
        df_sell = df
    # 输出满足出场条件的股票池
    data_3 = DataSource.write_df(df_sell)    
    
    return Outputs(data_1=data_1, data_2=data_2, data_3=data_3)

# 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。
def m22_post_run_bigquant_run(outputs):
    return outputs

# Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端
def m17_run_bigquant_run(input_1, input_2, input_3):
    df1 = input_1.read_df()
    df2 = input_2.read_df()
    df3 = input_3.read_df()

    if len(df1.index.names) == 2:
        df1.index.names = [None, None]
    else:
        df1.index.names = [None]
    
    df = {'df1':df1,'df2':df2,'df3':df3}
    ds = DataSource.write_pickle(df)
    return Outputs(data_1=ds)

# 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。
def m17_post_run_bigquant_run(outputs):
    return outputs


def prepare_index_data(context):
    """准备指数数据"""
    if context.market_risk_conf != []:
        if len(context.market_risk_conf) == 1:
            index_code =  context.market_risk_conf[0]['params']['index_code']
            start_date = '2005-01-01'
            end_date = context.end_date
            index_data = DataSource('bar1d_index_CN_STOCK_A').read(instruments=[index_code], start_date=start_date, end_date=end_date).set_index('date')
            
            if  context.market_risk_conf[0]['method'] == 'market_ma_stoploss':
                ma_periods = int(context.market_risk_conf[0]['params']['ma_periods'])
                index_data['ma_%s'%ma_periods] = index_data['close'].rolling(ma_periods).mean()
                index_data['signal'] = np.where(index_data['close'] > index_data['ma_%s'%ma_periods], 'long', 'short')

            elif context.market_risk_conf[0]['method'] == 'market_fallrange_stoploss':
                days = context.market_risk_conf[0]['params']['days']
                fallrange = context.market_risk_conf[0]['params']['fallrange']
                index_data['signal'] = np.where(index_data['close']/index_data['close'].shift(days)-1 <= fallrange, 'long', 'short')
            context.index_signal_data = index_data 
            
        if  len(context.market_risk_conf) == 2:
            start_date = '2005-01-01'
            end_date = context.end_date 
            if context.market_risk_conf[0]['method'] == 'market_ma_stoploss':              
                index_code_1 = context.market_risk_conf[0]['params']['index_code']
                index_data_1 = DataSource('bar1d_index_CN_STOCK_A').read(instruments=[index_code_1], start_date=start_date, end_date=end_date).set_index('date')
                ma_periods = int(context.market_risk_conf[0]['params']['ma_periods'])
                
                index_code_2 =  context.market_risk_conf[1]['params']['index_code']
                index_data_2 = DataSource('bar1d_index_CN_STOCK_A').read(instruments=[index_code_2], start_date=start_date, end_date=end_date).set_index('date')
                days = context.market_risk_conf[1]['params']['days']
                fallrange = context.market_risk_conf[1]['params']['fallrange']
            else:
                index_code_1 = context.market_risk_conf[1]['params']['index_code']
                index_data_1 = DataSource('bar1d_index_CN_STOCK_A').read(instruments=[index_code_1], start_date=start_date, end_date=end_date).set_index('date')
                ma_periods = int(context.market_risk_conf[1]['params']['ma_periods'])
                
                index_code_2 =  context.market_risk_conf[0]['params']['index_code']
                index_data_2 = DataSource('bar1d_index_CN_STOCK_A').read(instruments=[index_code_2], start_date=start_date, end_date=end_date).set_index('date')
                days = context.market_risk_conf[0]['params']['days']
                fallrange = context.market_risk_conf[0]['params']['fallrange'] 
                
            index_data_1['ma_%s'%ma_periods] = index_data_1['close'].rolling(ma_periods).mean()
            index_data_1['signal_1'] = np.where(index_data_1['close'] > index_data_1['ma_%s'%ma_periods], 1, 0)
            signal_1 = index_data_1[['signal_1']].reset_index()                
            index_data_2['signal_2'] = np.where(index_data_2['close']/index_data_2['close'].shift(days)-1 <= fallrange, 1, 0)
            signal_2 = index_data_2[['signal_2']].reset_index()
            signal = pd.merge(signal_1,signal_2).set_index('date')
            signal['signal_sum'] = signal['signal_1'] + signal['signal_2']
            signal['signal'] = np.where(signal['signal_sum']>0,'long','short')     
            context.index_signal_data = signal
    else:
        context.index_signal_data = None  

def m4_initialize_bigquant_run(context):
    context.set_commission(PerOrder(buy_cost=0.003, sell_cost=0.004, min_cost=5))
    context.selected_stock = []
    context.trade_mode = '轮动'

    if context.trade_mode == '轮动':
        context.buy_frequency = 1
        context.sell_frequency = 1
        context.rebalance_periods = 1 # 调仓周期
        context.max_stock_count = 5 # 最大持仓股票数量
        context.order_weight_method = 'equal_weight' # 买入方式
        context.is_sell_willbuy_stock = False  # 卖出欲买进股票 
    else:
        # 买入条件参数
        context.stock_select_frequency = 1 # 选股频率
        context.order_weight_method = 'equal_weight' # 买入方式
        context.buy_frequency = 2 # 买入频率
        context.can_duplication_buy = False  # 是否可重复买入
        context.max_stock_count = 5 # 最大持仓股票数量
        context.max_stock_weight = 1 # 个股最大持仓比重

        # 卖出条件参数
        context.sell_frequency = 10 # 卖出频率
        context.is_sell_willbuy_stock = False  # 卖出欲买进股票 

    # 风控参数 
    context.stock_risk_conf = [{'method':'stock_percent_stopwin', 'params':{'percent': 0.2}}, {'method':'stock_percent_stoploss', 'params':{'percent': 0.1}}] # 支持多选  无:[]
    context.strategy_risk_conf = []  # 支持多选 无:[]
    context.market_risk_conf = [] # 支持多选, 无: []
    
    prepare_index_data(context)
    slippage_type = 'price'
    from zipline.finance.slippage import SlippageModel
    class FixedPriceSlippage(SlippageModel):
        # 指定初始化函数
        def __init__(self, spreads, price_field_buy, price_field_sell):
            # 存储spread的字典,用股票代码作为key
            self.spreads = spreads
            self._price_field_buy = price_field_buy
            self._price_field_sell = price_field_sell
        def process_order(self, data, order, bar_volume=0, trigger_check_price=0):
            if order.limit is None:
                price_field = self._price_field_buy if order.amount > 0 else self._price_field_sell
                price_base = data.current(order.asset, price_field)
                if slippage_type == 'price':
                    price = price_base + (self.spreads / 2) if order.amount > 0 else price_base - (self.spreads / 2)
                else:
                    price = price_base * (1.0 + self.spreads / 2) if order.amount > 0 else price_base * (1.0 - self.spreads / 2)
            else:
                price = order.limit
                # 返回希望成交的价格和数量
            return (price, order.amount)
    # 设置price_field
    fix_slippage = FixedPriceSlippage(price_field_buy='open', price_field_sell='open', spreads=0.02)
    context.set_slippage(us_equities=fix_slippage)
#--------------------------------------------------------------------
# 卖出条件
#--------------------------------------------------------------------     
def sell_action(context, data):
    date = data.current_dt.strftime('%Y-%m-%d')
    hit_stop_stock = context.stock_hit_stop 
    
    try:
        today_enter_stock = context.enter_daily_df.loc[date] 
    except KeyError as e:
        today_enter_stock = []
    try:
        today_exit_stock = context.exit_daily_df.loc[date] 
    except KeyError as e:
        today_exit_stock = []
        
    target_stock_to_buy = [i for i in  context.selected_stock  if i in today_enter_stock ]   
    stock_hold_now = [equity.symbol for equity in context.portfolio.positions] # 当前持仓股票
    
    if  context.trading_day_index % context.sell_frequency == 0:
        stock_to_sell = [i for i in stock_hold_now if i in today_exit_stock] # 要卖出的股票
        stock_buy_and_sell = [i for i in stock_to_sell if i in target_stock_to_buy]
        if context.is_sell_willbuy_stock == False: # 要买入的股票不卖出,但该票也不再买入
            stock_to_sell.extend(hit_stop_stock) # 将触发个股风控的股票融入到卖出票池
            stock_to_sell = [i for i in stock_to_sell if i not in stock_buy_and_sell] # 进行更新而已
        elif context.is_sell_willbuy_stock == True: # 要买入的股票依然要卖出,该票不再买入
            stock_to_sell.extend(hit_stop_stock)
        
        # 买入时需要过滤的股票
        context.cannot_buy_stock = stock_buy_and_sell
            
        for stock in stock_to_sell:
            if data.can_trade(context.symbol(stock)):
                context.order_target_percent(context.symbol(stock), 0)
                s = context.symbol(stock)
                del context.portfolio.positions[s]
                

#--------------------------------------------------------------------
# 买入条件
#--------------------------------------------------------------------     
def buy_action(context, data):
    date = data.current_dt.strftime('%Y-%m-%d')
    
    try:
        today_enter_stock = context.enter_daily_df.loc[date] 
    except KeyError as e:
        today_enter_stock = []
    try:
        today_exit_stock = context.exit_daily_df.loc[date] 
    except KeyError as e:
        today_exit_stock = []
    
    target_stock_to_buy = [i for i in  context.selected_stock if i in today_enter_stock]  
    target_stock_to_buy = [s for s in target_stock_to_buy if s not in context.cannot_buy_stock] # 进行更新,不能买入的股票要过滤
    
    stock_hold_now = [equity.symbol for equity in context.portfolio.positions] # 当前持仓股票
    
    # 确定股票权重
    if context.order_weight_method == 'equal_weight':
        equal_weight =  1 / context.max_stock_count
        
    portfolio_value = context.portfolio.portfolio_value
    position_current_value = {pos.sid: pos.amount* pos.last_sale_price for i,pos in context.portfolio.positions.items()}
    
    # 买入
    if  context.trading_day_index % context.buy_frequency == 0:
        if len(stock_hold_now) >= context.max_stock_count:
            return 
        
        today_buy_count = 0
        if context.trade_mode == '轮动':
            for s in target_stock_to_buy:
                if today_buy_count + len(stock_hold_now) >= context.max_stock_count: # 超出最大持仓数量
                    break
                if data.can_trade(context.symbol(s)):
                    order_target_percent(context.symbol(s), equal_weight)
                    today_buy_count += 1
        else:
            if context.can_duplication_buy == True: # 可以重复买入,多一份买入
                for s in target_stock_to_buy:
                    if today_buy_count + len(stock_hold_now) >= context.max_stock_count: # 超出最大持仓数量
                        break
                        
                    if data.can_trade(context.symbol(s)):
                        if context.symbol(s) in position_current_weight:
                            curr_value = position_current_value.get(context.symbol(s)) 
                            order_value(context.symbol(s), min(context.max_stock_weight * portfolio_value - curr_value, equal_weight*portfolio_value))
                        else:
                            order_value(context.symbol(s), equal_weight*portfolio_value)
                        today_buy_count += 1

            elif context.can_duplication_buy == False: # 不可以重复买入,不买
                for s in target_stock_to_buy:
                    if today_buy_count + len(stock_hold_now) >= context.max_stock_count: # 超出最大持仓数量
                        break
                    if s in stock_hold_now:
                        continue
                    else:
                        if data.can_trade(context.symbol(s)):
                            order_target_percent(context.symbol(s), equal_weight)
                            today_buy_count += 1

                        
#--------------------------------------------------------------------
# 风控体系
#--------------------------------------------------------------------                         
def market_risk_manage(context, data):
    """大盘风控"""
    date = data.current_dt.strftime('%Y-%m-%d')
    if type(context.index_signal_data) == pd.DataFrame:
        current_signal = context.index_signal_data.loc[date]['signal']
        if current_signal == 'short': 
            stock_hold_now = [equity.symbol for equity in context.portfolio.positions]  
            # 平掉所有股票
            for stock in stock_hold_now:
                if data.can_trade(context.symbol(stock)):
                    context.order_target_percent(context.symbol(stock), 0) 
            print('大盘出现止损信号, 平掉全部仓位,并关闭交易!')
            context.market_risk_signal = 'short'
    else:
        context.market_risk_signal = 'long'

        
        
def strategy_risk_manage(context, data):
    """策略风控"""
    if context.strategy_risk_conf == []: # 没有设置策略风控
        context.strategy_risk_signal = 'long'
    
    else:
        for rm in context.strategy_risk_conf:
            if rm['method'] == 'strategy_percent_stopwin':
                pct = rm['params']['percent']
                portfolio_value = context.portfolio.portfolio_value 
                if  portfolio_value / context.capital_base - 1 > pct: 
                    stock_hold_now = [equity.symbol for equity in context.portfolio.positions]  
                    # 平掉所有股票
                    for stock in stock_hold_now:
                        if data.can_trade(context.symbol(stock)):
                            context.order_target_percent(context.symbol(stock), 0) 
                    print('策略出现止盈信号, 平掉全部仓位,并关闭交易!')
                    context.strategy_risk_signal = 'short'  
                
            
            if rm['method'] == 'strategy_percent_stoploss':
                pct = rm['params']['percent']
                portfolio_value = context.portfolio.portfolio_value 
                if  portfolio_value / context.capital_base -1 < pct:
                    stock_hold_now = [equity.symbol for equity in context.portfolio.positions]  
                    # 平掉所有股票
                    for stock in stock_hold_now:
                        if data.can_trade(context.symbol(stock)):
                            context.order_target_percent(context.symbol(stock), 0) 
                    print('策略出现止损信号, 平掉全部仓位,并关闭交易!')
                    context.strategy_risk_signal = 'short'

        
def stock_risk_manage(context, data):
    """个股风控"""
    position_current_pnl = {pos.sid: (pos.last_sale_price-pos.cost_basis)/pos.cost_basis for i,pos in context.portfolio.positions.items()}
    
    for rm in context.stock_risk_conf:
        params_pct = rm['params']['percent']
        if rm['method'] == 'stock_percent_stopwin':
            for sid,pnl_pct in position_current_pnl.items():  
                if pnl_pct > params_pct:
                    context.stock_hit_stop.append(sid.symbol)
          
        if rm['method'] == 'stock_percent_stoploss':
            for sid,pnl_pct in position_current_pnl.items():
                if pnl_pct < params_pct:
                    context.stock_hit_stop.append(sid.symbol)




# 回测引擎:每日数据处理函数,每天执行一次
def m4_handle_data_bigquant_run(context, data):
    """每日运行策略逻辑"""
    market_risk_manage(context, data)
    strategy_risk_manage(context, data)
    
    if context.market_risk_signal == 'short': return
    if context.strategy_risk_signal == 'short': return

    stock_risk_manage(context, data)
    if context.trading_day_index % context.rebalance_periods == 0:
        sell_action(context, data)
        buy_action(context, data)

# 回测引擎:准备数据,只执行一次
def m4_prepare_bigquant_run(context):
      
    load_data = context.options['data'].read_pickle()
    context.signal_daily_stock = load_data['df1'].groupby('date').apply(lambda x:list(x.instrument))
    context.enter_daily_df = load_data['df2'].groupby('date').apply(lambda x:list(x.instrument))
    context.exit_daily_df = load_data['df3'].groupby('date').apply(lambda x:list(x.instrument))

# 回测引擎:每个单位时间开始前调用一次,即每日开盘前调用一次。
def m4_before_trading_start_bigquant_run(context, data):
    
    """每日盘前更新股票池"""
    frequency = context.rebalance_periods if context.trade_mode == '轮动' else context.stock_select_frequency
    if context.trading_day_index % frequency == 0:
        date = data.current_dt.strftime('%Y-%m-%d')
        try:
            context.selected_stock = context.signal_daily_stock[date] 
        except KeyError as e:
            context.selected_stock = []
    
    """初始化风控参数"""
    context.strategy_risk_signal = 'long'
    context.market_risk_signal = 'long' 
    context.stock_hit_stop = []
    context.cannot_buy_stock = []


m1 = M.instruments.v2(
    start_date='2022-01-01',
    end_date='2022-05-27',
    market='CN_STOCK_A',
    instrument_list=''
)

m8 = M.cached.v3(
    input_1=m1.data,
    run=m8_run_bigquant_run,
    post_run=m8_post_run_bigquant_run,
    input_ports='input_1',
    params='{\'input_index\':\'000300.HIX\'}',
    output_ports='data_1'
)

m3 = M.input_features.v1(
    features="""
in_csi300_0
in_csi500_0
in_sse50_0
industry_sw_level1_0
st_status_0

close_0
ta_sma_5_0
ta_sma_10_0
ta_sma_60_0

# 选股条件
cond1=((ta_sma_5_0 - ta_sma_10_0) / ta_sma_10_0 < 0.02) & ((ta_sma_5_0 - ta_sma_10_0) / ta_sma_10_0 > 0)

# 排序选股
cond2=pe_ttm_0

# 进场条件
cond3=(close_0 > close_1)
              
# 卖出条件
cond4=close_0 < ta_sma_10_0
"""
)

m15 = M.general_feature_extractor.v7(
    instruments=m1.data,
    features=m3.data,
    start_date='',
    end_date='',
    before_start_days=300
)

m16 = M.derived_feature_extractor.v3(
    input_data=m15.data,
    features=m3.data,
    date_col='date',
    instrument_col='instrument',
    drop_na=False,
    remove_extra_columns=False
)

m10 = M.input_features.v1(
    features="""concept
"""
)

m5 = M.use_datasource.v1(
    instruments=m1.data,
    features=m10.data,
    datasource_id='industry_CN_STOCK_A',
    start_date='',
    end_date=''
)

m7 = M.join.v3(
    data1=m5.data,
    data2=m16.data,
    on='date,instrument',
    how='inner',
    sort=False
)

m6 = M.input_features.v1(
    features='suspended'
)

m19 = M.use_datasource.v1(
    instruments=m1.data,
    features=m6.data,
    datasource_id='stock_status_CN_STOCK_A',
    start_date='',
    end_date=''
)

m20 = M.join.v3(
    data1=m7.data,
    data2=m19.data,
    on='date,instrument',
    how='inner',
    sort=False
)

m2 = M.stockpool_select.v6(
    input_1=m20.data,
    self_instruments=[],
    input_concepts=[],
    input_industrys=[360000,710000,220000,460000,370000,330000,340000,720000,240000,630000,280000,420000,510000,640000,610000,620000,650000,230000,410000,350000,490000,110000,210000,730000,450000,270000,430000,480000],
    input_indexs=['全A股'],
    input_st='过滤',
    input_suspend='过滤'
)

m22 = M.cached.v3(
    input_1=m2.data,
    run=m22_run_bigquant_run,
    post_run=m22_post_run_bigquant_run,
    input_ports='',
    params='{}',
    output_ports=''
)

m17 = M.cached.v3(
    input_1=m22.data_1,
    input_2=m22.data_2,
    input_3=m22.data_3,
    run=m17_run_bigquant_run,
    post_run=m17_post_run_bigquant_run,
    input_ports='',
    params='{}',
    output_ports=''
)

m4 = M.trade.v4(
    instruments=m1.data,
    options_data=m17.data_1,
    benchmark_ds=m8.data_1,
    start_date='',
    end_date='',
    initialize=m4_initialize_bigquant_run,
    handle_data=m4_handle_data_bigquant_run,
    prepare=m4_prepare_bigquant_run,
    before_trading_start=m4_before_trading_start_bigquant_run,
    volume_limit=0.025,
    order_price_field_buy='open',
    order_price_field_sell='open',
    capital_base=1000000,
    auto_cancel_non_tradable_orders=True,
    data_frequency='daily',
    price_type='后复权',
    product_type='股票',
    plot_charts=True,
    backtest_only=False,
    benchmark='000300.HIX'
)
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-1-9fc0724e8fb3> in <module>
    537 )
    538 
--> 539 m4 = M.trade.v4(
    540     instruments=m1.data,
    541     options_data=m17.data_1,

<ipython-input-1-9fc0724e8fb3> in m4_handle_data_bigquant_run(context, data)
    376     stock_risk_manage(context, data)
    377     if context.trading_day_index % context.rebalance_periods == 0:
--> 378         sell_action(context, data)
    379         buy_action(context, data)
    380 

<ipython-input-1-9fc0724e8fb3> in sell_action(context, data)
    223                 context.order_target_percent(context.symbol(stock), 0)
    224                 s = context.symbol(stock)
--> 225                 del context.portfolio.positions[s]
    226 
    227 

KeyError: Equity(3263 [002248.SZA])