import pandas as pd
import matplotlib.pylab as plt
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation, Flatten
from keras.layers.recurrent import LSTM, GRU
from keras.layers import Convolution1D, MaxPooling1D, AtrousConvolution1D, RepeatVector
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger
from keras.layers.wrappers import Bidirectional
from keras import regularizers
from keras.layers.normalization import BatchNormalization
from keras.layers.advanced_activations import *
from keras.optimizers import RMSprop, Adam, SGD, Nadam
from keras.initializers import *
from sklearn.model_selection import train_test_split
import seaborn as sns
sns.despine()
在分析时间序列数据的时候,我们不能用随机化的方法来分割训练集和测试集。
虽然大部分机器学习算法进行训练的时候都是随机化分割训练集,但是这种分割方法的前提是数据是全部符合某一固定的概率分布的,而我们的金融数据时间序列显然不具备这样的特诊。另一方面,我们的因子都是包含一个时间窗口的历史数据,如果利用随机化分割,测试集的很多信息其实都已经隐含在了训练集里面。 在文章原文里面就利用了随机化进行分割,得到了60%准确率的成绩,但是在实际回测中效果却没有那么好。这一点是需要重视的。
感兴趣的话可以自己在下面注释掉原来的函数试一试。利用随机分割的测试集甚至可以达到70%的准确率,不过这并不真正的实用。
# 按时间分离训练集和测试集
def create_Xt_Yt(X, Y, ratio=0.9):
p = int(len(X) * ratio)
X_train = X[0:p]
X_test = X[p:]
Y_train = Y[0:p]
Y_test = Y[p:]
return X_train, X_test, Y_train, Y_test
#训练起始时间
start_date='2005-01-01'
end_date='2016-02-01'
#inst = D.instruments(start_date, end_date, market='CN_STOCK_A')
#print(inst)
instruments = ['000300.SHA']
features = ['close', 'open', 'high', 'low', 'volume']
hist = D.history_data(instruments, start_date, end_date, fields=features)
print(hist.head())
plt.plot(hist['date'], hist['close'])
plt.show()
# 转化为时间序列数据
closep = hist['close'].tolist()
'''
data = []
for feature in features:
data.append(hist[feature].tolist())
'''
WINDOW = 60
EMB_SIZE = len(features)
STEP = 1
FORECAST = 1
# Straightforward way for creating time windows
X, Y = [], []
for i in range(0, len(hist), STEP):
try:
o = hist['open'].tolist()[i:i+WINDOW]
h = hist['high'].tolist()[i:i+WINDOW]
l = hist['low'].tolist()[i:i+WINDOW]
c = hist['close'].tolist()[i:i+WINDOW]
v = hist['volume'].tolist()[i:i+WINDOW]
o = (np.array(o) - np.mean(o)) / np.std(o)
h = (np.array(h) - np.mean(h)) / np.std(h)
l = (np.array(l) - np.mean(l)) / np.std(l)
c = (np.array(c) - np.mean(c)) / np.std(c)
v = (np.array(v) - np.mean(v)) / np.std(v)
'''
x_i = []
# normalization for one time window
for arr in data:
o = arr[i:i+WINDOW]
o = (np.array(o) - np.mean(o)) / np.std(o)
x_i.append(o)
print(x_i)
x_i = np.array(x_i)
print(x_i.shape)
temp_i = closep[i:i+WINDOW]
y_i = closep[i+WINDOW+FORECAST]
last_close = temp_i[-1]
next_close = y_i
'''
x_i = closep[i:i+WINDOW]
y_i = closep[i+WINDOW+FORECAST]
last_close = x_i[-1]
next_close = y_i
if last_close * (1 + 0.00) < next_close:
y_i = [1, 0]
elif last_close * (1 - 0.00) > next_close:
y_i = [0, 1]
else: y_i = [0, 0]
x_i = np.column_stack((o, h, l, c, v))
except Exception as e:
print(e)
break
X.append(x_i)
Y.append(y_i)
X, Y = np.array(X), np.array(Y)
print(X.shape)
X_train, X_test, Y_train, Y_test = create_Xt_Yt(X, Y) # 按时间分割训练集和测试集
#X_train, X_test, Y_train, Y_test = train_test_split(X, Y) # 随机化分割
#print(X_test)
#X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[2], EMB_SIZE))
#X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[2], EMB_SIZE))
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], EMB_SIZE))
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], EMB_SIZE))
print(X_train.shape)
# set up model
model = Sequential()
model.add(Convolution1D(input_shape = (WINDOW, EMB_SIZE),
nb_filter=16,
filter_length=4,
border_mode='same'))
model.add(BatchNormalization())
model.add(LeakyReLU())
model.add(Dropout(0.5))
model.add(Convolution1D(nb_filter=8,
filter_length=4,
border_mode='same'))
model.add(BatchNormalization())
model.add(LeakyReLU())
model.add(Dropout(0.5))
model.add(Flatten())
model.add(Dense(64))
model.add(BatchNormalization())
model.add(LeakyReLU())
model.add(Dense(2))
model.add(Activation('softmax'))
# prepare and train
opt = Nadam(lr=0.002)
reduce_lr = ReduceLROnPlateau(monitor='val_acc', factor=0.9, patience=30, min_lr=0.000001, verbose=1)
checkpointer = ModelCheckpoint(filepath="lolkek.hdf5", verbose=1, save_best_only=True)
model.compile(optimizer=opt,
loss='categorical_crossentropy',
metrics=['accuracy'])
history = model.fit(X_train, Y_train,
nb_epoch = 100,
batch_size = 128,
verbose=1,
validation_data=(X_test, Y_test),
callbacks=[reduce_lr, checkpointer],
shuffle=True)
model.load_weights("lolkek.hdf5")
pred = model.predict(np.array(X_test))
#for prediction in pred:
# print(np.argmax(prediction))
# loss plot
plt.figure()
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='best')
plt.show()
如果训练集的准确率持续上升而测试集反之,说明可能有过拟合现象,模型的泛化能力可能比较差。
# accuracy plot
plt.figure()
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='best')
plt.show()
# 1. 策略基本参数
# 回测起始时间
start_date_2 = '2016-02-06'
# 回测结束时间
end_date_2 = '2017-06-20'
# 策略比较参考标准,以沪深300为例
benchmark = '000300.INDX'
# 证券池 以贵州茅台为例
# instruments = ['000030.SZA']
# 起始资金
capital_base = 100000
hist = D.history_data(instruments, '2015-11-4', end_date_2, fields=features)
#print(hist)
# Straightforward way for creating time windows
prediction = {}
for i in range(0, len(hist) - WINDOW, STEP):
try:
o = hist['open'].tolist()[i:i+WINDOW]
h = hist['high'].tolist()[i:i+WINDOW]
l = hist['low'].tolist()[i:i+WINDOW]
c = hist['close'].tolist()[i:i+WINDOW]
v = hist['volume'].tolist()[i:i+WINDOW]
date = hist['date'][i+WINDOW].date()
o = (np.array(o) - np.mean(o)) / np.std(o)
h = (np.array(h) - np.mean(h)) / np.std(h)
l = (np.array(l) - np.mean(l)) / np.std(l)
c = (np.array(c) - np.mean(c)) / np.std(c)
v = (np.array(v) - np.mean(v)) / np.std(v)
x_i = np.column_stack((o, h, l, c, v))
except Exception as e:
print(e)
break
X = np.array([x_i])
pred = model.predict(X)
prediction[date] = pred
#print(prediction)
# 3. 策略主体函数
# 初始化虚拟账户状态,只在第一个交易日运行
def initialize(context):
# 设置手续费
context.set_commission(PerOrder(buy_cost=0.000, sell_cost=0.000, min_cost=5))
# 策略交易逻辑,每个交易日运行一次
def handle_data(context, data):
global prediction
date = data.current_dt.date()
# 在这里添加策略代码
for instrument in instruments:
# 字符型股票代码转化成 BigQuant回测引擎所需的股票代码
instrument = context.symbol(instrument)
if not data.can_trade(instrument):
break
try:
pred = prediction[date]
print(pred)
except Exception as e:
continue
threshold = 0
if np.argmax(pred[0]) == 0:
order_target_percent(instrument, 1)
elif np.argmax(pred[0]) == 1:
order_target_percent(instrument, 0)
# 3. 启动回测
# 策略回测接口: https://bigquant.com/docs/module_trade.html
m = M.trade.v1(
instruments=instruments,
start_date=start_date_2,
end_date=end_date_2,
initialize=initialize,
handle_data=handle_data,
# 买入订单以开盘价成交
order_price_field_buy='open',
# 卖出订单以开盘价成交
order_price_field_sell='open',
capital_base=capital_base,
benchmark=benchmark,
)
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
model.load_weights("model.hdf5")
pred = model.predict(np.array(X_test))
C = confusion_matrix([np.argmax(y) for y in Y_test], [np.argmax(y) for y in pred])
print(C / C.astype(np.float).sum(axis=1))