# 第二天课程:全因子
import pandas as pd
import numpy as np
from biglearning.module2.common.data import Outputs
from zipline.finance.commission import PerOrder
import time
from bigdatasource.api import DataSource
from biglearning.api import M
from biglearning.api import tools as T
from biglearning.module2.common.data import Outputs
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
import os
import dai
from joblib import Parallel, delayed
import fai
# 调整时间
sd = '2015-01-01'
ed = '2023-12-13'
# 这个路径是不是要改? --------------------------------------------------------
root_path = '/home/aiuser/work/calc_data'
# 构建因子部分
def calc_data(sd,ed,ins,path):
sql = """
WITH t1 AS(
SELECT
instrument,
close/open -1 AS ret_intraday,
date
FROM
cn_stock_bar1d
WHERE
date >='{0}'||' 00:00:00.000'
AND
date <= '{1}'||' 23:59:59.999'
AND
instrument = '{2}'
),
t2_tmp AS(
SELECT
*,
# 选择日期范围为天,并命名day
datetrunc('day', date) AS day,
# 收益率的计算公式,收益率命名为_ret_min
close/open -1 AS _ret_min,
# 收益率的总体方差
nanvar_pop(_ret_min) OVER(PARTITION by day ORDER BY date) AS real_var,
# 收益率的峰度
kurtosis(_ret_min) OVER(PARTITION by day ORDER BY date) AS real_kurtosis,
skewness(_ret_min) OVER(PARTITION by day ORDER BY date) AS real_skewness,
nanvar_pop(_ret_min)FILTER(_ret_min>0) OVER(PARTITION by day ORDER BY date ORDER BY date) AS real_upvar,
nanvar_pop(_ret_min)FILTER(_ret_min<0) OVER(PARTITION by day ORDER BY date) AS real_downvar,
real_upvar/real_var AS ratio_realupvar,
real_downvar/real_var AS ratio_realdownvar,
# 趋势占比,日内价格变化/分钟频价格变化绝对值之和
first(open) OVER(PARTITION BY day ORDER BY date) AS _open,
last(close) OVER(PARTITION BY day ORDER BY date) AS _close,
SUM(ABS(close-open)) OVER(PARTITION BY day ORDER BY date) AS _sum,
(_close-_open)/_sum AS trendratio,
row_number() over (PARTITION BY day ORDER BY date) as _rn,
FROM
cn_stock_bar1m
WHERE
date >='{0}'||' 00:00:00.000'
AND
date <= '{1}'||' 23:59:59.999'
AND
instrument = '{2}'
QUALIFY _rn = 1
ORDER by date
),
t2 AS(
SELECT
day AS date,
instrument,
real_var,
real_kurtosis,
real_skewness,
real_upvar,
real_downvar,
ratio_realupvar,
ratio_realdownvar,
trendratio
FROM
t2_tmp
WHERE
date >='{0}'||' 00:00:00.000'
AND
date <= '{1}'||' 23:59:59.999'
AND
instrument = '{2}'
ORDER by day
),
t3_tmp AS(
SELECT
date,
instrument,
datetrunc('day', date) AS day,
strftime('%H:%M', date) AS min,
SUM(if(strftime('%H:%M', date) >= '09:30' and strftime('%H:%M', date) <= '10:00', volume, 0)) OVER(PARTITION BY day ORDER BY date)/SUM(volume) OVER(PARTITION BY day ORDER BY date) AS ratio_volumeH1,
SUM(if(strftime('%H:%M', date) >= '10:00' and strftime('%H:%M', date) <= '10:30', volume, 0)) OVER(PARTITION BY day ORDER BY date)/SUM(volume) OVER(PARTITION BY day ORDER BY date) AS ratio_volumeH2,
SUM(if(strftime('%H:%M', date) >= '10:30' and strftime('%H:%M', date) <= '11:00', volume, 0)) OVER(PARTITION BY day ORDER BY date)/SUM(volume) OVER(PARTITION BY day ORDER BY date) AS ratio_volumeH3,
SUM(if(strftime('%H:%M', date) >= '11:00' and strftime('%H:%M', date) <= '11:30', volume, 0)) OVER(PARTITION BY day ORDER BY date)/SUM(volume) OVER(PARTITION BY day ORDER BY date) AS ratio_volumeH4,
SUM(if(strftime('%H:%M', date) >= '13:00' and strftime('%H:%M', date) <= '13:30', volume, 0)) OVER(PARTITION BY day ORDER BY date)/SUM(volume) OVER(PARTITION BY day ORDER BY date) AS ratio_volumeH5,
SUM(if(strftime('%H:%M', date) >= '13:30' and strftime('%H:%M', date) <= '14:00', volume, 0)) OVER(PARTITION BY day ORDER BY date)/SUM(volume) OVER(PARTITION BY day ORDER BY date) AS ratio_volumeH6,
SUM(if(strftime('%H:%M', date) >= '14:00' and strftime('%H:%M', date) <= '14:30', volume, 0)) OVER(PARTITION BY day ORDER BY date)/SUM(volume) OVER(PARTITION BY day ORDER BY date) AS ratio_volumeH7,
SUM(if(strftime('%H:%M', date) >= '14:30' and strftime('%H:%M', date) <= '15:00', volume, 0)) OVER(PARTITION BY day ORDER BY date)/SUM(volume) OVER(PARTITION BY day ORDER BY date) AS ratio_volumeH8,
close/open AS _ret_min,
CORR(volume,close) OVER(PARTITION BY day ORDER BY date) AS corr_VP,
CORR(volume,_ret_min) OVER(PARTITION BY day ORDER BY date) AS corr_VR,
COALESCE(LAG(_ret_min,1) OVER(PARTITION BY day ORDER BY date),0) AS _ret_LAG,
COALESCE(LEAD(_ret_min,1) OVER(PARTITION BY day ORDER BY date),0) AS _ret_LEAD,
CORR(volume,_ret_LAG) OVER(PARTITION BY day ORDER BY date) AS corr_VRlag,
CORR(volume,_ret_LEAD) OVER(PARTITION BY day ORDER BY date) AS corr_VRlead,
row_number() over (PARTITION BY day ORDER by day ORDER BY date) as _rn,
SUM(if(strftime('%H:%M', date) = '09:31', open, 0)) OVER(PARTITION BY day ORDER BY date) AS _o1,
SUM(if(strftime('%H:%M', date) = '10:00', close, 0)) OVER(PARTITION BY day ORDER BY date) AS _c1,
SUM(if(strftime('%H:%M', date) = '10:00', open, 0)) OVER(PARTITION BY day ORDER BY date) AS _o2,
SUM(if(strftime('%H:%M', date) = '15:00', close, 0)) OVER(PARTITION BY day ORDER BY date) AS _c2,
_c1/_o1 AS ret_H1,
_c2/_o2 AS ret_close2H1,
CORR(volume,close) FILTER(min>='09:30' AND min <= '10:00') OVER(PARTITION BY day ORDER BY date) AS corr_VPH1,
CORR(volume,_ret_min) FILTER(min>='09:30' AND min <= '10:00') OVER(PARTITION BY day ORDER BY date) AS corr_VRH1,
CORR(volume,_ret_LEAD) FILTER(min>='09:30' AND min <= '10:00') OVER(PARTITION BY day ORDER BY date) AS corr_VRleadH1,
CORR(volume,_ret_LAG) FILTER(min>='09:30' AND min <= '10:00') OVER(PARTITION BY day ORDER BY date) AS corr_VRlagH1,
nanvar_pop(_ret_min) FILTER(min>='09:30' AND min <= '10:00') OVER(PARTITION by day ORDER BY date) AS real_varH1,
kurtosis(_ret_min) FILTER(min>='09:30' AND min <= '10:00') OVER(PARTITION by day ORDER BY date) AS real_kurtosisH1,
skewness(_ret_min) FILTER(min>='09:30' AND min <= '10:00') OVER(PARTITION by day ORDER BY date) AS real_skewnessH1,
CORR(volume,close) FILTER(min>='14:30' AND min <= '15:00') OVER(PARTITION BY day ORDER BY date) AS corr_VPH8,
CORR(volume,_ret_min) FILTER(min>='14:30' AND min <= '15:00') OVER(PARTITION BY day ORDER BY date) AS corr_VRH8,
CORR(volume,_ret_LEAD) FILTER(min>='14:30' AND min <= '15:00') OVER(PARTITION BY day ORDER BY date) AS corr_VRleadH8,
CORR(volume,_ret_LAG) FILTER(min>='14:30' AND min <= '15:00') OVER(PARTITION BY day ORDER BY date) AS corr_VRlagH8,
nanvar_pop(_ret_min) FILTER(min>='14:30' AND min <= '15:00') OVER(PARTITION by day ORDER BY date) AS real_varH8,
kurtosis(_ret_min) FILTER(min>='14:30' AND min <= '15:00') OVER(PARTITION by day ORDER BY date) AS real_kurtosisH8,
skewness(_ret_min) FILTER(min>='14:30' AND min <= '15:00') OVER(PARTITION by day ORDER BY date) AS real_skewnessH8,
FROM
cn_stock_level2_bar1m
WHERE
date >='{0}'||' 00:00:00.000'
AND
date <= '{1}'||' 23:59:59.999'
AND
instrument = '{2}'
QUALIFY _rn = 1
ORDER by date
),
t3 AS(
SELECT
day AS date,
instrument,
ratio_volumeH1,
ratio_volumeH2,
ratio_volumeH3,
ratio_volumeH4,
ratio_volumeH5,
ratio_volumeH6,
ratio_volumeH7,
ratio_volumeH8,
corr_VP,
corr_VR,
corr_VRlag,
corr_VRlead,
ret_H1,
ret_close2H1,
corr_VPH1,
corr_VRH1,
corr_VRlagH1,
corr_VRleadH1,
real_varH1,
real_kurtosisH1,
real_skewnessH1,
corr_VPH8,
corr_VRH8,
corr_VRlagH8,
corr_VRleadH8,
real_varH8,
real_kurtosisH8,
real_skewnessH8,
FROM
t3_tmp
WHERE
date >='{0}'||' 00:00:00.000'
AND
date <= '{1}'||' 23:59:59.999'
AND
instrument = '{2}'
ORDER by date
)
SELECT
*
FROM
t1
JOIN
t2 USING(date,instrument)
JOIN
t3 USING(date,instrument)
WHERE
date >='{0}'||' 00:00:00.000'
AND
date <= '{1}'||' 23:59:59.999'
AND
instrument = '{2}'
ORDER by date
"""
import dai
df = dai.query(sql.format(sd,ed,ins)).df()
df.to_pickle(path+f'/{ins}.pkl')
return
@fai.remote
def calc_data_remote(*args, **kwargs):
return calc_data(*args, **kwargs)
sql_ins = """
SELECT
date,
instrument,
list_sector_0
FROM
cn_stock_factors
WHERE
date BETWEEN '{0}'||' 00:00:00.000' AND '{1}'||' 23:59:59.000'
AND
list_sector_0 <4
"""
ins_list = dai.query(sql_ins.format(sd,ed)).df().instrument.unique()
# 这里需要修改,怎么修改呢?
# 引入fai sdk
import fai
# 连接和初始化fai
fai.init(cluster="fai-xiabinbin1987-nnojbytv.fai-cluster",token="4G2X5uiuQjf56VRkzfbZQh25e9Jkke3g")
# 这段代码什么意思?
import time
# 检查一个名为 root_path 的路径是否存在于文件系统中,如果不存在,则创建这个目录,如果 root_path 所指定的目录不存在,就创建这个目录
if not os.path.exists(root_path):
os.mkdir(root_path)
# 记录了当前时间的时间戳(以秒为单位),并将其存储在变量 time0 中。
time0 = time.time()
# 可能用于控制日志输出,True 参数可能意味着设置为静默模式,即不输出日志信息。
fai.log_silent(True)
# 这行代码创建了一个列表推导式,它遍历 ins_list(一个包含多个元素的列表),对每个元素调用 calc_data_remote.remote 函数,
# 并将返回的结果(可能是远程任务的引用)收集到列表 remainings 中。sd, ed, 和 root_path 变量可能是与时间、路径等相关的参数。
remainings = [calc_data_remote.remote(sd,ed,ins,root_path) for ins in ins_list]
# 初始化一个名为 done 的变量,将其值设置为0
done = 0
ready_list = []
print('提交时间:', time.time() - time0)
# 这段代码是用于监控一组远程或异步任务的执行情况,它会定期打印出完成的任务数量和总耗时,直到所有任务完成。这有助于了解任务执行进度和性能。
while remainings:
ready, remainings = fai.wait(remainings)
ready_list+=ready
done += len(ready)
if done % 100 == 0:
print(f"{time.time() - time0}, {done}/{len(remainings) + done}")
print('计算时间:', time.time() - time0)
time1 = time.time()
# 获取目录下的所有条目
entries = os.listdir(root_path)
# 创建完整路径的列表
full_paths = [os.path.join(root_path, entry) for entry in entries]
def reader(path):
try:
# 这行代码尝试使用pandas库的 read_pickle 函数读取 path 参数指定的pickle文件,并将读取到的数据框(DataFrame)赋值给变量 df
df = pd.read_pickle(path)
# 代码用于删除 path 指定的文件。这通常用于清理磁盘空间或确保数据不会被重复处理。
os.remove(path)
# 如果上面的操作都成功了,函数将返回包含pickle文件数据的 df
return df
except:
print(path)
# 即使在发生异常的情况下,这行代码也会尝试删除 path 指定的文件。这样做可能是为了确保即使文件内容损坏或读取失败,文件也不会留在文件系统上。
os.remove(path)
print('该文件出现损坏')
# 1-n_jobs=-1 告诉 Parallel 函数使用所有可用的CPU核心来执行并行作业。如果你设置了 n_jobs 为一个正整数,它将使用指定数量的CPU核心。
# 2-(delayed(reader)(path) for path in full_paths) 是一个生成器表达式,它为 full_paths 列表中的每个路径生成一个延迟的 reader
# 函数调用。full_paths 是一个包含文件路径的列表,每个路径都是要读取和处理的pickle文件的位置
# 3-res 将是一个列表,其中的每个元素都是 reader 函数返回的DataFrame。
res = Parallel(n_jobs=-1)(delayed(reader)(path) for path in full_paths)
print(res)
df = pd.concat(res)
print('合并完毕',time.time() - time1)
提交时间: 1.4353306293487549 28.56333017349243, 100/5242 50.94886088371277, 200/5242 73.14129996299744, 300/5242 94.92110204696655, 400/5242 114.98591351509094, 500/5242 135.44996213912964, 600/5242 154.47978448867798, 700/5242 174.3638083934784, 800/5242 193.78399515151978, 900/5242 213.23874282836914, 1000/5242 231.36846804618835, 1100/5242 249.19734406471252, 1200/5242 266.22522735595703, 1300/5242 283.3724479675293, 1400/5242 299.9828655719757, 1500/5242 316.6593325138092, 1600/5242 332.49348855018616, 1700/5242 348.1883502006531, 1800/5242 363.33561635017395, 1900/5242 377.9405200481415, 2000/5242 391.66358280181885, 2100/5242 405.4709897041321, 2200/5242 418.9121539592743, 2300/5242 431.82685804367065, 2400/5242 444.6474711894989, 2500/5242 456.5827474594116, 2600/5242 468.49014043807983, 2700/5242 479.97183299064636, 2800/5242 490.36535477638245, 2900/5242 500.73371624946594, 3000/5242 511.0447840690613, 3100/5242 520.6318161487579, 3200/5242 529.4097776412964, 3300/5242 538.3031499385834, 3400/5242 546.5886290073395, 3500/5242 553.9415082931519, 3600/5242 561.2455270290375, 3700/5242 567.7631244659424, 3800/5242 574.1930766105652, 3900/5242 579.0342531204224, 4000/5242 584.0697567462921, 4100/5242 588.6049420833588, 4200/5242 592.8299810886383, 4300/5242 596.5362701416016, 4400/5242 600.2357585430145, 4500/5242 603.0863196849823, 4600/5242 605.6474657058716, 4700/5242 607.7946753501892, 4800/5242 609.6339881420135, 4900/5242 611.0546534061432, 5000/5242 612.061116695404, 5100/5242 612.7108898162842, 5200/5242 计算时间: 612.8520441055298 []
entries = os.listdir(root_path)
entries
# 把df数据写入到平台数据库,然后通过SQL读取数据
import dai
df[dai.DEFAULT_PARTITION_FIELD] = df["date"].apply(lambda x: f"{x.year}")
dai.DataSource.write_bdb(
data=df,
# datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
id="hf_gfzq_yz_xbb",
# id 这里是需要求改的
# 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
unique_together=["date", "instrument"],
indexes=["date"],
)
# 上面的代码是将数据保存到平台数据库中,这个代码是调用保存的数据
sql = """
SELECT
*
FROM
hf_gfzq_qx
WHERE
date>'2015-01-01'
ORDER BY date
"""
df = dai.query(sql).df()
df
下面是因子分析模块,因子分析V27-代码精简 copy
import pandas as pd
import numpy as np
import warnings
import empyrical
import dai
import bigcharts
import time
warnings.filterwarnings('ignore')
from biglearning.api import tools as T
print('导入包完成!')
# 这里是调用因子的模块
sql = """
SELECT
date,
instrument
ratio_volumeH3 AS factor
FROM
hf_gfzq_qx
WHERE
date >='2017-01-01'
AND
date<= '2024-01-01'
ORDER BY date
"""
# 每次因子分析时候,要把factor_field的列名改成因子的列名
params = {'group_num':10, 'factor_field':'factor', 'instruments':'全市场', 'factor_direction':1, 'benchmark':'中证500', 'data_process':False} # instruments支持选项:沪深300、中证500、中证1000、全市场;benchmark支持的选项:沪深300、中证500、中证1000
# group_num:10 将股票按照因子排序分成10层,方便我们查看每层净值的变化
# factor_field:需要分析因子列的列名
# benchmark:选择的基准
# instruments:因子分析用到的市场范围
# factor_data 要指向我们使用的df表格的名字,如果没有表格,上面就是直接调用的SQL,就用factor_data = dai.query(sql).df(),如果有表格就是factor_data =df_1
factor_data = dai.query(sql).df()
# 因子数据处理
factor_data.dropna(subset=[params['factor_field']], inplace=True)
factor_data = factor_data[['instrument', 'date', params['factor_field']]]
### 因子分析工具
class AlphaMiner(object):
def __init__(self, params, factor_data):
# params: 字典格式。 形如 {'group_num':10, 'factor_field':'hf_close_netinflow_rate_small_order_act', 'instruments':'中证500', 'factor_direction':-1, 'benchmark':'中证500','data_process':True}
# group_num:分组数量 参数类型:int
# factor_field:因子在表中所对应的字段名称 参数类型:str
# instruments:标的池,支持选项:沪深300、中证500、中证1000、全市场 参数类型:str
# factor_direction:因子方向,字符串格式,取值为1、-1。1表示因子方向为正,因子值越大越好,-1表示因子值为负,因子值越小越好。 参数类型:int
# benchmark:基准对比指数,支持选项:沪深300、中证500、中证1000 参数类型:str
# data_process:是否进行数据处理 参数类型:bool
# factor_data:pandas.DataFrame格式,形如
# instrument date hf_fz_ykws
# 0 000001.SZ 2017-01-03 1.564644
# 1 000001.SZ 2017-01-04 1.521567
# 2 000001.SZ 2017-01-05 1.519973
# 3 000001.SZ 2017-01-06 1.553225
# 4 000001.SZ 2017-01-09 1.367971
# 其中,
# instrument:str ,以股票代码+.sh(沪市) +.SZ(深市)
# date:datetime64
# hf_fz_ykws:float64
print('==================因子分析开始==================')
self.t0 = time.time()
self.params = params
self.top_n_ins = 5 # 默认5只
self.factor_data = factor_data.rename(columns={self.params['factor_field']:'factor'})
self.factor_data['factor'] *= self.params['factor_direction']
# 检查因子数据格式
try:
self.check_data_format(self.factor_data)
t1 = time.time()
print("耗时:{0}秒 数据格式检查通过".format(np.round(t1-self.t0)))
except ValueError as e:
print("数据格式检查失败:" + str(e))
# 进行数据池过滤
self.stock_pool_filter()
t2 = time.time()
print('耗时:{0}秒 股票池过滤完成'.format(np.round(t2-t1)))
# 因子数据预处理
if self.params['data_process'] == True:
self.factor_data = self.factor_data_process('factor')
t3 = time.time()
print("耗时:{0}秒 数据预处理完成".format(np.round(t3-t2)))
elif self.params['data_process'] == False:
t3 = time.time()
# 计算个股收益率
self.start_date = self.factor_data.date.min().strftime('%Y-%m-%d')
self.end_date = self.factor_data.date.max().strftime('%Y-%m-%d')
self.price_data = self.get_daily_ret(self.start_date, self.end_date) # 日收益率数据
t4 = time.time()
print('耗时:{0}秒 个股日收益率计算完成'.format(np.round(t4-t3)))
self.merge_data = pd.merge(self.factor_data.sort_values(['date', 'instrument']), \
self.price_data.sort_values(['date', 'instrument']), on=['date','instrument'], how='left')
self.group_data = self.get_group_data() # 分组数据
t5 = time.time()
print('耗时:{0}秒 因子分组完成'.format(np.round(t5-t4)))
self.bm_ret = self.get_bm_ret(self.params['benchmark'])
t6 = time.time()
print('耗时:{0}秒 基准日收益率计算完成'.format(np.round(t6-t5)))
self.group_cumret = self.get_group_cumret() # 分组累积收益率
t7 = time.time()
print('耗时:{0}秒 分组收益率计算完成'.format(np.round(t7-t6)))
self.whole_perf = self.get_whole_perf() # 整体绩效指标
t8 = time.time()
print('耗时:{0}秒 整体绩效计算完成'.format(np.round(t8-t7)))
self.yearly_perf = self.get_yearly_perf() # 按年度绩效指标
t9 = time.time()
print('耗时:{0}秒 年度绩效计算完成'.format(np.round(t9-t8)))
self.ic = self.get_IC_data('all') # ic指标
self.t10 = time.time()
print('耗时:{0}秒 IC计算完成'.format(np.round(self.t10-t9)))
def check_data_format(self, df):
# 检查date列是否是日期型类型
if df['date'].dtype != 'datetime64[ns]':
raise ValueError("date列的数据格式应为datetime格式")
# 检查instrument列是否是以SZ\SH结尾
if not all(df['instrument'].str.endswith('.SH') | df['instrument'].str.endswith('.SZ') | df['instrument'].str.endswith('.BJ')):
raise ValueError("instrument列的数据格式应为以.SH或.SZ或.BJ结尾的字符串")
# 检查factor列是否是浮点型数值
if df['factor'].dtype != 'float64':
raise ValueError("factor列的数据格式应为浮点型")
def stock_pool_filter(self):
pools = self.params['instruments']
if pools == "沪深300":
index_code = '000300.SH'
elif pools == "中证500":
index_code = '000905.SH'
elif pools == "中证1000":
index_code = '000852.SH'
elif pools == "全市场":
index_code = '全市场'
else:
print('请检查输入的指数池是否正确')
if index_code in ['000300.SH' , '000905.SH', '000852.SH']:
index_com_df = dai.query("select * from cn_stock_index_component where date >= '2015-01-01' and instrument == '%s' order by date, instrument "%index_code).df()
factor_df = self.factor_data
merge_df = pd.merge(factor_df, index_com_df, how='inner', left_on=['date','instrument'], right_on=['date', 'member_code'])[['instrument_x','date','factor']]
merge_df.rename(columns={'instrument_x':'instrument'}, inplace=True)
else:
merge_df = self.factor_data
# 因子数据更多的预处理,包括去除ST、新股、北交所的股票
def factor_data_filter(factor_data):
columns = factor_data.columns
start_date = factor_data.date.min().strftime('%Y-%m-%d')
end_date = factor_data.date.max().strftime('%Y-%m-%d')
factor_data['instrument'] = factor_data['instrument'].apply(lambda x:x[:9])
base_info_df = dai.query("select date, instrument, st_status ,trading_days, amount from cn_stock_factors_base where date >= '%s' and date <= '%s'"%(start_date, end_date)).df()
factor_data = pd.merge(factor_data, base_info_df, how='left', on=['date', 'instrument'])
factor_data = factor_data[(factor_data['st_status'] == 0) & (factor_data['trading_days']> 252)] # 去除st 和上市不足一年的票
factor_data= factor_data[factor_data.instrument.apply(lambda x: True if x.endswith('SH') or x.endswith('SZ') else False)] # 去除北交所的票
factor_data = factor_data[factor_data['amount'] > 0 ] # 去除停牌期间的数据
factor_data.replace([np.inf, -np.inf], np.nan, inplace=True) # 将 inf 替换为 NaN
# 删除包含 NaN 的行
factor_data.dropna(inplace=True)
return factor_data[columns]
self.factor_data = factor_data_filter(merge_df)
def factor_data_process(self, col):
"""因子数据预处理函数,包括去极值、标准化、中性化"""
def zscore(df, train_col):
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
df[train_col] = scaler.fit_transform(df[train_col])
return df
def remove_extreme_and_cut_zscore(df, col):
col_list = [col]
for fac in col_list:
n = 3
mean = df[fac].mean() # 计算因子值的均值
std = df[fac].std() # 计算因子值的标准差
lower_bound = mean - n * std # 计算下边界
upper_bound = mean + n * std # 计算上边界
df.loc[df[fac]<lower_bound, fac] = lower_bound
df.loc[df[fac]>upper_bound, fac] = upper_bound
df = zscore(df, col_list)
return df
factor_data = self.factor_data.groupby('date').apply(remove_extreme_and_cut_zscore, col=col)
factor_data = factor_data.fillna(0) # 用0进行填充
start_date = factor_data.date.min().strftime("%Y-%m-%d")
end_date = factor_data.date.max().strftime("%Y-%m-%d")
sql = """
SELECT date, instrument, industry_level1_code
FROM cn_stock_industry_component
WHERE date >= '{0}' and date <= '{1}' and industry =='sw2021'
ORDER BY instrument, date;
""".format(start_date, end_date)
df_industry = dai.query(sql).df()
df_industry = df_industry.dropna()
factor_data = factor_data.merge(df_industry, on=['date', 'instrument'])
industry_list = df_industry['industry_level1_code'].unique()
sql = """
SELECT date, instrument, total_market_cap
FROM cn_stock_valuation
WHERE date >= '{0}' and date <= '{1}'
ORDER BY instrument, date;
""".format(start_date, end_date)
df_market_cap = dai.query(sql).df()
factor_data = factor_data.merge(df_market_cap, on=['date', 'instrument'])
factor_data['log_cap'] = np.log(factor_data.total_market_cap)
#截面中性化
def neutralize(df, col):
import warnings
warnings.filterwarnings('ignore')
import statsmodels.api as sm
col_list = [col]
for fac in col_list:
ind_dummies = pd.get_dummies(df['industry_level1_code'], prefix='industry_level1_code')
mkcap = df['log_cap']
train = pd.concat([ind_dummies,mkcap],axis=1)
X = sm.add_constant(train)
y = df[fac]
model = sm.OLS(y, X).fit()
df[fac] = model.resid
return df
res = factor_data.dropna().groupby('date').apply(neutralize, col=col)
res.sort_values(by='date', inplace=True)
res.reset_index(inplace=True, drop=True)
return res
def get_daily_ret(self, start_date, end_date):
"""计算收益率. T0的因子对应的收益率是T+1日开盘买入,T+2开盘卖出"""
sql = f"SELECT instrument,date, (m_lead(open, 2)/ m_lead(open, 1) - 1) AS daily_ret from cn_stock_bar1d ORDER BY date, instrument;"
from datetime import datetime, timedelta
ten_days_ago_start_date = pd.Timestamp(self.start_date) - timedelta(days=10) # 往前多取10天数据
ten_days_ago_start_date = ten_days_ago_start_date.strftime('%Y-%m-%d')
price_data = dai.query(sql, filters={"date": [ten_days_ago_start_date, self.end_date]}).df()
return price_data
def get_group_data(self):
"""因子分组,因子值越大,组数越大,默认的多头组合是因子数值最大的组合"""
def cut(df, group_num=10):
"""分组"""
df = df.drop_duplicates('factor') # 删除重复值
df['group'] = pd.qcut(df['factor'], q=group_num, labels=False, duplicates='drop')
df = df.dropna(subset=['group'], how='any')
df['group'] = df['group'].apply(int).apply(str)
return df
group_data = self.merge_data.groupby('date', group_keys=False).apply(cut, group_num=self.params['group_num'])
return group_data
def get_bm_ret(self, benchmark):
if benchmark == "沪深300":
bm_code = '000300.SH'
elif benchmark == "中证500":
bm_code = '000905.SH'
elif benchmark == "中证1000":
bm_code = '000852.SH'
else:
print('请检查输入的基准代码是否正确')
# 获取基准日收益率数据
bm_sql = """
SELECT
date,instrument, (close - m_Lag(close,1)) / m_LAG(close, 1) as benchmark_ret
FROM cn_stock_index_bar1d
WHERE instrument = '%s'
AND date >= '%s' and date <='%s' ;"""%(bm_code, self.start_date, self.end_date)
bm_ret = dai.query(bm_sql).df()
return bm_ret
def get_group_cumret(self):
# 分组收益率
groupret_data = self.group_data[['date','group','daily_ret']].groupby(['date','group'], group_keys=False).apply(lambda x:np.nanmean(x)).reset_index()
groupret_data.rename(columns={0:'g_ret'}, inplace=True)
groupret_pivotdata = groupret_data.pivot(index='date', values='g_ret', columns='group')
groupret_pivotdata['ls'] = groupret_pivotdata[str(self.params['group_num']-1)] - groupret_pivotdata['0'] # 日收益率
bm_ret = self.bm_ret.set_index('date') # 基准收益率
groupret_pivotdata['bm'] = bm_ret['benchmark_ret']
groupret_pivotdata = groupret_pivotdata.shift(1) # 首日为nan,最后一日有值
self.groupret_pivotdata = groupret_pivotdata
groupcumret_pivotdata = groupret_pivotdata.cumsum() # 单利下的累积收益率
return groupcumret_pivotdata.round(4) # 数值型数据都是保留到小数点后四位
def get_Performance(self, data_type):
def get_stats(series, bm_series):
"""
series是日收益率数据, pandas.series
data_type是组合类型, 'long'、'short'、'long_short'
"""
return_ratio = series.sum() # 总收益
annual_return_ratio = series.sum() * 242 / len(series) # 年度收益
ex_return_ratio = (series-bm_series).sum() # 超额总收益
ex_annual_return_ratio = (series-bm_series).sum() * 242 / len( (series-bm_series)) # 超额年度收益
sharp_ratio = empyrical.sharpe_ratio(series, 0.035/242)
return_volatility = empyrical.annual_volatility(series)
max_drawdown = empyrical.max_drawdown(series)
information_ratio=series.mean()/series.std()
win_percent = len(series[series>0]) / len(series)
trading_days = len(series)
series = series.fillna(0)
ret_3 = series.rolling(3).sum().iloc[-1]
ret_10 = series.rolling(10).sum().iloc[-1]
ret_21 = series.rolling(21).sum().iloc[-1]
ret_63 = series.rolling(63).sum().iloc[-1]
ret_126 = series.rolling(126).sum().iloc[-1]
ret_252 = series.rolling(252).sum().iloc[-1]
return {
'return_ratio': return_ratio,
'annual_return_ratio': annual_return_ratio,
'ex_return_ratio': ex_return_ratio,
'ex_annual_return_ratio': ex_annual_return_ratio,
'sharp_ratio': sharp_ratio,
'return_volatility': return_volatility,
'information_ratio':information_ratio,
'max_drawdown': max_drawdown,
'win_percent':win_percent,
'trading_days':trading_days,
'ret_3':ret_3,
'ret_10':ret_10,
'ret_21':ret_21,
'ret_63':ret_63,
'ret_126':ret_126,
'ret_252':ret_252
}
if data_type == 'long':
perf = get_stats(self.groupret_pivotdata[str(self.params['group_num']-1)], self.groupret_pivotdata['bm'])
elif data_type =='short':
perf = get_stats(self.groupret_pivotdata['0'], self.groupret_pivotdata['bm'])
elif data_type =='long_short':
perf = get_stats(self.groupret_pivotdata['ls'], self.groupret_pivotdata['bm'])
return perf
def get_IC_data(self, data_type):
# IC
def cal_ic(df):
return df['daily_ret'].corr(df['factor'], method='spearman')
if data_type == 'all':
groupIC_data = self.group_data[['date','daily_ret','factor']].groupby('date', group_keys=False).apply(lambda x:cal_ic(x)).reset_index()
groupIC_data.rename(columns={0:'g_ic'}, inplace=True)
groupIC_data = groupIC_data.shift(1) # 首日为nan,最后一日有值
groupIC_data['ic_cumsum'] = groupIC_data['g_ic'].cumsum()
groupIC_data['ic_roll_ma'] = groupIC_data['g_ic'].rolling(22).mean()
return groupIC_data.round(4).dropna()
elif data_type == 'long':
data = self.group_data[self.group_data['group'] == str(self.params['group_num']-1)][['date','daily_ret','factor']]
groupIC_data = data.groupby('date', group_keys=False).apply(lambda x:cal_ic(x)).reset_index()
elif data_type == 'short':
data = self.group_data[self.group_data['group'] == '0'][['date','daily_ret','factor']]
groupIC_data = data.groupby('date', group_keys=False).apply(lambda x:cal_ic(x)).reset_index()
elif data_type == 'long_short':
data = self.group_data[self.group_data['group'].isin(['0',str(self.params['group_num']-1)])][['date','daily_ret','factor']]
groupIC_data = data.groupby('date', group_keys=False).apply(lambda x:cal_ic(x)).reset_index()
IC_data = groupIC_data.rename(columns={0:'g_ic'}).dropna()
ic_mean = np.nanmean(IC_data['g_ic'])
ir = np.nanmean(IC_data['g_ic']) / np.nanstd(IC_data['g_ic'])
ic_3 = IC_data['g_ic'].tail(3).mean()
ic_10 = IC_data['g_ic'].tail(10).mean()
ic_21 = IC_data['g_ic'].tail(21).mean()
ic_63 = IC_data['g_ic'].tail(63).mean()
ic_126 = IC_data['g_ic'].tail(126).mean()
ic_252 = IC_data['g_ic'].tail(252).mean()
return {
'ic':ic_mean,
'ir':ir,
'ic_3':ic_3,
'ic_10':ic_10,
'ic_21':ic_21,
'ic_63':ic_63,
'ic_126':ic_126,
'ic_252':ic_252
}
def get_Turnover_data(self, data_type):
def cal_turnover(df):
# 求每天instrument和上一日的重复元素数量
def count_repeat(s):
if s.name > 0:
return len(set(s['instrument']).intersection(set(df.loc[s.name - 1, 'instrument'])))
else:
return 0
s = df.groupby('date').apply(lambda x:x.instrument.tolist())
df = pd.DataFrame(s, columns = ['instrument']).reset_index()
# 求每天instrument有多少元素
df['instrument_count'] = df['instrument'].apply(len)
df['repeat_count'] = df.apply(count_repeat, axis=1)
df['turnover'] = 1 - df['repeat_count'] / df['instrument_count']
return np.nanmean(df['turnover'])
if data_type == 'long':
df = self.group_data[self.group_data['group'] == str(self.params['group_num']-1)]
return {'turnover':cal_turnover(df)}
elif data_type == 'short':
df = self.group_data[self.group_data['group'] == '0']
return {'turnover':cal_turnover(df)}
elif data_type == 'long_short':
long_df = self.group_data[self.group_data['group'] == str(self.params['group_num']-1)]
short_df = self.group_data[self.group_data['group'] == '0']
return {'turnover':cal_turnover(long_df) + cal_turnover(short_df)}
## 总体绩效计算
def get_whole_perf(self):
summary_df = pd.DataFrame()
for _type in ['long', 'short', 'long_short']:
dict_merged = {}
dict1 = self.get_IC_data(_type)
dict2 = self.get_Performance(_type)
dict3 = self.get_Turnover_data(_type)
dict_merged.update(dict1)
dict_merged.update(dict2)
dict_merged.update(dict3)
df = pd.DataFrame.from_dict(dict_merged, orient='index', columns=['value']).T
df['portfolio'] = _type
summary_df = summary_df.append(df)
summary_df.index = range(len(summary_df))
return summary_df.round(4)
# 按年绩效计算
def get_yearly_perf(self):
# 计算年度绩效指标
year_df = self.groupret_pivotdata.reset_index('date')
year_df['year'] = year_df['date'].apply(lambda x:x.year)
def cal_Performance(data):
series = data[str(self.params['group_num']-1)] # 只看多头组合
bm_series = data['bm']
return_ratio = series.sum() # 总收益
annual_return_ratio = series.sum() * 242 / len(series) # 年度收益
ex_return_ratio = (series-bm_series).sum() # 总收益
ex_annual_return_ratio = (series-bm_series).sum() * 242 / len(series-bm_series) # 年度收益
sharp_ratio = empyrical.sharpe_ratio(series,0.035/242)
return_volatility = empyrical.annual_volatility(series)
max_drawdown = empyrical.max_drawdown(series)
information_ratio=series.mean()/series.std()
win_percent = len(series[series>0]) / len(series)
trading_days = len(series)
perf = pd.DataFrame({
'return_ratio': [return_ratio],
'annual_return_ratio': [annual_return_ratio],
'ex_return_ratio': [ex_return_ratio],
'ex_annual_return_ratio': [ex_annual_return_ratio],
'sharp_ratio': [sharp_ratio],
'return_volatility': [return_volatility],
'max_drawdown': [max_drawdown],
'win_percent':[win_percent],
'trading_days':[int(trading_days)],
})
return perf
yearly_perf = year_df.groupby(['year'], group_keys=True).apply(cal_Performance)
yearly_perf = yearly_perf.droplevel(1).round(4) # 去掉一个level
# 计算年度IC
data = self.group_data[self.group_data['group'] == str(self.params['group_num']-1)][['date','daily_ret','factor']] # 只看多头组合
def cal_ic(df):
return df['daily_ret'].corr(df['factor'])
groupIC_data = data.groupby('date', group_keys=False).apply(lambda x:cal_ic(x)).reset_index()
IC_data = groupIC_data.rename(columns={0:'g_ic'}).dropna()
IC_data['year'] = IC_data['date'].apply(lambda x:x.year)
yearly_IC = IC_data.groupby('year').apply(lambda x:np.nanmean(x['g_ic']))
yearly_perf['ic'] = yearly_IC.round(4)
yearly_perf = yearly_perf.reset_index()
yearly_perf['year'] = yearly_perf['year'].apply(str)
return yearly_perf
def render(self):
"""图表展示因子分析结果"""
from bigcharts import opts
fields = ['portfolio','ic', 'ir', 'turnover', 'return_ratio', 'annual_return_ratio','ex_return_ratio', 'ex_annual_return_ratio', 'sharp_ratio', 'return_volatility', 'information_ratio', 'max_drawdown', 'win_percent', 'ic_252', 'ret_252']
whole_perf = self.whole_perf[fields]
c1 = bigcharts.Chart(
data=whole_perf,
type_="table",
chart_options=dict(
title_opts=opts.ComponentTitleOpts(title="整体绩效指标")
),
y=list(whole_perf.columns))
fields = ['year','ic', 'return_ratio', 'annual_return_ratio', 'ex_return_ratio', 'ex_annual_return_ratio', 'sharp_ratio', 'return_volatility',
'max_drawdown', 'win_percent', 'trading_days']
yearly_perf = self.yearly_perf[fields]
c2 = bigcharts.Chart(
data=yearly_perf,
type_="table",
chart_options=dict(
title_opts=opts.ComponentTitleOpts(title="年度绩效指标(多头组合)")
),
y=list(yearly_perf.columns))
# 绘制累积收益图
c3 = bigcharts.Chart(
data=self.group_cumret,
type_="line",
x=self.group_cumret.index,
y=self.group_cumret.columns)
_IC = np.nanmean(alpha_instance.ic['g_ic'])
_IR = np.nanmean(alpha_instance.ic['g_ic']) / np.nanstd(alpha_instance.ic['g_ic'])
abs_IC = alpha_instance.ic['g_ic'].abs()
significant_ic_ratio = abs_IC[abs_IC>=0.02].shape[0] / abs_IC.shape[0]
c4 = bigcharts.Chart(
data=pd.DataFrame({'IC':[np.round(_IC,4)], '|IC|>0.02':[np.round(significant_ic_ratio,4)], 'IR':[np.round(_IR,4)]}),
type_="table",
chart_options=dict(
title_opts=opts.ComponentTitleOpts(title="IC分析指标")
),
y=['IC','|IC|>0.02','IR'],
)
# 绘制每期IC时序图
c5 = bigcharts.Chart(
data=self.ic,
type_="bar",
x='date',
y=['g_ic', 'ic_roll_ma'],
chart_options=dict(
title_opts=opts.TitleOpts(
title="IC曲线",
subtitle="每日IC、累计IC、近22日IC均值",
pos_left="center",
pos_top="top",
),
legend_opts=opts.LegendOpts(
is_show=False, # 不显示图例
),
extend_yaxis=[opts.AxisOpts()]
)
)
# 绘制IC累计曲线图
c6 = bigcharts.Chart(
data=self.ic,
type_="line",
x='date',
y=['ic_cumsum'],
chart_options=dict(
title_opts=opts.TitleOpts(
title="IC累积曲线",
pos_left="center",
pos_top="top",
),
legend_opts=opts.LegendOpts(
is_show=False, # 不显示图例
)
),
series_options={"ic_cumsum": {"yaxis_index": 1}}
)
c5_6 = bigcharts.Chart(data = [c5, c6], type_ = "overlap",)
top_factor_df = self.factor_data[self.factor_data['date'] == self.end_date].round(4) # 最后一天因子数据
top_factor_df['date'] = top_factor_df['date'].apply(lambda x:x.strftime('%Y-%m-%d'))
# 按照 factor 列升序排序,获取最小的10行数据
df_sorted_min = top_factor_df.sort_values('factor').head(self.top_n_ins)
# 按照 factor 列降序排序,获取最大的10行数据
df_sorted_max = top_factor_df.sort_values('factor', ascending=False).head(self.top_n_ins)
c7 = bigcharts.Chart(
data=df_sorted_max,
type_="table",
chart_options=dict(
title_opts=opts.ComponentTitleOpts(title="因子值最大的%s只标的"%self.top_n_ins)
),
y=['date','instrument','factor'],
)
c8 = bigcharts.Chart(
data=df_sorted_min[['date','instrument','factor']],
type_="table",
chart_options=dict(
title_opts=opts.ComponentTitleOpts(title="因子值最小的%s只标的"%self.top_n_ins)
),
y=['date','instrument','factor'],
)
c_set = bigcharts.Chart([c1, c2, c3, c4, c5_6, c7, c8], type_="page").render(display=False)
from IPython.display import display
display(c_set)
t11 = time.time()
print('耗时:{0}秒 可视化输出'.format(np.round(t11-self.t10)))
print('=========因子分析结束,总耗时:{0}==========='.format(np.round(t11-self.t0)))
return c_set.data
# 下面的代码是上面SQL里面要删除的,怕删除错误。
t4_tmp AS(
SELECT
*,
price*volume AS amt,
datetrunc('day', date) AS day,
SUM(amt) FILTER(amt>1000000 AND bs_flag=66) OVER(PARTITION BY day ORDER BY date) AS amountbuy_xlarge,
SUM(amt) FILTER(amt>1000000 AND bs_flag=83) OVER(PARTITION BY day ORDER BY date) AS amountsell_xlarge,
SUM(amt) FILTER(amt<1000000 AND amt>200000 AND bs_flag=66) OVER(PARTITION BY day ORDER BY date) AS amountbuy_large,
SUM(amt) FILTER(amt<1000000 AND amt>200000 AND bs_flag=83) OVER(PARTITION BY day ORDER BY date) AS amountsell_large,
SUM(amt) FILTER(amt<200000 AND amt>40000 AND bs_flag=66) OVER(PARTITION BY day ORDER BY date) AS amountbuy_med,
SUM(amt) FILTER(amt<200000 AND amt>40000 AND bs_flag=83) OVER(PARTITION BY day ORDER BY date) AS amountsell_med,
SUM(amt) FILTER(amt<40000 AND bs_flag=66) OVER(PARTITION BY day ORDER BY date) AS amountbuy_small,
SUM(amt) FILTER(amt<40000 AND bs_flag=83) OVER(PARTITION BY day ORDER BY date) AS amountsell_small,
amountbuy_xlarge - amountsell_xlarge AS amountdiff_xlarge,
amountbuy_large - amountsell_large AS amountdiff_large,
amountbuy_med - amountsell_med AS amountdiff_med,
amountbuy_small - amountsell_small AS amountdiff_small,
row_number() over (PARTITION BY day ORDER by day) as _rn,
FROM
cn_stock_level2_order
WHERE
date >='{0}'||' 00:00:00.000'
AND
date <= '{1}'||' 23:59:59.999'
AND
instrument = '{2}'
QUALIFY _rn = 1
ORDER by date
),
t4 AS(
SELECT
day as date,
instrument,
amountbuy_xlarge,
amountsell_xlarge,
amountbuy_large,
amountsell_large,
amountbuy_med,
amountsell_med,
amountbuy_small,
amountsell_small,
amountdiff_xlarge,
amountdiff_large,
amountdiff_med,
amountdiff_small,
FROM
t4_tmp
WHERE
date >='{0}'||' 00:00:00.000'
AND
date <= '{1}'||' 23:59:59.999'
AND
instrument = '{2}'
ORDER by date
)