精华帖子

期货现货价差实时因子加工

由bq7zuymm创建,最终由bq7zuymm 被浏览 23 用户

本文以中证1000的股指期货(IM2503.CFE)与指数(000852.SH)价差为例, 我们来加工股指与期货的价差因子并进行实时可视化操作。以下涉及到的流数据暂未开放,后期我们会为大家提供流数据获取服务。

数据定义

因子构造思路较为简单,我们需要用到期货l1快照数据以及指数快照数据,首先计算快照上的价差,最后将价差用last函数聚合成分钟频的数据。

因子加工代码

首先导入第三方库,并将数据推送至中间表:

import dai
import time
import plotly.graph_objects as go
from IPython.display import display
from plotly.subplots import make_subplots
 
dai.pull_data_to_table(datasource='cn_future_level1_snapshot', table_name='future_table', overwrite=True, lookback_time=8*24*60*60)
dai.pull_data_to_table(datasource='cn_stock_index_snapshot', table_name='index_table', overwrite=True, lookback_time=8*24*60*60)

我们需要期货和股指数据, 所以我们需要订阅两中数据: cn_future_level1_snapshot和cn_stock_index_snapshot.

进一步我们需要分别取出中证1000期货(IM2503.CFE)以及股指(000852.SH)的价格数据:

future_instrument = 'IM2503.CFE'
index_instrument = '000852.SH'
 
sql = f"""
SELECT date_trunc('minute', to_timestamp(datetime * 1.0 / 1000 + 8 * 60 * 60)) AS date, 
instrument, price AS future_price
FROM future_table
WHERE instrument = '{future_instrument}'
"""
future_df = dai.stream_factor(sql, 'future', overwrite=True)
 
sql = f"""
SELECT date_trunc('minute', to_timestamp(datetime * 1.0 / 1000 + 8 * 60 * 60)) AS date,  
instrument, price AS index_price
FROM index_table
WHERE instrument = '{index_instrument}'
"""
 
index_df = dai.stream_factor(sql, 'index', overwrite=True)

引擎冷启动,防止取出来的实时数据为空:

# 引擎冷启动
for _ in range(1000):
    future_data = future_df.df()
    index_data = index_df.df()
    if len(future_data) != 0 and len(index_data) != 0:
        break

需要注意的是, 我们取出来的数据依旧是快照数据, 我们在sql的时间上做了一些运算, 我们将毫秒时间转换成精确到秒的日期, 进一步将秒数全部变为0(我们在加工分钟因子时一直都是这样处理的, 我们可以通过抹去秒钟来获取分钟日期),这样我们能够方便使用分钟聚合。

我们可以使用pandas聚合得到分钟的收盘价:

future_df.df().groupby(['date', 'instrument']).agg({'future_price': 'last'}).reset_index()

明白了这一点后, 我们接下来尝试将期货价格,指数价格,价差给画出来:

# 往画布中添加数据
def fig_append(f, data1, data2, window_size:int=20):
    import pandas as pd
    data = pd.merge(data1, data2, on=['date'], how='inner')
    # 股指价格
    current_new_x = tuple(data['date'].to_list())
    current_new_y = tuple(data['index_price'].to_list())
 
    # 期货價格
    future_new_x = tuple(data['date'].to_list())
    future_new_y = tuple(data['future_price'].to_list())
 
    # 价差
    delta_new_x = tuple(data['date'].to_list())
    delta_new_y = tuple((data['index_price'] - data['future_price']).to_list())
 
    # 将上述放入两个列表
    new_x = [current_new_x, future_new_x, delta_new_x]
    new_y = [current_new_y, future_new_y, delta_new_y]
 
    for i in range(3):
        if len(f.data[i].x)==window_size and len(f.data[i].y)==window_size:
            x = f.data[i].x[1:] + (new_x[i][-1],)
            y = f.data[i].y[1:] + (new_y[i][-1],)
            f.data[i].x = x
            f.data[i].y = y
        else:
            f.data[i].x += (new_x[i][-1],)
            f.data[i].y += (new_y[i][-1],)
    return f.data[0].x, f.data[0].y
 
# 定义画布和曲线数据
fig = make_subplots(rows=3, cols=1)
fig.add_trace(go.Scatter(x=(), y=(), mode='lines+markers', name='股指'), row=1, col=1)
fig.add_trace(go.Scatter(x=(), y=(), mode='lines+markers', name='股指期货'), row=2, col=1)
fig.add_trace(go.Scatter(x=(), y=(), mode='lines+markers', name='差价'), row=3, col=1)
 
# 更新布局
fig.update_layout(title_text='动态数据点示例', height=600)
 
f = go.FigureWidget(fig)
display(f)
 
# 数据更新(由于是分钟数据, 所以这里不需要异步, 只要设置一个timesleep即可)
while True:
    data2 = future_df.df().groupby(['date', 'instrument']).agg({'future_price': 'last'}).reset_index()
    data1 = index_df.df().groupby(['date', 'instrument']).agg({'index_price': 'last'}).reset_index()
    x, y = fig_append(f, data1, data2)
    time.sleep(60)

结果展示

\

标签

数据处理
{link}