利用CNN对股票“图片”进行涨跌分类——一次尝试

深度学习
cnn
卷积神经网络
标签: #<Tag:0x00007fb00e55c290> #<Tag:0x00007fb00e55c0d8> #<Tag:0x00007fb006bb3ec0>

(Arthas) #1

首先解释一下标题:
CNN:卷积神经网络(Convolutional Neural Network), 在图像处理方面有出色表现,不是被川普怒怼的那个新闻网站;
股票涨跌:大家都懂的,呵呵;
股票图片:既然使用CNN,那么如果输入数据是股票某个周期的K线图片就太好了。当然,本文中使用的图片并不是在看盘软件上一张一张截下来的,而是利用OHLC数据“画”出来的;
尝试:这个词委婉一点说就是“一个很好的想法:slight_smile:",比较直白的说法是“没啥效果T_T”。


进入正题:
首先是画出图片。本文目前是仿照柱线图画的。

大致的想法是:
1. 对每个样本,将32time_steps×4features(OHLC)数据归一化处理,即所有取值均在[0,1]之间;
2. 构建一个128×128像素的全0数组,将[0,1]区间等分为128份,分到每列的128个像素点上;
3. 然后使用每四列构建一根K线(前三列画柱状线,第四列作为间隔行):第一列描绘开盘价,开盘价与该列的哪个像素点最近,那么这个像素点取值就由0变为1;第二列描绘高低价区间,将 最高价至 最低价 范围内的像素点取值由0变为1;第三列描绘收盘价,收盘价与该列的哪个像素点最近,那么这个像素点取值就由0变为1。

这样每个样本就构建了一张由32根K线组成,类似柱线图的“图片”,下面是一个样本画的一张图(为了便于观看,将0替换成空格,将1替换成圆点):


其实还蛮像柱线图的。

测试阶段:
原始数据:最终选择的数据是100只2005年以前上市的股票。1只股票数据太少,全部股票数据又太多,所以 股票三千,我只取一百;“上市时间前于2005年”这个条件 主要考虑在每只股票上取样数量不会太少。

生成样本:每只股票每32根K线生成一个样本,每隔8根K线取一次样。然后按照上述作图方法将其变成图片。标签:若未来五日收益为正,标签为[1,0],否则,标签为[0,1]。15年1月1日之前数据用作train和evaluate,之后数据用作test。

构建模型:本文所用模型共5层,先后顺序为 卷积层-池化层-卷积层-池化层-全连接层,中间还夹杂了两个Dropout和一个Flatten,用来防止过拟合和一维化数据,不过由于他们是无权重的,所以没将他们算作一层。

预测效果
在train和evaluate阶段,看起来还是不错的:

但是在test阶段:

第一个值是loss,第二个值是准确率,不要看反。。。呵呵

听说有一种很厉害的操作——去除label不明显的样本——可能会提高模型效果。所以本文又对训练样本进行了一次筛选,只保留了未来五日收益在最前30%和最后30%的样本。然后input到模型做训练。最终test集上效果:

有(mei)所(sha)改(xiao)善(guo)!

问题分析
目前发现的一个问题:一幅图中被标记的像素点太少了。下面两张图为train-evaluate样本和test样本中值为1的像素点占总像素点(128×128=16384)比重的分布。

值为1的像素点占总像素点比重平均不到5%,最大的比重也未超过10%。这说明在模型训练阶段有些像素点对应的weights仅仅被训练了很少的次数甚至未经训练,这影响了模型在test数据上的表现。之后可能会针对这一问题做一些改进,以增加每张图值为1的像素点占总像素点比例

代码部分
欢迎讨论!

克隆策略
In [1]:
import matplotlib.pyplot as plt
import keras
from keras.models import Sequential
from keras.layers.convolutional import Conv2D
from keras.layers.pooling import MaxPooling2D
from keras.layers.core import Dense, Dropout, Flatten
Using TensorFlow backend.
In [2]:
def df_to_picture(data, picture_len=128, gap_len=8, future_len=5):
    ### 参数说明 ###
    # picture_len为图像多少行/列像素点,picture_len=128即为共128×128=16384个像素点
    # gap_len为每隔几根K线取一次样本
    # future_len为计算未来几天收益率做为label
    
    # 每根K线要占4列,故picture_len必须为4的倍数
    if picture_len%4 != 0:
        raise ValueError('picture_len must be 4*n where n is positive integer')
    n = int(picture_len/4)
    # 计算label
    data = data[data.amount>0]
    data['label'] = (data['close'].shift(-1*future_len)/data['close']).apply(lambda x: math.log(x))
    data.dropna(inplace=True)
    data.reset_index(drop=True, inplace=True)
    # 处理数据,用每个样本的['open', 'low', 'high', 'close']的data生成一幅0,1图
    data_to_pic = []
    output_x = []
    output_y = []
    output_date = []
    unit = picture_len-1
    features = ['open', 'low', 'high', 'close']
    for i in range(n-1, len(data)):
        if (i+1) % gap_len == 0:
            # 每个样本数据归一化
            HH = max(data['high'][i+1-n:i+1])
            LL = min(data['low'][i+1-n:i+1])
            data_to_pic = (data[features][i+1-n:i+1].values-LL)/(HH-LL)
            picture = np.zeros((picture_len, picture_len))
            for j in range(n):
                picture[4*j][int(np.round(data_to_pic[j][0]*unit))] = 1
                picture[4*j+1][int(np.round(data_to_pic[j][1]*unit)):int(np.round(data_to_pic[j][2]*unit))+1] = 1
                picture[4*j+2][int(np.round(data_to_pic[j][3]*unit))] = 1
            output_x.append(picture)
            output_y.append(data['label'][i])
            output_date.append(data['date'][i])
    return np.transpose(np.array(output_x), (0,2,1)), np.array(output_y), np.array(output_date)
In [3]:
# 先选100只上市时间前于2005年的‘老股票’(主要是因为这样可以保障每只股票的数据量不会太少)做尝试。
wholeinstruments = D.history_data(D.instruments(start_date='2017-07-03', end_date='2017-07-03') , start_date='2017-07-03', end_date='2017-07-03', fields=['list_date'])
oldinstruments = wholeinstruments[wholeinstruments.list_date < '2005-01-01']
numpy.random.seed(111)
instruments = np.random.choice(oldinstruments.instrument, size=100, replace=False)
In [4]:
# 处理数据,并划分训练预测集
split_date = datetime.datetime.strptime('2015-01-01', '%Y-%m-%d')
x_train = []
train_y = []
x_test = []
test_y = []
data_ = D.history_data(instruments, '2005-01-01', '2017-07-01', ['open', 'low', 'high', 'close', 'amount'])
for i in range(len(instruments)):
    instrument = instruments[i]
    data0 = data_[data_.instrument==instrument]
    x, y, z = df_to_picture(data0, picture_len=128, gap_len=8, future_len=5)
    x_train = x_train + list(x[z<split_date])
    train_y = train_y + list(y[z<split_date])
    x_test = x_test + list(x[z>=split_date])
    test_y = test_y + list(y[z>=split_date])
    if (i+1) % 20 == 0:
        print(i)
19
39
59
79
99
In [5]:
# 没有设置阈值
y_train = []
y_test = []
for i in range(len(train_y)):
    if train_y[i]<=0:
        y_train.append(0)
    else:
        y_train.append(1)
        
for i in range(len(test_y)):
    if test_y[i]<=0:
        y_test.append(0)
    else:
        y_test.append(1)
        
y_train = np.array(y_train)
y_test = np.array(y_test)
In [6]:
# 设置阈值,取涨跌较明显的样本,并保证正负样本数据量接近(threshold1,threshold2取值使所取样本为未来收益率在最前最后30%)
threshold1 = 0.03
threshold2 = -0.025
x_train_with_threshold = []
y_train_with_threshold = []
for i in range(len(train_y)):
    if train_y[i]<=threshold2:
        x_train_with_threshold.append(x_train[i])
        y_train_with_threshold.append(0)
    if train_y[i]>=threshold1:
        x_train_with_threshold.append(x_train[i])
        y_train_with_threshold.append(1)

x_train_with_threshold = np.array(x_train_with_threshold)
y_train_with_threshold = np.array(y_train_with_threshold)
In [7]:
# 利用keras中的to_categorical函数将label转换为映射为二值类别矩阵
y_train = keras.utils.to_categorical(y_train, num_classes=2)
y_test = keras.utils.to_categorical(y_test, num_classes=2)
y_train_with_threshold = keras.utils.to_categorical(y_train_with_threshold, num_classes=2)
In [8]:
# 模型接受数组输入
x_train = np.array(x_train)
x_test = np.array(x_test)
In [9]:
# shuffle训练数据
def shuffle_data(data_x, data_y):
    rand = np.random.choice(len(data_x), size=len(data_x), replace=False)
    x = data_x[rand]
    y = data_y[rand]
    return x, y

x_train, y_train = shuffle_data(x_train, y_train)
x_train_with_threshold, y_train_with_threshold = shuffle_data(x_train_with_threshold, y_train_with_threshold)
In [10]:
x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], x_train.shape[2], 1))
x_train_with_threshold = np.reshape(x_train_with_threshold, (x_train_with_threshold.shape[0], x_train_with_threshold.shape[1], x_train_with_threshold.shape[2], 1))
x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], x_test.shape[2], 1))
In [11]:
model = Sequential()
# 卷积层1
model.add(Conv2D(input_shape=(x_train.shape[1],x_train.shape[2],x_train.shape[3]), filters=32, kernel_size=5, name='con1', activation='relu'))
# 池化层1
model.add(MaxPooling2D(pool_size=4))
# dropout层1(减轻训练阶段过拟合)
model.add(Dropout(0.3))
# 卷积层2
model.add(Conv2D(filters=64, kernel_size=4, name='con2', activation='relu'))
# 池化层2
model.add(MaxPooling2D(pool_size=4))
# Flatten用于将多维输入一维化,常用在从卷积层到全连接层的过渡
model.add(Flatten())
# dropout层2(减轻训练阶段过拟合)
model.add(Dropout(0.3))
# 全连接层
model.add(Dense(units=2, activation='sigmoid'))
# 设置优化器,损失函数,性能评估
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['binary_accuracy'])
In [74]:
# 画模型
# from IPython.display import SVG
# from keras.utils.vis_utils import model_to_dot

# SVG(model_to_dot(model, show_shapes=True, show_layer_names=False).create(prog='dot', format='svg'))
In [ ]:
# 训练(未设置阈值)
model.fit(x_train, y_train, batch_size=128, epochs=10, verbose=2, validation_split=0.1, shuffle=True)
Train on 25284 samples, validate on 2810 samples
Epoch 1/10
252s - loss: 0.6916 - binary_accuracy: 0.5269 - val_loss: 0.6916 - val_binary_accuracy: 0.5270
Epoch 2/10
252s - loss: 0.6897 - binary_accuracy: 0.5345 - val_loss: 0.6928 - val_binary_accuracy: 0.5265
Epoch 3/10
257s - loss: 0.6887 - binary_accuracy: 0.5379 - val_loss: 0.6903 - val_binary_accuracy: 0.5297
Epoch 4/10
260s - loss: 0.6872 - binary_accuracy: 0.5452 - val_loss: 0.6912 - val_binary_accuracy: 0.5233
Epoch 5/10
262s - loss: 0.6850 - binary_accuracy: 0.5518 - val_loss: 0.6910 - val_binary_accuracy: 0.5285
Epoch 6/10
275s - loss: 0.6836 - binary_accuracy: 0.5557 - val_loss: 0.6901 - val_binary_accuracy: 0.5294
Epoch 7/10
296s - loss: 0.6815 - binary_accuracy: 0.5628 - val_loss: 0.6888 - val_binary_accuracy: 0.5420
Epoch 8/10
300s - loss: 0.6804 - binary_accuracy: 0.5636 - val_loss: 0.6908 - val_binary_accuracy: 0.5388
Epoch 9/10
316s - loss: 0.6779 - binary_accuracy: 0.5707 - val_loss: 0.6900 - val_binary_accuracy: 0.5420
Epoch 10/10
314s - loss: 0.6753 - binary_accuracy: 0.5747 - val_loss: 0.6892 - val_binary_accuracy: 0.5441
Out[ ]:
<keras.callbacks.History at 0x7f1e07ed3160>
In [ ]:
# 预测
model.evaluate(x_test, y_test, batch_size=128, verbose=0)
Out[ ]:
[0.69782731860288261, 0.51609076086410199]
In [ ]:
# 训练
model.fit(x_train_with_threshold, y_train_with_threshold, batch_size=128, epochs=10, verbose=2, validation_split=0.1, shuffle=True)
Train on 16084 samples, validate on 1788 samples
Epoch 1/10
202s - loss: 0.6701 - binary_accuracy: 0.5863 - val_loss: 0.6635 - val_binary_accuracy: 0.5965
Epoch 2/10
178s - loss: 0.6657 - binary_accuracy: 0.5933 - val_loss: 0.6656 - val_binary_accuracy: 0.5833
Epoch 3/10
177s - loss: 0.6630 - binary_accuracy: 0.5998 - val_loss: 0.6663 - val_binary_accuracy: 0.5870
Epoch 4/10
197s - loss: 0.6579 - binary_accuracy: 0.6056 - val_loss: 0.6691 - val_binary_accuracy: 0.5819
Epoch 5/10
202s - loss: 0.6572 - binary_accuracy: 0.6047 - val_loss: 0.6681 - val_binary_accuracy: 0.5777
Epoch 6/10
197s - loss: 0.6512 - binary_accuracy: 0.6175 - val_loss: 0.6713 - val_binary_accuracy: 0.5766
Epoch 7/10
199s - loss: 0.6499 - binary_accuracy: 0.6191 - val_loss: 0.6716 - val_binary_accuracy: 0.5677
Epoch 8/10
216s - loss: 0.6423 - binary_accuracy: 0.6271 - val_loss: 0.6729 - val_binary_accuracy: 0.5727
Epoch 9/10
211s - loss: 0.6404 - binary_accuracy: 0.6295 - val_loss: 0.6727 - val_binary_accuracy: 0.5867
Epoch 10/10
199s - loss: 0.6375 - binary_accuracy: 0.6320 - val_loss: 0.6734 - val_binary_accuracy: 0.5685
Out[ ]:
<keras.callbacks.History at 0x7f1ea79c66d8>
In [ ]:
# 预测
model.evaluate(x_test, y_test, batch_size=128, verbose=0)
Out[ ]:
[0.71082615618109612, 0.52142963077677973]
In [58]:
p = []
for i in range(len(x_train)):
    p.append(np.sum(x_train[i])/16384)
In [59]:
plt.hist(p, 100)
plt.show()
In [60]:
q = []
for i in range(len(x_test)):
    q.append(np.sum(x_test[i])/16384)
In [61]:
plt.hist(q, 100)
plt.show()
In [80]:
# # 画出 “柱状线”
# for i in range(len(x_train[0])):
#     for j in range(len(x_train[0][0])):
#         if x_train[4][i][j][0] == 1:
#             print('* ', end='')
#         else:
#             print('  ', end='')
#     print(' ')

(stalkerggyy) #2

想法很好,但是切入的点不太合适。我觉得其实目前的各种技术指标,就是从图形中提取更有价值的特征,直接从图形学习来预测,可能还是有些太模糊了。


(shinefuture) #3

楼主可以尝试下 LSTM 模型,这个更适合股票


(Arthas) #4

嗯,目前来看处理时间序列数据LSTM确实比CNN好


(qqqxxx99) #5

的确是好想法! 赞


(xuqiang) #6

1.感谢楼主分享

2.个人想法(我知之甚少,权且供大家探讨)
如果把k线图,换为均线的频谱图(建议用小波包),做带通滤波的预处理,效果可能好一些


(Arthas) #7

嗯嗯,有很多可以修改的地方。
代码已分享,你可以克隆后进行多角度的修改和尝试,脑洞大开的时候到了~~~


(Miles) #8

赞,谢谢楼主!楼主,二次尝试是不是做个CNN + LSTM啊? 期待!


(xfcxfc0312) #9

楼主用CNN来预测的什么呀??输入输出是什么,然后用哪些原始数据经过什么计算得到的


(PAYNE) #10

简单粗暴…… 直接预测图形……

这个想法可以和分形理论结合还有hurst 函数加在一起做试试吧


(lu0817) #11

大神,很想请教下.怎么把走势数据做成图片数据


(mhzdlm) #12

我做过类似的思路。比你效果稍微好点。你这个实验的主要问题在,1、不应该用这么小的图片,基本数据信息都被压缩没了。2、不该用这么浅的cnn,基本提不出什么有效信息。当然还有很多可以改进的地方,但是最明显的问题显然是这两点。


(HYL25537) #13

可以请教一下,代码中
wholeinstruments = D.history_data(D.instruments(start_date=‘2017-07-03’, end_date=‘2017-07-03’) , start_date=‘2017-07-03’, end_date=‘2017-07-03’, fields=[‘list_date’])
这个D是引用的哪个包嘛,初学python,想先跑一下这套demo在慢慢吃透


(iQuant) #14

您好,可以参考文档板块:https://bigquant.com/docs/develop/datasource/deprecated/history_data.html