期货现货价差实时因子加工
由bq7zuymm创建,最终由bq7zuymm 被浏览 30 用户
本文以中证1000的股指期货(IM2503.CFE)与指数(000852.SH)价差为例, 我们来加工股指与期货的价差因子并进行实时可视化操作。以下涉及到的流数据暂未开放,后期我们会为大家提供流数据获取服务。
数据定义
因子构造思路较为简单,我们需要用到期货l1快照数据以及指数快照数据,首先计算快照上的价差,最后将价差用last
函数聚合成分钟频的数据。
因子加工代码
首先导入第三方库,并将数据推送至中间表:
import dai
import time
import plotly.graph_objects as go
from IPython.display import display
from plotly.subplots import make_subplots
dai.pull_data_to_table(datasource='cn_future_level1_snapshot', table_name='future_table', overwrite=True, lookback_time=8*24*60*60)
dai.pull_data_to_table(datasource='cn_stock_index_snapshot', table_name='index_table', overwrite=True, lookback_time=8*24*60*60)
我们需要期货和股指数据, 所以我们需要订阅两中数据: cn_future_level1_snapshot和cn_stock_index_snapshot.
进一步我们需要分别取出中证1000期货(IM2503.CFE)以及股指(000852.SH)的价格数据:
future_instrument = 'IM2503.CFE'
index_instrument = '000852.SH'
sql = f"""
SELECT date_trunc('minute', to_timestamp(datetime * 1.0 / 1000 + 8 * 60 * 60)) AS date,
instrument, price AS future_price
FROM future_table
WHERE instrument = '{future_instrument}'
"""
future_df = dai.stream_factor(sql, 'future', overwrite=True)
sql = f"""
SELECT date_trunc('minute', to_timestamp(datetime * 1.0 / 1000 + 8 * 60 * 60)) AS date,
instrument, price AS index_price
FROM index_table
WHERE instrument = '{index_instrument}'
"""
index_df = dai.stream_factor(sql, 'index', overwrite=True)
引擎冷启动,防止取出来的实时数据为空:
# 引擎冷启动
for _ in range(1000):
future_data = future_df.df()
index_data = index_df.df()
if len(future_data) != 0 and len(index_data) != 0:
break
需要注意的是, 我们取出来的数据依旧是快照数据, 我们在sql的时间上做了一些运算, 我们将毫秒时间转换成精确到秒的日期, 进一步将秒数全部变为0(我们在加工分钟因子时一直都是这样处理的, 我们可以通过抹去秒钟来获取分钟日期),这样我们能够方便使用分钟聚合。
我们可以使用pandas聚合得到分钟的收盘价:
future_df.df().groupby(['date', 'instrument']).agg({'future_price': 'last'}).reset_index()
明白了这一点后, 我们接下来尝试将期货价格,指数价格,价差给画出来:
# 往画布中添加数据
def fig_append(f, data1, data2, window_size:int=20):
import pandas as pd
data = pd.merge(data1, data2, on=['date'], how='inner')
# 股指价格
current_new_x = tuple(data['date'].to_list())
current_new_y = tuple(data['index_price'].to_list())
# 期货價格
future_new_x = tuple(data['date'].to_list())
future_new_y = tuple(data['future_price'].to_list())
# 价差
delta_new_x = tuple(data['date'].to_list())
delta_new_y = tuple((data['index_price'] - data['future_price']).to_list())
# 将上述放入两个列表
new_x = [current_new_x, future_new_x, delta_new_x]
new_y = [current_new_y, future_new_y, delta_new_y]
for i in range(3):
if len(f.data[i].x)==window_size and len(f.data[i].y)==window_size:
x = f.data[i].x[1:] + (new_x[i][-1],)
y = f.data[i].y[1:] + (new_y[i][-1],)
f.data[i].x = x
f.data[i].y = y
else:
f.data[i].x += (new_x[i][-1],)
f.data[i].y += (new_y[i][-1],)
return f.data[0].x, f.data[0].y
# 定义画布和曲线数据
fig = make_subplots(rows=3, cols=1)
fig.add_trace(go.Scatter(x=(), y=(), mode='lines+markers', name='股指'), row=1, col=1)
fig.add_trace(go.Scatter(x=(), y=(), mode='lines+markers', name='股指期货'), row=2, col=1)
fig.add_trace(go.Scatter(x=(), y=(), mode='lines+markers', name='差价'), row=3, col=1)
# 更新布局
fig.update_layout(title_text='动态数据点示例', height=600)
f = go.FigureWidget(fig)
display(f)
# 数据更新(由于是分钟数据, 所以这里不需要异步, 只要设置一个timesleep即可)
while True:
data2 = future_df.df().groupby(['date', 'instrument']).agg({'future_price': 'last'}).reset_index()
data1 = index_df.df().groupby(['date', 'instrument']).agg({'index_price': 'last'}).reset_index()
x, y = fig_append(f, data1, data2)
time.sleep(60)
结果展示
\