class conf:
version = 5
start_date = '2017-01-01'
end_date='2017-12-31'
split_date = '2017-11-01'
instruments = M.instruments.v2(
start_date=start_date,
end_date=end_date,
market='CN_STOCK_A',
instrument_list='',
max_count=0
)
WIN_HOLD = 20
features = M.input_features.v1(features=
"""
in_csi300_0
daily_return_1
return_5
return_20
rank_amount_5
avg_turn_10
market_cap_float_0
pe_ttm_0
pb_lf_0
mf_net_pct_main_0
fs_roa_ttm_0
fs_cash_ratio_0
sh_holder_avg_pct_0
ta_sma_10_0
ta_sma_30_0
ta_sar_0
volatility_60_0
"""
)
print("instruments count %d" % len(conf.instruments.data.read()['instruments']))
class DragonLabel(conf):
def __init__(self):
pass
@staticmethod
def _gen_data_M():
df = D.history_data(conf.instruments.data.read()['instruments'], start_date=conf.start_date,end_date=conf.end_date,fields=['close','open','high','low','amount'])
ds = DataSource.write_df(df)
return Outputs(data=ds)
def gen(self):
self.X = self.gen_X()
self.Y = self.gen_Y()
self.XY = pd.merge(self.X, self.Y, on=['date', 'instrument'], how='inner', sort=True).dropna()
print('X size {0}, Y size {1}, XY join size {2}'.format(len(self.X), len(self.Y), len(self.XY)))
def get(self, scope='all'):
if scope == 'train':
XY = self.XY.query('date < "%s"' % conf.split_date)
elif scope == 'test':
XY = self.XY.query('date >= "%s"' % conf.split_date)
else:
XY = self.XY
features = conf.features.data.read()
X = XY[features]
Y = XY['label']
return (X,Y)
def gen_X(self):
#M_features = M.input_features.v1(features=conf.features)
M_features_general = M.general_feature_extractor.v7(
instruments=conf.instruments.data,
features=conf.features.data,
start_date='',
end_date='',
before_start_days=0
)
M_features_derived = M.derived_feature_extractor.v3(
input_data=M_features_general.data,
features=conf.features.data,
date_col='date',
instrument_col='instrument',
drop_na=True,
remove_extra_columns=False
)
return M_features_derived.data.read_df()
def gen_Y(self):
m_data = M.cached.v3(run=self._gen_data_M, kwargs=None, m_deps=conf.version)
df = m_data.data.read_df().dropna()
df = df.groupby('instrument').apply(lambda x:self.calc_label_all(x))
df = self.calc_score(df)
#df.drop(['low','high','amount','open','hr', 'lr', 'wt'],axis=1,inplace=True)
return df
def rolling_apply(self, df, N, f, nn=1):
ii = [int(x) for x in range(0, df.shape[0] - N + 1, nn)]
out = [f(df.iloc[i:(i + N)]) for i in ii]
out = pandas.Series(out)
#out.index = df.index[0:len(out)]
out.index = df.index[0::nn][:len(out)]
return(out)
def calc_label_all(self, iall):
#print('calc_label_all', iall.iloc[0]['instrument'])
N=conf.WIN_HOLD
ii = [int(x) for x in range(0, iall.shape[0] - N + 1, 1)]
out = [self.calc_label(iall.iloc[i:(i + N)]) for i in ii]
out = pandas.DataFrame(out)
iall[out.columns] = out
return iall
def calc_label(self, iw):
#iw has each instrument with WIN_HOLD
name = iw.index[0]
iw = iw.reset_index(drop=True)
pc = iw.iloc[0]['close']
hri = iw['high'].idxmax()+1
#hr = highest return in the WIN
hr = (iw['high'].max() - pc)/pc
#before hr
iwh = iw.iloc[:hri]
lr = (iwh['low'].min() - pc)/pc
#wt = wait time to get highest/2
wtr = iwh['close'] - (pc*(1+(hr/2)))
wtr = wtr.abs()
wt = wtr.idxmin()
label = pd.Series({'hr':hr, 'lr':lr, 'wt':wt})
label.name = name
return label
def calc_score(self, data):
reduced = data.dropna().query('hr>0 & lr>-0.03 & wt>0')
print('Label Data size {0} reduced size {1}'.format(len(data), len(reduced)))
for factor in ['hr', 'lr', 'wt']:
reduced[factor] = (reduced[factor] - reduced[factor].mean()) / reduced[factor].std()
reduced['score'] = reduced['hr']*0.45 - reduced['lr']*0.2 - reduced['wt']*0.35
#map label to 100 range
std = reduced['score'].std()
reduced['label'] = (reduced['score']+std)*(50/std)
#reduced.label = reduced.label.astype('int')
#df.label=df.label.where(df.label<20,20)
#df.label=df.label.where(df.label>0,0)
return reduced
DL = DragonLabel()
data = DL.gen()
import matplotlib.pyplot as plt
from sklearn import ensemble
from sklearn.metrics import mean_squared_error
class DragonTrain():
def __init__(self):
self.params = {'n_estimators': 200, 'max_depth': 3, 'min_samples_split': 2,
'learning_rate': 0.03, 'loss': 'ls'}
def gen(self):
pass
def train(self, train, test, plot=True):
(X_train, y_train) = train
(X_test, y_test) = test
params = self.params
# Fit regression model
clf = ensemble.GradientBoostingRegressor(**params)
clf.fit(X_train, y_train)
mse_t = mean_squared_error(y_train, clf.predict(X_train))
mse = mean_squared_error(y_test, clf.predict(X_test))
print("train data MSE %.4f test data MSE: %.4f" % (mse_t, mse))
self.clf = clf
if (plot):
self.plot(test)
def plot(self, test):
clf = self.clf
(X_test, y_test) = test
params = self.params
feature_names = X_test.columns
# #############################################################################
# Plot training deviance
# compute test set deviance
test_score = np.zeros((params['n_estimators'],), dtype=np.float64)
for i, y_pred in enumerate(clf.staged_predict(X_test)):
test_score[i] = clf.loss_(y_test, y_pred)
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.title('Deviance')
plt.plot(np.arange(params['n_estimators']) + 1, clf.train_score_, 'b-',
label='Training Set Deviance')
plt.plot(np.arange(params['n_estimators']) + 1, test_score, 'r-',
label='Test Set Deviance')
plt.legend(loc='upper right')
plt.xlabel('Boosting Iterations')
plt.ylabel('Deviance')
# #############################################################################
# Plot feature importance
feature_importance = clf.feature_importances_
# make importances relative to max importance
feature_importance = 100.0 * (feature_importance / feature_importance.max())
sorted_idx = np.argsort(feature_importance)
pos = np.arange(sorted_idx.shape[0]) + .5
plt.subplot(1, 2, 2)
plt.barh(pos, feature_importance[sorted_idx], align='center')
plt.yticks(pos, feature_names[sorted_idx])
plt.xlabel('Relative Importance')
plt.title('Variable Importance')
plt.show()
DT = DragonTrain()
DT.train(train=DL.get('train'), test=DL.get('test'))
from sklearn.metrics import r2_score
r2_score(y_test, DT.clf.predict(X_test))