from sklearn.linear_model import LinearRegression
import pandas as pd
import numpy as np
from biglearning.module2.common.data import Outputs
from zipline.finance.commission import PerOrder
import datetime
from datetime import date,timedelta
from bigdatasource.api import DataSource
from biglearning.api import M
from biglearning.api import tools as T
from biglearning.module2.common.data import Outputs
import warnings
warnings.filterwarnings('ignore')
import xgboost as xgb
sd = '2016-01-01'
ed = '2023-07-31'
import dai
dai.DataSource.save_view("data_001",
'''
SELECT
t1.instrument, t1.date, t1.alpha91001,
t2.alpha91007,
t3.alpha91012,
t4.alpha91019,t4.alpha91018,
t5.alpha91022,
t6.alpha91026,
t7.alpha91030,
t8.alpha91036,t8.alpha91034,t8.alpha91037
FROM
hf_fzzq_20230328 t1
JOIN
hf_fzzq_20230421 t2 ON t1.instrument = t2.instrument AND t1.date = t2.date
JOIN
hf_fzzq_20220805 t3 ON t1.instrument = t3.instrument AND t1.date = t3.date
JOIN
hf_fzzq_20220531 t4 ON t1.instrument = t4.instrument AND t1.date = t4.date
JOIN
hf_fzzq_20220613 t5 ON t1.instrument = t5.instrument AND t1.date = t5.date
JOIN
hf_fzzq_20220922 t6 ON t1.instrument = t6.instrument AND t1.date = t6.date
JOIN
hf_fzzq_20230612 t7 ON t1.instrument = t7.instrument AND t1.date = t7.date
JOIN
hf_fzzq_20230215 t8 ON t1.instrument = t8.instrument AND t1.date = t8.date
WHERE
t1.date >= '2016-01-01' AND t1.date <= '2023-07-31';
''')
df = dai.query(f"SELECT * FROM data_001", filters={"date": [sd,ed]}).df()
m1 = M.instruments.v2(
start_date=sd,
end_date=ed,
market='CN_STOCK_A',
instrument_list='',
max_count=0
)
m2 = M.input_features.v1(
features="""
alpha001=return_5
alpha002=return_10
alpha003=return_20
alpha004=avg_amount_0/avg_amount_5
alpha005=avg_amount_5/avg_amount_20
alpha006=rank_avg_amount_0/rank_avg_amount_5
alpha007=rank_avg_amount_5/rank_avg_amount_10
alpha008=rank_return_0
alpha009=rank_return_5
alpha010=rank_return_10
alpha011=rank_return_0/rank_return_5
alpha012=rank_return_5/rank_return_10
alpha013=pe_ttm_0
label= shift(open_0,-2)/shift(open_0,-1)
"""
)
m3 = M.general_feature_extractor.v7(
instruments=m1.data,
features=m2.data,
start_date='',
end_date='',
before_start_days=0
)
m4 = M.derived_feature_extractor.v3(
input_data=m3.data,
features=m2.data,
date_col='date',
instrument_col='instrument',
drop_na=False,
remove_extra_columns=True,
user_functions={}
)
m5 = M.chinaa_stock_filter.v1(
input_data=m4.data,
index_constituent_cond=['全部'],
board_cond=['上证主板', '深证主板', '创业板', '科创板'],
industry_cond=['全部'],
st_cond=['正常'],
delist_cond=['非退市'],
output_left_data=False
)
df_ = m5.data.read()
df = pd.merge(df,df_,left_on=['instrument','date'],right_on=['instrument','date'],how='right')
df.sort_values(by='date',inplace=True,ascending=True)
##
time_interval = (sd,ed)
train_interval = 180 #训练天数
test_interval = 20 #预测天数
label_shift = 2 #标签滞后天数
group_length = train_interval + test_interval + label_shift - 1
#获得日期
td_list = DataSource("trading_days").read(start_date=time_interval[0], end_date=time_interval[-1])
td_list = td_list[td_list.country_code=="CN"]
td_list.date = td_list.date.astype(np.str)
train_start_date = list(td_list[::test_interval].date) #构建训练起始日期序列
train_end_date = list(td_list[(train_interval - label_shift - 1):][::test_interval].date) #训练结束日期序列
test_start_date = list(td_list[train_interval:][::test_interval].date) #测试起始日期序列
test_end_date = list(td_list[(group_length - label_shift - 1):][::test_interval].date) #测试结束日期序列
train_col = sorted([col_name for col_name in df.columns if 'alpha' in col_name])
df.dropna(subset=train_col,inplace=True)
def train(df,train_sd,train_ed,test_sd,test_ed,train_col):
#导入模型
import lightgbm as lgb
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')
try:
df=df.dropna(subset=train_col,axis=0)
#print(f'training till {test_ed}')
#日期调整,方便筛选=====================
train_ed = datetime.datetime.strptime(train_ed, '%Y-%m-%d')
train_ed = train_ed + datetime.timedelta(days=1)
train_ed = datetime.datetime.strftime(train_ed, '%Y-%m-%d')
test_ed = datetime.datetime.strptime(test_ed, '%Y-%m-%d')
test_ed = test_ed + datetime.timedelta(days=1)
test_ed = datetime.datetime.strftime(test_ed, '%Y-%m-%d')
#训练========================
x_train = df[(df['date'] >= train_sd) & (df['date'] < train_ed)][train_col]
y_train = df[(df['date'] >= train_sd) & (df['date'] < train_ed)]['label']
X_train, X_test, y_train, y_test = train_test_split(x_train, y_train, test_size=0.2, random_state=42)
#划分训练集验证集
lgb_train = lgb.Dataset(X_train, y_train)
lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)
# 设置模型参数
params = {
'boosting_type': 'gbdt',
'objective': 'regression',
'metric': {'l2', 'l1'},
'num_leaves': 100,
'learning_rate': 0.05,
'feature_fraction': 0.5,
'bagging_fraction': 0.5,
'bagging_freq': 5,
'verbose': 0
}
# 训练模型
gbm = lgb.train(params,
lgb_train,
num_boost_round=100,
valid_sets=lgb_eval,
early_stopping_rounds=5)
# 预测数据
X_test = df[(df['date'] >= test_sd) & (df['date'] < test_ed)]
X_test['pred'] = gbm.predict(X_test[train_col], num_iteration=gbm.best_iteration)
return X_test
except:
return
from joblib import Parallel, delayed
res = Parallel(n_jobs=-1)(delayed(train)(df,train_sd,train_ed,test_sd,test_ed,train_col) for train_sd,train_ed,test_sd,test_ed in zip(train_start_date, train_end_date, test_start_date, test_end_date))
df = pd.concat(res)
df.sort_values(by='date',inplace=True)
sd_ = np.str(df.date.unique()[0])
ed_ = np.str(df.date.unique()[-1])
back_test_time = M.instruments.v2(
start_date=sd_,
end_date=ed_,
market='CN_STOCK_A',
instrument_list='',
max_count=0
)
df_out = DataSource.write_df(df)
# 交易引擎:初始化函数,只执行一次
def m4_initialize_bigquant_run(context):
context.ranker_prediction = context.options['data'].read_df()
context.ranker_prediction.set_index('date',inplace=True)
# 交易引擎:每个单位时间开盘前调用一次。
def m4_before_trading_start_bigquant_run(context, data):
# 盘前处理,订阅行情等
#context.subscribe_bar(context.instruments,'1m')
pass
# 交易引擎:tick数据处理函数,每个tick执行一次
def m4_handle_tick_bigquant_run(context, tick):
pass
def handle_bar(context, bar):
pass
# 交易引擎:bar数据处理函数,每个时间单位执行一次
def m4_handle_data_bigquant_run(context,data):
remainder = context.trading_day_index % 20
#如果没到调仓期直接结束运行
if remainder !=0:
return
import datetime
#初始化
buy_list = [] #买入列表
sell_list = [] #卖出列表
#==================== 数据准备
today = data.current_dt.strftime('%Y-%m-%d')
time = data.current_dt
account_pos = context.get_account_positions()
holding_list = list({key: value for key, value in account_pos.items() if value.avail_qty > 0}.keys())
holding_num = len(holding_list)
#读取当日数据
try:
today_data = context.ranker_prediction.loc[today,:]
today_data.reset_index(inplace=True)
except:
return
today_data.dropna(inplace=True)
today_data.sort_values(by='pred',ascending = False , inplace=True)
#构建买入列表
target_list = today_data.instrument.to_list()[:50] #选股
#构建卖出列表
for ins in holding_list:
if ins not in target_list:
sell_list.append(ins)
#构建买入列表
for ins in target_list:
if ins not in holding_list:
buy_list.append(ins)
for ins in sell_list:
context.order_target(ins,0)
for ins in buy_list:
context.order_target_percent(ins, 0.02)
# 交易引擎:成交回报处理函数,每个成交发生时执行一次
def m4_handle_trade_bigquant_run(context, trade):
pass
# 交易引擎:委托回报处理函数,每个委托变化时执行一次
def m4_handle_order_bigquant_run(context, order):
pass
# 交易引擎:盘后处理函数,每日盘后执行一次
def m4_after_trading_bigquant_run(context, data):
pass
trade = M.hftrade.v2(
instruments=back_test_time.data,
options_data=df_out,
start_date='',
end_date='',
initialize=m4_initialize_bigquant_run,
before_trading_start=m4_before_trading_start_bigquant_run,
handle_tick=m4_handle_tick_bigquant_run,
handle_data=m4_handle_data_bigquant_run,
handle_trade=m4_handle_trade_bigquant_run,
handle_order=m4_handle_order_bigquant_run,
after_trading=m4_after_trading_bigquant_run,
capital_base=100000000,
frequency='daily',
price_type='真实价格',
product_type='股票',
before_start_days='0',
volume_limit=0,
order_price_field_buy='open',
order_price_field_sell='open',
benchmark='000905.HIX',
plot_charts=True,
disable_cache=True,
replay_bdb=False,
show_debug_info=False,
backtest_only=False
)