从LSTM示例策略上直接克隆不做任何修改,运行报错

问答交流
标签: #<Tag:0x00007fcf6d459860>

(qci133) #1
克隆策略
In [1]:
class conf:
    instrument = '000300.SHA'  #股票代码
    #设置用于训练和回测的开始/结束日期
    start_date = '2005-01-01'  
    end_date='2017-07-19'
    field='close'
    seq_len=100 #每个input的长度
    prediction_len=20 #预测数据长度
    train_proportion=0.8 #训练数据占总数据量的比值,其余为测试数据
    normalise=True #数据标准化
    epochs  = 1 #LSTM神经网络迭代次数
    batch=100 #整数,指定进行梯度下降时每个batch包含的样本数,训练时一个batch的样本会被计算一次梯度下降,使目标函数优化一步
    validation_split=0.1 # 0~1之间的浮点数,用来指定训练集的一定比例数据作为验证集。
    lr=0.001 #学习效率
    

# 2. LSTM策略主体
import time
import numpy as np
import matplotlib.pyplot as plt
from numpy import newaxis
from keras.layers.core import Dense, Activation, Dropout
from keras.layers.recurrent import LSTM
from keras.models import Sequential
from keras import optimizers


def load_data(instrument,start_date,end_date,field,seq_len,prediction_len,train_proportion,normalise=True):
    # 加载数据,数据变化,提取数据模块
    fields=[field,'amount']
    data=D.history_data(instrument,start_date,end_date,fields)
    data=data[data.amount>0]
    datetime=list(data['date'])
    data=list(data[field])
    seq_len=seq_len+1  
    result=[]
    for index in range(len(data)-seq_len):
        result.append(data[index:index+seq_len])
        
    if normalise:
        norm_result=normalise_windows(result)
    else:
        norm_result=result
        
    result=np.array(result)
    norm_result=np.array(norm_result)
    
    row=round(train_proportion*norm_result.shape[0])
    
    data_test=result[int(row):,:]
    datetime=datetime[int(row):]

    test_datetime=[]
    for index in range(len(datetime)):
        if index % prediction_len==0 and index+seq_len<len(datetime)-prediction_len:
            test_datetime.append(datetime[index+seq_len])
    
    train=norm_result[:int(row),:]
    np.random.shuffle(train)   #随机打乱训练样本
    x_train = train[:, :-1]
    y_train = train[:, -1]
    x_test = norm_result[int(row):, :-1]
    y_test = norm_result[int(row):, -1]
    
    x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1))
    x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1))  

    return [x_train, y_train, x_test, y_test, data_test, test_datetime]
    
def normalise_windows(window_data):
    #数据规范化
    normalised_data = []
    for window in window_data:
        normalised_window = [((float(p) / float(window[0])) - 1) for p in window]
        normalised_data.append(normalised_window)
    return normalised_data

def denormalise_windows(normdata,data,seq_len):
    #数据反规范化
    denormalised_data = []
    wholelen=0
    for i, rowdata in enumerate(normdata):
        denormalise=list()
        if isinstance(rowdata,float)|isinstance(rowdata,np.float32):
            denormalise = [(rowdata+1)*float(data[wholelen][0])]
            denormalised_data.append(denormalise)
            wholelen=wholelen+1
        else:       
            for j in range(len(rowdata)):
                denormalise.append((float(rowdata[j])+1)*float(data[wholelen][0]))
                wholelen=wholelen+1
            denormalised_data.append(denormalise)
    return denormalised_data

def build_model(layers):
    # LSTM神经网络层
    # 详细介绍请参考http://keras-cn.readthedocs.io/en/latest/
    model = Sequential()  

    model.add(LSTM(input_dim=layers[0],output_dim=layers[1],return_sequences=True))
    model.add(Dropout(0.2))
    
    model.add(LSTM(
        layers[1],
        return_sequences=False))
    model.add(Dropout(0.2))
    
    model.add(Dense(
        input_dim=layers[1],
        output_dim=layers[2]))
    model.add(Activation("linear"))

    rms=optimizers.RMSprop(lr=conf.lr, rho=0.9, epsilon=1e-06)
    model.compile(loss="mse", optimizer=rms)
    start = time.time()
    print("> Compilation Time : ", time.time() - start)
    return model

def predict_point_by_point(model, data):
    #每次只预测1步长
    predicted = model.predict(data)
    predicted = np.reshape(predicted, (predicted.size,))
    return predicted

def predict_sequence_full(model, data, seq_len):
    #根据训练模型和第一段用来预测的时间序列长度逐步预测整个时间序列
    curr_frame = data[0]
    predicted = []
    for i in range(len(data)):
        predicted.append(model.predict(curr_frame[newaxis,:,:])[0,0])
        curr_frame = curr_frame[1:]
        curr_frame = np.insert(curr_frame, [seq_len-1], predicted[-1], axis=0)
    return predicted

def predict_sequences_multiple(model, data, seq_len, prediction_len):
    #根据训练模型和每段用来预测的时间序列长度逐步预测prediction_len长度的序列
    prediction_seqs = []
    for i in range(int(len(data)/prediction_len)):
        curr_frame = data[i*prediction_len]
        predicted = []
        for j in range(prediction_len):
            predicted.append(model.predict(curr_frame[newaxis,:,:])[0,0])
            curr_frame = curr_frame[1:]
            curr_frame = np.insert(curr_frame, [seq_len-1], predicted[-1], axis=0)
        prediction_seqs.append(predicted)
    return prediction_seqs

def plot_results(predicted_data, true_data):
    #做图函数,用于predict_point_by_point和predict_sequence_full
    fig = plt.figure(facecolor='white')
    ax = fig.add_subplot(111)
    ax.plot(true_data, label='True Data')
    plt.plot(predicted_data)
    plt.legend()
    figure=plt.gcf()
    figure.set_size_inches(20,10)
    plt.show()

def plot_results_multiple(predicted_data, true_data, prediction_len):
    #做图函数,用于predict_sequences_multiple
    fig = plt.figure(facecolor='white')
    ax = fig.add_subplot(111)
    ax.plot(true_data, label='True Data')
    for i, data in enumerate(predicted_data):
        padding = [None for p in range(i * prediction_len)]
        plt.plot(padding + data)    
    plt.legend()
    figure=plt.gcf()
    figure.set_size_inches(20,10)
    plt.show()

#主程序
global_start_time = time.time() 

print('> Loading data... ')

X_train,y_train,X_test,y_test,data_test,test_datetime=load_data(conf.instrument,conf.start_date,conf.end_date,conf.field,conf.seq_len,conf.prediction_len,conf.train_proportion,normalise=True)

print('> Data Loaded. Compiling...')

model = build_model([1, conf.seq_len, 1])

model.fit(
    X_train,
    y_train,
    batch_size=conf.batch,
    nb_epoch=conf.epochs,
    validation_split=conf.validation_split)


predictions = predict_sequences_multiple(model, X_test, conf.seq_len, conf.prediction_len)
# predictions = predict_sequence_full(model, X_test, conf.seq_len)
# predictions = predict_point_by_point(model, X_test)  
   
if conf.normalise==True:   
    predictions=denormalise_windows(predictions,data_test,conf.seq_len)
    y_test=denormalise_windows(y_test,data_test,conf.seq_len)

print('Training duration (s) : ', time.time() - global_start_time)
plot_results_multiple(predictions, y_test, conf.prediction_len)
# plot_results(predictions, y_test)
Using TensorFlow backend.
> Loading data... 
> Data Loaded. Compiling...
> Compilation Time :  9.5367431640625e-07
Train on 2121 samples, validate on 236 samples
Epoch 1/1
2121/2121 [==============================] - 18s - loss: 0.0153 - val_loss: 0.0030
Training duration (s) :  40.46105647087097
In [2]:
len(predictions)
Out[2]:
29
In [3]:
# 1. 策略基本参数
# 3.回测
# 目前回测结构主要针对predict_sequences_multiple的预测结果,并且股票最好没有停牌
# 回测其他结果可自行修改handle_data
def initialize(context):
    # 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
    context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
    # 传入预测数据和真实数据
    context.predictions=predictions
    context.true_data=y_test
    context.date_time=test_datetime
    # 设置持仓比
    context.percent = 0.7
    # 设置持仓天数
    context.hold_days=conf.prediction_len
    # 传入起止时间
    context.start_date=context.date_time[0].strftime('%Y-%m-%d')
    context.end_date=context.date_time[-1].strftime('%Y-%m-%d')
    # 结束时间预计至少比开始时间多gap点才进场
    context.gap=0
    context.dt = 0

# 回测引擎:每日数据处理函数,每天执行一次
def handle_data(context, data):
    current_dt = data.current_dt.strftime('%Y-%m-%d') 
    
    can_do=pd.Timestamp(current_dt) in context.date_time
    if can_do:
        context.dt = current_dt
        
    sid = context.symbol(conf.instrument)
    cur_position = context.portfolio.positions[sid].amount    # 持仓

    row=context.date_time.index(pd.Timestamp(context.dt))
  
    prediction=context.predictions[row]
    # 满足开仓条件
    if prediction[-1]-prediction[0]>=context.gap and cur_position == 0 and data.can_trade(sid):
        context.order_target_percent(sid, 1)
    elif prediction[-1]-prediction[0]<context.gap and cur_position > 0 and data.can_trade(sid):
            context.order_target(sid, 0)

# 调用回测引擎
m8 = M.trade.v2(
    instruments=conf.instrument,
    start_date=test_datetime[0].strftime('%Y-%m-%d'),
    end_date=test_datetime[len(test_datetime)-1].strftime('%Y-%m-%d'),
    initialize=initialize,
    handle_data=handle_data,
    order_price_field_buy='open',       # 表示 开盘 时买入
    order_price_field_sell='close',     # 表示 收盘 前卖出
    capital_base=10000, 
    benchmark='000300.SHA', 
    # 通过 options 参数传递预测数据和参数给回测引擎
    options={'predictions': predictions}
)    
[2017-10-10 23:24:23.641440] INFO: bigquant: backtest.v7 开始运行..
[2017-10-10 23:24:23.646339] ERROR: bigquant: 输入股票列表有误,请检查确认!
!!WARNING: 回测结果返回为空
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-3-afd6007f62f4> in <module>()
     53     benchmark='000300.SHA',
     54     # 通过 options 参数传递预测数据和参数给回测引擎
---> 55     options={'predictions': predictions}
     56 )    

TypeError: cache_set, not supported type: <class 'NoneType'>

(iQuant) #2

你好,这是版本太旧的原因。
你在回测中,将M.trade.v2修改为M.trade.v3就没有问题了。
欢迎反馈,如果还有问题,及时交流。也欢迎你加入BigQuant官方交流群:537280168