import pandas as pd
import numpy as np
from biglearning.module2.common.data import Outputs
from zipline.finance.commission import PerOrder
import os
from bigdatasource.api import DataSource
from biglearning.api import M
from biglearning.api import tools as T
from biglearning.module2.common.data import Outputs
import warnings
warnings.filterwarnings('ignore')
from joblib import Parallel, delayed
import fai
# 连接和初始化fai
fai.init(cluster="fai-anthonywan-cptlfirs.fai-anthonywan",token="1VhioLNOp7BgqtDJAMsa4NuPwi7wGihZ")
sd = '2022-01-01'
ed = '2023-05-09'
3.1 “潮汐”的定义 我们观察个股分钟频成交量的高点与低点来定义“涨潮”与“退潮”, 具体如下: 1)剔除开盘和收盘数据,仅考虑日内分钟频数据,为了减小个别异 常点的影响,我们首先计算个股每分钟的成交量及其前后 4 分钟成交 量的总和(共 9 分钟),作为该分钟“邻域成交量”。 2)假设“邻域成交量”最高点发生在第 t 分钟,这一分钟称为“顶峰 时刻”。 3)第 5~t-1 分钟里,“邻域成交量”最低点发生在第 m 分钟,这一点 的邻域成交量为 Vm,收盘价为 Cm,这一分钟称为“涨潮时刻”,从 “涨潮时刻”到“顶峰时刻”的过程记为“涨潮”。 4)第 t+1~233 分钟里,”邻域成交量“最低点发生在第 n 分钟里,这 一点的邻域成交量为 Vn,收盘价为 Cn,这一分钟称为“退潮时刻”, 从“顶峰时刻”到“退潮时刻”的过程记为“退潮”。 5)从“涨潮时刻”到“退潮时刻”的全过程记为一次“潮汐”。
3.2 “潮汐”过程的价格变动速率 我们首先来考察“潮汐”过程的价格变动速率,进而构造“全潮汐” 因子,具体过程如下: 1)如上述定义,我们记“涨潮时刻”发生在第 m 分钟,收盘价为 Cm; “退潮时刻”发生在第 n 分钟,收盘价为 Cn。 2)则全部“潮汐”过程的价格变化率为(Cn-Cm)/Cm。 金融工程报告 7 敬请关注文后特别声明与免责条款 3)进而全“潮汐”过程的价格变动速率为(Cn-Cm)/Cm/(n-m),我们 将此作为每日投资者出售或购买股票意愿强烈程度的代理变量。 4)我们计算最近 20 个交易日的价格变动速率的平均值,记为“全潮 汐”因子。 接下来我们将对上述构建的“全潮汐”因子进行单因子测试,我们在 全 A 样本中按照月度频率进行测试,测试中对因子进行市值和行业正 交化处理,测试区间为 2013 年 4 月至 2022 年 2 月(下同)。因子表 现如下所示。
@fai.remote
def calc_data(instrument,sd,ed):
import warnings
warnings.filterwarnings('ignore')
print(instrument)
df = DataSource('bar1m_CN_STOCK_A').read(start_date=sd,end_date=ed,instruments = instrument)
try:
df_ = df.copy()
df_['day'] = df_['date'].dt.to_period('D')
except:
return
def calc_one_day(df):
df.reset_index(inplace=True,drop=True)
vs = df.loc[0,'volume']
cs = df.loc[0,'close']
s = 0
df['邻域'] = df['volume'] +df['volume'].shift()+df['volume'].shift(2)+df['volume'].shift(3)+df['volume'].shift(4)+df['volume'].shift(-1)+df['volume'].shift(-2)+df['volume'].shift(-3)+df['volume'].shift(-4)
df.dropna(inplace=True)
df.reset_index(inplace=True,drop=True)
idxmax = df['邻域'].idxmax() #顶峰
if idxmax != 0:
df_1 = df.loc[:idxmax]
df_2 = df.loc[idxmax:]
idxmin_m = df_1['邻域'].idxmin()
idxmin_n = df_2['邻域'].idxmin()
vm = df.loc[idxmin_m,'volume']
cm = df.loc[idxmin_m,'close']
vn = df.loc[idxmin_n,'volume']
cn = df.loc[idxmin_n,'close']
df['vm'] = vm
df['cm'] = cm
df['vn'] = vn
df['cn'] = cn
df['nm'] = idxmin_n - idxmin_m
else:
idxmin = df['邻域'].idxmin()
vn = df.loc[idxmin,'volume']
cn = df.loc[idxmin,'close']
df['vm'] = vs
df['cm'] = cs
df['vn'] = vn
df['cn'] = cn
df['nm'] = idxmin
df['factor'] = (df['cn'] - df['cm'])/df['cm']/df['nm']
return df
try:
df_ = df_.groupby('day').apply(calc_one_day)
df_.reset_index(inplace=True,drop=True)
df_.drop_duplicates(subset=['day'],inplace=True)
return df_
except:
return
tmp = DataSource('bar1d_CN_STOCK_A').read(start_date=sd,end_date=ed)
ins_list = tmp.instrument.unique()
import time
time0 = time.time()
fai.log_silent(True)
remainings = [calc_data.remote(ins, sd , ed) for ins in ins_list]
done = 0
ready_list = []
print('提交时间:', time.time() - time0)
while remainings:
ready, remainings = fai.wait(remainings)
ready_list+=ready
done += len(ready)
if done % 100 == 0:
print(f"{time.time() - time0}, {done}/{len(remainings) + done}")
print('计算时间:', time.time() - time0)
time1 = time.time()
df = pd.concat(fai.get(ready_list))
print('子任务合并时间:', time.time() - time1)
df
df['date'] = df['day']
df = df[['instrument','date','factor']]
df['date'] = df['date'].astype(str)
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d')
df
m1 = M.instruments.v2(
start_date=sd,
end_date=ed,
market='CN_STOCK_A',
instrument_list='',
max_count=0
)
m2 = M.input_features.v1(
features="""
收盘=close_0
市值=market_cap_0
行业=industry_sw_level1_0
"""
)
m3 = M.general_feature_extractor.v7(
instruments=m1.data,
features=m2.data,
start_date='',
end_date='',
before_start_days=500
)
m4 = M.derived_feature_extractor.v3(
input_data=m3.data,
features=m2.data,
date_col='date',
instrument_col='instrument',
drop_na=False,
remove_extra_columns=False,
user_functions={}
)
m5 = M.chinaa_stock_filter.v1(
input_data=m4.data,
index_constituent_cond=['全部'],
board_cond=['上证主板', '深证主板', '创业板'],
industry_cond=['全部'],
st_cond=['正常'],
delist_cond=['非退市'],
output_left_data=False
)
df_ = m5.data.read()
df = pd.merge(df,df_[['instrument','date','收盘','市值','行业']],left_on=['instrument','date'],right_on=['instrument','date'],how='left')
df.sort_values(by='date',inplace=True)
df
import plotly.graph_objs as go
import numpy as np
# 使用 Histogram 类创建概率分布图
fig = go.Figure(data=[go.Histogram(x=df.factor, histnorm='probability')])
# 设置图形的标题和轴标签
fig.update_layout(title='Probability Distribution', xaxis_title='Value', yaxis_title='Probability')
# 显示图形
fig.show()