# 0. 基础参数配置
class lstm_conf:
# 股票代码:用LSTM预测沪深300指数
instrument = '000300.HIX'
# 设置用于训练和回测的开始/结束日期
start_date = '2005-01-01'
split_date = '2015-01-01'
end_date = '2017-05-01'
fields = ['close', 'open', 'high', 'low', 'amount', 'volume']
# 每个input的长度,使用过去30天的数据
feature_back_days = 30
# 指定进行梯度下降时每个batch包含的样本数,训练时一个batch的样本会被计算一次梯度下降,使目标函数优化一步
batch_size = 100
# 1. 加载数据
def load_and_label_data(instrument, start_date, end_date, fields):
df = D.history_data(instrument, start_date, end_date, fields)
# 只保留有数据的交易日
df = df[df.amount>0]
# 计算收益:以明日开盘价买入,第五天的收盘价卖出
df['return'] = df['close'].shift(-5) / df['open'].shift(-1) - 1
#去极值
df['return'] = df['return'].clip(-0.2, 0.2)
# 适当增大return范围,利于LSTM模型训练
df['return'] = df['return'] * 10
# 辅助输入
df['label'] = np.round(df['close'] / 500)
df.dropna(inplace=True)
df.reset_index(drop=True, inplace=True)
return Outputs(data=DataSource.write_df(df))
lstm_m1 = M.cached.v2(run=load_and_label_data, kwargs=dict(
instrument=lstm_conf.instrument,
start_date=lstm_conf.start_date,
end_date=lstm_conf.end_date,
fields=lstm_conf.fields
))
[2020-05-06 16:55:20.453084] INFO: moduleinvoker: cached.v2 开始运行..
[2020-05-06 16:55:28.505047] INFO: moduleinvoker: cached.v2 运行完成[8.052002s].
# 2. 生成数据集
def generate_datasets(input_ds, x_fields, feature_back_days):
from numpy.lib.stride_tricks import as_strided
from sklearn.preprocessing import scale
input_df = input_ds.read_df()
X_data = input_df[x_fields].values
rows, cols = X_data.shape
row_stride, col_stride = X_data.strides
# stride view
X = as_strided(
X_data,
shape=(rows - feature_back_days + 1, feature_back_days, cols),
strides=(row_stride, row_stride, col_stride))
# copy x for update
X = np.array(X)
X = [scale(x) for x in X]
X_aux = input_df['label'].values[feature_back_days - 1:]
Y = input_df['return'].values[feature_back_days - 1:]
meta_date = input_df['date'].values[feature_back_days - 1:]
df = pd.DataFrame({
'date': meta_date,
'X': X,
'Y': Y,
'X_aux': X_aux
})
return Outputs(data=DataSource.write_df(df))
lstm_m2 = M.cached.v2(run=generate_datasets, kwargs=dict(
input_ds=lstm_m1.data,
x_fields=lstm_conf.fields,
feature_back_days=lstm_conf.feature_back_days
))
[2020-05-06 16:55:44.110608] INFO: moduleinvoker: cached.v2 开始运行..
[2020-05-06 16:55:45.963135] INFO: moduleinvoker: cached.v2 运行完成[1.852516s].
# 3. 拆分训练数据和测试数据
lstm_m3_train = M.filter.v2(data=lstm_m2.data, expr='date <= "%s"' % lstm_conf.split_date)
lstm_m3_evaluation = M.filter.v2(data=lstm_m2.data, expr='date > "%s"' % lstm_conf.split_date)
[2020-05-06 16:55:50.514641] INFO: moduleinvoker: filter.v2 开始运行..
[2020-05-06 16:55:50.522155] INFO: filter: filter with expr date <= "2015-01-01"
[2020-05-06 16:55:50.613300] INFO: filter: filter /data, 2398/2958
[2020-05-06 16:55:50.670605] INFO: moduleinvoker: filter.v2 运行完成[0.155955s].
[2020-05-06 16:55:50.672880] INFO: moduleinvoker: filter.v2 开始运行..
[2020-05-06 16:55:50.706424] INFO: filter: filter with expr date > "2015-01-01"
[2020-05-06 16:55:50.767170] INFO: filter: filter /data, 560/2958
[2020-05-06 16:55:50.871515] INFO: moduleinvoker: filter.v2 运行完成[0.198612s].
# 4. LSTM模型训练
# 自定义激活函数
def activation_atan(x):
import tensorflow.keras as tf
return tf.atan(x)
def lstm_train(input_ds, batch_size, activation):
from tensorflow.keras.layers import Input, Dense, LSTM, concatenate
from tensorflow.keras.models import Model
# 构建神经网络层 1层LSTM层+3层Dense层
lstm_input = Input(shape=(30, 6), name='lstm_input')
# lstm_output = LSTM(128, activation=activation, dropout_W=0.2, dropout_U=0.1)(lstm_input)
lstm_output = LSTM(128, input_shape=(30,6))(lstm_input)
aux_input = Input(shape=(1,), name='aux_input')
merged_data = concatenate([lstm_output, aux_input],axis=-1)
dense_output_1 = Dense(64, activation='linear')(merged_data)
dense_output_2 = Dense(8, activation='linear')(dense_output_1)
predictions = Dense(1)(dense_output_2)
model = Model(inputs=[lstm_input, aux_input], outputs=predictions)
model.compile(optimizer='adam', loss='mse', metrics=['mse'])
df = input_ds.read_df()
history = model.fit(
[np.array(df['X'].values.tolist()), np.array(df['X_aux'].values)],
np.array(df['Y'].values),
batch_size=batch_size,
nb_epoch=2,
verbose=2
)
# 保存模型
model_yaml = model.to_yaml()
model_weights = model.get_weights()
ds = DataSource.write_pickle(
{'model_graph': model_yaml, 'model_weights': model_weights, 'history': history.history})
return Outputs(data=ds)
lstm_m4 = M.cached.v2(run=lstm_train, kwargs=dict(
input_ds=lstm_m3_train.data,
batch_size=lstm_conf.batch_size,
activation=activation_atan
))
[2020-05-06 18:23:39.723401] INFO: moduleinvoker: cached.v2 开始运行..
[2020-05-06 18:23:54.376515] INFO: moduleinvoker: cached.v2 运行完成[14.653086s].
lstm_m4
# 5. LSTM 预测
dd = [None, None]
def lstm_predict(model_ds, data_ds, activation):
from tensorflow.keras.models import model_from_yaml
from tensorflow.keras.models import load_model
from tensorflow.keras import activations
model_dict = model_ds.read_pickle()
model = model_from_yaml(model_dict['model_graph'])
model.set_weights(model_dict['model_weights'])
activations.activation_atan = activation
df = data_ds.read_df()
predictions = model.predict(
[np.array(df['X'].values.tolist()), np.array(df['X_aux'].values)])
df['score'] = predictions.flatten()
# 预测值和真实值的分布
T.plot(
df,
x='Y', y=['score'], chart_type='scatter',
title='LSTM预测结果:实际值 vs. 预测值'
)
return Outputs(data=DataSource.write_df(df[['date', 'score']]))
lstm_m5 = M.cached.v2(run=lstm_predict, kwargs=dict(
model_ds=lstm_m4.data,
data_ds=lstm_m3_evaluation.data,
activation=activation_atan
))
[2020-05-06 18:30:32.578549] INFO: moduleinvoker: cached.v2 开始运行..
[2020-05-06 18:30:34.406733] INFO: moduleinvoker: cached.v2 运行完成[1.828172s].
# 6. 回测:在沪深300上回测
def initialize(context):
# 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
# 传入预测数据和真实数据
context.predictions = context.options['prediction_ds'].read_df()
# 回测引擎:每日数据处理函数,每天执行一次
def handle_data(context, data):
current_dt = data.current_dt.strftime('%Y-%m-%d')
today_df = context.predictions[context.predictions.date == current_dt]
if len(today_df) <= 0:
return
score = today_df.score.iloc[0]
sid = context.symbol(context.options['instrument'])
# 当前持仓
cur_position = context.portfolio.positions[sid].amount
if cur_position == 0:
# 如果当前没有仓位
if score > 0:
# 如果预测要上涨
context.order_target_percent(sid, 0.9)
context.extension['last_buy_date'] = current_dt
else:
# 如果预测要下跌,并且持有超过了五天
if score < 0:
hold_days = context.trading_calendar.session_distance(
pd.Timestamp(context.extension['last_buy_date']),
pd.Timestamp(current_dt)
)
if hold_days >= 5:
context.order_target(sid, 0)
# 调用回测引擎
lstm_m6 = M.trade.v4(
instruments=DataSource().write_pickle(lstm_conf.instrument),
start_date=lstm_conf.split_date,
end_date=lstm_conf.end_date,
handle_data=handle_data,
initialize=initialize,
volume_limit=0.025,
order_price_field_buy='open',
order_price_field_sell='close',
capital_base=1000000,
auto_cancel_non_tradable_orders=True,
data_frequency='daily',
price_type='真实价格',
product_type='股票',
plot_charts=True,
backtest_only=False,
benchmark='000300.SHA',
options={'instrument': lstm_conf.instrument, 'prediction_ds': lstm_m5.data}
)
[2020-05-06 18:56:30.832318] INFO: moduleinvoker: backtest.v8 开始运行..
[2020-05-06 18:56:30.836476] INFO: backtest: biglearning backtest:V8.3.4
[2020-05-06 18:56:30.837392] INFO: backtest: product_type:stock by specified
[2020-05-06 18:56:30.838451] INFO: backtest: 其它市场:{'HIX'}
[2020-05-06 18:56:31.005372] INFO: moduleinvoker: cached.v2 开始运行..
[2020-05-06 18:56:31.014759] INFO: moduleinvoker: 命中缓存
[2020-05-06 18:56:31.016227] INFO: moduleinvoker: cached.v2 运行完成[0.01087s].
[2020-05-06 18:56:31.181486] INFO: algo: TradingAlgorithm V1.6.7
[2020-05-06 18:56:31.423098] INFO: algo: trading transform...
[2020-05-06 18:56:32.629203] INFO: Performance: Simulated 565 trading days out of 565.
[2020-05-06 18:56:32.630321] INFO: Performance: first open: 2015-01-05 09:30:00+00:00
[2020-05-06 18:56:32.631172] INFO: Performance: last close: 2017-04-28 15:00:00+00:00
[2020-05-06 18:56:40.708567] INFO: moduleinvoker: backtest.v8 运行完成[9.876247s].
[2020-05-06 18:56:40.709728] INFO: moduleinvoker: trade.v4 运行完成[9.91252s].