import pandas as pd
import numpy as np
from biglearning.module2.common.data import Outputs
from zipline.finance.commission import PerOrder
import os
from bigdatasource.api import DataSource
from biglearning.api import M
from biglearning.api import tools as T
from biglearning.module2.common.data import Outputs
import warnings
warnings.filterwarnings('ignore')
from datetime import date
from joblib import Parallel, delayed
sd = '2023-05-15'
ed = date.today().strftime('%Y-%m-%d')
#'2023-05-14'
#sd = '2023-05-12'
#ed = '2023-05-15'
def calc_data(instrument,sd,ed):
import warnings
warnings.filterwarnings('ignore')
print(instrument)
df = DataSource('bar1m_CN_STOCK_A').read(start_date=sd,end_date=ed,instruments = instrument)
try:
df_ = df.copy()
df_['day'] = df_['date'].dt.to_period('D')
except:
print(instrument,'数据读取失败')
return
def calc_one_day(df):
df.reset_index(inplace=True,drop=True)
vs = df.loc[0,'volume']
cs = df.loc[0,'close']
s = 0
df['邻域'] = df['volume'] +df['volume'].shift()+df['volume'].shift(2)+df['volume'].shift(3)+df['volume'].shift(4)+df['volume'].shift(-1)+df['volume'].shift(-2)+df['volume'].shift(-3)+df['volume'].shift(-4)
df.dropna(inplace=True)
df.reset_index(inplace=True,drop=True)
idxmax = df['邻域'].idxmax() #顶峰
price_top = df.loc[idxmax,'close']
if idxmax != 0:
df_1 = df.loc[:idxmax]
df_2 = df.loc[idxmax:]
idxmin_m = df_1['邻域'].idxmin()
idxmin_n = df_2['邻域'].idxmin()
vm = df.loc[idxmin_m,'volume']
cm = df.loc[idxmin_m,'close']
vn = df.loc[idxmin_n,'volume']
cn = df.loc[idxmin_n,'close']
df['vm'] = vm
df['cm'] = cm
df['vn'] = vn
df['cn'] = cn
df['nm'] = idxmin_n - idxmin_m
if vm > vn:
last_price = df_2.loc[:,'close'].iloc[-1]
t_1 = len(df) - idxmax
strong = (last_price - price_top)/price_top/t_1
first_price = df_1.loc[:,'close'].iloc[0]
t_2 = idxmax
weak = (price_top - first_price)/first_price/t_2
df['w_factor'] = weak
df['s_factor'] = strong
if vn > vm:
first_price= df_1.loc[:,'close'].iloc[0]
t_1 = idxmax
strong = (price_top - first_price)/first_price/t_1
last_price = df_2.loc[:,'close'].iloc[-1]
t_2 = len(df) - idxmax
weak = (last_price - price_top)/price_top/t_2
df['w_factor'] = weak
df['s_factor'] = strong
else:
idxmin = df['邻域'].idxmin()
vn = df.loc[idxmin,'volume']
cn = df.loc[idxmin,'close']
df['vm'] = vs
df['cm'] = cs
df['vn'] = vn
df['cn'] = cn
df['nm'] = idxmin
first_price = df.loc[:,'close'].iloc[0]
last_price = df.loc[:,'close'].iloc[-1]
fac = (last_price-first_price)/first_price/len(df)
df['w_factor'] = fac
df['s_factor'] = fac
df['factor'] = (df['cn'] - df['cm'])/df['cm']/df['nm']
return df
try:
df_ = df_.groupby('day').apply(calc_one_day)
df_.reset_index(inplace=True,drop=True)
df_.drop_duplicates(subset=['day'],inplace=True)
df_['date'] = df_['date'].dt.strftime('%Y-%m-%d')
df_['date'] = pd.to_datetime(df_['date'])
df_ = df_[['instrument','date','w_factor','s_factor','factor']]
import dai
# 可选:定义分区,可以用于数据访问加速
df_[dai.DEFAULT_PARTITION_FIELD] = df_["date"].apply(lambda x: f"{x.year}")
dai.DataSource.write_bdb(
data=df_,
id="hf_tide",
unique_together=["date", "instrument"],
indexes=["date"],
)
return
except:
print(instrument,'数据更新失败')
return
tmp = DataSource('bar1d_CN_STOCK_A').read(start_date=sd,end_date=ed)
ins_list = tmp.instrument.unique()
results = Parallel(n_jobs=-1)(delayed(calc_data)(ins,sd,ed) for ins in ins_list)