from bigdatasource.api import DataSource
from biglearning.api import M
from biglearning.api import tools as T
from biglearning.module2.common.data import Outputs
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
#并行任务
from joblib import Parallel, delayed
import copy
m1 = M.instruments.v2(
start_date='2020-04-01',
end_date='2023-04-28',
market='CN_STOCK_A',
instrument_list='',
max_count=0
)
m2 = M.input_features.v1(
features="""
# #号开始的表示注释,注释需单独一行
# 多个特征,每行一个,可以包含基础特征和衍生特征,特征须为本平台特征
换手=turn_0
成交量=volume_0
流通市值=market_cap_float_0
收盘价=close_0
开盘价=open_0
成交额=amount_0
最高价=high_0
最低价=low_0"""
)
m3 = M.general_feature_extractor.v7(
instruments=m1.data,
features=m2.data,
start_date='',
end_date='',
before_start_days=500
)
m4 = M.derived_feature_extractor.v3(
input_data=m3.data,
features=m2.data,
date_col='date',
instrument_col='instrument',
drop_na=False,
remove_extra_columns=False,
user_functions={}
)
m5 = M.chinaa_stock_filter.v1(
input_data=m4.data,
index_constituent_cond=['全部'],
board_cond=['上证主板', '深证主板', '创业板'],
industry_cond=['全部'],
st_cond=['正常'],
delist_cond=['非退市'],
output_left_data=False
)
df = m5.data.read()
df
def calc_chips(df,instrument):
import warnings
warnings.filterwarnings('ignore')
df_ = df.copy()
df_ = df_[df_['instrument'] == instrument]
df_.reset_index(inplace=True,drop=True)
#计算1日的获利盘比例
def calc_one_day(data,n):
data.sort_values(by='date',inplace=True)
data.reset_index(inplace=True , drop=True)
#获得换手率array
turn_array = np.array(data['换手'])/100 #除以100 成为小数格式
diff_array = 1 - turn_array # (1-当日换手) 数列
cp_array = diff_array.cumprod() # 等差数列连乘
mul = cp_array[::-1] # 等差数列翻转获得乘子
mul = np.roll(mul,-1) # 对乘子进行shift 并将最后的值赋值为1 (即观察日换手乘数为1)
mul[-1] = 1
mul_turn = turn_array * mul
#计算百分之多少的筹码
turn_cumsum = mul_turn[::-1] #对调整后的换手 倒序后累加
turn_cumsum = turn_cumsum.cumsum() #累加
target = np.ones(n) * 1.5 #这里输入计算百分之多少的筹码 #根据想计算的筹码总量百分比设定数值
pos = np.argmin(np.abs(turn_cumsum - target)) #找到最接近百分比值的位置,做切片操作
mul_turn = mul_turn[-pos:]
tmp = data.iloc[-pos:]
tmp.reset_index(drop=True,inplace=True)
tmp['adj_turn'] = pd.Series(mul_turn) #adj_turn = 历史换手率等价于今天的换手率
tmp['adj_amo'] = tmp['adj_turn'] * tmp['成交额'] #adj_amo = 根据换手率调整成交额
#计算获利盘比例
tmp.sort_values(by='收盘价',ascending=True,inplace=True) #按收盘价排序
tmp['cum_amo'] = tmp['adj_amo'].cumsum() #调整后的成交额累加
num = tmp['cum_amo'].iloc[-1] #获取成交额累加结果
tmp['winner_ratio'] = tmp['cum_amo'] /num #计算获利盘占比
tmp.sort_values(by='date',inplace=True)
return tmp.iloc[[-1],:] #输出最后一行
idx_list = df_.index.tolist()
len_ = len(idx_list)
pos = 0
n = 400
if len_ < n:
n = len_
lst = []
while pos <= len_:
try:
ep = idx_list[pos]
sp = ep - n + 1
pos += 1
if sp >= 0:
data_tmp = df_.loc[sp:ep]
outputs = calc_one_day(data_tmp,n)
lst.append(outputs)
pos += 1
except:
pos +=1
data = pd.concat(lst)
#用于研究未来收益期望使用,不需要可以注释掉
data.reset_index(inplace=True,drop=True)
data['future_return_5'] = data['收盘价'].shift(-5)/data['收盘价']
data['future_return_1'] = data['收盘价'].shift(-1)/data['收盘价']
data.dropna(inplace=True)
return data
lst = df.instrument.unique().tolist()
results = Parallel(n_jobs=32)(delayed(calc_chips)(df,ins) for ins in lst)
df_ = pd.concat(results)
df_.sort_values(by='date',inplace=True)
df_.reset_index(inplace=True,drop=True)
df_
def calc_mr(df):
mean = df['future_return_1'].mean()
groups = df.groups.unique()[0]
df_tmp = pd.DataFrame({
'label':groups,
'mr':mean},
columns=[
'label',
'mr',
],
index=pd.RangeIndex(start=0, stop=1))
return df_tmp
df_['groups'] = pd.cut(df_['winner_ratio'],bins=7,labels=False)
df_plot = df_.groupby(['groups']).apply(calc_mr)
df_plot.reset_index(drop=True,inplace=True)
df_plot
import plotly.graph_objects as go
# 创建柱状图
fig = go.Figure(
data=[go.Bar(x=df_plot.label, y=df_plot.mr-1, text=df_plot.mr-1, textposition='auto')],
layout_title_text='获利盘分层未来5日收益期望'
)
fig.show()
def calc_ic(df):
mean = df['future_return_5'].mean()
groups = df.groups.unique()[0]
date = df.date.unique()[0]
winner = df['winner_ratio'].mean()
df_tmp = pd.DataFrame({'date':date,
'label':groups,
'mr':mean,
'winner':winner},
columns=[ 'date',
'label',
'mr',
'winner'
],
index=pd.RangeIndex(start=0, stop=1))
return df_tmp
df_['groups'] = pd.cut(df_['winner_ratio'],bins=7,labels=False)
df_plot_ic = df_.groupby(['groups','date']).apply(calc_ic)
df_plot_ic.reset_index(drop=True,inplace=True)
from plotly.subplots import make_subplots
plot = pd.DataFrame({'label':None,
'corr':None},
columns=[
'label',
'corr'],
index=pd.RangeIndex(start=0, stop=1))
for num in range(7):
cut = df_plot_ic[df_plot_ic['label']==num]
corr = cut['winner'].corr(cut['mr'])
plot.loc[num,'corr'] =corr
plot.loc[num,'label']= num
fig = go.Figure(
data=[go.Bar(x=plot['label'], y=plot['corr'])],
layout_title_text='不同层,获利盘与未来收益率相关性'
)
fig.show()
df_
def calc_return(df):
df['return'] = df['future_return_1'].mean()
return df
plot_ic = df_.groupby(['date','groups']).apply(calc_return)
test = plot_ic.drop_duplicates(subset=['date','groups'])
def cal_cum_prod(df):
df['equity'] = df['return'].cumprod()
return df
test = test.groupby('groups').apply(cal_cum_prod)
test[test['groups']==0]['equity']
import plotly.graph_objects as go
# 创建一个折线图对象
fig = go.Figure()
# 添加折线到图表
fig.add_trace(go.Scatter(x=test.date.unique(), y=test[test['groups']==0]['equity'], mode='lines', name='group_0'))
fig.add_trace(go.Scatter(x=test.date.unique(), y=test[test['groups']==1]['equity'], mode='lines', name='group_1'))
fig.add_trace(go.Scatter(x=test.date.unique(), y=test[test['groups']==2]['equity'], mode='lines', name='group_2'))
fig.add_trace(go.Scatter(x=test.date.unique(), y=test[test['groups']==3]['equity'], mode='lines', name='group_3'))
fig.add_trace(go.Scatter(x=test.date.unique(), y=test[test['groups']==4]['equity'], mode='lines', name='group_4'))
fig.add_trace(go.Scatter(x=test.date.unique(), y=test[test['groups']==5]['equity'], mode='lines', name='group_5'))
fig.add_trace(go.Scatter(x=test.date.unique(), y=test[test['groups']==6]['equity'], mode='lines', name='group_6'))
# 设置图表标题和坐标轴标签
fig.update_layout(title='分层测试', xaxis_title='X Axis Label', yaxis_title='Y Axis Label')
# 显示图表
fig.show()