In [2]:
import pandas as pd
import numpy as np
import warnings
import empyrical
import dai
import bigcharts
import time 
warnings.filterwarnings('ignore')
from biglearning.api import tools as T
print('导入包完成!')

因子数据

In [3]:
params = {'group_num':10, 'factor_field':'fac_vol20', 'instruments':'全市场',
 'factor_direction': 1, 'benchmark':'沪深300', 'data_process':True} # instruments支持选项:沪深300、中证500、中证1000、全市场;benchmark支持的选项:沪深300、中证500、中证1000


sql = """
SELECT
    date, 
    instrument, 
    m_AVG(fac_volume, 20) AS fac_vol20
FROM
    tigerhf_fac_rcjlby
where date >= '2018-01-01'  and date <='2023-12-04'
ORDER BY
    date, instrument;
"""
factor_data = dai.query(sql).df()

# 因子数据处理
factor_data.dropna(subset=[params['factor_field']], inplace=True)
factor_data = factor_data[['instrument', 'date', params['factor_field']]]
In [6]:
factor_data['date']= pd.to_datetime(factor_data['date'])
factor_data['fac_vol20']=factor_data['fac_vol20'].astype(float)

因子分析框架

In [7]:
### 因子分析工具
class AlphaMiner(object):
    def __init__(self, params, factor_data):

        # params: 字典格式。 形如  {'group_num':10, 'factor_field':'hf_close_netinflow_rate_small_order_act', 'instruments':'中证500', 'factor_direction':-1, 'benchmark':'中证500','data_process':True}
        # group_num:分组数量 参数类型:int
        # factor_field:因子在表中所对应的字段名称 参数类型:str
        # instruments:标的池,支持选项:沪深300、中证500、中证1000、全市场 参数类型:str
        # factor_direction:因子方向,字符串格式,取值为1、-1。1表示因子方向为正,因子值越大越好,-1表示因子值为负,因子值越小越好。 参数类型:int
        # benchmark:基准对比指数,支持选项:沪深300、中证500、中证1000 参数类型:str
        # data_process:是否进行数据处理 参数类型:bool 

        # factor_data:pandas.DataFrame格式,形如
        #   instrument	date	    hf_fz_ykws
        # 0	000001.SZ	2017-01-03	1.564644
        # 1	000001.SZ	2017-01-04	1.521567
        # 2	000001.SZ	2017-01-05	1.519973
        # 3	000001.SZ	2017-01-06	1.553225
        # 4	000001.SZ	2017-01-09	1.367971
        # 其中, 
        #     instrument:str ,以股票代码+.sh(沪市) +.SZ(深市)
        #     date:datetime64 
        #     hf_fz_ykws:float64
        
        print('==================因子分析开始==================')
        self.t0 = time.time()
        self.params = params
        self.top_n_ins = 5 # 默认5只
        self.factor_data = factor_data.rename(columns={self.params['factor_field']:'factor'}) 
        self.factor_data['factor'] *= self.params['factor_direction']

        # 检查因子数据格式
        try:
            self.check_data_format(self.factor_data)
            t1 = time.time()
            print("耗时:{0}秒 数据格式检查通过".format(np.round(t1-self.t0)))
        except ValueError as e:
            print("数据格式检查失败:" + str(e))

        # 进行数据池过滤 
        self.stock_pool_filter() 
        t2 = time.time()
        print('耗时:{0}秒 股票池过滤完成'.format(np.round(t2-t1)))

        # 因子数据预处理
        if self.params['data_process'] == True:
            self.factor_data = self.factor_data_process('factor')
            t3 = time.time()
            print("耗时:{0}秒 数据预处理完成".format(np.round(t3-t2)))
        elif self.params['data_process'] == False:
            t3 = time.time()

        # 计算个股收益率
        self.start_date = self.factor_data.date.min().strftime('%Y-%m-%d')
        self.end_date =  self.factor_data.date.max().strftime('%Y-%m-%d')
        self.price_data =  self.get_daily_ret(self.start_date, self.end_date) # 日收益率数据
        t4 = time.time()
        print('耗时:{0}秒 个股日收益率计算完成'.format(np.round(t4-t3)))
        
        self.merge_data = pd.merge(self.factor_data.sort_values(['date', 'instrument']), \
                                   self.price_data.sort_values(['date', 'instrument']), on=['date','instrument'], how='left')
        self.group_data = self.get_group_data()  # 分组数据
        t5 = time.time()
        print('耗时:{0}秒 因子分组完成'.format(np.round(t5-t4)))
        self.bm_ret = self.get_bm_ret(self.params['benchmark'])
        t6 = time.time()
        print('耗时:{0}秒 基准日收益率计算完成'.format(np.round(t6-t5)))
    
        self.group_cumret = self.get_group_cumret()  # 分组累积收益率
        t7 = time.time()
        print('耗时:{0}秒 分组收益率计算完成'.format(np.round(t7-t6)))
        self.whole_perf = self.get_whole_perf()   # 整体绩效指标 
        t8 = time.time()
        print('耗时:{0}秒 整体绩效计算完成'.format(np.round(t8-t7)))
        
        self.yearly_perf =  self.get_yearly_perf() # 按年度绩效指标
        t9 = time.time()
        print('耗时:{0}秒 年度绩效计算完成'.format(np.round(t9-t8)))
        self.ic = self.get_IC_data('all')  # ic指标
        self.t10 = time.time()
        print('耗时:{0}秒 IC计算完成'.format(np.round(self.t10-t9)))

    def check_data_format(self, df):
        # 检查date列是否是日期型类型
        if df['date'].dtype != 'datetime64[ns]':
            raise ValueError("date列的数据格式应为datetime格式")
        # 检查instrument列是否是以SZ\SH结尾
        if not all(df['instrument'].str.endswith('.SH') | df['instrument'].str.endswith('.SZ') | df['instrument'].str.endswith('.BJ')):
            raise ValueError("instrument列的数据格式应为以.SH或.SZ或.BJ结尾的字符串")
        # 检查factor列是否是浮点型数值
        if df['factor'].dtype != 'float64':
            raise ValueError("factor列的数据格式应为浮点型")


    def stock_pool_filter(self):
        pools = self.params['instruments']
        if pools == "沪深300":
            index_code = '000300.SH'
        elif pools == "中证500":
            index_code = '000905.SH'
        elif pools == "中证1000":
            index_code = '000852.SH'
        elif pools  == "全市场":
            index_code = '全市场'
        else: 
            print('请检查输入的指数池是否正确')
        if index_code in ['000300.SH' , '000905.SH', '000852.SH']:
            index_com_df  = dai.query("select * from cn_stock_index_component where date >= '2015-01-01' and instrument == '%s' order by date, instrument "%index_code).df()
            factor_df = self.factor_data
            merge_df = pd.merge(factor_df, index_com_df, how='inner', left_on=['date','instrument'], right_on=['date', 'member_code'])[['instrument_x','date','factor']]
            merge_df.rename(columns={'instrument_x':'instrument'}, inplace=True)
        else:
            merge_df = self.factor_data

        # 因子数据更多的预处理,包括去除ST、新股、北交所的股票
        def factor_data_filter(factor_data):
            columns = factor_data.columns 
            start_date = factor_data.date.min().strftime('%Y-%m-%d')
            end_date = factor_data.date.max().strftime('%Y-%m-%d')
            factor_data['instrument'] = factor_data['instrument'].apply(lambda x:x[:9]) 
            base_info_df = dai.query("select date, instrument, st_status ,trading_days, amount from cn_stock_factors_base where date >= '%s' and date <= '%s'"%(start_date, end_date)).df()
            factor_data = pd.merge(factor_data, base_info_df, how='left', on=['date', 'instrument'])
            factor_data = factor_data[(factor_data['st_status'] == 0) & (factor_data['trading_days']> 252)]  # 去除st 和上市不足一年的票
            factor_data= factor_data[factor_data.instrument.apply(lambda x: True if x.endswith('SH') or x.endswith('SZ') else False)] # 去除北交所的票
            factor_data = factor_data[factor_data['amount'] > 0 ]            # 去除停牌期间的数据
            factor_data.replace([np.inf, -np.inf], np.nan, inplace=True)     # 将 inf 替换为 NaN
            # 删除包含 NaN 的行
            factor_data.dropna(inplace=True)
            return factor_data[columns]

        self.factor_data  =  factor_data_filter(merge_df)

    def factor_data_process(self, col):
        """因子数据预处理函数,包括去极值、标准化、中性化"""
        
        def zscore(df, train_col):
            from sklearn.preprocessing import StandardScaler
            scaler = StandardScaler()
            df[train_col] = scaler.fit_transform(df[train_col])
            return df
        def remove_extreme_and_cut_zscore(df, col):
            col_list = [col]
            for fac in col_list:
                n = 3
                mean = df[fac].mean()  # 计算因子值的均值
                std = df[fac].std() # 计算因子值的标准差
                lower_bound = mean - n * std  # 计算下边界
                upper_bound = mean + n * std  # 计算上边界
                df.loc[df[fac]<lower_bound, fac] = lower_bound
                df.loc[df[fac]>upper_bound, fac] = upper_bound
        
            df = zscore(df, col_list)
            return df

        factor_data = self.factor_data.groupby('date').apply(remove_extreme_and_cut_zscore, col=col)
        factor_data = factor_data.fillna(0) # 用0进行填充 

        start_date = factor_data.date.min().strftime("%Y-%m-%d")
        end_date = factor_data.date.max().strftime("%Y-%m-%d")

        sql = """
        SELECT date, instrument, industry_level1_code
        FROM cn_stock_industry_component 
        WHERE date >= '{0}' and date <= '{1}' and industry =='sw2021'
        ORDER BY instrument, date;
        """.format(start_date, end_date)

        df_industry = dai.query(sql).df()
        df_industry = df_industry.dropna()

        factor_data = factor_data.merge(df_industry, on=['date', 'instrument'])
        industry_list = df_industry['industry_level1_code'].unique()

        sql = """
        SELECT date, instrument, total_market_cap
        FROM cn_stock_valuation 
        WHERE date >= '{0}' and date <= '{1}' 
        ORDER BY instrument, date;
        """.format(start_date, end_date)

        df_market_cap = dai.query(sql).df()
        factor_data = factor_data.merge(df_market_cap, on=['date', 'instrument'])
        factor_data['log_cap'] = np.log(factor_data.total_market_cap)
            
        #截面中性化
        def neutralize(df, col):
            import warnings
            warnings.filterwarnings('ignore')
            import statsmodels.api as sm

            col_list = [col]
            for fac in col_list:
                ind_dummies = pd.get_dummies(df['industry_level1_code'], prefix='industry_level1_code')
                mkcap = df['log_cap']
                train = pd.concat([ind_dummies,mkcap],axis=1)
                X = sm.add_constant(train)
                y = df[fac]
                model = sm.OLS(y, X).fit()
                df[fac] = model.resid
            return df

        res =  factor_data.dropna().groupby('date').apply(neutralize, col=col)
        res.sort_values(by='date', inplace=True)
        res.reset_index(inplace=True, drop=True)
        return res

    def get_daily_ret(self, start_date, end_date):
        """计算收益率. T0的因子对应的收益率是T+1日开盘买入,T+2开盘卖出"""
        sql = f"SELECT instrument,date, (m_lead(open, 2)/ m_lead(open, 1) - 1) AS daily_ret from cn_stock_bar1d ORDER BY date, instrument;"
        
        from datetime import datetime, timedelta
        ten_days_ago_start_date = pd.Timestamp(self.start_date) - timedelta(days=10) # 往前多取10天数据
        ten_days_ago_start_date = ten_days_ago_start_date.strftime('%Y-%m-%d')

        price_data = dai.query(sql, filters={"date": [ten_days_ago_start_date, self.end_date]}).df()
        return price_data

    def get_group_data(self):
        """因子分组,因子值越大,组数越大,默认的多头组合是因子数值最大的组合"""
        def cut(df, group_num=10):
            """分组"""
            df = df.drop_duplicates('factor') # 删除重复值
            df['group'] = pd.qcut(df['factor'], q=group_num, labels=False, duplicates='drop')
            df = df.dropna(subset=['group'], how='any')
            df['group'] =  df['group'].apply(int).apply(str)
            return df

        group_data = self.merge_data.groupby('date', group_keys=False).apply(cut, group_num=self.params['group_num'])
        return group_data

    def get_bm_ret(self, benchmark):

        if benchmark == "沪深300":
            bm_code = '000300.SH'
        elif benchmark == "中证500":
            bm_code = '000905.SH'
        elif benchmark == "中证1000":
            bm_code = '000852.SH'
        else: 
            print('请检查输入的基准代码是否正确')

        # 获取基准日收益率数据
        bm_sql = """
        SELECT 
            date,instrument, (close - m_Lag(close,1))  / m_LAG(close, 1) as benchmark_ret
        FROM cn_stock_index_bar1d
        WHERE instrument = '%s'
        AND date >= '%s' and date <='%s' ;"""%(bm_code, self.start_date, self.end_date)

        bm_ret = dai.query(bm_sql).df()
        return bm_ret

    def get_group_cumret(self):
        # 分组收益率
        groupret_data = self.group_data[['date','group','daily_ret']].groupby(['date','group'], group_keys=False).apply(lambda x:np.nanmean(x)).reset_index()
        groupret_data.rename(columns={0:'g_ret'}, inplace=True)

        groupret_pivotdata = groupret_data.pivot(index='date', values='g_ret', columns='group')
        groupret_pivotdata['ls'] = groupret_pivotdata[str(self.params['group_num']-1)] - groupret_pivotdata['0']  # 日收益率

        bm_ret = self.bm_ret.set_index('date') # 基准收益率
        groupret_pivotdata['bm'] = bm_ret['benchmark_ret'] 
        groupret_pivotdata = groupret_pivotdata.shift(1) # 首日为nan,最后一日有值
        self.groupret_pivotdata = groupret_pivotdata
        
        groupcumret_pivotdata = groupret_pivotdata.cumsum() # 单利下的累积收益率
        return groupcumret_pivotdata.round(4) # 数值型数据都是保留到小数点后四位 

    def get_Performance(self, data_type):
        def get_stats(series, bm_series):
            """
            series是日收益率数据, pandas.series 
            data_type是组合类型, 'long'、'short'、'long_short'
            """
            return_ratio =  series.sum() # 总收益
            annual_return_ratio = series.sum() * 242 / len(series)  #  年度收益

            ex_return_ratio =  (series-bm_series).sum() # 超额总收益
            ex_annual_return_ratio =  (series-bm_series).sum() * 242 / len( (series-bm_series))  #  超额年度收益
            
            sharp_ratio = empyrical.sharpe_ratio(series, 0.035/242)
            return_volatility = empyrical.annual_volatility(series)
            max_drawdown  = empyrical.max_drawdown(series)
            information_ratio=series.mean()/series.std()
            win_percent = len(series[series>0]) / len(series)
            trading_days = len(series)

            series = series.fillna(0)
            ret_3 = series.rolling(3).sum().iloc[-1]
            ret_10 = series.rolling(10).sum().iloc[-1]
            ret_21 = series.rolling(21).sum().iloc[-1]
            ret_63 = series.rolling(63).sum().iloc[-1]
            ret_126 = series.rolling(126).sum().iloc[-1]
            ret_252 = series.rolling(252).sum().iloc[-1]

            return {
                    'return_ratio': return_ratio,
                    'annual_return_ratio': annual_return_ratio,
                    'ex_return_ratio': ex_return_ratio,
                    'ex_annual_return_ratio': ex_annual_return_ratio,
                    'sharp_ratio': sharp_ratio,
                    'return_volatility': return_volatility,
                    'information_ratio':information_ratio,
                    'max_drawdown': max_drawdown,
                    'win_percent':win_percent,
                    'trading_days':trading_days,
                    'ret_3':ret_3,
                    'ret_10':ret_10,
                    'ret_21':ret_21,
                    'ret_63':ret_63,
                    'ret_126':ret_126,
                    'ret_252':ret_252
                    }

        if data_type == 'long':
            perf = get_stats(self.groupret_pivotdata[str(self.params['group_num']-1)], self.groupret_pivotdata['bm'])
        elif data_type =='short':
            perf = get_stats(self.groupret_pivotdata['0'], self.groupret_pivotdata['bm'])
        elif data_type =='long_short':
            perf = get_stats(self.groupret_pivotdata['ls'], self.groupret_pivotdata['bm'])
        return perf

    def get_IC_data(self, data_type):
            # IC
            def cal_ic(df):
                return df['daily_ret'].corr(df['factor'], method='spearman')

            if data_type == 'all':
                groupIC_data = self.group_data[['date','daily_ret','factor']].groupby('date', group_keys=False).apply(lambda x:cal_ic(x)).reset_index()
                groupIC_data.rename(columns={0:'g_ic'}, inplace=True)
                groupIC_data = groupIC_data.shift(1) # 首日为nan,最后一日有值
                groupIC_data['ic_cumsum'] = groupIC_data['g_ic'].cumsum()
                groupIC_data['ic_roll_ma'] = groupIC_data['g_ic'].rolling(22).mean()
                return groupIC_data.round(4).dropna() 

            elif data_type == 'long':
                data = self.group_data[self.group_data['group'] == str(self.params['group_num']-1)][['date','daily_ret','factor']]  
                groupIC_data = data.groupby('date', group_keys=False).apply(lambda x:cal_ic(x)).reset_index()
            elif data_type == 'short':
                data = self.group_data[self.group_data['group'] == '0'][['date','daily_ret','factor']]  
                groupIC_data = data.groupby('date', group_keys=False).apply(lambda x:cal_ic(x)).reset_index()
            elif data_type == 'long_short':
                data = self.group_data[self.group_data['group'].isin(['0',str(self.params['group_num']-1)])][['date','daily_ret','factor']]  
                groupIC_data = data.groupby('date', group_keys=False).apply(lambda x:cal_ic(x)).reset_index()
        
            IC_data = groupIC_data.rename(columns={0:'g_ic'}).dropna()
            
            ic_mean = np.nanmean(IC_data['g_ic'])
            ir = np.nanmean(IC_data['g_ic']) / np.nanstd(IC_data['g_ic'])
            ic_3 = IC_data['g_ic'].tail(3).mean()
            ic_10 = IC_data['g_ic'].tail(10).mean()
            ic_21 = IC_data['g_ic'].tail(21).mean()
            ic_63 = IC_data['g_ic'].tail(63).mean()
            ic_126 = IC_data['g_ic'].tail(126).mean()
            ic_252 = IC_data['g_ic'].tail(252).mean()

            return {
                    'ic':ic_mean,
                    'ir':ir,
                    'ic_3':ic_3,
                    'ic_10':ic_10,
                    'ic_21':ic_21,
                    'ic_63':ic_63,
                    'ic_126':ic_126,
                    'ic_252':ic_252
                    }

    def get_Turnover_data(self, data_type):

        def cal_turnover(df):
            # 求每天instrument和上一日的重复元素数量
            def count_repeat(s):
                if s.name > 0:
                    return len(set(s['instrument']).intersection(set(df.loc[s.name - 1, 'instrument'])))
                else:
                    return 0

            s = df.groupby('date').apply(lambda x:x.instrument.tolist())
            df = pd.DataFrame(s, columns = ['instrument']).reset_index()
            # 求每天instrument有多少元素
            df['instrument_count'] = df['instrument'].apply(len)

            df['repeat_count'] = df.apply(count_repeat, axis=1)
            df['turnover'] = 1 - df['repeat_count'] / df['instrument_count']
            return np.nanmean(df['turnover'])

        if data_type == 'long':
            df = self.group_data[self.group_data['group'] == str(self.params['group_num']-1)]
            return {'turnover':cal_turnover(df)}
        elif data_type == 'short':
            df = self.group_data[self.group_data['group'] == '0']
            return {'turnover':cal_turnover(df)}
        elif data_type == 'long_short':
            long_df = self.group_data[self.group_data['group'] == str(self.params['group_num']-1)]
            short_df = self.group_data[self.group_data['group'] == '0']
            return {'turnover':cal_turnover(long_df) + cal_turnover(short_df)}

    ## 总体绩效计算
    def get_whole_perf(self):
        summary_df = pd.DataFrame() 
        for _type in ['long', 'short', 'long_short']:   
            dict_merged = {} 
            dict1 = self.get_IC_data(_type)
            dict2 = self.get_Performance(_type)
            dict3 = self.get_Turnover_data(_type)

            dict_merged.update(dict1)
            dict_merged.update(dict2)
            dict_merged.update(dict3)
            df = pd.DataFrame.from_dict(dict_merged, orient='index', columns=['value']).T
            df['portfolio'] = _type 
            summary_df = summary_df.append(df)
        
        summary_df.index = range(len(summary_df))
        return summary_df.round(4)

    # 按年绩效计算
    def get_yearly_perf(self):
        # 计算年度绩效指标
        year_df = self.groupret_pivotdata.reset_index('date')
        year_df['year'] = year_df['date'].apply(lambda x:x.year)
        
        def cal_Performance(data):
            series = data[str(self.params['group_num']-1)] # 只看多头组合
            bm_series = data['bm']

            return_ratio =  series.sum() # 总收益
            annual_return_ratio = series.sum() * 242 / len(series)  #  年度收益
            ex_return_ratio =  (series-bm_series).sum() # 总收益
            ex_annual_return_ratio = (series-bm_series).sum() * 242 / len(series-bm_series)  #  年度收益

            sharp_ratio = empyrical.sharpe_ratio(series,0.035/242)
            return_volatility = empyrical.annual_volatility(series)
            max_drawdown  = empyrical.max_drawdown(series)
            information_ratio=series.mean()/series.std()
            win_percent = len(series[series>0]) / len(series)
            trading_days = len(series)
            perf =  pd.DataFrame({
                    'return_ratio': [return_ratio],
                    'annual_return_ratio': [annual_return_ratio],
                    'ex_return_ratio': [ex_return_ratio],
                    'ex_annual_return_ratio': [ex_annual_return_ratio],
                    'sharp_ratio': [sharp_ratio],
                    'return_volatility': [return_volatility],
                    'max_drawdown': [max_drawdown],
                    'win_percent':[win_percent],
                    'trading_days':[int(trading_days)],
                    })
            return perf
        yearly_perf = year_df.groupby(['year'], group_keys=True).apply(cal_Performance) 
        yearly_perf = yearly_perf.droplevel(1).round(4)   # 去掉一个level

        # 计算年度IC
        data = self.group_data[self.group_data['group'] == str(self.params['group_num']-1)][['date','daily_ret','factor']] # 只看多头组合
        def cal_ic(df):
            return df['daily_ret'].corr(df['factor'])
        groupIC_data = data.groupby('date', group_keys=False).apply(lambda x:cal_ic(x)).reset_index()       
        IC_data = groupIC_data.rename(columns={0:'g_ic'}).dropna()
        IC_data['year'] = IC_data['date'].apply(lambda x:x.year)
        yearly_IC = IC_data.groupby('year').apply(lambda x:np.nanmean(x['g_ic']))

        yearly_perf['ic'] = yearly_IC.round(4)
        yearly_perf = yearly_perf.reset_index()
        yearly_perf['year'] = yearly_perf['year'].apply(str) 
        return yearly_perf

    def render(self):
        """图表展示因子分析结果"""
        from bigcharts import opts 

        fields = ['portfolio','ic', 'ir', 'turnover', 'return_ratio', 'annual_return_ratio','ex_return_ratio', 'ex_annual_return_ratio', 'sharp_ratio', 'return_volatility', 'information_ratio', 'max_drawdown', 'win_percent', 'ic_252', 'ret_252']
        whole_perf = self.whole_perf[fields] 
        c1 = bigcharts.Chart(
            data=whole_perf,
            type_="table",
            chart_options=dict(
                title_opts=opts.ComponentTitleOpts(title="整体绩效指标")
            ),
            y=list(whole_perf.columns))

        fields = ['year','ic', 'return_ratio', 'annual_return_ratio', 'ex_return_ratio', 'ex_annual_return_ratio', 'sharp_ratio', 'return_volatility',
        'max_drawdown', 'win_percent', 'trading_days']
        yearly_perf = self.yearly_perf[fields]
        c2 = bigcharts.Chart(
            data=yearly_perf,
            type_="table",
            chart_options=dict(
                title_opts=opts.ComponentTitleOpts(title="年度绩效指标(多头组合)")
            ),
            y=list(yearly_perf.columns))

        # 绘制累积收益图
        c3 = bigcharts.Chart(
            data=self.group_cumret,
            type_="line",
            x=self.group_cumret.index,
            y=self.group_cumret.columns)

        _IC = np.nanmean(alpha_instance.ic['g_ic'])
        _IR = np.nanmean(alpha_instance.ic['g_ic']) / np.nanstd(alpha_instance.ic['g_ic'])
        abs_IC = alpha_instance.ic['g_ic'].abs()
        significant_ic_ratio = abs_IC[abs_IC>=0.02].shape[0] / abs_IC.shape[0]
        c4 = bigcharts.Chart(
                    data=pd.DataFrame({'IC':[np.round(_IC,4)], '|IC|>0.02':[np.round(significant_ic_ratio,4)], 'IR':[np.round(_IR,4)]}),
                    type_="table",
                    chart_options=dict(
                        title_opts=opts.ComponentTitleOpts(title="IC分析指标")
                    ),
                    y=['IC','|IC|>0.02','IR'],
                )

        # 绘制每期IC时序图
        c5 = bigcharts.Chart(
            data=self.ic,
            type_="bar",
            x='date',
            y=['g_ic', 'ic_roll_ma'],
            chart_options=dict(
                title_opts=opts.TitleOpts(
                    title="IC曲线",
                    subtitle="每日IC、累计IC、近22日IC均值",
                    pos_left="center",
                    pos_top="top",
                ),
                legend_opts=opts.LegendOpts(
                    is_show=False,  # 不显示图例
                ),
                extend_yaxis=[opts.AxisOpts()]
                )
        )

        # 绘制IC累计曲线图 
        c6 = bigcharts.Chart(
            data=self.ic,
            type_="line",
            x='date',
            y=['ic_cumsum'],
            chart_options=dict(
                title_opts=opts.TitleOpts(
                    title="IC累积曲线",
                    pos_left="center",
                    pos_top="top",
                ),
                legend_opts=opts.LegendOpts(
                    is_show=False,  # 不显示图例
                )
                ),
            series_options={"ic_cumsum": {"yaxis_index": 1}}
        )
        c5_6 = bigcharts.Chart(data = [c5, c6], type_ = "overlap",)


        top_factor_df = self.factor_data[self.factor_data['date'] == self.end_date].round(4) # 最后一天因子数据
        top_factor_df['date'] = top_factor_df['date'].apply(lambda x:x.strftime('%Y-%m-%d'))
        # 按照 factor 列升序排序,获取最小的10行数据
        df_sorted_min = top_factor_df.sort_values('factor').head(self.top_n_ins)
        # 按照 factor 列降序排序,获取最大的10行数据
        df_sorted_max = top_factor_df.sort_values('factor', ascending=False).head(self.top_n_ins)

        c7 = bigcharts.Chart(
                    data=df_sorted_max,
                    type_="table",
                    chart_options=dict(
                        title_opts=opts.ComponentTitleOpts(title="因子值最大的%s只标的"%self.top_n_ins)
                    ),
                    y=['date','instrument','factor'],
                )
        c8 = bigcharts.Chart(
                    data=df_sorted_min[['date','instrument','factor']],
                    type_="table",
                    chart_options=dict(
                        title_opts=opts.ComponentTitleOpts(title="因子值最小的%s只标的"%self.top_n_ins)
                    ),
                    y=['date','instrument','factor'],
                )

        c_set = bigcharts.Chart([c1, c2, c3, c4, c5_6, c7, c8], type_="page").render(display=False)
        from IPython.display import display
        display(c_set)
        t11 = time.time() 
        print('耗时:{0}秒 可视化输出'.format(np.round(t11-self.t10)))
        print('=========因子分析结束,总耗时:{0}==========='.format(np.round(t11-self.t0)))
        return c_set.data

因子实例化

In [8]:
alpha_instance = AlphaMiner(params=params, factor_data=factor_data)
report_html = alpha_instance.render()

因子数据入库(可选)

保存因子数据,可选项

In [6]:
# df = alpha_instance.factor_data
# df[dai.DEFAULT_PARTITION_FIELD] = df["date"].apply(lambda x: f"{x.year}")


# ## write_bdb接口支持创建和更新
# dai.DataSource.write_bdb(
#     data=df,
#     # datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
#     id="datasource_id_001",
#     # 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
#     unique_together=["date", "instrument"],
#     indexes=["date"])

因子提交

In [7]:
# perf_df = alpha_instance.whole_perf  

# perf_dict = {}
# perf_dict['factor_direction'] = params['factor_direction']
# perf_dict['IC'] =  np.nanmean(alpha_instance.ic['g_ic'])

# for i in perf_df.index:
#     df = perf_df.iloc[i]
#     flag = df['portfolio'] 

#     for c in perf_df.columns:
#         perf_dict['%s_%s'%(flag, c)] = df[c]

# perf_dict.update(alpha_instance.params)

# for key, value in perf_dict.items():
#     if isinstance(value, (int, float)):
#         if np.isinf(value) or np.isnan(value):
#             perf_dict[key] = None
In [8]:
# from bigalpha import factors

# # 提交到公共空间
# factors.submit_factor(
# id = 'alpha_10000000',  #因子分析ID
# performance_index = perf_dict, # 因子绩效
# performance_report = report_html, # 因子详情页面
# metadata = {}, # 可以先不填
# docs = {"因子类型":"量价行情因子"},   # 暂时没有用到这个文档
# name = "120日复权均价", # 中文名 
# desc = "因子描述:120日移动平均价格, 因子类型:量价行情因子, 计算方式:最近120个交易日的复权价格的简单平均值, 因子方向:%s"%params['factor_direction'], 
# factor_sql = sql)
# print('因子入库完成!')
In [12]:
def backtest(start_date='2018-04-25', end_date='2023-10-27', sql=None, alpha_direction=None):
    import pandas as pd
    import numpy as np
    import warnings
    import dai
    import scipy.sparse as sp
    import bigcharts 
    from biglearning.api import M
    warnings.filterwarnings('ignore')
    from biglearning.api import tools as T
    from zipline.finance.commission import PerOrder
    import math
    from datetime import datetime, timedelta
    all_factors_df = dai.query(sql).df()
    print(sql)

 
    # #################################################### 读取行业数据 ########################################################
    sql = "select * from ccx_index_cne5_exposure where date >= '{0}' and date <= '{1}'  order by date, instrument".format(start_date, end_date)
    import dai 
    exposure_df = dai.query(sql).df()
 
    industry_list = ['AERODEF', 'AGRIFOREST', 'AUTO', 'BANK', 'BUILDDECO', 'CHEM',
        'COMMETRADE', 'COMPUTER', 'CONGLOMERATES', 'CONMAT', 'ELECEQP',
        'ELECTRONICS', 'FOODBEVER', 'HEALTH', 'HOUSEAPP', 'IRONSTEEL',
        'LEISERVICE', 'LIGHTINDUS', 'MACHIEQUIP', 'MEDIA', 'MINING',
        'NONBANKFINAN', 'NONFERMETAL', 'REALESTATE', 'TELECOM', 'TEXTILE',
        'TRANSPORTATION', 'UTILITIES']
 
    df_industry = exposure_df[industry_list+['date', 'instrument']]
    print('读取行业数据完成!')
    df = all_factors_df.merge(df_industry, on=['date', 'instrument'])
    field_columns = ['factor']
 
 
    # #################################################### 读取市值数据 ########################################################
    sql = """
    SELECT date, instrument, total_market_cap
    FROM cn_stock_valuation 
    WHERE date >= '{0}' and date <= '{1}' 
    ORDER BY instrument, date;
    """.format(start_date, end_date)
 
    df_market_cap = dai.query(sql).df()
    df_market_cap['total_market_cap'] = np.log(df_market_cap.total_market_cap)
 
    merge_df = df.merge(df_market_cap, on=['date', 'instrument'])
    print('市值数据读取完成!')
 
 
    # #################################################### 数据标准化处理 ########################################################
    def zscore(df, train_col):
        from sklearn.preprocessing import StandardScaler
        scaler = StandardScaler()
        df[train_col] = scaler.fit_transform(df[train_col])
        return df
 
    def remove_extreme_and_cut_zscore(df, col):
        for fac in col:
            n = 3
            mean = df[fac].mean()  # 计算因子值的均值
            std = df[fac].std() # 计算因子值的标准差
            lower_bound = mean - n * std  # 计算下边界
            upper_bound = mean + n * std  # 计算上边界
            df.loc[df[fac]<lower_bound, fac] = lower_bound
            df.loc[df[fac]>upper_bound, fac] = upper_bound
             
        df = zscore(df, col)
        return df 
 
    merge_df = merge_df.replace([np.inf, -np.inf], np.nan) # 将inf填充为nan 
 
 
    # df1 = merge_df.groupby('date').apply(remove_extreme_and_cut_zscore,col=field_columns)
    df1 = merge_df.fillna(0) # 用0进行填充,此处是否考虑用min-max归一化 
    print('前期极值、标准化处理完成')
 
    # #################################################### 数据中性化处理 ########################################################
    def neutralize(df, col):
        import statsmodels.api as sm
        for fac in col:
            train = pd.concat([df[industry_list], df['total_market_cap']], axis=1)
            X = sm.add_constant(train)
            y = df[fac]
 
            model = sm.OLS(y, X).fit()
            df[fac] = model.resid
        return df
 
    df2 = df1.groupby('date').apply(neutralize, col=field_columns)
    print('中性化处理完成')
 
 
    # #################################################### 因子合成 ########################################################
    factor_weight_map = {
    'factor':alpha_direction*1, 
    }
 
    field_columns = list(factor_weight_map.keys())
 
    df3 = df2.groupby('date').apply(remove_extreme_and_cut_zscore, col=field_columns)
 
    for j in factor_weight_map.keys():
        df3[j] = df3[j]*factor_weight_map[j]
 
    df3['score'] = df3[field_columns].mean(axis=1) # 简单平均合成,简单的线性合成
    df4 = df3.groupby('date').apply(remove_extreme_and_cut_zscore, col=['score'])
    print('因子加权合成完成') 
 
 
    pred = df4[['date','instrument', 'score']] # 因子alpha数据
    pred_df = pred[pred['date'] >= start_date]
 
    sql = "select date, instrument, st_status, list_days, close, adjust_factor from cn_stock_factors_base where date >= '%s' order by date, instrument"%start_date
    filter_df =  dai.query(sql).df()
    filter_df = filter_df[(filter_df['st_status'] == 0) & (filter_df['close'] / filter_df['adjust_factor'] > 1.2) &(filter_df['list_days']>=365)]
 
    # 构建一个组合优化函数,传入日期 ,当天的基准,当天的组合,当天的预测 、股票池参数就能拿到 组合优化的结果 
    pred_df['instrument'] = pred_df['instrument'].apply(lambda x: x[:9])
    pred_df = pd.merge(pred_df[['date','instrument','score']], filter_df[['date', 'instrument']], how='right', on=['date', 'instrument'])
    pred_df = pred_df.dropna() 
 
    # # 过滤北交所和科创板股票
    pred_df = pred_df[pred_df['instrument'].apply(lambda x: not x.endswith('.BJ') )]
    pred_df = pred_df[pred_df['instrument'].apply(lambda x:  not x.startswith('688') )]
    # print('预测数据:', pred_df)
 
    all_weight_dataframe = {}
    # 回测引擎:初始化函数,只执行一次
    def m19_initialize_bigquant_run(context):
        import math
        from zipline.finance.commission import PerOrder
 
        # 加载预测数据
        context.pred_df = pred_df
 
        # 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
        context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
        context.rebalance_day = 5
        context.cnt = 0
 
    # 回测引擎:每日数据处理函数,每天执行一次
    def m19_handle_data_bigquant_run(context, data):
        context.cnt += 1
        if context.cnt % context.rebalance_day != 0:
            return
        dt = data.current_dt.strftime('%Y-%m-%d')
        # 按日期过滤得到今日的预测数据
        ranker_prediction =  context.pred_df[context.pred_df.date == dt]
 
        # 得分前100名的股票
        ranker_prediction.sort_values('score', ascending=False, inplace=True)
        context.weight_df = ranker_prediction.head(100) # 组合数量400只
        context.weight_df['weight'] = 1/100
 
        all_weight_dataframe[dt] = context.weight_df
        weight_df = context.weight_df.sort_values('weight', ascending=False)
        weight_df = weight_df[weight_df['weight'] > 0]
        weight_df['instrument'] =  weight_df['instrument'].apply(lambda x:x+'A')  # 下单 还是在SZA SHA上做的 
 
        # 根据日期获取调仓需要买入的股票的列表
        stock_to_buy = list(weight_df.instrument)
 
        # 通过positions对象,使用列表生成式的方法获取目前持仓的股票列表
        stock_hold_now = [equity.symbol for equity in context.portfolio.positions]
        # 继续持有的股票:调仓时,如果买入的股票已经存在于目前的持仓里,那么应继续持有
        no_need_to_sell = [i for i in stock_hold_now if i in stock_to_buy]
        # 需要卖出的股票
        stock_to_sell = [i for i in stock_hold_now if i not in no_need_to_sell]
        # 卖出
        for stock in stock_to_sell:
            if data.can_trade(context.symbol(stock)):
                context.order_target_percent(context.symbol(stock), 0)
         
        # 如果当天没有买入的股票,就返回
        if len(stock_to_buy) == 0:
            return
        # 买入
        for stock in stock_to_buy:
            weight = weight_df[weight_df['instrument']==stock]['weight'].values[0]
             
            try:
                if data.can_trade(context.symbol(stock)):
                    context.order_target_percent(context.symbol(stock), weight)
            except Exception as e:
                print('>>>>>>>>:', e, stock)
             
        print(dt, np.round(context.portfolio.portfolio_value, 2),  len(stock_to_buy) , weight_df['weight'].sum(), '卖出数量:', len(stock_to_sell))
 
    # 回测引擎:准备数据,只执行一次
    def m19_prepare_bigquant_run(context):
        pass
 
    m = M.trade.v4(
        instruments=[i+'A' for i in pred_df.instrument.unique()],
        start_date = start_date, 
        end_date = end_date, 
        initialize=m19_initialize_bigquant_run,
        handle_data=m19_handle_data_bigquant_run,
        prepare=m19_prepare_bigquant_run,
        volume_limit=0.02,
        order_price_field_buy='open',
        order_price_field_sell='open',
        capital_base=10000000.9,
        data_frequency='daily',
        price_type='真实价格',
        product_type='股票',
        plot_charts=True,
        backtest_only=False,
        benchmark='000852.HIX',
        m_deps=np.random.rand()) 
 
    #### 统计绩效数据 
    raw_perf = m.raw_perf.read() 
    return raw_perf,all_factors_df
In [13]:
import dai

sql = """
SELECT date, instrument, m_avg(turn, 20)  as factor FROM cn_stock_bar1d where date >= '2018-04-25 order by date, instrument'
"""

backtest(start_date='2018-04-25', end_date='2023-11-21', sql=sql, alpha_direction= -1 )
In [ ]: