定义函数

In [1]:
import numpy as np
import pandas as pd
from numpy.lib.stride_tricks import as_strided
import bottleneck as bn


class Functions:
    """ 设定在因子挖掘中会用到的所有函数
    要添加相关函数实现可继续在此类中以静态方法的方式添加具体实现('_'下划线开头的方法会被忽略,不会加入到遗传算法原语集中)
    注意一定要加上对应的输入输出类型注解,否则 deap 包无法正常 compile 进行因子值的计算
    """
    @staticmethod
    def _rolling_window(data, window_size):
        shape = (data.shape[0] - window_size + 1, window_size) + data.shape[1:]
        strides = (data.strides[0],) + data.strides
        return as_strided(data, shape=shape, strides=strides)

    @staticmethod
    def prod(x : np.ndarray, n : int) -> np.ndarray:
        res = np.full(x.shape, np.nan)
        rolling_data = Functions._rolling_window(x, window_size=n)
        rolling_res = np.prod(rolling_data, axis=1)
        res[n - 1:] = rolling_res
        return res

    @staticmethod
    def rank(x: np.ndarray) -> np.ndarray:
        res = bn.nanrankdata(x, axis=1)
        return res

    @staticmethod
    def max(x: np.ndarray, y: np.ndarray) -> np.ndarray:
        res = np.full(x.shape, np.nan)
        bool_ = x >= y
        res[bool_] = x[bool_]
        res[~bool_] = y[~bool_]
        return res

    @staticmethod
    def min(x: np.ndarray, y: np.ndarray) -> np.ndarray:
        res = np.full(x.shape, np.nan)
        bool_ = x >= y
        res[~bool_] = x[~bool_]
        res[bool_] = y[bool_]
        return res

    @staticmethod
    def delay(x: np.ndarray, n: int) -> np.ndarray:
        res = np.full(x.shape, np.nan)
        res[n:] = x[:-n]
        return res

    @staticmethod
    def ts_std(x: np.ndarray, n: int) -> np.ndarray:
        res = bn.move_std(x, n, min_count=max(1, n // 2), axis=0)
        return res

    @staticmethod
    def corr(x: np.ndarray,y: np.ndarray, n: int) -> np.ndarray:
        min_count = max(1, n // 2)

        c = x * y
        d_count = np.ones(c.shape)
        d_count[np.isnan(c)] = np.nan
        d_count = bn.move_sum(d_count, window=n, min_count=1, axis=0)

        ab_sum = bn.move_sum((c), window=n, min_count=min_count, axis=0)
        a_sum = bn.move_sum((x), window=n, min_count=min_count, axis=0)
        b_sum = bn.move_sum((y), window=n, min_count=min_count, axis=0)
        aa_sum = bn.move_sum((x * x), window=n, min_count=min_count, axis=0)
        bb_sum = bn.move_sum((y * y), window=n, min_count=min_count, axis=0)

        res = (ab_sum * d_count - a_sum * b_sum) / (np.sqrt(d_count * aa_sum - a_sum ** 2) * np.sqrt(d_count * bb_sum - b_sum ** 2))
        return res

    @staticmethod
    def delta(x: np.ndarray, n: int) -> np.ndarray:
        res = x - Functions.delay(x, n)
        return res

    @staticmethod
    def add(x: np.ndarray, y: np.ndarray) -> np.ndarray:
        res = x + y
        return res

    @staticmethod
    def sub(x: np.ndarray, y: np.ndarray) -> np.ndarray:
        res = x - y
        return res

    @staticmethod
    def mul(x: np.ndarray, y: np.ndarray) -> np.ndarray:
        res = x * y
        return res

    @staticmethod
    def div(x: np.ndarray, y: np.ndarray) -> np.ndarray:
        res = x / y
        res[np.isinf(res)] = np.nan
        return res

    @staticmethod
    def log(x: np.ndarray) -> np.ndarray:
        res = np.log(x + min(0,np.nanmin(x)) + 1e-20)
        return res

    @staticmethod
    def abs(x: np.ndarray) -> np.ndarray:
        res = np.abs(x)
        return res

    @staticmethod
    def neg(x: np.ndarray) -> np.ndarray:
        res = x * -1
        return res

    @staticmethod
    def sign(x: np.ndarray) -> np.ndarray:
        res = np.sign(x)
        return res
    @staticmethod
    # 反正切函数
    def arctan(x : np.ndarray) -> np.ndarray:
        res = np.arctan(x)
        return res
    
    @staticmethod
    # 工具函数,把数据转化为3d数据
    def rolling_to_3d(mat, window, chunk_num=1):
        s0, s1 = mat.strides
        r, c = mat.shape
        max_chunk_num = int(np.floor(r/window))
        chunk_num = min(max_chunk_num, chunk_num)

        def rolling(m):
            shape0 = m.shape[0]-window+1
            if shape0 <= 0:
                shape0 = m.shape[0]
            return as_strided(m, shape=(shape0, window, c), strides=(s0, s0, s1), writeable=False)

        if chunk_num == 1:
            yield rolling(mat)
        else:
            chunk_size = r // chunk_num
            first_chunk = mat[:chunk_size]
            yield rolling(first_chunk)

            chunks = as_strided(
                            mat[chunk_size-window+1:], 
                            shape=(chunk_num-1, chunk_size+window-1, c), 
                            strides=(s0*chunk_size, s0, s1), 
                            writeable=False
                            )
            for sub_mat in chunks:
                yield rolling(sub_mat)

            left_rows = r % chunk_num
            if left_rows > 0:
                sub_mat = mat[-(left_rows+window-1):]
                yield rolling(sub_mat)
    

    @staticmethod
    def decay_linear(x: np.ndarray, n: int) -> np.ndarray:
        res = bn.move_mean(x, window=n, min_count=max(1, n // 2), axis=0)
        return res

    @staticmethod
    def cov(x: np.ndarray, y: np.ndarray, n: int) -> np.ndarray:
        res = np.full(x.shape, np.nan)
        min_count = max(1, n // 2)
        a, b = x, y
        c = a * b
        d_count = np.ones(c.shape)
        d_count[np.isnan(c)] = np.nan
        d_count = bn.move_sum(d_count, window=n, min_count=1, axis=0)
        ab_sum = bn.move_sum((c), window=n, min_count=min_count, axis=0)
        a_sum = bn.move_sum(a, window=n, min_count=min_count, axis=0)
        b_sum = bn.move_sum(b, window=n, min_count=min_count, axis=0)
        res = (ab_sum * d_count - a_sum * b_sum) / ((d_count-1) * d_count)
        return res

    @staticmethod
    def ts_sum(x: np.ndarray, n: int) -> np.ndarray:
        res = bn.move_sum(x, window=n, min_count=max(1, n // 2), axis=0)
        return res

    @staticmethod
    def ts_mean(x: np.ndarray, n: int) -> np.ndarray:
        res = bn.move_mean(x, window=n, min_count=max(1, n // 2), axis=0)
        return res

    @staticmethod
    def ts_rank(x: np.ndarray, n: int) -> np.ndarray:
        res = bn.move_rank(x, window=n, min_count=max(1, n // 2), axis=0)
        return res

    @staticmethod
    def ts_min(x: np.ndarray, n: int) -> np.ndarray:
        res = bn.move_min(x, window=n, min_count=max(1, n // 2), axis=0)
        return res

    @staticmethod
    def ts_max(x: np.ndarray, n: int) -> np.ndarray:
        res = bn.move_max(x, window=n, min_count=max(1, n // 2), axis=0)
        return res

    @staticmethod
    def mean2(x: np.ndarray, y: np.ndarray) -> np.ndarray:
        res = (x + y) / 2
        return res

    @staticmethod
    def mean3(x: np.ndarray, y: np.ndarray, z: np.ndarray) -> np.ndarray:
        res = (x + y + z) / 3
        return res

    @staticmethod
    def argmax(x: np.ndarray, n: int) -> np.ndarray:
        res = bn.move_argmax(x, window=n, min_count=max(1, n // 2), axis=0)
        return res

    @staticmethod
    def argmin(x: np.ndarray, n: int) -> np.ndarray:
        res = bn.move_argmin(x, window=n, min_count=max(1, n // 2), axis=0)
        return res   

    @staticmethod
    def power(x: np.ndarray, y: np.ndarray) -> np.ndarray:
        y_a_min, y_a_max = bn.nanmin(y, axis=1), bn.nanmax(y, axis=1)
        z = ((y.T - y_a_min) / (y_a_max - y_a_min)).T
        x_a_min, x_a_max = bn.nanmin(x, axis=1), bn.nanmax(x, axis=1)
        x = ((x.T - x_a_min) / (x_a_max - x_a_min)).T + 1
        res = x ** z
        return res
 
    @staticmethod
    def constant(type_int : int) -> int:
        return type_int

    @staticmethod
    def standardation(x: np.ndarray) -> np.ndarray:
        """标准化算子"""
        mean = bn.nanmean(x, axis=1).reshape(-1, 1)
        std = bn.nanstd(x, axis=1, ddof=1).reshape(-1, 1)
        with np.errstate(invalid='ignore'):
            res = (x - mean) / std
        return res

    @staticmethod
    def normalization(x: np.ndarray) -> np.ndarray:
        """归一化算子"""
        x_min = bn.nanmin(x, axis=1).reshape(-1, 1)
        x_max = bn.nanmax(x, axis=1).reshape(-1 ,1)
        with np.errstate(invalid='ignore'):
            res = (x - x_min) / (x_max - x_min)
        return res
    
    @staticmethod
    def delay(x: np.ndarray, n: int) -> np.ndarray:
        res = np.full(x.shape, np.nan)
        res[n:] = x[:-n]
        return res
    
    @staticmethod
    def delta(x: np.ndarray, n: int) -> np.ndarray:
        res = x - Functions.delay(x, n)
        return res
    
#     @staticmethod
#     # 过去n天变化率
#     def pctchange_ts(n : int, x : np.ndarray) -> np.ndarray:
#         res = Functions.delta(n, x) / x
#         return res
    
    @staticmethod
    # 过去n天回归系数
    def ts_regbeta(n : int, x : np.ndarray, y : np.ndarray) -> np.ndarray:
        c = x * y
        d_count = np.ones(c.shape)
        d_count[np.isnan(c)] = np.nan
        d_count = bn.move_sum(d_count, window=n, min_count=1, axis=0)
        #bn.move_min(x_m, window=d, min_count=min_count, axis=0)
        ab_sum = bn.move_sum((c), window=n, min_count=None, axis=0)
        a_sum = bn.move_sum((x), window=n, min_count=None, axis=0)
        b_sum = bn.move_sum((y), window=n, min_count=None, axis=0)
        aa_sum = bn.move_sum((x * x), window=n, min_count=None, axis=0)

        beta = (ab_sum * d_count - a_sum * b_sum) / \
                 (d_count * aa_sum - (a_sum) ** 2)
        beta[np.isinf(beta)] = np.nan
        return beta

定义回测代码

In [2]:
import random
import time
import numpy as np
import pandas as pd
import inspect
import multiprocessing
import operator
from deap import base, creator, gp, tools
import empyrical as em
from biglearning.api import tools as T
from bigdatasource.api import DataSource
import bottleneck as bn
from numpy.lib.stride_tricks import as_strided
import warnings
warnings.filterwarnings('ignore') # 日志里忽视warning
# from biglearning.api.func import Functions
from bigtrader.sdk import *
print('start run ')

定义数据类

In [3]:
class DataProcessor:
    """数据加载及处理类""" 
    def __init__(self, config):
        self.bar1d_data_cols = ['close', 'high', 'low', 'open' , 'amount','volume']
        from bigdata.api.datareader import D
        self.ins = D.instruments(start_date= config.start_date, end_date= config.end_date)
    
        # 数据开始日期
        self.start_date = config.start_date
        # 数据结束日期
        self.end_date = config.end_date
        # 整个数据中的训练集比例
        self.train_test_data_retio = config.train_test_data_retio
        self.train_validate_data_ratio = config.train_validate_data_ratio
        self.config = config
        # 键值对存储的基础因子数据
        self.data = {}
        # 数据的 index 和 columns,方便后续进行对齐操作
        self.data_index = None
        self.data_cols = None
        
        # 原始收益率数据(dataframe 格式)
        self.raw_ret = pd.DataFrame()
        self._ret = None
        # 加载数据
        self._load_data()
        # 根据指定的训练集比例进行短区间内的训练集与测试集划分
        self._split_train_and_test(self.raw_ret.shape[0], self.train_test_data_retio, self.train_validate_data_ratio)

    def _split_train_and_test(self, length, train_test_ratio, train_validate_ratio):
        full_list = list(range(length))
        offset_train_test = int(length * train_test_ratio)
        offset_train_validate = int(offset_train_test * train_validate_ratio)
        if length == 0 or offset_train_test < 1:
            return [], full_list
        
        self.train_series = full_list[:offset_train_validate]
        self.val_series = full_list[offset_train_validate:offset_train_test]
        self.test_series = full_list[offset_train_test:]
        self.full_series = full_list

    @property
    def ret_values(self):
        self._ret = self.raw_ret.values.astype(np.float64)
        return self._ret

    @property
    def train_ret(self):
        return self.ret_values[self.train_series]

    @property
    def validate_ret(self):
        return self.ret_values[self.val_series]

    @property
    def test_ret(self):
        return self.ret_values[self.test_series]

    @property
    def not_nan_num(self):
        return pd.DataFrame(self.ret_values).count(axis=1).values
    # 这里面基本
    def _load_data(self):
        print(f'loading data from {self.start_date} to {self.end_date}...')
        # 读取基础因子数据
#         table_name = 'bar5m_CN_FUTURE' # 5分钟k线
        table_name = 'bar1d_CN_STOCK_A'
        bar_data = DataSource(table_name).read(instruments=self.ins, start_date=self.start_date, end_date=self.end_date, fields=self.bar1d_data_cols)
        bar_data['return'] = bar_data.groupby('instrument')['close'].apply(lambda x: x.pct_change().shift(-1).fillna(0))
        bar_data_pivot_table = bar_data.pivot_table(index='date', columns='instrument')
        raw_ret = bar_data_pivot_table['return']
        self.raw_ret = raw_ret.fillna(method="ffill", axis=0)
        
        # 判断是否已有 data_index 和 data_cols,没有则以收益数据的 index 和 columns 为标准,后续数据均以此进行 reindex
        if self.data_index is None or self.data_cols is None:
            self.data_index = self.raw_ret.index
            self.data_cols = self.raw_ret.columns
        
        # 将基础因子数据加入 data 中
        for col in self.bar1d_data_cols:
            self.data[col] = bar_data_pivot_table[col]
            self.data[col] = self.data[col].reindex(index=self.data_index, columns=self.data_cols)
        
    def outlier_limit(self, data, n_extremum=5):
        """对传入数据进行去极值"""
        median = bn.nanmedian(data, axis=1).reshape(-1, 1)
        Dmad = bn.nanmedian(abs(data - median), axis=1).reshape(-1, 1)
        upper = (median + n_extremum * Dmad)
        lower = (median - n_extremum * Dmad)
        with np.errstate(invalid='ignore'):
            res = np.clip(data, lower, upper)
        return res

class Fitnesses(object):
    """适应度函数类"""
    _methods = ["sharpe_ratio"]

    def __init__(self, config):
        # 适应度函数选取
        self.fitness = config.fitness
        self.window = 21 #  过去多少天的时序窗口
        if self.fitness not in self._methods:
            raise Exception("请输入正确的适应度函数类型")
    
    def _nan_drop(self, x, y):
        """删除缺失值的处理方法"""
        merged = np.vstack((x, y)).T
        merged = merged[~np.isnan(merged).any(1)].T
        if merged.size == 0:
            return None
        return merged

    def _nan_fill(self, arr):
        """向前填充的处理方法"""
        arr = arr.T
        mask = np.isnan(arr)
        idx = np.where(~mask, np.arange(mask.shape[1]), 0)
        return arr[np.arange(idx.shape[0])[:, None], idx].T
    
    def _calculate_daily_return(self, factor, ret):
        ## 适应度
        def rolling_window(a, window):
            shape = a.shape[:-1]+(a.shape[-1]-window+1, window)
            strides = a.strides+(a.strides[-1],)
            return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
        
        # print(self.window)
        global tmp 
        tmp = factor 

        roll_max = np.array([np.nanmax(rolling_window(factor[:,k], self.window),axis=1) for k in range(factor.shape[1])]).T
        roll_min = np.array([np.nanmin(rolling_window(factor[:,k], self.window),axis=1) for k in range(factor.shape[1])]).T

        condition = np.array([(factor[self.window:,k]>=roll_max[1:,k]).astype(int)+ (factor[self.window:,k]<=roll_min[1:,k]).astype(int)*(-1) for k in range(factor.shape[1])]).T
        position = np.array([np.insert(condition[:,k],0,[0]*(self.window)) for k in range(factor.shape[1])]).T 
        position = np.array([k/(abs(k).sum()) if abs(k).sum()>0 else k for k in position])
        daily_ret = position * ret - 0.0001*np.array([np.insert(abs(np.diff(position[:,k])),0,[0]) for k in range(position.shape[1])]).T
        daily_ret[daily_ret == np.nan] = 0.0
        counts = (np.array([abs(np.diff(position[:,k],1)).sum() for k in range(position.shape[1])])).sum() / position.shape[1]/2
        return np.nansum(daily_ret, axis=1), counts

    def calculate_longshort_index(self, factor, longshort, eval_type):
        """计算多空收益、多头收益的sharpe、总收益、波动率指标"""
        if eval_type == 'train':
            data = data_dp
            ret_values = data.train_ret
        elif eval_type == "val":
            data = data_dp
            ret_values = data.validate_ret
        elif eval_type == 'test':
            data = data_dp
            ret_values = data.test_ret

        returns, counts =  self._calculate_daily_return(factor, ret_values)   
        nav = np.cumsum(returns) + 1 
        nav_cummax = np.array([np.max(nav[:k]) for k in range(1, len(nav)+1)]) 
#         # 要求交易次数
#         if returns.size == 0 or counts < returns.size/(6*240/5) or counts > 10*returns.size/(6*240/5) or  max(1-nav/nav_cummax) >0.2:
#             return np.nan 
        returns = returns[~np.isnan(returns)]
        if returns.size == 0:
            return np.nan
        return  em.sharpe_ratio(returns, 0.035/252)
    
    def fitness_choose(self, factor, longshort, eval_type):  
        fit = self.calculate_longshort_index(factor=factor, longshort=longshort, eval_type=eval_type)  
        return fit
    
    def evaluate_factor(self, individual, eval_type):
        """计算传入个体的因子值"""
        if eval_type == 'train':  # 训练集
            data = data_dp
            index_series = data.train_series
            ret_values = data.train_ret
        elif eval_type == "val": # 验证集
            data = data_dp
            index_series = data.val_series
            ret_values = data.validate_ret
        elif eval_type == 'test': # 测试集
            data = data_dp
            index_series = data.test_series
            ret_values = data.test_ret
            
        # 将个体转换为表达式函数
        func = toolbox.compile(expr=individual)
        # 获取不同函数的参数
        func_names = list(inspect.signature(func).parameters.keys())
        # 根据获取的函数参数构建对应的键值对
        # 传入数据前记得转换数据类型为 np.float64
        param = {i: data.data[i].values.astype(np.float64) for i in func_names}
        # 将构建好的键值对解包传入 func 中,进行对应表达式的因子值计算
        factor = func(**param)[index_series]
        # 出现 np.inf 大部分是因为 float 类型的问题,numpy 默认转换出来的 float 是 float32,需要自行转换成 float64 避免溢出为 np.inf
        factor[np.isinf(factor)] = np.nan
        if (factor != factor).sum() == (factor.shape[0] * factor.shape[1]):
            return np.nan, None
        if len(np.unique(factor[~np.isnan(factor)])) < 10000:
            factor = factor
        else:
            factor = data.outlier_limit(factor)
        return factor
    
    def evaluate_fitness(self, individual, longshort, eval_type):
        """计算传入个体的因子值并计算 IR 值"""
        factor = self.evaluate_factor(individual=individual, eval_type=eval_type)
        if isinstance(factor, float):
            if factor != factor:
                return factor, individual
        if  type(factor) == tuple:
            return np.nan, individual
        
        factor = self._nan_fill(factor)
        fit = self.fitness_choose(factor, longshort, eval_type=eval_type)
        return fit, individual

    def compare_fitness(self, Threshold, fit):
        """比较阈值和适应度值"""
        if self.fitness.endswith("vol"): # 波动率要反着来 
            if Threshold > fit:
                return True
        elif Threshold < fit:
                return True
        else:
            return False
    
def cross_mutation_handle(population):
    """对传入的种群进行交叉、子树变异、提升变异以及点变异等操作"""
    offspring = [toolbox.clone(ind) for ind in population]
    
    # 交叉
    for i in range(1, len(offspring), 2):
        if random.random() < config.cross_prob:
            offspring[i - 1], offspring[i] = toolbox.mate(offspring[i - 1], offspring[i])
            del offspring[i - 1].fitness.values, offspring[i].fitness.values
    # 变异
    for i in range(len(offspring)):
        if random.random() < config.mutation_prob:
            offspring[i], = toolbox.mutate(offspring[i])
            del offspring[i].fitness.values
    # 提升变异
    for i in range(len(offspring)):
        if random.random() < config.boost_mutation_prob:
            offspring[i], = toolbox.mutate_shrink(offspring[i])
            del offspring[i].fitness.values
    # 点变异
    for i in range(len(offspring)):
        if random.random() < config.point_mutation_prob:
            offspring[i], = toolbox.mutate_NodeReplacement(offspring[i])
            del offspring[i].fitness.values
    return offspring

def drop_duplicates(individuals):
    """移除生成表达式相同的个体"""
    ind_dict = {}
    for ind in individuals:
        expr = str(ind)
        if expr in list(ind_dict.keys()):
            continue
        ind_dict[expr] = ind
    return list(ind_dict.values())

遗传算法配置

In [8]:
class Config:
    # 设定短区间数据读取时间范围
    start_date = '2021-01-01'
    end_date = '2021-10-01'
    return_field = 'close'
    train_test_data_retio = 3/4
    train_validate_data_ratio = 3/4
    # 初始种群数量
    init_ind_num = 200
    # 种群代数
    num_gen = 1
    # 训练集适应度标准
    train_fitness = 1
    # 验证集适应度标准
    val_fitness = 0.8
    # 测试集适应度标准
    test_fitness = 0.55  
    fitness = 'sharpe_ratio'
    # 交叉概率
    cross_prob = 0.8
    # 子树变异概率
    mutation_prob = 0.6
    # 提升变异概率
    boost_mutation_prob =  0.6
    # 点变异概率
    point_mutation_prob = 0.6
    # 常数项
    constant_ranges =  list(range(1, 11)) 
# 初始化相关实例,定义遗传算法结构
# 设置随机数种子
random_seed_num = 44 # should:49
random.seed(random_seed_num)
# config 配置类,遗传算法运行中的相关参数均由该 config 实例控制
config = Config()
data_dp = DataProcessor(config)
# 适应度函数类
fitness = Fitnesses(config)
# 创建个体
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)


# 构建原语集
# 设置遗传算法原语集,将相关函数、常数项及待计算的基础因子注入到 pset 对象中
pset = gp.PrimitiveSetTyped("MAIN", (np.ndarray,) * len(list(data_dp.data.keys())), np.ndarray)

funcs = inspect.getmembers(Functions)

for name, func in funcs:
    if not name.startswith('_'):
        bool_ = True
        sig = inspect.signature(func)
        params=sig.parameters
        pa = []
        for param in list(params.keys()):
            if params[param].annotation == inspect._empty:
                bool_ = False
            pa.append(params[param].annotation)
        if bool_:
            # print(pa, sig.return_annotation, name, func)
            pset.addPrimitive(func, pa, sig.return_annotation)
            
for i in config.constant_ranges:
    pset.addTerminal(i, int, str(i))
args_dict = {f'ARG{index}': key_name for index, key_name in enumerate(list(data_dp.data.keys()))}
pset.renameArguments(**args_dict)
In [9]:
# 构建工具箱
# 设置遗传算法中会用到的相关工具方法,注册到 toolbox 中,方便后续直接调用
toolbox = base.Toolbox()
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=5)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("mate", gp.cxOnePoint) # 交叉
toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset) #变异
toolbox.register("mutate_shrink", gp.mutShrink) # 提升变异
toolbox.register("mutate_NodeReplacement", gp.mutNodeReplacement, pset=pset) # 点变异

toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=10))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=10))
toolbox.decorate("mutate_shrink", gp.staticLimit(key=operator.attrgetter("height"), max_value=10))
toolbox.decorate("mutate_NodeReplacement", gp.staticLimit(key=operator.attrgetter("height"), max_value=10))

toolbox.register("evaluate_train", fitness.evaluate_fitness, longshort=True, eval_type='train')
toolbox.register("evaluate_val", fitness.evaluate_fitness, longshort=True, eval_type='val')
toolbox.register("evaluate_test", fitness.evaluate_fitness, longshort=True, eval_type='test')
# 开始使用定义的遗传算法进行因子挖掘
# 保存符合条件的因子表达式
saved_factor_exprs = {}

# 初始化种群
population = toolbox.population(n=config.init_ind_num)

# 声明统计指标
stats = tools.Statistics(key=lambda ind: ind.fitness.values)
stats.register("avg", bn.nanmean)
stats.register("std", bn.nanstd)
stats.register("min", bn.nanmin)
stats.register("max", bn.nanmax)

# 日志记录,与指标记录一起使用,可方便打印指标相关记录
logbook = tools.Logbook()
logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])

for gen in range(1, config.num_gen + 1):
    random.seed(random_seed_num)
    # 个体优选器(能自动按给定的最大长度将适应度最优的个体从大到小排序存放,根据你的目标进行判定)
    pass_hall_of_fame = tools.HallOfFame(config.init_ind_num)
    final_hall_of_fame = tools.HallOfFame(int(config.init_ind_num * 0.1))

    invalid_ind = [ind for ind in population if not ind.fitness.valid]
    # 开始计算之前预先针对相同表达式去重,减少重复计算量
    invalid_ind = drop_duplicates(invalid_ind)
    print('==============================第{0}代开始挖掘因子中:=============================='.format(gen))

    fitnesses_train_lst = []        
    for i in invalid_ind:
        fitnesses_train = toolbox.evaluate_train(i)        
        fitnesses_train_lst.append(fitnesses_train)

    pass1check_population = []
    record_population = []
    
    print('开始for loop ')
    # 根据指定的训练 IR 值筛选符合条件的个体并存入 pass_population 中
    for fit, ind in fitnesses_train_lst:
        ind.fitness.values = (fit,)
        record_population.append(ind)

        #  计算个体在验证集的适应度
        val_fitness = toolbox.evaluate_val(ind)[0]

        print(f"因子{ind}在训练集适应度值为{fit}")
        if fitness.compare_fitness(config.train_fitness, fit):
            if fitness.compare_fitness(config.val_fitness, val_fitness):
                print(f"通过第一层检查的因子 {ind} 在训练集适应度值为{fit} 在验证集适应度值为{val_fitness}")
                pass1check_population.append(ind)

    print(f'共「{len(pass1check_population)}」个表达式:{[str(pop) for pop in pass1check_population]}通过训练集检测')
    print('-- 使用训练数据计算表达式适应度完成 --')

    pass_hall_of_fame.update(pass1check_population)
    
    fitnesses_test_lst = []
    for j in pass1check_population:
        fitnesses_test = toolbox.evaluate_test(j)
        fitnesses_test_lst.append(fitnesses_test)

    pass2check_population = []
    pass2check_population_dict = {}

    # 根据指定的测试 IR 值筛选符合条件的个体并存入 final_population 中
    for fit, ind in fitnesses_test_lst:
        ind.fitness.values = (fit,)
        if fitness.compare_fitness(config.test_fitness, fit):
            print(f"通过第二层检查的因子 {ind} 在测试集适应度值为{fit}")
            pass2check_population.append(ind)
            pass2check_population_dict[str(ind)] = (ind,fit)

    # 记录每一代的双重通过检验的个体
    saved_factor_exprs[gen] = pass2check_population_dict

    print(f'共「{len(pass2check_population)}」个表达式:{[str(pop) for pop in pass2check_population]}通过测试数据检测')
    print('-- 使用测试数据计算表达式适应度完成 --')

    # Update the final_hall_of_fame with the generated individuals
    final_hall_of_fame.update(pass2check_population)

    # 使用通过训练集 IR 值检测的个体替换原有的随机个体
    population = pass2check_population
    # 判断当前种群是否为空,为空则拿到本代父代种群,进行交叉变异
    if not population:
        population = record_population

    print(f"pass:{len(pass2check_population)}, record:{len(record_population)}, population: {len(pass2check_population)}")

    record = stats.compile(population) if stats else {}
    logbook.record(gen=gen, nevals=len(invalid_ind), **record)
    print(f'因子挖掘过程中相关指标:{repr(logbook)}')

    print('-- 开始进行下一代因子挖掘 --')
    offspring = toolbox.select(population, config.init_ind_num)
    offspring = cross_mutation_handle(offspring)
    population[:] = offspring

"""绘制每代的统计指标折线图"""
df = pd.DataFrame(logbook)
df.set_index("gen", inplace=True)
df['avg'].plot(title='每代适应度统计指标') 

print(f'=====================因子挖掘结束=======================')
In [10]:
fitness_record_df  =pd.DataFrame() 
for j in range(1,config.num_gen+1):
    if len(saved_factor_exprs[j]) ==0:
        continue
    fit =  [saved_factor_exprs[j][i][1] for i  in saved_factor_exprs[j].keys()]
    max_f = np.max(fit)
    min_f = np.min(fit)
    mean_f = np.mean(fit)
    num_f = len(fit)
    exprs = saved_factor_exprs[j]  
    exprs_dict = {i:exprs[i][1] for i  in exprs.keys()}
    sorted_dict = dict(sorted(exprs_dict.items(), key=lambda x: x[1], reverse=True))
    top_expr = list(sorted_dict.keys())[0]

    tmp = pd.DataFrame({'代数':[j],'均值':[mean_f], '因子数':[num_f], '最大':[max_f], '最小':[min_f], '最佳因子':top_expr} ) 
    fitness_record_df = fitness_record_df.append(tmp)
    fitness_record_df.index = range(len(fitness_record_df))

print('本次挖掘结果:', fitness_record_df)
In [11]:
fitness_record_df
Out[11]:
代数 均值 因子数 最大 最小 最佳因子
0 1 2.177461 9 6.443255 0.57359 standardation(min(close, rank(argmin(power(low...
In [16]:
def stock_backtest(_ind, eval_type):
    factor_array_test_set = fitness.evaluate_factor(_ind, eval_type)
    factor_df_test_set  = pd.DataFrame(factor_array_test_set , columns=data_dp.data_cols, index=data_dp.data_index[data_dp.test_series])
    
    factor_df_stack = factor_df_test_set.stack().reset_index().rename(columns={0:'factor'}) 
    st = factor_df_stack.date.min().strftime('%Y-%m-%d')
    et =  factor_df_stack.date.max().strftime('%Y-%m-%d')

    from biglearning.api import M
    def m1_initialize_bigquant_run(context):
        msg = "initialize:" 
        context.PRINT = 1
        context.write_log(msg, stdout=context.PRINT)
        context.all_data = context.options["data"].read()

        context.rebalance_period = 1 # 调仓周期设置
        context.mono = True  # 因子单调性: True为做多因子小的做空因子大的; False则相反
        # 设置买入股票数量
        context.target_hold_count = 100
        # 每只股票的目标权重
        context.target_percent_per_instrument = 1.0 / context.target_hold_count


    def m1_before_trading_start_bigquant_run(context, data):
        pass
    

    # 交易引擎:tick数据处理函数,每个tick执行一次
    def m1_handle_tick_bigquant_run(context, tick):
        pass

    def m1_handle_data_bigquant_run(context, data):
        context.today = data.trading_day_dt.strftime('%Y-%m-%d')
        context.today_data = context.all_data[context.all_data.date==context.today]

        if context.trading_day_index % context.rebalance_period == 0:

            r1 = context.today_data
            r1.sort_values(by='factor', ascending=context.mono, inplace=True)
            # context.ins_to_long = r1.instrument[:context.num_trades].tolist()  # 做多因子排序后靠前的品种
            
            # 取前10只
            # 获取当日目标持有股票
            target_hold_instruments = set(r1.instrument[:context.target_hold_count].tolist())
            # 获取当前已持有股票
            current_hold_instruments = set(context.get_account_positions().keys())

            # 卖出不在目标持有列表中的股票
            for instrument in current_hold_instruments - target_hold_instruments:
                context.order_target_percent(instrument, 0)
                
            # 买入目标持有列表中的股票
            for instrument in target_hold_instruments - current_hold_instruments:
                context.order_target_percent(instrument, context.target_percent_per_instrument)
        
    # 交易引擎:盘后处理函数,每日盘后执行一次
    def m1_after_trading_bigquant_run(context, data):
        pass

    m6 = M.instruments.v2(
    start_date=T.live_run_param('trading_date', st),
    end_date=T.live_run_param('trading_date', et),
    market='CN_STOCK_A',
    instrument_list='',
    max_count=0)


    m1 = M.hftrade.v2(
        instruments=m6.data,
        options_data=DataSource.write_df(factor_df_stack),
        start_date='',
        end_date='',
        initialize=m1_initialize_bigquant_run,
        before_trading_start=m1_before_trading_start_bigquant_run,
        handle_data=m1_handle_data_bigquant_run,
        after_trading=m1_after_trading_bigquant_run,
        capital_base=10000000+np.random.int(),
        frequency='daily',
        price_type='真实价格',
        product_type='股票',
        before_start_days='0',
        volume_limit=1,
        order_price_field_buy='open',
        order_price_field_sell='open',
        plot_charts=True,
        disable_cache=True,
        replay_bdb=False,
        show_debug_info=False,
        backtest_only=False,
        m_cached=False
    ) 
    return m1
In [17]:
fine_inds =  fitness_record_df['最佳因子'].tolist() # 全部因子
fine_inds = list(set(fine_inds)) # 去重
tail_ind = fine_inds[-1] # 最后因子

def get_factor_data(expr):
    factor_array_test_set = fitness.evaluate_factor(gp.PrimitiveTree.from_string(tail_ind, pset), eval_type='test')
    factor_df_test_set  = pd.DataFrame(factor_array_test_set , columns=data_dp.data_cols, index=data_dp.data_index[data_dp.test_series])
    factor_df = factor_df_test_set.stack().reset_index().rename(columns={0:'factor'})
    return factor_df 

base_factor_df = get_factor_data(tail_ind) # 最后因子的因子数据

cnt = 0
for expr_ind in fine_inds:
    cnt += 1
    cur_factor_df  = get_factor_data(expr_ind)
    corr =  cur_factor_df['factor'].corr(base_factor_df['factor'])

    if corr < 0.3 or expr_ind == tail_ind: # 相关系数低于0.3
        print('当前回测因子:', expr_ind)
        _ind  = gp.PrimitiveTree.from_string(expr_ind, pset)
        from biglearning.api import M
        stock_backtest(_ind , 'test')
  • 收益率13.11%
  • 年化收益率91.23%
  • 基准收益率2.42%
  • 阿尔法0.86
  • 贝塔0.35
  • 夏普比率4.04
  • 胜率0.89
  • 盈亏比45.72
  • 收益波动率16.31%
  • 信息比率0.18
  • 最大回撤4.75%
日期 时间 证券代码 证券名称 买/卖 数量 成交价 成交金额 平仓盈亏 交易费用
Loading... (need help?)
日期 证券代码 证券名称 数量 持仓均价 收盘价 持仓市值 收益
Loading... (need help?)
时间 级别 内容
Loading... (need help?)
In [ ]: