克隆策略

    {"Description":"实验创建于2020/2/14","Summary":"","Graph":{"EdgesInternal":[{"DestinationInputPortId":"-28:features","SourceOutputPortId":"-592:data"},{"DestinationInputPortId":"-235:input_2","SourceOutputPortId":"-592:data"},{"DestinationInputPortId":"-127:input_1","SourceOutputPortId":"-135:data"},{"DestinationInputPortId":"-135:features","SourceOutputPortId":"-151:data"},{"DestinationInputPortId":"-235:input_1","SourceOutputPortId":"-127:data_1"},{"DestinationInputPortId":"-28:factors_df","SourceOutputPortId":"-235:data_1"},{"DestinationInputPortId":"-28:performance_data","SourceOutputPortId":"-235:data_2"},{"DestinationInputPortId":"-638:input_1","SourceOutputPortId":"-404:data"},{"DestinationInputPortId":"-404:input_data","SourceOutputPortId":"-408:data"},{"DestinationInputPortId":"-135:instruments","SourceOutputPortId":"-638:data_1"}],"ModuleNodes":[{"Id":"-28","ModuleId":"BigQuantSpace.factorlens_preservation.factorlens_preservation-v1","ModuleParameters":[{"Name":"factor_column","Value":"['Trend_Strength', 'ret_skew', 'ret_kurt']","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"factor_name","Value":"hf_Trend_Strength, hf_ret_skew, hf_ret_kurt","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"factors_df","NodeId":"-28"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"performance_data","NodeId":"-28"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"features","NodeId":"-28"}],"OutputPortsInternal":[{"Name":"data","NodeId":"-28","OutputType":null}],"UsePreviousResults":false,"moduleIdForCode":13,"IsPartOfPartialRun":null,"Comment":"","CommentCollapsed":true},{"Id":"-592","ModuleId":"BigQuantSpace.input_features.input_features-v1","ModuleParameters":[{"Name":"features","Value":"Trend_Strength\nret_skew\nret_kurt","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"features_ds","NodeId":"-592"}],"OutputPortsInternal":[{"Name":"data","NodeId":"-592","OutputType":null}],"UsePreviousResults":false,"moduleIdForCode":14,"IsPartOfPartialRun":null,"Comment":"","CommentCollapsed":true},{"Id":"-135","ModuleId":"BigQuantSpace.feature_extractor_1m.feature_extractor_1m-v1","ModuleParameters":[{"Name":"start_date","Value":"","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"end_date","Value":"","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"before_start_days","Value":"0","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"workers","Value":"4","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"parallel_mode","Value":"集群","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"table_1m","Value":"level2_bar1m_CN_STOCK_A","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"instruments","NodeId":"-135"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"features","NodeId":"-135"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"user_functions","NodeId":"-135"}],"OutputPortsInternal":[{"Name":"data","NodeId":"-135","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":15,"IsPartOfPartialRun":null,"Comment":"","CommentCollapsed":true},{"Id":"-151","ModuleId":"BigQuantSpace.input_features.input_features-v1","ModuleParameters":[{"Name":"features","Value":"_last_price = close.iloc[-1] \nclose = _last_price ##因子分析必须\n\n## 动量类因子\n# 趋势强度\n_p1 = close.iloc[-1] - close.iloc[0] #一日收盘价差值\n_p2 = abs(close-close.shift(1))[1:].sum() #今昨两日收盘价分钟序列差值求和\nTrend_Strength = _p1 / _p2 #日内价格位移与路程之比\n\n## 收益率分布因子\n# 高频偏度和峰度\n_ret_log = log(close.pct_change().fillna(method='bfill') + 1) #对数收益率\nret_skew = _ret_log.skew() #收益率偏度\nret_kurt = _ret_log.kurt() #收益率峰度","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"features_ds","NodeId":"-151"}],"OutputPortsInternal":[{"Name":"data","NodeId":"-151","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":16,"IsPartOfPartialRun":null,"Comment":"","CommentCollapsed":true},{"Id":"-127","ModuleId":"BigQuantSpace.cached.cached-v3","ModuleParameters":[{"Name":"run","Value":"# Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端\ndef bigquant_run(input_1, input_2, input_3):\n # 示例代码如下。在这里编写您的代码\n df = input_1.read()\n data_1 = DataSource.write_df(df[['date','instrument','close', 'Trend_Strength', 'ret_skew', 'ret_kurt']])\n return Outputs(data_1=data_1, data_2=None, data_3=None)\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"post_run","Value":"# 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。\ndef bigquant_run(outputs):\n return outputs\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"input_ports","Value":"","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"params","Value":"{}","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"output_ports","Value":"","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_1","NodeId":"-127"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_2","NodeId":"-127"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_3","NodeId":"-127"}],"OutputPortsInternal":[{"Name":"data_1","NodeId":"-127","OutputType":null},{"Name":"data_2","NodeId":"-127","OutputType":null},{"Name":"data_3","NodeId":"-127","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":20,"IsPartOfPartialRun":null,"Comment":"","CommentCollapsed":true},{"Id":"-235","ModuleId":"BigQuantSpace.cached.cached-v3","ModuleParameters":[{"Name":"run","Value":"# Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端\ndef bigquant_run(input_1, input_2, start_date, end_date, rabalance_period, buy_commission_rate, sell_commission_rate, \n ic_method, quantile_num, is_standardlize, is_winsorize):\n import time\n import bigexpr\n \n from bigshared.common.biglogger import BigLogger\n \n # print(start_date,end_date)\n class FuturesPerformance:\n def __init__(\n self,\n log,\n start_date=None,\n end_date=None,\n rabalance_period=22,\n buy_commission_rate=0.0005,\n sell_commission_rate=0.0005,\n ic_method=\"Rank_IC\",\n quantile_num=5,\n is_standardlize=True,\n is_winsorize=True,\n ):\n self.log = log\n self.start_date = start_date\n self.end_date = end_date\n self.rabalance_period = rabalance_period # 调仓天数\n self.buy_commission_rate = buy_commission_rate # 买入佣金(百分比)\n self.sell_commission_rate = sell_commission_rate # 卖出佣金(百分比)\n self.ic_method = ic_method\n self.quantile_num = quantile_num\n self.is_standardlize = is_standardlize # 是否标准化\n self.is_winsorize = is_winsorize # 是否去极值\n \n def data_processing(self, continus_contract_df, factor_expr):\n # 表达式抽取\n start_time = time.time()\n self.log.info(\"data_processing start ...\")\n\n def _handle_data(df, assignment_value, price_type):\n # 计算当期因子和未来一段时间收益率\n # df[\"factor\"] = df[\"close\"] / df[\"close\"].shift(44) - 1 # 构建因子\n df[\"factor\"] = bigexpr.evaluate(df, assignment_value, None)\n # 持有期收益率\n df[\"ret\"] = df[price_type].shift(-1 * self.rabalance_period) / df[price_type] - 1\n df['ret'] = df.ret.shift(-1) # 下一期的收益率\n df['daily_ret_1'] = df['close'].pct_change().shift(-1) # 次日收益率\n return df\n\n # 极值数据处理\n def _winsorize(df):\n df = df.copy()\n factor_columns = [\"factor\"]\n for factor in factor_columns:\n mean = df[factor].mean()\n sigma = df[factor].std()\n df[factor] = df[factor].clip(mean - 3 * sigma, mean + 3 * sigma)\n return df\n\n # 标准数据处理\n def _standardlize(df):\n df = df.copy()\n factor_columns = [\"factor\"]\n for factor in factor_columns:\n mean = df[factor].mean()\n sigma = df[factor].std()\n df[factor] = (df[factor] - mean) / sigma\n return df\n\n assignment_targets, assignment_value = bigexpr.parse_assignment(factor_expr)\n factor_df = continus_contract_df.groupby(\"instrument\").apply(\n _handle_data, assignment_value=assignment_value, price_type=\"close\")\n\n base_factor_df = factor_df[[\"date\", \"instrument\", \"close\", \"ret\", \"factor\", \"daily_ret_1\"]]\n # 标准化,去极值处理\n if self.is_standardlize and not self.is_winsorize:\n base_factor_df = base_factor_df.groupby(\"date\").apply(\n lambda x: _standardlize(x)).reset_index(drop=True)\n elif self.is_winsorize and not self.is_standardlize:\n base_factor_df = base_factor_df.groupby(\"date\").apply(\n lambda x: _winsorize(x)).reset_index(drop=True)\n elif self.is_winsorize and self.is_standardlize:\n base_factor_df = base_factor_df.groupby(\"date\").apply(\n lambda x: _standardlize(_winsorize(x))).reset_index(drop=True)\n # 对数据根据时间进行过滤\n base_factor_df = base_factor_df[(base_factor_df['date']>self.start_date) & ((\n base_factor_df['date']<self.end_date))]\n\n # 对应用户抽取的列名\n if assignment_targets:\n for target in assignment_targets:\n base_factor_df[target] = base_factor_df[\"factor\"]\n\n # if not self.is_roll_rebalance:\n # td = D.trading_days(\n # start_date=base_factor_df.date.min().strftime(\"%Y-%m-%d\"))\n # rebalance_days = td[:: self.rabalance_period] # 调仓期\n # rebalance_days_df = pd.DataFrame(\n # {\"date\": rebalance_days[\"date\"], \"ix\": range(len(rebalance_days))})\n # rebalance_days_df.index = range(len(rebalance_days_df))\n # merge_df = pd.merge(\n # base_factor_df, rebalance_days_df, on=\"date\", how=\"inner\")\n # else:\n # merge_df = base_factor_df\n td = D.trading_days(start_date=base_factor_df.date.min().strftime('%Y-%m-%d'))\n rebalance_days = td[::self.rabalance_period] # 调仓期\n rebalance_days_df = pd.DataFrame({'date': rebalance_days['date'], 'ix': range(len(rebalance_days))})\n rebalance_days_df.index = range(len(rebalance_days_df))\n merge_df = pd.merge(base_factor_df, rebalance_days_df, on='date', how='inner')\n\n # 将因子名或因子表达式抽取出来做展示处理\n factor_name = assignment_targets[0] if assignment_targets else assignment_value\n self.log.info(\"data_processing process %.3fs\" % (time.time() - start_time))\n return merge_df, base_factor_df, factor_name\n \n def ic_processing(self, merge_df, factor_name):\n start_time = time.time()\n self.log.info(\"ic_processing start ...\")\n\n def _cal_IC(df, method=\"Rank_IC\"):\n \"\"\"计算IC系数\"\"\"\n from scipy.stats import pearsonr, spearmanr\n\n df = df.dropna()\n if df.shape[0] == 0:\n return np.nan\n\n if method == \"Rank_IC\":\n return spearmanr(df[\"factor\"], df[\"ret\"])[0]\n if method == \"IC\":\n return pearsonr(df[\"factor\"], df[\"ret\"])[0]\n\n ic = merge_df.groupby(\"date\").apply(_cal_IC, method=self.ic_method)\n # if self.is_roll_rebalance:\n # ic = ic.rolling(self.rabalance_period).mean()[\n # ic.index[:: self.rabalance_period]]\n\n # ic相关指标\n ic_mean = np.round(ic.mean(), 4)\n ic_std = np.round(ic.std(), 4)\n ic_ir = np.round(ic_mean / ic_std, 4)\n positive_ic_cnt = len(ic[ic > 0])\n negative_ic_cnt = len(ic[ic < 0])\n ic_skew = np.round(ic.skew(), 4)\n ic_kurt = np.round(ic.kurt(), 4)\n\n # IC指标展示\n results = {\n \"stats\": {\n \"ic_mean\": ic_mean,\n \"ic_std\": ic_std,\n \"ic_ir\": ic_ir,\n \"positive_ic_cnt\": positive_ic_cnt,\n \"negative_ic_cnt\": negative_ic_cnt,\n \"ic_skew\": ic_skew,\n \"ic_kurt\": ic_kurt,\n },\n \"title\": f\"{factor_name}: IC分析\",\n }\n\n ic.name = \"ic\"\n ic_df = ic.to_frame()\n ic_df[\"ic_cumsum\"] = ic_df[\"ic\"].cumsum()\n self.log.info(\"ic_processing process %.3fs\" % (time.time() - start_time))\n return ic_df, results\n\n def ols_stats_processing(self, merge_df, factor_name):\n start_time = time.time()\n self.log.info(\"ols_stats_processing start ...\")\n\n def _get_model_stats(X, y):\n from pyfinance import ols\n\n model = ols.OLS(y=y, x=X)\n return [model.beta, model.tstat_beta, model.pvalue_beta, model.se_beta]\n\n ols_stats = merge_df.dropna().groupby(\"date\").apply(\n lambda df: _get_model_stats(df[[\"factor\"]], df[\"ret\"]))\n ols_stats_df = pd.DataFrame(ols_stats)\n ols_stats_df.rename(columns={0: \"ols_result\"}, inplace=True)\n ols_stats_df[\"beta\"] = ols_stats_df[\"ols_result\"].apply(lambda x: x[0])\n ols_stats_df[\"tstat_beta\"] = ols_stats_df[\"ols_result\"].apply(lambda x: x[1])\n ols_stats_df[\"pvalue_beta\"] = ols_stats_df[\"ols_result\"].apply(lambda x: x[2])\n ols_stats_df[\"se_beta\"] = ols_stats_df[\"ols_result\"].apply(lambda x: x[3])\n ols_stats_df = ols_stats_df[[\"beta\", \"tstat_beta\", \"pvalue_beta\", \"se_beta\"]]\n\n # if self.is_roll_rebalance:\n # ols_stats_df = ols_stats_df.rolling(self.rabalance_period).mean(\n # ).loc[ols_stats_df.index[:: self.rabalance_period]]\n\n roll_beta_period = 12\n ols_stats_df[\"cum_beta\"] = ols_stats_df[\"beta\"].cumsum()\n ols_stats_df[\"roll_beta\"] = ols_stats_df[\"beta\"].rolling(\n roll_beta_period).mean()\n\n # 因子收益率数据加工\n ols_stats_df[\"abs_t_value\"] = ols_stats_df[\"tstat_beta\"].abs()\n # 相应指标\n beta_mean = np.round(ols_stats_df[\"beta\"].mean(), 4)\n beta_std = np.round(ols_stats_df[\"beta\"].std(), 4)\n positive_beta_ratio = np.round(\n len(ols_stats_df[\"beta\"][ols_stats_df[\"beta\"] > 0]) / len(ols_stats_df), 4) * 100\n abs_t_mean = np.round(ols_stats_df[\"abs_t_value\"].mean(), 4)\n abs_t_value_over_two_ratio = np.round(len(\n ols_stats_df[\"abs_t_value\"][ols_stats_df[\"abs_t_value\"] > 2]) / len(ols_stats_df[\"abs_t_value\"]), 4)\n p_value_less_ratio = np.round(len(\n ols_stats_df[\"pvalue_beta\"][ols_stats_df[\"pvalue_beta\"] < 0.05]) / len(ols_stats_df[\"pvalue_beta\"]), 4)\n\n results = {\n \"stats\": {\n \"beta_mean\": beta_mean,\n \"beta_std\": beta_std,\n \"positive_beta_ratio\": positive_beta_ratio,\n \"abs_t_mean\": abs_t_mean,\n \"abs_t_value_over_two_ratio\": abs_t_value_over_two_ratio,\n \"p_value_less_ratio\": p_value_less_ratio,\n },\n \"title\": f\"{factor_name}: 因子收益率分析\",\n }\n self.log.info(\"ols_stats_processing process %.3fs\" %\n (time.time() - start_time))\n return ols_stats_df, results\n\n def group_processing(self, merge_df, base_factor_df, factor_name):\n start_time = time.time()\n self.log.info(\"group_processing start ...\")\n\n def _fill_ix_na(df):\n df['rebalance_index'] = df['ix'].fillna(method='ffill')\n return df\n\n def _unify_factor(tmp):\n \"\"\"因子以调仓期首日因子为准\"\"\"\n tmp['factor'] = list(tmp['factor'])[0]\n return tmp\n\n def _cut_box(df, quantile_num=5):\n if df.factor.isnull().sum() == len(df): # 因子值全是nan的话\n df[\"factor_group\"] = [np.nan] * len(df)\n else:\n labels = [str(i) for i in range(quantile_num)]\n df[\"factor_group\"] = pd.qcut(\n df[\"factor\"], quantile_num, labels=labels) # 升序排序,分成5组\n return df\n\n # 计算绩效指标\n def _get_stats(results, col_name):\n import empyrical\n\n return_ratio = np.round(\n empyrical.cum_returns_final(results[col_name]), 4)\n annual_return_ratio = np.round(\n empyrical.annual_return(results[col_name]), 4)\n sharp_ratio = np.round(empyrical.sharpe_ratio(\n results[col_name], 0.035/252), 4)\n return_volatility = np.round(\n empyrical.annual_volatility(results[col_name]), 4)\n max_drawdown = np.round(empyrical.max_drawdown(results[col_name]), 4)\n\n res = {'收益率': [return_ratio]}\n date_dict = {1: \"1日\", 5: \"1周\", 22: \"1月\"}\n for n in [1, 5, 22]:\n res['近{}收益率'.format(date_dict[n])] = np.round(results[col_name.replace('ret', 'pv')][-1] / results[col_name.replace('ret', 'pv')][-(n+1)] -1, 4)\n res.update({\n '年化收益率': [annual_return_ratio],\n '夏普比率': [sharp_ratio],\n '收益波动率': [return_volatility],\n '最大回撤': [max_drawdown]})\n return pd.DataFrame(res)\n\n merge_df2 = pd.merge(base_factor_df[['date', 'instrument', 'factor', 'daily_ret_1']],\n merge_df[['date', 'instrument', 'ix']], how='left', on=['date', 'instrument'])\n\n merge_df2 = merge_df2.groupby('instrument').apply(_fill_ix_na)\n unify_factor_df = merge_df2.groupby(['rebalance_index', 'instrument']).apply(_unify_factor)\n\n group_df = unify_factor_df.groupby(\"date\").apply(_cut_box, quantile_num=self.quantile_num)\n \n # 计算每组每天的收益率\n result = group_df[['date', 'factor_group', 'daily_ret_1', 'rebalance_index', 'ix']].groupby(\n ['factor_group', 'date']).mean().reset_index()\n # 调仓日的收益率需要扣除交易成本\n result['daily_ret_1'] -= (self.buy_commission_rate + self.sell_commission_rate) * \\\n np.where(result['ix'].isna(), 0, 1)\n\n result_table = result.pivot(\n values=\"daily_ret_1\", columns=\"factor_group\", index=\"date\")\n \n result_table.rename(\n columns={i: 'top%s_ret' % i for i in result_table.columns}, inplace=True)\n\n # if self.is_roll_rebalance:\n # result_table = result_table.rolling(self.rabalance_period).mean(\n # ).loc[result_table.index[:: self.rabalance_period]]\n\n small_quantile_name = result_table.columns.min()\n big_quantile_name = result_table.columns.max()\n result_table[\"LS_ret\"] = result_table[small_quantile_name] - result_table[big_quantile_name]\n # 移除na值,防止收益计算为难\n result_table.dropna(inplace=True)\n\n for i in result_table.columns:\n col_name = i.split(\"_\")[0] + \"_\" + \"pv\"\n result_table[col_name] = (1 + result_table[i]).cumprod()\n\n small_quantile_perf = _get_stats(result_table, small_quantile_name)\n big_quantile_perf = _get_stats(result_table, big_quantile_name)\n df = pd.concat([small_quantile_perf, big_quantile_perf])\n df.index = [small_quantile_name, big_quantile_name]\n results = {\n \"stats\": df.T.to_dict(),\n \"title\": f\"{factor_name}: 因子绩效分析\",\n }\n self.log.info(\"group_processing process %.3fs\" %\n (time.time() - start_time))\n return result_table, results\n\n def process(self, continus_contract_df, factor_exprs):\n factor_data = []\n performance_data = []\n # 更新结束日期\n is_live_run = T.live_run_param(\"trading_date\", None) is not None\n if is_live_run:\n self.end_date = T.live_run_param(\"trading_date\", \"trading_date\")\n # 进行因子计算\n for factor_expr in factor_exprs:\n # continus_contract_df = self.load_continus_instrument(df)\n merge_df, base_factor_df, factor_name = self.data_processing(\n continus_contract_df, factor_expr)\n \n ic_data = self.ic_processing(merge_df, factor_name)\n ols_data = self.ols_stats_processing(merge_df, factor_name)\n group_data = self.group_processing(merge_df, base_factor_df, factor_name)\n\n # 保存因子相关信息\n options_data = {\n \"start_date\": self.start_date,\n \"end_date\": self.end_date,\n \"rabalance_period\": self.rabalance_period,\n \"buy_commission_rate\": self.buy_commission_rate,\n \"sell_commission_rate\": self.sell_commission_rate,\n # \"is_roll_rebalance\": self.is_roll_rebalance,\n \"ic_method\": self.ic_method,\n \"quantile_num\": self.quantile_num,\n }\n result = {\n \"summary\": {\"IC\": ic_data[1], \"FactorReturns\": ols_data[1], \"QuantileReturns\": group_data[1]},\n \"data\": {\"IC\": ic_data[0], \"FactorReturns\": ols_data[0], \"QuantileReturns\": group_data[0]},\n \"options\": options_data,\n }\n factor_data.append(base_factor_df)\n performance_data.append(result)\n\n return factor_data, performance_data\n \n# print(input_2.read())\n df = input_1.read()\n factor_exprs = input_2.read()\n log = BigLogger('FuturesPerformance')\n fp = FuturesPerformance(log, start_date, end_date, rabalance_period, buy_commission_rate, sell_commission_rate, \n ic_method, quantile_num, is_standardlize, is_winsorize)\n data_1, data_2 = fp.process(df, factor_exprs)\n data_1 = DataSource.write_pickle(data_1)\n data_2 = DataSource.write_pickle(data_2)\n# data_1 = DataSource('994493c3e5164362b7dec1793d6a466aT').read()\n# data_1 = DataSource.write_pickle(data_1)\n\n return Outputs(data_1=data_1, data_2=data_2, data_3=None)\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"post_run","Value":"# 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。\ndef bigquant_run(outputs):\n from jinja2 import Template\n from biglearning.module2.common.utils import display_html\n \n class RenderHtml:\n ic_stats_template = \"\"\"\n <div style=\"width:100%;text-align:center;color:#333333;margin-bottom:16px;font-size:12px;\"><h2>{{ title }}</h2></div>\n <div class='kpicontainer'>\n <ul class='kpi'>\n <li><span class='title'>IC均值</span><span class='value'>{{ stats.ic_mean }}</span></li>\n <li><span class='title'>IC标准差</span><span class='value'>{{ stats.ic_std }}</span></li>\n <li><span class='title'>ICIR</span><span class='value'>{{ stats.ic_ir }}</span></li>\n <li><span class='title'>IC正值次数</span><span class='value'>{{ stats.positive_ic_cnt }}次</span></li>\n <li><span class='title'>IC负值次数</span><span class='value'>{{ stats.negative_ic_cnt }}次</span></li>\n <li><span class='title'>IC偏度</span><span class='value'>{{ stats.ic_skew }}</span></li>\n <li><span class='title'>IC峰度</span><span class='value'>{{ stats.ic_kurt }}</span></li>\n </ul>\n </div>\n \"\"\"\n ols_stats_template = \"\"\"\n <div style=\"width:100%;text-align:center;color:#333333;margin-bottom:16px;font-size:12px;\"><h2>{{ title }}</h2></div>\n <div class='kpicontainer'>\n <ul class='kpi'>\n <li><span class='title'>因子收益均值</span><span class='value'>{{ stats.beta_mean }}</span></li>\n <li><span class='title'>因子收益标准差</span><span class='value'>{{ stats.beta_std }}</span></li>\n <li><span class='title'>因子收益为正比率</span><span class='value'>{{ stats.positive_beta_ratio }}%</span></li>\n <li><span class='title'>t值绝对值的均值</span><span class='value'>{{ stats.abs_t_mean }}</span></li>\n <li><span class='title'>t值绝对值大于2的比率</span><span class='value'>{{ stats.abs_t_value_over_two_ratio }}</span></li>\n <li><span class='title'>因子收益t检验p值小于0.05的比率</span><span class='value'>{{ stats.p_value_less_ratio }}</span></li>\n </ul>\n </div>\n \"\"\"\n group_stats_template = \"\"\"\n <div style=\"width:100%;text-align:center;color:#333333;margin-bottom:16px;font-size:12px;\"><h2>{{ title }}</h2></div>\n <div class='kpicontainer'>\n <ul class='kpi'>\n <li><span class='title'>&nbsp;</span>\n {% for k in stats%}\n <span class='value'>{{ k }}</span>\n {% endfor %}\n </li>\n <li><span class='title'>收益率</span>\n {% for k in stats%}\n <span class='value'>{{ (stats[k].收益率 | string)[0:10] }}</span>\n {% endfor %}\n </li>\n <li><span class='title'>近1日收益率</span>\n {% for k in stats%}\n <span class='value'>{{ (stats[k].近1日收益率 | string)[0:10] }}</span>\n {% endfor %}\n </li>\n <li><span class='title'>近1周收益率</span>\n {% for k in stats%}\n <span class='value'>{{ (stats[k].近1周收益率 | string)[0:10] }}</span>\n {% endfor %}\n </li>\n <li><span class='title'>近1月收益率</span>\n {% for k in stats%}\n <span class='value'>{{ (stats[k].近1月收益率 | string)[0:10] }}</span>\n {% endfor %}\n </li>\n <li><span class='title'>年化收益率</span>\n {% for k in stats%}\n <span class='value'>{{ (stats[k].年化收益率 | string)[0:10] }}</span>\n {% endfor %}\n </li>\n <li><span class='title'>夏普比率</span>\n {% for k in stats%}\n <span class='value'>{{ (stats[k].夏普比率 | string)[0:10] }}</span>\n {% endfor %}\n </li>\n <li><span class='title'>收益波动率</span>\n {% for k in stats%}\n <span class='value'>{{ (stats[k].收益波动率 | string)[0:10] }}</span>\n {% endfor %}\n </li>\n <li><span class='title'>最大回撤</span>\n {% for k in stats%}\n <span class='value'>{{ (stats[k].最大回撤 | string)[0:10] }}</span>\n {% endfor %}\n </li>\n </ul>\n </div>\n \"\"\"\n\n def __init__(self, ic_data, ic_summary, factor_returns_data, factor_returns_summary, quantile_returns_data, quantile_returns_summary):\n self.ic_df = ic_data\n self.ic_results = ic_summary\n self.ols_stats_df = factor_returns_data\n self.ols_stats_results = factor_returns_summary\n self.group_df = quantile_returns_data\n self.group_df_results = quantile_returns_summary\n\n def render_results(self, stats_template, results):\n \"\"\" 展示模板信息 \"\"\"\n\n def render(stats_template, results):\n html = Template(stats_template).render(stats=results[\"stats\"], title=results[\"title\"])\n display_html(html)\n\n render(stats_template, results)\n\n def show_ic(self):\n self.render_results(self.ic_stats_template, self.ic_results)\n T.plot(\n self.ic_df,\n title=\"IC分析\",\n panes=[[\"ic\", \"40%\"], [\"ic_cumsum\", \"20%\"]],\n # height=500,设置高度为500\n options={\n \"chart\": {\"height\": 500},\n # 设置颜色\n \"series\": [\n {\n \"name\": \"ic\",\n \"color\": \"#8085e8\",\n \"type\": \"column\",\n \"yAxis\": 0,\n },\n {\n \"name\": \"ic_cumsum\",\n \"color\": \"#8d4653\",\n \"type\": \"spline\",\n \"yAxis\": 0,\n },\n ],\n },\n )\n\n def show_ols(self):\n self.render_results(self.ols_stats_template, self.ols_stats_results)\n T.plot(\n self.ols_stats_df[[\"beta\", \"cum_beta\", \"roll_beta\"]],\n title=\"因子收益率\",\n # high、low显示在第一栏,高度40%,open、close显示在第二栏,其他的在最后一栏\n panes=[[\"beta\", \"cum_beta\", \"40%\"], [\"roll_beta\", \"20%\"]],\n # height=500,设置高度为500\n options={\n \"chart\": {\"height\": 500},\n # 设置颜色\n \"series\": [\n {\n \"name\": \"beta\",\n \"color\": \"#8085e8\",\n \"type\": \"column\",\n \"yAxis\": 0,\n },\n {\n \"name\": \"cum_beta\",\n \"color\": \"#8d4653\",\n \"type\": \"column\",\n \"yAxis\": 0,\n },\n {\n \"name\": \"roll_beta\",\n \"color\": \"#91e8e1\",\n \"type\": \"spline\",\n \"yAxis\": 1,\n },\n ],\n },\n )\n\n def show_group(self):\n self.render_results(self.group_stats_template, self.group_df_results)\n T.plot(self.group_df[[i for i in self.group_df.columns if \"_pv\" in i]])\n \n def show(self):\n self.show_ic()\n self.show_ols()\n self.show_group()\n \n # 读取 IC,FactorReturns,QuantileReturns用作展示\n performance_data = outputs.data_2.read()\n for data in performance_data:\n ic_data = data[\"data\"][\"IC\"]\n factor_returns_data = data[\"data\"][\"FactorReturns\"]\n quantile_returns_data = data[\"data\"][\"QuantileReturns\"]\n ic_summary = data[\"summary\"][\"IC\"]\n factor_returns_summary = data[\"summary\"][\"FactorReturns\"]\n quantile_returns_summary = data[\"summary\"][\"QuantileReturns\"]\n renderhtml = RenderHtml(ic_data, ic_summary, factor_returns_data, factor_returns_summary, quantile_returns_data, quantile_returns_summary)\n renderhtml.show()\n return outputs\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"input_ports","Value":"input_1, input_2","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"params","Value":"{\n 'start_date': '2020-01-01',\n 'end_date': '2021-05-15',\n 'rabalance_period': 5,\n 'buy_commission_rate': 0.0005,\n 'sell_commission_rate': 0.0005,\n 'ic_method': 'Rank_IC',\n 'quantile_num': 5,\n 'is_standardlize': True,\n 'is_winsorize': True\n}","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"output_ports","Value":"data_1, data_2","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_1","NodeId":"-235"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_2","NodeId":"-235"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_3","NodeId":"-235"}],"OutputPortsInternal":[{"Name":"data_1","NodeId":"-235","OutputType":null},{"Name":"data_2","NodeId":"-235","OutputType":null},{"Name":"data_3","NodeId":"-235","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":2,"IsPartOfPartialRun":null,"Comment":"期货因子分析","CommentCollapsed":false},{"Id":"-404","ModuleId":"BigQuantSpace.filter.filter-v3","ModuleParameters":[{"Name":"expr","Value":"in_csi300==1","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"output_left_data","Value":"False","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_data","NodeId":"-404"}],"OutputPortsInternal":[{"Name":"data","NodeId":"-404","OutputType":null},{"Name":"left_data","NodeId":"-404","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":3,"IsPartOfPartialRun":null,"Comment":"","CommentCollapsed":true},{"Id":"-408","ModuleId":"BigQuantSpace.datahub_load_datasource.datahub_load_datasource-v1","ModuleParameters":[{"Name":"table","Value":"index_constituent_CN_STOCK_A","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"start_date","Value":"2020-05-01","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"end_date","Value":"2020-12-01","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"instruments","Value":"# #号开始的表示注释,注释需单独一行\n# 每行一条\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"fields","Value":"# #号开始的表示注释,注释需单独一行\n# 每行一条\n","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[],"OutputPortsInternal":[{"Name":"data","NodeId":"-408","OutputType":null}],"UsePreviousResults":false,"moduleIdForCode":4,"IsPartOfPartialRun":null,"Comment":"","CommentCollapsed":true},{"Id":"-638","ModuleId":"BigQuantSpace.cached.cached-v3","ModuleParameters":[{"Name":"run","Value":"# Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端\ndef bigquant_run(input_1, input_2, input_3):\n # 示例代码如下。在这里编写您的代码\n dt = input_1.read()\n start_date = dt['date'].iloc[0].strftime('%Y-%m-%d')\n end_date = dt['date'].iloc[-1].strftime('%Y-%m-%d')\n instruments = dt['instrument'].unique()\n dic = {'instruments':instruments,'start_date':start_date,'end_date':end_date}\n data_1 = DataSource.write_pickle(dic)\n return Outputs(data_1=data_1, data_2=None, data_3=None)\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"post_run","Value":"# 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。\ndef bigquant_run(outputs):\n return outputs\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"input_ports","Value":"","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"params","Value":"{}","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"output_ports","Value":"","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_1","NodeId":"-638"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_2","NodeId":"-638"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_3","NodeId":"-638"}],"OutputPortsInternal":[{"Name":"data_1","NodeId":"-638","OutputType":null},{"Name":"data_2","NodeId":"-638","OutputType":null},{"Name":"data_3","NodeId":"-638","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":5,"IsPartOfPartialRun":null,"Comment":"","CommentCollapsed":true}],"SerializedClientData":"<?xml version='1.0' encoding='utf-16'?><DataV1 xmlns:xsd='http://www.w3.org/2001/XMLSchema' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'><Meta /><NodePositions><NodePosition Node='-28' Position='-143,352,200,200'/><NodePosition Node='-592' Position='68,102,200,200'/><NodePosition Node='-135' Position='-358,-19,200,200'/><NodePosition Node='-151' Position='-158,-155,200,200'/><NodePosition Node='-127' Position='-339,105,200,200'/><NodePosition Node='-235' Position='-216,231,200,200'/><NodePosition Node='-404' Position='-559.7350463867188,-207.0115203857422,200,200'/><NodePosition Node='-408' Position='-556.758056640625,-268.9884796142578,200,200'/><NodePosition Node='-638' Position='-555.758056640625,-140.0921630859375,200,200'/></NodePositions><NodeGroups /></DataV1>"},"IsDraft":true,"ParentExperimentId":null,"WebService":{"IsWebServiceExperiment":false,"Inputs":[],"Outputs":[],"Parameters":[{"Name":"交易日期","Value":"","ParameterDefinition":{"Name":"交易日期","FriendlyName":"交易日期","DefaultValue":"","ParameterType":"String","HasDefaultValue":true,"IsOptional":true,"ParameterRules":[],"HasRules":false,"MarkupType":0,"CredentialDescriptor":null}}],"WebServiceGroupId":null,"SerializedClientData":"<?xml version='1.0' encoding='utf-16'?><DataV1 xmlns:xsd='http://www.w3.org/2001/XMLSchema' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'><Meta /><NodePositions></NodePositions><NodeGroups /></DataV1>"},"DisableNodesUpdate":false,"Category":"user","Tags":[],"IsPartialRun":true}
    In [2]:
    # 本代码由可视化策略环境自动生成 2021年5月27日17:41
    # 本代码单元只能在可视化模式下编辑。您也可以拷贝代码,粘贴到新建的代码单元或者策略,然后修改。
    
    
    # Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端
    def m5_run_bigquant_run(input_1, input_2, input_3):
        # 示例代码如下。在这里编写您的代码
        dt = input_1.read()
        start_date = dt['date'].iloc[0].strftime('%Y-%m-%d')
        end_date = dt['date'].iloc[-1].strftime('%Y-%m-%d')
        instruments = dt['instrument'].unique()
        dic = {'instruments':instruments,'start_date':start_date,'end_date':end_date}
        data_1 = DataSource.write_pickle(dic)
        return Outputs(data_1=data_1, data_2=None, data_3=None)
    
    # 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。
    def m5_post_run_bigquant_run(outputs):
        return outputs
    
    # Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端
    def m20_run_bigquant_run(input_1, input_2, input_3):
        # 示例代码如下。在这里编写您的代码
        df = input_1.read()
        data_1 = DataSource.write_df(df[['date','instrument','close', 'Trend_Strength', 'ret_skew', 'ret_kurt']])
        return Outputs(data_1=data_1, data_2=None, data_3=None)
    
    # 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。
    def m20_post_run_bigquant_run(outputs):
        return outputs
    
    # Python 代码入口函数,input_1/2/3 对应三个输入端,data_1/2/3 对应三个输出端
    def m2_run_bigquant_run(input_1, input_2, start_date, end_date, rabalance_period, buy_commission_rate, sell_commission_rate, 
                     ic_method, quantile_num, is_standardlize, is_winsorize):
        import time
        import bigexpr
        
        from bigshared.common.biglogger import BigLogger
        
        # print(start_date,end_date)
        class FuturesPerformance:
            def __init__(
                self,
                log,
                start_date=None,
                end_date=None,
                rabalance_period=22,
                buy_commission_rate=0.0005,
                sell_commission_rate=0.0005,
                ic_method="Rank_IC",
                quantile_num=5,
                is_standardlize=True,
                is_winsorize=True,
            ):
                self.log = log
                self.start_date = start_date
                self.end_date = end_date
                self.rabalance_period = rabalance_period  # 调仓天数
                self.buy_commission_rate = buy_commission_rate  # 买入佣金(百分比)
                self.sell_commission_rate = sell_commission_rate  # 卖出佣金(百分比)
                self.ic_method = ic_method
                self.quantile_num = quantile_num
                self.is_standardlize = is_standardlize  # 是否标准化
                self.is_winsorize = is_winsorize  # 是否去极值
            
            def data_processing(self, continus_contract_df, factor_expr):
                # 表达式抽取
                start_time = time.time()
                self.log.info("data_processing start ...")
    
                def _handle_data(df, assignment_value, price_type):
                    # 计算当期因子和未来一段时间收益率
                    # df["factor"] = df["close"] / df["close"].shift(44) - 1  # 构建因子
                    df["factor"] = bigexpr.evaluate(df, assignment_value, None)
                    # 持有期收益率
                    df["ret"] = df[price_type].shift(-1 * self.rabalance_period) / df[price_type] - 1
                    df['ret'] = df.ret.shift(-1)  # 下一期的收益率
                    df['daily_ret_1'] = df['close'].pct_change().shift(-1)  # 次日收益率
                    return df
    
                # 极值数据处理
                def _winsorize(df):
                    df = df.copy()
                    factor_columns = ["factor"]
                    for factor in factor_columns:
                        mean = df[factor].mean()
                        sigma = df[factor].std()
                        df[factor] = df[factor].clip(mean - 3 * sigma, mean + 3 * sigma)
                    return df
    
                # 标准数据处理
                def _standardlize(df):
                    df = df.copy()
                    factor_columns = ["factor"]
                    for factor in factor_columns:
                        mean = df[factor].mean()
                        sigma = df[factor].std()
                        df[factor] = (df[factor] - mean) / sigma
                    return df
    
                assignment_targets, assignment_value = bigexpr.parse_assignment(factor_expr)
                factor_df = continus_contract_df.groupby("instrument").apply(
                    _handle_data, assignment_value=assignment_value, price_type="close")
    
                base_factor_df = factor_df[["date", "instrument", "close", "ret", "factor", "daily_ret_1"]]
                # 标准化,去极值处理
                if self.is_standardlize and not self.is_winsorize:
                    base_factor_df = base_factor_df.groupby("date").apply(
                        lambda x: _standardlize(x)).reset_index(drop=True)
                elif self.is_winsorize and not self.is_standardlize:
                    base_factor_df = base_factor_df.groupby("date").apply(
                        lambda x: _winsorize(x)).reset_index(drop=True)
                elif self.is_winsorize and self.is_standardlize:
                    base_factor_df = base_factor_df.groupby("date").apply(
                        lambda x: _standardlize(_winsorize(x))).reset_index(drop=True)
                # 对数据根据时间进行过滤
                base_factor_df = base_factor_df[(base_factor_df['date']>self.start_date) & ((
                    base_factor_df['date']<self.end_date))]
    
                # 对应用户抽取的列名
                if assignment_targets:
                    for target in assignment_targets:
                        base_factor_df[target] = base_factor_df["factor"]
    
                # if not self.is_roll_rebalance:
                #     td = D.trading_days(
                #         start_date=base_factor_df.date.min().strftime("%Y-%m-%d"))
                #     rebalance_days = td[:: self.rabalance_period]  # 调仓期
                #     rebalance_days_df = pd.DataFrame(
                #         {"date": rebalance_days["date"], "ix": range(len(rebalance_days))})
                #     rebalance_days_df.index = range(len(rebalance_days_df))
                #     merge_df = pd.merge(
                #         base_factor_df, rebalance_days_df, on="date", how="inner")
                # else:
                #     merge_df = base_factor_df
                td = D.trading_days(start_date=base_factor_df.date.min().strftime('%Y-%m-%d'))
                rebalance_days = td[::self.rabalance_period]  # 调仓期
                rebalance_days_df = pd.DataFrame({'date': rebalance_days['date'], 'ix': range(len(rebalance_days))})
                rebalance_days_df.index = range(len(rebalance_days_df))
                merge_df = pd.merge(base_factor_df, rebalance_days_df, on='date', how='inner')
    
                # 将因子名或因子表达式抽取出来做展示处理
                factor_name = assignment_targets[0] if assignment_targets else assignment_value
                self.log.info("data_processing process %.3fs" % (time.time() - start_time))
                return merge_df, base_factor_df, factor_name
            
            def ic_processing(self, merge_df, factor_name):
                start_time = time.time()
                self.log.info("ic_processing start ...")
    
                def _cal_IC(df, method="Rank_IC"):
                    """计算IC系数"""
                    from scipy.stats import pearsonr, spearmanr
    
                    df = df.dropna()
                    if df.shape[0] == 0:
                        return np.nan
    
                    if method == "Rank_IC":
                        return spearmanr(df["factor"], df["ret"])[0]
                    if method == "IC":
                        return pearsonr(df["factor"], df["ret"])[0]
    
                ic = merge_df.groupby("date").apply(_cal_IC, method=self.ic_method)
                # if self.is_roll_rebalance:
                #     ic = ic.rolling(self.rabalance_period).mean()[
                #         ic.index[:: self.rabalance_period]]
    
                # ic相关指标
                ic_mean = np.round(ic.mean(), 4)
                ic_std = np.round(ic.std(), 4)
                ic_ir = np.round(ic_mean / ic_std, 4)
                positive_ic_cnt = len(ic[ic > 0])
                negative_ic_cnt = len(ic[ic < 0])
                ic_skew = np.round(ic.skew(), 4)
                ic_kurt = np.round(ic.kurt(), 4)
    
                # IC指标展示
                results = {
                    "stats": {
                        "ic_mean": ic_mean,
                        "ic_std": ic_std,
                        "ic_ir": ic_ir,
                        "positive_ic_cnt": positive_ic_cnt,
                        "negative_ic_cnt": negative_ic_cnt,
                        "ic_skew": ic_skew,
                        "ic_kurt": ic_kurt,
                    },
                    "title": f"{factor_name}: IC分析",
                }
    
                ic.name = "ic"
                ic_df = ic.to_frame()
                ic_df["ic_cumsum"] = ic_df["ic"].cumsum()
                self.log.info("ic_processing process  %.3fs" % (time.time() - start_time))
                return ic_df, results
    
            def ols_stats_processing(self, merge_df, factor_name):
                start_time = time.time()
                self.log.info("ols_stats_processing start ...")
    
                def _get_model_stats(X, y):
                    from pyfinance import ols
    
                    model = ols.OLS(y=y, x=X)
                    return [model.beta, model.tstat_beta, model.pvalue_beta, model.se_beta]
    
                ols_stats = merge_df.dropna().groupby("date").apply(
                    lambda df: _get_model_stats(df[["factor"]], df["ret"]))
                ols_stats_df = pd.DataFrame(ols_stats)
                ols_stats_df.rename(columns={0: "ols_result"}, inplace=True)
                ols_stats_df["beta"] = ols_stats_df["ols_result"].apply(lambda x: x[0])
                ols_stats_df["tstat_beta"] = ols_stats_df["ols_result"].apply(lambda x: x[1])
                ols_stats_df["pvalue_beta"] = ols_stats_df["ols_result"].apply(lambda x: x[2])
                ols_stats_df["se_beta"] = ols_stats_df["ols_result"].apply(lambda x: x[3])
                ols_stats_df = ols_stats_df[["beta", "tstat_beta", "pvalue_beta", "se_beta"]]
    
                # if self.is_roll_rebalance:
                #     ols_stats_df = ols_stats_df.rolling(self.rabalance_period).mean(
                #     ).loc[ols_stats_df.index[:: self.rabalance_period]]
    
                roll_beta_period = 12
                ols_stats_df["cum_beta"] = ols_stats_df["beta"].cumsum()
                ols_stats_df["roll_beta"] = ols_stats_df["beta"].rolling(
                    roll_beta_period).mean()
    
                # 因子收益率数据加工
                ols_stats_df["abs_t_value"] = ols_stats_df["tstat_beta"].abs()
                # 相应指标
                beta_mean = np.round(ols_stats_df["beta"].mean(), 4)
                beta_std = np.round(ols_stats_df["beta"].std(), 4)
                positive_beta_ratio = np.round(
                    len(ols_stats_df["beta"][ols_stats_df["beta"] > 0]) / len(ols_stats_df), 4) * 100
                abs_t_mean = np.round(ols_stats_df["abs_t_value"].mean(), 4)
                abs_t_value_over_two_ratio = np.round(len(
                    ols_stats_df["abs_t_value"][ols_stats_df["abs_t_value"] > 2]) / len(ols_stats_df["abs_t_value"]), 4)
                p_value_less_ratio = np.round(len(
                    ols_stats_df["pvalue_beta"][ols_stats_df["pvalue_beta"] < 0.05]) / len(ols_stats_df["pvalue_beta"]), 4)
    
                results = {
                    "stats": {
                        "beta_mean": beta_mean,
                        "beta_std": beta_std,
                        "positive_beta_ratio": positive_beta_ratio,
                        "abs_t_mean": abs_t_mean,
                        "abs_t_value_over_two_ratio": abs_t_value_over_two_ratio,
                        "p_value_less_ratio": p_value_less_ratio,
                    },
                    "title": f"{factor_name}: 因子收益率分析",
                }
                self.log.info("ols_stats_processing process  %.3fs" %
                              (time.time() - start_time))
                return ols_stats_df, results
    
            def group_processing(self, merge_df, base_factor_df, factor_name):
                start_time = time.time()
                self.log.info("group_processing start ...")
    
                def _fill_ix_na(df):
                    df['rebalance_index'] = df['ix'].fillna(method='ffill')
                    return df
    
                def _unify_factor(tmp):
                    """因子以调仓期首日因子为准"""
                    tmp['factor'] = list(tmp['factor'])[0]
                    return tmp
    
                def _cut_box(df, quantile_num=5):
                    if df.factor.isnull().sum() == len(df):  # 因子值全是nan的话
                        df["factor_group"] = [np.nan] * len(df)
                    else:
                        labels = [str(i) for i in range(quantile_num)]
                        df["factor_group"] = pd.qcut(
                            df["factor"], quantile_num, labels=labels)  # 升序排序,分成5组
                    return df
    
                # 计算绩效指标
                def _get_stats(results, col_name):
                    import empyrical
    
                    return_ratio = np.round(
                        empyrical.cum_returns_final(results[col_name]), 4)
                    annual_return_ratio = np.round(
                        empyrical.annual_return(results[col_name]), 4)
                    sharp_ratio = np.round(empyrical.sharpe_ratio(
                        results[col_name], 0.035/252), 4)
                    return_volatility = np.round(
                        empyrical.annual_volatility(results[col_name]), 4)
                    max_drawdown = np.round(empyrical.max_drawdown(results[col_name]), 4)
    
                    res = {'收益率': [return_ratio]}
                    date_dict = {1: "1日", 5: "1周", 22: "1月"}
                    for n in [1, 5, 22]:
                        res['近{}收益率'.format(date_dict[n])] = np.round(results[col_name.replace('ret', 'pv')][-1] / results[col_name.replace('ret', 'pv')][-(n+1)] -1, 4)
                    res.update({
                    '年化收益率': [annual_return_ratio],
                    '夏普比率': [sharp_ratio],
                    '收益波动率': [return_volatility],
                    '最大回撤': [max_drawdown]})
                    return pd.DataFrame(res)
    
                merge_df2 = pd.merge(base_factor_df[['date', 'instrument', 'factor', 'daily_ret_1']],
                                     merge_df[['date', 'instrument', 'ix']], how='left', on=['date', 'instrument'])
    
                merge_df2 = merge_df2.groupby('instrument').apply(_fill_ix_na)
                unify_factor_df = merge_df2.groupby(['rebalance_index', 'instrument']).apply(_unify_factor)
    
                group_df = unify_factor_df.groupby("date").apply(_cut_box, quantile_num=self.quantile_num)
                
                # 计算每组每天的收益率
                result = group_df[['date', 'factor_group', 'daily_ret_1', 'rebalance_index', 'ix']].groupby(
                    ['factor_group', 'date']).mean().reset_index()
                # 调仓日的收益率需要扣除交易成本
                result['daily_ret_1'] -= (self.buy_commission_rate + self.sell_commission_rate) * \
                    np.where(result['ix'].isna(), 0, 1)
    
                result_table = result.pivot(
                    values="daily_ret_1", columns="factor_group", index="date")
                
                result_table.rename(
                    columns={i: 'top%s_ret' % i for i in result_table.columns}, inplace=True)
    
                # if self.is_roll_rebalance:
                #     result_table = result_table.rolling(self.rabalance_period).mean(
                #     ).loc[result_table.index[:: self.rabalance_period]]
    
                small_quantile_name = result_table.columns.min()
                big_quantile_name = result_table.columns.max()
                result_table["LS_ret"] = result_table[small_quantile_name] - result_table[big_quantile_name]
                # 移除na值,防止收益计算为难
                result_table.dropna(inplace=True)
    
                for i in result_table.columns:
                    col_name = i.split("_")[0] + "_" + "pv"
                    result_table[col_name] = (1 + result_table[i]).cumprod()
    
                small_quantile_perf = _get_stats(result_table, small_quantile_name)
                big_quantile_perf = _get_stats(result_table, big_quantile_name)
                df = pd.concat([small_quantile_perf, big_quantile_perf])
                df.index = [small_quantile_name, big_quantile_name]
                results = {
                    "stats": df.T.to_dict(),
                    "title": f"{factor_name}: 因子绩效分析",
                }
                self.log.info("group_processing process  %.3fs" %
                              (time.time() - start_time))
                return result_table, results
    
            def process(self, continus_contract_df, factor_exprs):
                factor_data = []
                performance_data = []
                # 更新结束日期
                is_live_run = T.live_run_param("trading_date", None) is not None
                if is_live_run:
                    self.end_date = T.live_run_param("trading_date", "trading_date")
                # 进行因子计算
                for factor_expr in factor_exprs:
                    # continus_contract_df = self.load_continus_instrument(df)
                    merge_df, base_factor_df, factor_name = self.data_processing(
                        continus_contract_df, factor_expr)
             
                    ic_data = self.ic_processing(merge_df, factor_name)
                    ols_data = self.ols_stats_processing(merge_df, factor_name)
                    group_data = self.group_processing(merge_df, base_factor_df, factor_name)
    
                    # 保存因子相关信息
                    options_data = {
                        "start_date": self.start_date,
                        "end_date": self.end_date,
                        "rabalance_period": self.rabalance_period,
                        "buy_commission_rate": self.buy_commission_rate,
                        "sell_commission_rate": self.sell_commission_rate,
                        # "is_roll_rebalance": self.is_roll_rebalance,
                        "ic_method": self.ic_method,
                        "quantile_num": self.quantile_num,
                    }
                    result = {
                        "summary": {"IC": ic_data[1], "FactorReturns": ols_data[1], "QuantileReturns": group_data[1]},
                        "data": {"IC": ic_data[0], "FactorReturns": ols_data[0], "QuantileReturns": group_data[0]},
                        "options": options_data,
                    }
                    factor_data.append(base_factor_df)
                    performance_data.append(result)
    
                return factor_data, performance_data
            
    #     print(input_2.read())
        df = input_1.read()
        factor_exprs = input_2.read()
        log = BigLogger('FuturesPerformance')
        fp = FuturesPerformance(log, start_date, end_date, rabalance_period, buy_commission_rate, sell_commission_rate, 
                                ic_method, quantile_num, is_standardlize, is_winsorize)
        data_1, data_2 = fp.process(df, factor_exprs)
        data_1 = DataSource.write_pickle(data_1)
        data_2 = DataSource.write_pickle(data_2)
    #     data_1 = DataSource('994493c3e5164362b7dec1793d6a466aT').read()
    #     data_1 = DataSource.write_pickle(data_1)
    
        return Outputs(data_1=data_1, data_2=data_2, data_3=None)
    
    # 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。
    def m2_post_run_bigquant_run(outputs):
        from jinja2 import Template
        from biglearning.module2.common.utils import display_html
        
        class RenderHtml:
            ic_stats_template = """
            <div style="width:100%;text-align:center;color:#333333;margin-bottom:16px;font-size:12px;"><h2>{{ title }}</h2></div>
            <div class='kpicontainer'>
                <ul class='kpi'>
                    <li><span class='title'>IC均值</span><span class='value'>{{ stats.ic_mean }}</span></li>
                    <li><span class='title'>IC标准差</span><span class='value'>{{ stats.ic_std }}</span></li>
                    <li><span class='title'>ICIR</span><span class='value'>{{ stats.ic_ir }}</span></li>
                    <li><span class='title'>IC正值次数</span><span class='value'>{{ stats.positive_ic_cnt }}次</span></li>
                    <li><span class='title'>IC负值次数</span><span class='value'>{{ stats.negative_ic_cnt }}次</span></li>
                    <li><span class='title'>IC偏度</span><span class='value'>{{ stats.ic_skew }}</span></li>
                    <li><span class='title'>IC峰度</span><span class='value'>{{ stats.ic_kurt }}</span></li>
                </ul>
            </div>
            """
            ols_stats_template = """
            <div style="width:100%;text-align:center;color:#333333;margin-bottom:16px;font-size:12px;"><h2>{{ title }}</h2></div>
            <div class='kpicontainer'>
                <ul class='kpi'>
                    <li><span class='title'>因子收益均值</span><span class='value'>{{ stats.beta_mean }}</span></li>
                    <li><span class='title'>因子收益标准差</span><span class='value'>{{ stats.beta_std }}</span></li>
                    <li><span class='title'>因子收益为正比率</span><span class='value'>{{ stats.positive_beta_ratio }}%</span></li>
                    <li><span class='title'>t值绝对值的均值</span><span class='value'>{{ stats.abs_t_mean }}</span></li>
                    <li><span class='title'>t值绝对值大于2的比率</span><span class='value'>{{ stats.abs_t_value_over_two_ratio }}</span></li>
                    <li><span class='title'>因子收益t检验p值小于0.05的比率</span><span class='value'>{{ stats.p_value_less_ratio }}</span></li>
                </ul>
            </div>
            """
            group_stats_template = """
            <div style="width:100%;text-align:center;color:#333333;margin-bottom:16px;font-size:12px;"><h2>{{ title }}</h2></div>
            <div class='kpicontainer'>
                <ul class='kpi'>
                    <li><span class='title'>&nbsp;</span>
                        {% for k in stats%}
                            <span class='value'>{{ k }}</span>
                        {% endfor %}
                    </li>
                    <li><span class='title'>收益率</span>
                        {% for k in stats%}
                            <span class='value'>{{ (stats[k].收益率 | string)[0:10] }}</span>
                        {% endfor %}
                    </li>
                    <li><span class='title'>近1日收益率</span>
                        {% for k in stats%}
                            <span class='value'>{{ (stats[k].近1日收益率 | string)[0:10] }}</span>
                        {% endfor %}
                    </li>
                    <li><span class='title'>近1周收益率</span>
                        {% for k in stats%}
                            <span class='value'>{{ (stats[k].近1周收益率 | string)[0:10] }}</span>
                        {% endfor %}
                    </li>
                    <li><span class='title'>近1月收益率</span>
                        {% for k in stats%}
                            <span class='value'>{{ (stats[k].近1月收益率 | string)[0:10] }}</span>
                        {% endfor %}
                    </li>
                    <li><span class='title'>年化收益率</span>
                        {% for k in stats%}
                            <span class='value'>{{ (stats[k].年化收益率 | string)[0:10] }}</span>
                        {% endfor %}
                    </li>
                    <li><span class='title'>夏普比率</span>
                        {% for k in stats%}
                            <span class='value'>{{ (stats[k].夏普比率 | string)[0:10] }}</span>
                        {% endfor %}
                    </li>
                    <li><span class='title'>收益波动率</span>
                        {% for k in stats%}
                            <span class='value'>{{ (stats[k].收益波动率 | string)[0:10] }}</span>
                        {% endfor %}
                    </li>
                    <li><span class='title'>最大回撤</span>
                        {% for k in stats%}
                            <span class='value'>{{ (stats[k].最大回撤 | string)[0:10] }}</span>
                        {% endfor %}
                    </li>
                 </ul>
            </div>
            """
    
            def __init__(self, ic_data, ic_summary, factor_returns_data, factor_returns_summary, quantile_returns_data, quantile_returns_summary):
                self.ic_df = ic_data
                self.ic_results = ic_summary
                self.ols_stats_df = factor_returns_data
                self.ols_stats_results = factor_returns_summary
                self.group_df = quantile_returns_data
                self.group_df_results = quantile_returns_summary
    
            def render_results(self, stats_template, results):
                """ 展示模板信息 """
    
                def render(stats_template, results):
                    html = Template(stats_template).render(stats=results["stats"], title=results["title"])
                    display_html(html)
    
                render(stats_template, results)
    
            def show_ic(self):
                self.render_results(self.ic_stats_template, self.ic_results)
                T.plot(
                    self.ic_df,
                    title="IC分析",
                    panes=[["ic", "40%"], ["ic_cumsum", "20%"]],
                    # height=500,设置高度为500
                    options={
                        "chart": {"height": 500},
                        # 设置颜色
                        "series": [
                            {
                                "name": "ic",
                                "color": "#8085e8",
                                "type": "column",
                                "yAxis": 0,
                            },
                            {
                                "name": "ic_cumsum",
                                "color": "#8d4653",
                                "type": "spline",
                                "yAxis": 0,
                            },
                        ],
                    },
                )
    
            def show_ols(self):
                self.render_results(self.ols_stats_template, self.ols_stats_results)
                T.plot(
                    self.ols_stats_df[["beta", "cum_beta", "roll_beta"]],
                    title="因子收益率",
                    # high、low显示在第一栏,高度40%,open、close显示在第二栏,其他的在最后一栏
                    panes=[["beta", "cum_beta", "40%"], ["roll_beta", "20%"]],
                    # height=500,设置高度为500
                    options={
                        "chart": {"height": 500},
                        # 设置颜色
                        "series": [
                            {
                                "name": "beta",
                                "color": "#8085e8",
                                "type": "column",
                                "yAxis": 0,
                            },
                            {
                                "name": "cum_beta",
                                "color": "#8d4653",
                                "type": "column",
                                "yAxis": 0,
                            },
                            {
                                "name": "roll_beta",
                                "color": "#91e8e1",
                                "type": "spline",
                                "yAxis": 1,
                            },
                        ],
                    },
                )
    
            def show_group(self):
                self.render_results(self.group_stats_template, self.group_df_results)
                T.plot(self.group_df[[i for i in self.group_df.columns if "_pv" in i]])
            
            def show(self):
                self.show_ic()
                self.show_ols()
                self.show_group()
        
        # 读取 IC,FactorReturns,QuantileReturns用作展示
        performance_data = outputs.data_2.read()
        for data in performance_data:
            ic_data = data["data"]["IC"]
            factor_returns_data = data["data"]["FactorReturns"]
            quantile_returns_data = data["data"]["QuantileReturns"]
            ic_summary = data["summary"]["IC"]
            factor_returns_summary = data["summary"]["FactorReturns"]
            quantile_returns_summary = data["summary"]["QuantileReturns"]
            renderhtml = RenderHtml(ic_data, ic_summary, factor_returns_data, factor_returns_summary, quantile_returns_data, quantile_returns_summary)
            renderhtml.show()
        return outputs
    
    
    m14 = M.input_features.v1(
        features="""Trend_Strength
    ret_skew
    ret_kurt""",
        m_cached=False
    )
    
    m16 = M.input_features.v1(
        features="""_last_price = close.iloc[-1] 
    close = _last_price  ##因子分析必须
    
    ## 动量类因子
    # 趋势强度
    _p1 = close.iloc[-1] - close.iloc[0] #一日收盘价差值
    _p2 = abs(close-close.shift(1))[1:].sum() #今昨两日收盘价分钟序列差值求和
    Trend_Strength = _p1 / _p2 #日内价格位移与路程之比
    
    ## 收益率分布因子
    # 高频偏度和峰度
    _ret_log = log(close.pct_change().fillna(method='bfill') + 1) #对数收益率
    ret_skew = _ret_log.skew() #收益率偏度
    ret_kurt = _ret_log.kurt() #收益率峰度"""
    )
    
    m4 = M.datahub_load_datasource.v1(
        table='index_constituent_CN_STOCK_A',
        start_date='2020-05-01',
        end_date='2020-12-01',
        instruments="""# #号开始的表示注释,注释需单独一行
    # 每行一条
    """,
        fields="""# #号开始的表示注释,注释需单独一行
    # 每行一条
    """
    )
    
    m3 = M.filter.v3(
        input_data=m4.data,
        expr='in_csi300==1',
        output_left_data=False
    )
    
    m5 = M.cached.v3(
        input_1=m3.data,
        run=m5_run_bigquant_run,
        post_run=m5_post_run_bigquant_run,
        input_ports='',
        params='{}',
        output_ports=''
    )
    
    m15 = M.feature_extractor_1m.v1(
        instruments=m5.data_1,
        features=m16.data,
        start_date='',
        end_date='',
        before_start_days=0,
        workers=4,
        parallel_mode='集群',
        table_1m='level2_bar1m_CN_STOCK_A'
    )
    
    m20 = M.cached.v3(
        input_1=m15.data,
        run=m20_run_bigquant_run,
        post_run=m20_post_run_bigquant_run,
        input_ports='',
        params='{}',
        output_ports=''
    )
    
    m2 = M.cached.v3(
        input_1=m20.data_1,
        input_2=m14.data,
        run=m2_run_bigquant_run,
        post_run=m2_post_run_bigquant_run,
        input_ports='input_1, input_2',
        params="""{
        'start_date': '2020-01-01',
        'end_date': '2021-05-15',
        'rabalance_period': 5,
        'buy_commission_rate': 0.0005,
        'sell_commission_rate': 0.0005,
        'ic_method': 'Rank_IC',
        'quantile_num': 5,
        'is_standardlize': True,
        'is_winsorize': True
    }""",
        output_ports='data_1, data_2'
    )
    
    m13 = M.factorlens_preservation.v1(
        factors_df=m2.data_1,
        performance_data=m2.data_2,
        features=m14.data,
        factor_column='[\'Trend_Strength\', \'ret_skew\', \'ret_kurt\']',
        factor_name='hf_Trend_Strength, hf_ret_skew, hf_ret_kurt',
        m_cached=False
    )