In [ ]:
from sklearn.linear_model import LinearRegression
import pandas as pd
import numpy as np
from biglearning.module2.common.data import Outputs
from zipline.finance.commission import PerOrder
import datetime
from datetime import date,timedelta
from bigdatasource.api import DataSource
from biglearning.api import M
from biglearning.api import tools as T
from biglearning.module2.common.data import Outputs

import warnings
warnings.filterwarnings('ignore')
import xgboost as xgb

定义因子提取起始和结束日期

In [ ]:
sd = '2016-01-01'
ed = '2023-07-31'

数据准备

使用虚拟表构建训练数据

In [ ]:
import dai
dai.DataSource.save_view("data_001", 
'''
SELECT
    t1.instrument, t1.date, t1.alpha91001,
    t2.alpha91007,
    t3.alpha91012,
    t4.alpha91019,t4.alpha91018,
    t5.alpha91022,
    t6.alpha91026,
    t7.alpha91030,
    t8.alpha91036,t8.alpha91034,t8.alpha91037


FROM
    hf_fzzq_20230328 t1
JOIN
    hf_fzzq_20230421 t2 ON t1.instrument = t2.instrument AND t1.date = t2.date
JOIN
    hf_fzzq_20220805 t3 ON t1.instrument = t3.instrument AND t1.date = t3.date
JOIN
    hf_fzzq_20220531 t4 ON t1.instrument = t4.instrument AND t1.date = t4.date
JOIN
    hf_fzzq_20220613 t5 ON t1.instrument = t5.instrument AND t1.date = t5.date
JOIN
    hf_fzzq_20220922 t6 ON t1.instrument = t6.instrument AND t1.date = t6.date
JOIN
    hf_fzzq_20230612 t7 ON t1.instrument = t7.instrument AND t1.date = t7.date
JOIN
    hf_fzzq_20230215 t8 ON t1.instrument = t8.instrument AND t1.date = t8.date



WHERE
    t1.date >= '2016-01-01' AND t1.date <= '2023-07-31';

''')

读取虚拟表

计算虚拟表的label

对虚拟表股票进行过滤处理 (如st 非北交所等)

In [ ]:
df = dai.query(f"SELECT * FROM data_001", filters={"date": [sd,ed]}).df()
In [ ]:
m1 = M.instruments.v2(
    start_date=sd,
    end_date=ed,
    market='CN_STOCK_A',
    instrument_list='',
    max_count=0
)

m2 = M.input_features.v1(
    features="""
alpha001=return_5
alpha002=return_10
alpha003=return_20
alpha004=avg_amount_0/avg_amount_5
alpha005=avg_amount_5/avg_amount_20
alpha006=rank_avg_amount_0/rank_avg_amount_5
alpha007=rank_avg_amount_5/rank_avg_amount_10
alpha008=rank_return_0
alpha009=rank_return_5
alpha010=rank_return_10
alpha011=rank_return_0/rank_return_5
alpha012=rank_return_5/rank_return_10
alpha013=pe_ttm_0

label= shift(open_0,-2)/shift(open_0,-1)

"""
)


m3 = M.general_feature_extractor.v7(
    instruments=m1.data,
    features=m2.data,
    start_date='',
    end_date='',
    before_start_days=0
)

m4 = M.derived_feature_extractor.v3(
    input_data=m3.data,
    features=m2.data,
    date_col='date',
    instrument_col='instrument',
    drop_na=False,
    remove_extra_columns=True,
    user_functions={}
)

m5 = M.chinaa_stock_filter.v1(
    input_data=m4.data,
    index_constituent_cond=['全部'],
    board_cond=['上证主板', '深证主板', '创业板', '科创板'],
    industry_cond=['全部'],
    st_cond=['正常'],
    delist_cond=['非退市'],
    output_left_data=False
)

df_ = m5.data.read()

df = pd.merge(df,df_,left_on=['instrument','date'],right_on=['instrument','date'],how='right')
df.sort_values(by='date',inplace=True,ascending=True)

根据时间跨度划分训练集

time_interval = 时间跨度
train_interval = 训练天数,每一个模型使用多久数据训练
test_interval = 预测天数,每一个模型预测多少天,预测天数同时等于模型更新频率(天为单位)
label_shift = 标签漂移天数(例如使用 open.shift(-2)/open.shift(-1) 作为未来收益率,实际使用了未来2天的数据,则标签漂移天数为2
train_col = 因子列表
In [ ]:
##

time_interval = (sd,ed)
train_interval = 180 #训练天数
test_interval = 20 #预测天数
label_shift = 2 #标签滞后天数
group_length = train_interval + test_interval + label_shift - 1

#获得日期
td_list = DataSource("trading_days").read(start_date=time_interval[0], end_date=time_interval[-1])
td_list = td_list[td_list.country_code=="CN"]
td_list.date = td_list.date.astype(np.str)

train_start_date = list(td_list[::test_interval].date) #构建训练起始日期序列
train_end_date = list(td_list[(train_interval - label_shift - 1):][::test_interval].date) #训练结束日期序列
test_start_date = list(td_list[train_interval:][::test_interval].date) #测试起始日期序列
test_end_date = list(td_list[(group_length - label_shift - 1):][::test_interval].date)  #测试结束日期序列

定义训练特征列名

In [ ]:
train_col = sorted([col_name for col_name in df.columns if 'alpha' in col_name])
df.dropna(subset=train_col,inplace=True)

自定义训练模型

示例仅以LGBM为例

In [ ]:
def train(df,train_sd,train_ed,test_sd,test_ed,train_col):

    #导入模型
    import lightgbm as lgb
    from sklearn.model_selection import train_test_split

    import warnings
    warnings.filterwarnings('ignore')

    try:
        df=df.dropna(subset=train_col,axis=0)
        #print(f'training till {test_ed}')
        #日期调整,方便筛选=====================
        train_ed = datetime.datetime.strptime(train_ed, '%Y-%m-%d')
        train_ed = train_ed + datetime.timedelta(days=1)
        train_ed = datetime.datetime.strftime(train_ed, '%Y-%m-%d')
        
        test_ed = datetime.datetime.strptime(test_ed, '%Y-%m-%d')
        test_ed = test_ed + datetime.timedelta(days=1)
        test_ed = datetime.datetime.strftime(test_ed, '%Y-%m-%d')
        
        #训练========================
        x_train = df[(df['date'] >= train_sd) & (df['date'] < train_ed)][train_col]
        y_train = df[(df['date'] >= train_sd) & (df['date'] < train_ed)]['label']

        X_train, X_test, y_train, y_test = train_test_split(x_train, y_train, test_size=0.2, random_state=42)

        #划分训练集验证集
        lgb_train = lgb.Dataset(X_train, y_train)
        lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)

        # 设置模型参数
        params = {
            'boosting_type': 'gbdt',
            'objective': 'regression',
            'metric': {'l2', 'l1'},
            'num_leaves': 100,
            'learning_rate': 0.05,
            'feature_fraction': 0.5,
            'bagging_fraction': 0.5,
            'bagging_freq': 5,
            'verbose': 0
        }
        

        # 训练模型
        gbm = lgb.train(params,
                        lgb_train,
                        num_boost_round=100,
                        valid_sets=lgb_eval,
                        early_stopping_rounds=5)

        # 预测数据
        X_test = df[(df['date'] >= test_sd) & (df['date'] < test_ed)]
        X_test['pred'] = gbm.predict(X_test[train_col], num_iteration=gbm.best_iteration)

        
        return X_test
        
    except:
        return

并行滚动训练

In [ ]:
from joblib import Parallel, delayed
res = Parallel(n_jobs=-1)(delayed(train)(df,train_sd,train_ed,test_sd,test_ed,train_col) for train_sd,train_ed,test_sd,test_ed in zip(train_start_date, train_end_date, test_start_date, test_end_date))
df = pd.concat(res)

定义回测时间

In [ ]:
df.sort_values(by='date',inplace=True)
sd_ = np.str(df.date.unique()[0])
ed_ = np.str(df.date.unique()[-1])

back_test_time = M.instruments.v2(
    start_date=sd_,
    end_date=ed_,
    market='CN_STOCK_A',
    instrument_list='',
    max_count=0
)


df_out = DataSource.write_df(df)

开始回测

In [ ]:
# 交易引擎:初始化函数,只执行一次
def m4_initialize_bigquant_run(context):

    context.ranker_prediction = context.options['data'].read_df()
    context.ranker_prediction.set_index('date',inplace=True)
    


# 交易引擎:每个单位时间开盘前调用一次。
def m4_before_trading_start_bigquant_run(context, data):
    # 盘前处理,订阅行情等
    #context.subscribe_bar(context.instruments,'1m')
    pass

# 交易引擎:tick数据处理函数,每个tick执行一次
def m4_handle_tick_bigquant_run(context, tick):
    pass

def handle_bar(context, bar):
    pass


# 交易引擎:bar数据处理函数,每个时间单位执行一次
def m4_handle_data_bigquant_run(context,data):


    remainder = context.trading_day_index % 20
    #如果没到调仓期直接结束运行
    if remainder !=0:
        return



    import datetime
    #初始化
    buy_list = []  #买入列表
    sell_list = [] #卖出列表
    
    #==================== 数据准备
    today = data.current_dt.strftime('%Y-%m-%d')
    time = data.current_dt

    account_pos = context.get_account_positions()
    holding_list = list({key: value for key, value in account_pos.items() if value.avail_qty > 0}.keys())
    holding_num = len(holding_list)

    #读取当日数据
    try:
        today_data  = context.ranker_prediction.loc[today,:]
        today_data.reset_index(inplace=True)
    except:
        return

    today_data.dropna(inplace=True)
    today_data.sort_values(by='pred',ascending = False , inplace=True)



    
    #构建买入列表
    target_list = today_data.instrument.to_list()[:50] #选股



    #构建卖出列表
    for ins in holding_list:
        if ins not in target_list:
            sell_list.append(ins)

    #构建买入列表
    for ins in target_list:
        if ins not in holding_list:
            buy_list.append(ins)
            
    for ins in sell_list:
        context.order_target(ins,0)


    for ins in buy_list:
        context.order_target_percent(ins, 0.02)


    

# 交易引擎:成交回报处理函数,每个成交发生时执行一次
def m4_handle_trade_bigquant_run(context, trade):
    pass

# 交易引擎:委托回报处理函数,每个委托变化时执行一次
def m4_handle_order_bigquant_run(context, order):
    pass

# 交易引擎:盘后处理函数,每日盘后执行一次
def m4_after_trading_bigquant_run(context, data):
    pass
In [ ]:
trade = M.hftrade.v2(
    instruments=back_test_time.data,
    options_data=df_out,
    start_date='',
    end_date='',
    initialize=m4_initialize_bigquant_run,
    before_trading_start=m4_before_trading_start_bigquant_run,
    handle_tick=m4_handle_tick_bigquant_run,
    handle_data=m4_handle_data_bigquant_run,
    handle_trade=m4_handle_trade_bigquant_run,
    handle_order=m4_handle_order_bigquant_run,
    after_trading=m4_after_trading_bigquant_run,
    capital_base=100000000,
    frequency='daily',
    price_type='真实价格',
    product_type='股票',
    before_start_days='0',
    volume_limit=0,
    order_price_field_buy='open',
    order_price_field_sell='open',
    benchmark='000905.HIX',
    plot_charts=True,
    disable_cache=True,
    replay_bdb=False,
    show_debug_info=False,
    backtest_only=False
)