【平台使用】因子回归分析为什么运行到cached就没办法继续运行下去了?
由bqga9pf6创建,最终由small_q 被浏览 28 用户
# 本代码由可视化策略环境自动生成 2023年5月11日 14:45
# 本代码单元只能在可视化模式下编辑。您也可以拷贝代码,粘贴到新建的代码单元或者策略,然后修改。
# Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端
def m7_run_bigquant_run(input_1, input_2, input_3):
# 示例代码如下。在这里编写您的代码
from datetime import datetime,timedelta
start_date = input_1.read()['start_date']
end_date = input_1.read()['end_date']
start_date_1 = (datetime.strptime(start_date,'%Y-%m-%d') - timedelta(days=20)).strftime('%Y-%m-%d')
df_index = DataSource("bar1d_index_CN_STOCK_A").read(start_date=start_date_1, end_date=end_date,fields=['date','instrument','close'])
df_comp = DataSource("industry_CN_STOCK_A").read(start_date=start_date_1, end_date=end_date)
df_comp = df_comp[['date','instrument','industry_sw_level1']]
df_bar = DataSource("bar1d_CN_STOCK_A").read(start_date=start_date_1, end_date=end_date)
df = input_2.read()
df = pd.merge(df,df_comp,how='left',on=['date','instrument'])
df = pd.merge(df,df_bar,how='left',on=['date','instrument'])
df = df.dropna()
def get_return(x):
x.sort_values('date',inplace=True)
x['return'] = x['close'] / x['close'].shift(1) - 1
return x
df = df.groupby('instrument').apply(get_return)
industry_sw = df['industry_sw_level1'].dropna().unique()
industry_cols = []
for col in industry_sw:
if col != 0:
df['SW' + str(int(col)) + '.HIX'] = df['industry_sw_level1'].apply(lambda x: 1 if x == col else 0)
industry_cols.append('SW' + str(int(col)) + '.HIX')
df_index = df_index[df_index['instrument'].isin(industry_cols)]
df_index = df_index.groupby('instrument').apply(get_return)
df_index = pd.pivot_table(df_index[['date','instrument','return']],index='date',columns='instrument',values='return')
df_index.reset_index(inplace=True)
df = df[(df['date'] >= start_date)&(df['date'] <= end_date)]
df_index = df_index[(df_index['date'] >= start_date)&(df_index['date'] <= end_date)]
df_index = df_index.fillna(0)
data_1 = DataSource.write_df(df)
data_2 = DataSource.write_df(df_index)
return Outputs(data_1=data_1, data_2=data_2, data_3=None)
# 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。
def m7_post_run_bigquant_run(outputs):
return outputs
# Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端
def m9_run_bigquant_run(input_1, input_2, input_3):
# 示例代码如下。在这里编写您的代码
df = input_1.read()
df_index = input_2.read()
factor_name = input_3.read()[0]
import statsmodels.api as sm
def factor_test(x,factor_name):
c_date = x['date'].unique()[0]
df_index_tp = df_index[df_index['date'] == c_date]
del df_index_tp['date']
industry_cols = df_index_tp.columns.tolist()
df_tp_industry = x[industry_cols +['instrument']]
df_tp_industry.set_index('instrument',inplace=True)
if len(df_tp_industry) != 0 and len(df_index_tp) != 0:
df_tp_industry['industry_return'] = np.mat(df_tp_industry)*np.mat(df_index_tp[industry_cols].T)
df_tp_industry.reset_index(inplace=True)
df_tp_linear = pd.merge(df_tp_industry[['instrument','industry_return']], x[['instrument',factor_name,'return']],how='left',on='instrument')
df_tp_linear['y'] = df_tp_linear['return'] - df_tp_linear['industry_return']
fit=sm.formula.ols(f'y~{factor_name}-1',data=df_tp_linear).fit()
f_returns = fit.params.values[0]
t_values = fit.tvalues[0]
r2 = fit.rsquared
return pd.DataFrame([{'date':c_date,'factor_name':factor_name,'return':f_returns,'t_values':t_values,'r2':r2}])
else:
print("factor test wrong data:",c_date)
df_f_return = df.groupby('date').apply(factor_test,factor_name)
df_f_return.reset_index(drop=True,inplace=True)
print(df_f_return)
df_f_return['t_values'] = df_f_return['t_values'].abs()
t_mean = df_f_return['t_values'].mean()
t_ratio = df_f_return[df_f_return['t_values'] >= 2]['t_values'].count() / df_f_return['t_values'].count()
fkt = t_mean / df_f_return['t_values'].std() * np.sqrt(df_f_return['t_values'].count() - 1)
df_f_test = pd.DataFrame([{'factor_name':factor_name,'t_mean':t_mean,'t_ratio':t_ratio,'t_significance':fkt}])
data_1 = DataSource.write_df(df_f_return)
data_2 = DataSource.write_df(df_f_test)
print(factor_name+"_T_test",df_f_test)
# DataSource('factor_return_U').read()
# DataSource('factor_exposure_pe_ttm_0_U').read()
return Outputs(data_1=data_1, data_2=data_2, data_3=None)
# 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。
def m9_post_run_bigquant_run(outputs):
return outputs
m1 = M.input_features.v1(
features="""
# #号开始的表示注释,注释需单独一行
# 多个特征,每行一个,可以包含基础特征和衍生特征,特征须为本平台特征
hf_real_var
"""
)
m2 = M.instruments.v2(
start_date='2019-1-1',
end_date='2022-12-31',
market='CN_STOCK_A',
instrument_list='',
max_count=0
)
m3 = M.general_feature_extractor.v7(
instruments=m2.data,
features=m1.data,
start_date='',
end_date='',
before_start_days=90
)
m4 = M.derived_feature_extractor.v3(
input_data=m3.data,
features=m1.data,
date_col='date',
instrument_col='instrument',
drop_na=False,
remove_extra_columns=False
)
m5 = M.winsorize.v7(
input_data=m4.data,
features=m1.data,
columns_input='',
function_name='3倍标准差',
group='date'
)
m6 = M.standardlize.v12(
input_1=m5.data,
input_2=m1.data,
standard_func='ZScoreNorm',
columns_input=''
)
m7 = M.cached.v3(
input_1=m2.data,
input_2=m6.data,
input_3=m1.data,
run=m7_run_bigquant_run,
post_run=m7_post_run_bigquant_run,
input_ports='',
params='{}',
output_ports=''
)
m8 = M.chinaa_stock_filter.v1(
input_data=m7.data_1,
index_constituent_cond=['全部'],
board_cond=['全部'],
industry_cond=['全部'],
st_cond=['全部'],
delist_cond=['全部'],
output_left_data=False
)
m9 = M.cached.v3(
input_1=m8.data,
input_2=m7.data_2,
input_3=m1.data,
run=m9_run_bigquant_run,
post_run=m9_post_run_bigquant_run,
input_ports='',
params='{}',
output_ports=''
)
\