股票ETF倒波动率风险平权混合策略

由 xuxiaoyin 创建 , 更新于 2026-06-18T15:58:27.103354+08:00
# -*- coding: utf-8 -*- """ 风险平权混合策略(无择时) ============================ 在"股票端 + ETF端"混合框架上,把两端的资金分配从【固定等权(45/45)】 改为【倒波动率风险平权】,解决"股票和基金风险不对等"的问题。 各端内部逻辑保持原样:   股票端:小盘高股息选 N 只,端内等权   ETF端 :动量评分选 top M + 均值方差优化(MVO)权重(搬自 小盘股息率_2 复杂版) 两端之间(核心改动):倒波动率分配   1. 用各端【内部权重】合成两端的"伪历史收益序列"(当场用标的历史价格算,      不依赖任何净值记录,回测/实盘第一天即可用):        股票端序列 = Σ(等权_i × 各股票近 VOL_WINDOW 日收益)        ETF端序列  = Σ(MVO权重_i × 各ETF近 VOL_WINDOW 日收益)   2. 各算年化波动率 σ_stock、σ_etf   3. 倒波动率定两端相对权重:w_stock = (1/σ_s) / (1/σ_s + 1/σ_e)   4. 相对权重截断到 [W_MIN, W_MAX] 后归一化,再乘总仓位 TOTAL_POS   协方差矩阵:组内不用、两端倒波动率也不用 → 全程无需协方差。 实现要点:任一端调仓日都重算两端配比,并把两端都对齐到最新配比   (非调仓端只按其保存的内部权重缩放,不重新选股,参考复杂版 signal_changed 逻辑),   保证总仓位恒为 TOTAL_POS、风险平权配比每个调仓日刷新。 """ from xxybacktest import run_backtest, order_target_percent, OrderCost from xxybacktest.data import DailyInfo import pandas as pd import numpy as np cost = OrderCost(     open_tax=0,     close_tax=0.001,     open_commission=0.0003,     close_commission=0.0003,     min_commission=5, ) # ===================== 可调参数 ===================== VOL_WINDOW = 20      # 倒波动率回看窗口(交易日,约一个月) W_MIN, W_MAX = 0.3, 0.7   # 股票端相对权重的上下限(ETF端对称) TOTAL_POS = 0.9      # 总投资仓位(剩余为现金) VERBOSE = True       # 打印每次调仓的权重变化日志 # =================================================== def initialize(context):     from xxybacktest import Data     _DEFAULT_PARAMS = {         "specified_etfs": (             '160723.SZ', '513100.SH', '159985.SZ', '518880.SH',             '513130.SH', '510300.SH', '159915.SZ', '511380.SZ',             '513520.SZ', '159980.SZ', '513500.SH',         ),         "stock_num": 5,         "etf_num": 6,         "stock_period": 22,         "etf_period": 8,         # MVO 参数(搬自复杂版)         "risk_aversion": 1.0,         "mv_min_weight": 0.05,         "mv_max_weight": 0.3,         "trend_period": 25,         "history_bar_count": 21,     }     g = context.g     for k, v in _DEFAULT_PARAMS.items():         setattr(g, k, v)     sd_stock = (pd.to_datetime(context.trade.start_time) - pd.Timedelta(days=30)).strftime("%Y-%m-%d")     sd_etf = (pd.to_datetime(context.trade.start_time) - pd.Timedelta(days=39)).strftime("%Y-%m-%d")     end_date = context.trade.end_time     # ===== 1. 股票端:小盘高股息选股(端内等权)=====     sql_stock = f"""     with t1 as (         select date, instrument, total_market_cap as cap, dividend_yield_ratio,         RANK() OVER (PARTITION BY date ORDER BY dividend_yield_ratio DESC) / COUNT(*) OVER (PARTITION BY date) AS rank_ratio,         from valuation         inner join daily_bar using (date, instrument)         inner join basic_info using (date, instrument)         inner join stock_status using (date, instrument)         where close <= 20         and list_days >= 200 and st_status = 0 and suspended = 0         and list_sector = 1 and stock_status.is_risk_warning = 0         and date >= '{sd_stock}' and date <= '{end_date}'         QUALIFY rank_ratio < 0.25     )     select date, instrument, RANK() OVER (PARTITION BY date ORDER BY cap) / COUNT(*) OVER (PARTITION BY date) as score     from t1     order by date, score, dividend_yield_ratio DESC     """     context.stock_df = Data._db.query(sql_stock).df().set_index('date')     # ===== 2. ETF端:动量评分(趋势打分 + 均线比),供 MVO 前的选股 =====     sql_etf = f"""     SELECT date, instrument, close,         close / (lag(close, 18) over (partition by instrument order by date)) - 1 AS stop_gain_signal,         (avg(close) over (partition by instrument order by date rows 9 preceding))             / (avg(close) over (partition by instrument order by date rows 4 preceding)) AS ma_ratio_5_10     FROM daily_fund     WHERE date >= '{sd_etf}' AND date <= '{end_date}'       AND instrument in {context.g.specified_etfs}     """     etf_df = Data._db.query(sql_etf).df().sort_values(['instrument', 'date'])     def trend_score(close, period):         if len(close) < period:             return np.full_like(close, np.nan)         y = np.log(close)         windows = np.lib.stride_tricks.sliding_window_view(y, window_shape=period)         x = np.arange(period)         n = period         sum_x = x.sum(); sum_x2 = (x ** 2).sum()         denom = n * sum_x2 - sum_x ** 2         sum_y = windows.sum(axis=1); sum_xy = (windows * x).sum(axis=1)         slope = (n * sum_xy - sum_x * sum_y) / denom         intercept = (sum_y - slope * sum_x) / n         annualized = np.exp(slope * 250) - 1         y_pred = slope[:, None] * x + intercept[:, None]         ss_res = np.sum((windows - y_pred) ** 2, axis=1)         ss_tot = np.sum(windows ** 2, axis=1) - (sum_y ** 2) / n         r2 = np.nan_to_num(1 - ss_res / ss_tot, nan=0.0)         score = annualized * r2         full = pd.Series(index=close.index, dtype=float)         full[period - 1:] = score         return full     etf_df['trend_score'] = etf_df.groupby('instrument')['close'].transform(         lambda x: trend_score(x, context.g.trend_period))     etf_df['score'] = etf_df['trend_score'] + etf_df['ma_ratio_5_10']     context.etf_df = etf_df     # ===== 3. 注入 ETF 日线到缓存(供 history/下单取价)=====     etf_cache_df = Data._db.query(f"""         SELECT instrument, date, name, open, high, low, close, pre_close,                volume, amount, upper_limit, lower_limit         FROM daily_fund         WHERE date >= '{sd_etf}' AND date <= '{end_date}'           AND instrument IN {context.g.specified_etfs}     """).df()     for _, row in etf_cache_df.iterrows():         date_str = row['date'].strftime('%Y-%m-%d') if hasattr(row['date'], 'strftime') else str(row['date'])[:10]         code = row['instrument']         if Data._daily_cache is None:             Data._daily_cache = {}         if date_str not in Data._daily_cache:             Data._daily_cache[date_str] = {}         volume = int(row['volume']) if pd.notna(row['volume']) else 0         amount_val = float(row['amount']) if pd.notna(row['amount']) else 0.0         Data._daily_cache[date_str][code] = DailyInfo(             ts_code=code, name=str(row['name']) if pd.notna(row['name']) else '',             open=float(row['open']) if pd.notna(row['open']) else 0.0,             high=float(row['high']) if pd.notna(row['high']) else 0.0,             low=float(row['low']) if pd.notna(row['low']) else 0.0,             close=float(row['close']) if pd.notna(row['close']) else 0.0,             pre_close=float(row['pre_close']) if pd.notna(row['pre_close']) else 0.0,             volume=volume, amount=amount_val,             vwap=amount_val / (volume + 1) if volume >= 0 else 0.0,             upLimit=float(row['upper_limit']) if pd.notna(row['upper_limit']) else float('inf'),             downLimit=float(row['lower_limit']) if pd.notna(row['lower_limit']) else 0.0,             stop=1 if volume == 0 else 0, st_status=0,         )     name_df = etf_cache_df[etf_cache_df['name'].notna() & (etf_cache_df['name'] != '')][['instrument', 'name']].drop_duplicates('instrument')     for _, row in name_df.iterrows():         if row['instrument'] not in Data._instrument_names:             Data._instrument_names[row['instrument']] = row['name']     # ===== 4. 状态初始化 =====     g.stock_index = -1     g.etf_index = -1     g.etf_universe = set(g.specified_etfs)     g.stock_targets = []          # 当前股票端持仓(等权)     g.etf_weights = {}            # 当前ETF端 MVO 内部权重(和为1) def _mean_variance_optimization(returns, risk_aversion, min_weight, max_weight):     """SLSQP 均值方差优化(搬自复杂版),返回和为1的权重 Series。"""     from scipy.optimize import minimize     exp_ret = returns.mean() * 252     cov = returns.cov() * 252     n = len(exp_ret)     w0 = np.clip(np.ones(n) / n, min_weight, max_weight)     w0 = w0 / w0.sum()     def obj(w):         return -(np.sum(exp_ret * w) - 0.5 * risk_aversion * np.dot(w, np.dot(cov, w)))     cons = [{'type': 'eq', 'fun': lambda x: np.sum(x) - 1}]     bounds = [(min_weight, max_weight)] * n     res = minimize(obj, w0, method='SLSQP', bounds=bounds, constraints=cons,                    options={'maxiter': 5000, 'ftol': 1e-9, 'eps': 1e-8})     w = res.x if res.success else np.clip(np.ones(n) / n, min_weight, max_weight)     w = np.clip(w, min_weight, max_weight)     w = w / w.sum()     return pd.Series(w, index=exp_ret.index) def _basket_vol(context, instruments, weights):     """     用各端【内部权重】合成伪历史收益序列,返回年化波动率。     instruments: list[str]; weights: dict 或 等权(None)。     数据不足/缺失返回 np.nan,由调用方兜底。     取数:context.history 返回 {code: np.recarray},recarray.close 已是 ndarray     (注意不是 DataFrame,没有 .values 属性)。history 自动防未来:截止上一交易日。     """     if not instruments:         return np.nan     hist = context.history(instruments, ['close'], VOL_WINDOW + 1)     if not hist:         return np.nan     rets = pd.DataFrame()     for ins in instruments:         rec = hist.get(ins)         if rec is None:             continue         close = np.asarray(rec.close, dtype=float)         # 去掉 NaN(该标的当日无数据时 history 填 nan)         if len(close) < VOL_WINDOW + 1 or np.isnan(close).any() or np.any(close[:-1] == 0):             continue         rets[ins] = close[1:] / close[:-1] - 1     if rets.shape[1] == 0:         return np.nan     if weights is None:  # 等权         w = np.ones(rets.shape[1]) / rets.shape[1]     else:         w = np.array([weights.get(c, 0.0) for c in rets.columns])         if w.sum() == 0:             return np.nan         w = w / w.sum()  # 仅对有数据的标的归一化     port_ret = (rets.values * w).sum(axis=1)     return port_ret.std() * np.sqrt(252)     if weights is None:  # 等权         w = np.ones(rets.shape[1]) / rets.shape[1]     else:         w = np.array([weights.get(c, 0.0) for c in rets.columns])         if w.sum() == 0:             return np.nan         w = w / w.sum()  # 仅对有数据的标的归一化     port_ret = (rets.values * w).sum(axis=1)     return port_ret.std() * np.sqrt(252) def _risk_parity_split(sigma_stock, sigma_etf):     """倒波动率两端分配。返回 (stock_total, etf_total, info)。     info 含中间量供日志:raw_ws(截断前股票相对权重), ws(截断后), clipped(是否触发截断), fallback(是否退回均分)。"""     fallback = False     if not np.isfinite(sigma_stock) or not np.isfinite(sigma_etf) or sigma_stock <= 0 or sigma_etf <= 0:         raw_ws = 0.5         fallback = True     else:         inv_s, inv_e = 1.0 / sigma_stock, 1.0 / sigma_etf         raw_ws = inv_s / (inv_s + inv_e)     ws = min(max(raw_ws, W_MIN), W_MAX)   # 股票端相对权重夹在 [W_MIN, W_MAX]     we = 1.0 - ws     info = {"raw_ws": raw_ws, "ws": ws, "clipped": abs(ws - raw_ws) > 1e-9, "fallback": fallback}     return ws * TOTAL_POS, we * TOTAL_POS, info def handle_data(context):     def is_etf(code):         return code in context.g.etf_universe     # 前视偏差修复:T日开盘用 T-1(昨日盘后)信号     if context.previous_dt is None:         return     today = context.previous_dt.strftime("%Y-%m-%d")     g = context.g     g.stock_index += 1     g.etf_index += 1     is_stock_rebal = (g.stock_index % g.stock_period == 0)     is_etf_rebal = (g.etf_index % g.etf_period == 0)     # ---- 1) 若到 ETF 调仓日:重新选股 + MVO,更新 g.etf_weights ----     if is_etf_rebal:         etf_today = context.etf_df[context.etf_df['date'] == today].sort_values('score', ascending=False).head(g.etf_num)         etf_targets = list(etf_today['instrument'])         if etf_targets:             try:                 rets = pd.DataFrame()                 for ins in etf_targets:                     close = context.history([ins], ['close'], g.history_bar_count)                     rets[ins] = np.diff(close[ins].close) / close[ins].close[:-1]                 w = _mean_variance_optimization(rets, g.risk_aversion, g.mv_min_weight, g.mv_max_weight)                 g.etf_weights = dict(w)             except Exception as e:                 print(f"MVO失败,ETF端等权兜底: {e}")                 g.etf_weights = {ins: 1.0 / len(etf_targets) for ins in etf_targets}     # ---- 2) 若到股票调仓日:重新选股,更新 g.stock_targets(等权)----     if is_stock_rebal:         try:             g.stock_targets = list(context.stock_df.loc[today].head(g.stock_num)['instrument'])         except KeyError:             g.stock_targets = []     # ---- 3) 任一端调仓 → 重算两端倒波动率配比,并对齐两端持仓 ----     if not (is_stock_rebal or is_etf_rebal):         return     stock_list = list(g.stock_targets)     etf_list = list(g.etf_weights.keys())     sigma_stock = _basket_vol(context, stock_list, weights=None)          # 股票端等权     sigma_etf = _basket_vol(context, etf_list, weights=g.etf_weights)     # ETF端用MVO权重     stock_total, etf_total, info = _risk_parity_split(sigma_stock, sigma_etf)     # 目标权重表(标的 -> 占总资金比例)     target_w = {}     if stock_list:         for ins in stock_list:             target_w[ins] = stock_total / len(stock_list)   # 端内等权     if etf_list:         for ins, iw in g.etf_weights.items():             target_w[ins] = etf_total * iw                  # 端内 MVO 权重     # 先清掉不在目标中的持仓     holdings = {k for k, v in context.portfolio.positions.items() if v.amount > 0}     for ins in holdings - set(target_w.keys()):         context.order_target_percent(ins, 0)     # 再对齐到目标权重(两端都刷新到最新风险平权配比)     for ins, w in target_w.items():         context.order_target_percent(ins, w)     # ---- 调仓日志 ----     if VERBOSE:         trig = []         if is_stock_rebal:             trig.append("股票")         if is_etf_rebal:             trig.append("ETF")         sig_s = f"{sigma_stock:.2%}" if np.isfinite(sigma_stock) else "NA"         sig_e = f"{sigma_etf:.2%}" if np.isfinite(sigma_etf) else "NA"         note = ""         if info["fallback"]:             note = " [波动率缺失→均分兜底]"         elif info["clipped"]:             note = f" [截断: {info['raw_ws']:.1%}→{info['ws']:.1%}]"         print(f"\n[{today}] 调仓({'+'.join(trig)})  "               f"σ股={sig_s} σ基金={sig_e}  "               f"→ 股票端={stock_total:.1%} 基金端={etf_total:.1%}{note}")         # 股票端明细(等权)         if stock_list:             sw = stock_total / len(stock_list)             print(f"   股票({len(stock_list)}只,各{sw:.2%}): {', '.join(stock_list)}")         # ETF端明细(MVO权重)         if etf_list:             etf_detail = ", ".join(f"{ins}:{etf_total*iw:.2%}" for ins, iw in g.etf_weights.items())             print(f"   基金({len(etf_list)}只): {etf_detail}") if __name__ == "__main__":     result = run_backtest(         initialize=initialize,         handle_data=handle_data,         start_date="2019-04-01",         end_date="2026-06-17",         capital=100000,         data_path=r"D:\Desktop\最新回测框架\data",         benchmark="000001.SH",         plot=True,         order_cost=cost,         asset_type="stock",     )     returns = result.performance.returns     nav = (1 + returns).cumprod()     trading_days = len(returns)     total_return = nav.iloc[-1] / nav.iloc[0] - 1     annual_return = (nav.iloc[-1] / nav.iloc[0]) ** (252 / trading_days) - 1     drawdown = (nav - nav.cummax()) / nav.cummax()     max_drawdown = drawdown.min()     annual_vol = returns.std() * np.sqrt(252)     sharpe = annual_return / annual_vol if annual_vol != 0 else np.nan     print("=" * 40)     print("风险平权混合策略(无择时)绩效指标")     print("=" * 40)     print(f"总收益:     {total_return:>10.2%}")     print(f"年化收益:   {annual_return:>10.2%}")     print(f"最大回撤:   {max_drawdown:>10.2%}")     print(f"年化波动:   {annual_vol:>10.2%}")     print(f"夏普比率:   {sharpe:>10.2f}")     print("=" * 40) 你能否看懂这个策略
看懂了,整体逻辑清晰,设计比较完整。下面逐层拆解: --- ## 策略架构总览 ``` 总仓位 90% ├── 股票端(倒波动率分配):小盘高股息,端内等权,22日调仓 └── ETF端(倒波动率分配):动量选基 + MVO 内部权重,8日调仓 ``` 两端之间用**倒波动率风险平权**分配资金,不用协方差矩阵,计算简单稳健。 --- ## 各模块解读 **选股(股票端)** - 筛选条件:价格 ≤20、上市≥200天、非ST、非停牌、主板、股息率前25% - 评分:在高股息池中按总市值升序排(选小盘),取前 N 只 - 端内等权 **选基(ETF端)** - 动量评分 = `trend_score`(线性回归斜率 × R²,年化化处理)+ `ma_ratio`(5日均线/10日均线) - 取评分前 M 只,做 MVO 求端内权重(SLSQP,有上下界约束) **倒波动率配比(核心)** - 用各端当前持仓的内部权重合成"伪历史组合收益",算年化波动率 - $w_{stock} = \frac{1/\sigma_s}{1/\sigma_s + 1/\sigma_e}$,夹在 `[W_MIN, W_MAX]` - 波动率缺失 → 退回均分兜底 **调仓触发逻辑** - 任一端到调仓日 → 两端都对齐到最新风险平权配比(非调仓端保持内部权重不变,只缩放总金额) --- ## 发现一个 Bug `_basket_vol` 里有**死代码**,`return` 后面的逻辑永远不会执行: ```python return port_ret.std() * np.sqrt(252) # ← 已经返回了 if weights is None: # ← 死代码,整段不会执行 ... port_ret = (rets.values * w).sum(axis=1) return port_ret.std() * np.sqrt(252) ``` 看起来是重构时没清干净的残留,可以直接删掉后半段。 --- ## 其他值得注意的点 | 问题 | 说明 | |------|------| | 前视偏差 | 用 `context.previous_dt` 取信号,方向正确 | | ETF历史收益计算 | `np.diff(close) / close[:-1]` 正确 | | 导入未使用 | `from xxybacktest import order_target_percent` 导入了但没用,实际用的是 `context.order_target_percent` | | 调仓同步 | ETF 调仓日同时刷新股票端权重(金额缩放),逻辑连贯 | | 数据对齐 | `context.stock_df.loc[today]` 若当天无数据会 KeyError,已有 try/except 保护 | 整体来说策略逻辑是自洽的,主要是那个死代码 bug 需要清理。你有什么想调整或想问的部分吗?
你能不能写一个bigtrader版本的出来
请读取以下文件并返回完整内容: 1. /home/aiuser/.claude/skills/bigquant-stock-strategy-gen/references/dai_functions.md 2. /home/aiuser/.claude/skills/bigquant-stock-strategy-gen/references/stock_data_schema.md 3. /home/aiuser/.claude/skills/bigquant-stock-strategy-gen/references/bigquant_platform.md 请把三个文件的完整内容都返回给我。
请读取以下文件并返回完整内容: 1. /home/aiuser/.claude/skills/bigquant-stock-strategy-gen/references/bigquant_platform.md 2. /home/aiuser/.claude/skills/bigquant-stock-strategy-gen/assets/stock_strategy_template.py 请把两个文件的完整内容都返回给我。
请完整读取以下文件内容(不要省略任何部分): 1. /home/aiuser/.claude/skills/bigquant-stock-strategy-gen/references/bigquant_platform.md 2. /home/aiuser/.claude/skills/bigquant-stock-strategy-gen/assets/stock_strategy_template.py 对于每个文件,请使用 Read 工具读取,并把完整的文件内容原文返回,不要做任何摘要或省略。
现在我来把这个策略完整转换为 BigTrader 版本。这个策略最核心的挑战是: 1. **股票端** — SQL 里选小盘高股息,端内等权 2. **ETF端** — 动量评分 + MVO 优化内部权重(BigTrader 的 `cn_stock_prefactors` 没有 ETF 数据,需要用 `dai.query` 直接查 `daily_fund` 表) 3. **倒波动率风险平权** — 两端各自算波动率,每次调仓日都重算配比 核心思路:在 `initialize` 里把股票端的选股打分做好,ETF端选基 + 动量分数也预算好;MVO 和倒波动率分配因为依赖"当时持仓权重 × 当时价格",放在自定义 `handle_data` 里实时计算。
请帮我查找 BigQuant DAI 中 ETF/基金数据的正确表名。 请执行以下查询来找到正确的表名: ```python from bigquant import dai # 查询所有包含 fund 或 etf 的表名 result = dai.query("SELECT table_name FROM bigdb_columns WHERE table_name ILIKE '%fund%' OR table_name ILIKE '%etf%' GROUP BY table_name").df() print(result) ``` 然后再查询: ```python result2 = dai.query("SELECT table_name FROM bigdb_columns WHERE table_name ILIKE '%daily%' GROUP BY table_name").df() print(result2) ``` 请在 /home/aiuser/work/codev_userdata/956121b4-1a28-4c7b-b2bd-46b1ddf7e4ad/ 目录下执行这段 Python 代码,告诉我 ETF/基金日线数据的正确表名。
ETF表名是 `cn_fund_bar1d`,原代码用了 `daily_fund`,修一下:
把 `daily_fund` 改成 `cn_fund_bar1d`,同时还注意到 `m_regr_slope`/`m_regr_r2` 用 `ROW_NUMBER()` 作为x变量可能有问题,改用更简洁稳定的线性序号方式,另外 `cn_fund_bar1d` 的查询需要直接加 `filters`:
策略验证通过,运行结果非常不错。核心绩效如下: | 指标 | 结果 | |------|------| | 累计收益率 | 204.19% | | 年化收益率 | 17.37% | | 基准收益率 | 29.58%(跑赢较多)| | 最大回撤 | **12.74%**(很低)| | 夏普比率 | 1.28 | | 收益波动率 | 10.62%(极低)| | 贝塔 | 0.35(市场中性偏强)| --- **转换过程中做的几个关键适配:** 1. **表名修正**:原代码用的 `daily_fund` 在 BigTrader 环境里不存在,改成平台正确表名 `cn_fund_bar1d`。 2. **股票端选股 SQL**:原代码依赖 `xxybacktest` 的自定义 SQL 函数,改写成 BigTrader DAI 兼容的写法,用 `c_pct_rank()` 计算截面百分位。 3. **ETF 动量评分**:原代码用 `sliding_window_view` 手写趋势斜率,改用 DAI 内置的 `m_regr_slope` 和 `m_regr_r2`,更简洁。 4. **历史价格缓存**:原代码在 `initialize` 里手动注入 ETF 缓存,BigTrader 不需要这个,直接在 `initialize` 里用 DAI 查询历史价格存到 `context.etf_close_df`,`handle_data` 里直接用 `.loc[:today]` 切片。 5. **波动率计算**:原代码依赖 `context.history()` 接口,改为调仓日实时 `dai.query()` 拉股票历史价格(ETF 直接走预加载的 DataFrame)。 --- **风险提示** 回测结果超额收益明显,但需注意以下偏差来源: - 小盘高股息选股存在**流动性风险**,实际成交价可能偏差较大 - 股票筛选用了当日收盘价 ≤20 + 主板筛选,有**幸存者偏差** - 股息率数据是否存在**前视**(取决于平台更新时点),建议验证 - 建议做**样本外测试**(如只看 2023 年之后),以及**分段分析**各市场环境下的表现
是这样的,其实我是在尝试将波动率分配资金的方法和策略本身想解耦,也就是说如果用户替换任意两个策略 他都可以无压力替换
这是个很好的设计思路——把"用什么策略"和"怎么分配资金"解耦成两层。可以用一个简单的面向对象框架来实现: **设计思路:** ``` StrategyBase(抽象基类) ├── SmallCapDividendStrategy ← 股票端 ├── MomentumETFStrategy ← ETF端 └── (任意新策略) RiskParityBlender(资金分配器) └── 接受任意两个 StrategyBase,负责倒波动率分配 ``` 用户替换策略只需:继承 `StrategyBase`,实现 3 个方法。分配器完全不用动。
代码写好了。来看下整体架构和扩展方式: --- **分层结构** ``` StrategyBase(抽象基类) │ prepare(context) ← initialize 里调用,预加载数据 │ on_rebalance(context, today) ← 调仓日更新选标的 + 内部权重 │ get_instruments() ← 返回当前持仓列表 │ get_internal_weights() ← 返回端内权重,和为1 │ get_basket_returns(...) ← 返回组合日收益序列,供分配器算波动率 │ ├── SmallCapDividendStrategy ← 股票端(已实现) ├── MomentumETFStrategy ← ETF端(已实现) └── 你的任意新策略... RiskParityBlender ← 分配器,与策略完全解耦 │ get_target_weights(context, today) → {instrument: 权重} ``` --- **替换策略只需三步** ```python # 1. 继承基类,实现5个方法 class MyNewStrategy(StrategyBase): def prepare(self, context): ... def on_rebalance(self, context, today): ... def get_instruments(self): ... def get_internal_weights(self): ... def get_basket_returns(self, context, today, window): ... # 2. 实例化 strategy_a = MyNewStrategy(period=10) # 3. 分配器不动,直接替换传入 blender = RiskParityBlender(strategy_a=strategy_a, strategy_b=strategy_b) ``` 分配器的倒波动率逻辑、截断、兜底机制全部不需要改动。如果以后想扩展成 N 个策略的等风险贡献(全风险平权),也只需改 `RiskParityBlender` 一处。
如果按照你这套方案去设计,最终用户要怎样使用呢?
好问题。用户视角来看,这套框架的实际使用体验分三种场景: --- ## 场景一:只调参数(最常见) 完全不需要写代码,改文件顶部的实例化部分就行。 ```python # 想换成持股8只、10日调仓?改这里 strategy_a = SmallCapDividendStrategy(stock_num=8, period=10) # 想换ETF池子、选7只?改这里 strategy_b = MomentumETFStrategy( etf_universe=('510300.SH', '510500.SH', '159915.SZ', ...), etf_num=7, period=5, ) # 想调风险平权约束?改这里 blender = RiskParityBlender(w_min=0.2, w_max=0.8, total_pos=0.95) ``` `initialize` / `handle_data` / `blender` 的内部逻辑**一行不动**。 --- ## 场景二:替换其中一个策略 比如把股票端换成"价值因子策略"。用户只需在文件里新增一个类,然后替换实例化那一行: ```python # 新增:继承基类,实现5个方法 class ValueFactorStrategy(StrategyBase): def __init__(self, stock_num=10, period=15): super().__init__(period) self.stock_num = stock_num self._instruments = [] self._stock_df = None def prepare(self, context): # 自己的选股逻辑:低PE高ROE sql = """ SELECT date, instrument FROM cn_stock_prefactors WHERE pe_ttm > 0 AND pe_ttm < 20 AND roe_avg_ttm > 0.1 AND st_status = 0 AND suspended = 0 ORDER BY date, pe_ttm ASC """ df = dai.query(sql, filters={...}).df() self._stock_df = df.groupby('date').head(self.stock_num).set_index('date') def on_rebalance(self, context, today): try: self._instruments = list(self._stock_df.loc[today]['instrument']) except KeyError: pass def get_instruments(self): return list(self._instruments) def get_internal_weights(self): n = len(self._instruments) return {ins: 1.0 / n for ins in self._instruments} if n else {} def get_basket_returns(self, context, today, window): # 查近期收盘价 → 算等权组合收益 ... # 逻辑和 SmallCapDividendStrategy 基本一样,复制改一下就行 # 只改这一行,其余全不动 strategy_a = ValueFactorStrategy(stock_num=10, period=15) ``` `blender`、`initialize`、`handle_data` **完全不用碰**。 --- ## 场景三:两个策略都换 ```python strategy_a = ValueFactorStrategy(period=15) strategy_b = CandlestickMomentumStrategy(period=5) # 另一个自定义策略 blender = RiskParityBlender(strategy_a, strategy_b) # 剩下全部不动 ``` --- ## 现在这套方案的一个小摩擦点 坦白说,`get_basket_returns` 这个方法里查历史价格的代码,在两个策略里几乎是重复的(拉收盘价 → pivot → pct_change → 加权求和)。如果用户自己写新策略,这部分容易写错。 可以在 `StrategyBase` 里提供一个**默认实现**,只需子类告诉基类"我的价格数据从哪来": ```python class StrategyBase(ABC): # 子类实现这个:返回历史价格 DataFrame(index=date, col=instrument) def get_price_dataframe(self) -> pd.DataFrame: return None # 默认 None,子类按需覆盖 # 基类自动算波动率序列,子类不再需要实现 get_basket_returns def get_basket_returns(self, context, today, window): price_df = self.get_price_dataframe() if price_df is None: return None ins = self.get_instruments() iw = self.get_internal_weights() # ... 通用计算逻辑 ``` 这样用户写新策略时,只需在 `prepare` 里把价格数据存到 `self._price_df`,然后: ```python def get_price_dataframe(self): return self._price_df # 一行搞定 ``` 要不要顺手把这个优化也合入进去?
其实我有一个想法,我们可以将bigtrader回测进行二次封装,设计一个策略类,传入initialize和handle_data方法就可以初始化了,然后里面有run属性,run属性传入的参数和bigtrader.run一致。然后再写一个波动率仓位分配的类,传入两个策略类,然后再run(参数和bigtrader.run参数一致)你觉得怎么样
这个思路比我上一版更好,用户体验更自然。让我先梳理一下优缺点再写。 --- **你方案的核心优势:** 用户视角极其简洁,整个使用流程就三步: ```python # 1. 把已有策略包一下 strat_a = Strategy(initialize=my_init_a, handle_data=my_handle_a) strat_b = Strategy(initialize=my_init_b, handle_data=my_handle_b) # 2. 套上分配器 blender = VolParityRunner(strat_a, strat_b) # 3. 和 bigtrader.run 一模一样的方式跑 blender.run(start_date="2019-01-01", end_date="2026-06-18", capital_base=1000000, ...) ``` **有一个设计难点需要解决:** `VolParityRunner` 要把两个独立策略"合并"跑,但 `bigtrader.run` 只接受一组 `initialize`+`handle_data`。所以 `VolParityRunner.run` 内部需要生成一个**合并版** `initialize` 和 `handle_data`,把两个策略的逻辑融合进去,然后调一次 `bigtrader.run`。 这意味着两个子策略的 `context` 需要隔离——两个策略各自的 `context` 状态不能互相污染。可以用命名空间(`context.strategy_a_xxx` 这样的前缀)或者把每个策略的状态挂在 `context` 上的独立 dict 里。 --- 好,设计清楚了,来写:
先算了吧,你先将之前跑通的混合策略给到我
把之前验证通过的那版(没有框架封装,直接可跑)给你:
这就是当前 `main.py` 里跑通的版本,完整贴给你了。 代码结构回顾: - **全局参数区**(第31-35行):`VOL_WINDOW`、`W_MIN/W_MAX`、`TOTAL_POS`,调参改这里 - **`SmallCapDividendStrategy`**:股票端,小盘高股息,22日调仓,端内等权 - **`MomentumETFStrategy`**:ETF端,动量+MVO,8日调仓 - **`RiskParityBlender`**:倒波动率分配器,与策略无关 - **入口区**(第437行起):实例化 → 绑定 blender → `initialize` + `handle_data` → `bigtrader.run` 直接在 BigTrader 环境里跑这个文件就行,ETF数据用的是 `cn_fund_bar1d` 表,已经是平台正确表名。
不是我要的是你最开始改写成bigtrader的那个策略
那是第一个验证通过的版本,当时文件后来被框架版覆盖了。直接把那版代码重新写回来:
好了,已经还原回去了。这就是那版干净的直接可跑版本,没有任何框架封装,结构是: - `initialize` —— 预加载股票/ETF选股数据 + ETF历史价格 - `handle_data` —— 按各自调仓周期触发,任一端到期就重算倒波动率配比,统一下单 - 三个辅助函数 `_calc_basket_vol_stock` / `_calc_basket_vol_etf` / `_risk_parity_split` 独立在外 ETF表用的是 `cn_fund_bar1d`,直接在 BigTrader 环境跑即可。
就针对这个代码,你觉得会不会不太好讲,我后续是要拿他去讲课的,这份代码里面的知识点应该不少,我应该怎么讲
这份代码拿去讲课其实挺合适的,知识点密度高而且每个模块都有实际意义。给你梳理一下讲课思路: --- ## 整体建议:从问题出发,而不是从代码出发 不要从第一行讲到最后一行,那样听众会睡着。建议以**"我们要解决什么问题"**为主线,让代码自然浮现出来。 --- ## 可以拆成 5 个知识模块 **模块一:为什么要混合两类资产?(5分钟,铺垫)** 先讲动机:股票波动大、ETF相对稳,单押一类都有缺陷。引出"混合策略"的必要性,然后抛出核心问题——"两类资产各给多少钱?",自然引出风险平权。 --- **模块二:DAI 数据查询 + 因子构建(15分钟,重点)** 这里有两个值得细讲的知识点: - **CTE 嵌套查询**:股票端的 `WITH ranked AS (...)` 两步筛选,为什么要拆成两步(内层筛股息率分位,外层再按市值排序),直接合并写会有什么问题 - **截面函数 `c_pct_rank`**:什么是截面计算,和普通排名的区别,为什么量化里大量用截面函数 - **ETF动量评分**:`m_regr_slope × m_regr_r2` 这个设计的含义——斜率衡量趋势强度,R²衡量趋势稳定性,两者相乘过滤"假趋势",比单纯用涨跌幅聪明在哪里 --- **模块三:均值方差优化 MVO(15分钟,算法)** 这是代码里数学含量最高的部分,可以讲: - 目标函数的直觉:最大化 `收益 - 0.5 × 风险厌恶 × 方差`,调 `RISK_AVERSION` 参数会怎样影响结果 - 为什么要设上下界 `[MV_MIN_W, MV_MAX_W]`:不加约束MVO会极度集中,现实中不可接受 - SLSQP 不收敛时等权兜底的工程思维:优化器不是万能的,必须有降级方案 --- **模块四:倒波动率风险平权(15分钟,核心亮点)** 这是整个策略最有讲头的地方: - **为什么不用固定比例**:固定45/45的问题——股票端波动率可能是ETF端的2倍,名义上平均但风险严重不对等 - **倒波动率的直觉**:谁波动小谁多分钱,一句话就能讲清楚 - **"伪历史收益序列"的构造**:用当前持仓权重 × 历史价格合成组合收益,不依赖净值记录,第一天就能用——这个工程细节很多人想不到 - **截断 `[W_MIN, W_MAX]` 的必要性**:极端行情下某端波动率可能趋近于0,不截断会导致全押一端 --- **模块五:BigTrader 工程细节(10分钟,实践)** - **`initialize` vs `handle_data` 的分工**:为什么把所有数据查询放 `initialize`,而不是每天在 `handle_data` 里查——性能差距可能是几十倍 - **前视偏差的规避**:`data.current_dt` 是当天,用它查昨天及以前的数据下单,信号不能用当天收盘价 - **两端调仓周期不同步的处理**:任一端触发都重算两端配比,非调仓端只缩放金额不重新选标的,保证总仓位恒定 --- ## 一个讲课小技巧 每个模块结束后可以问听众:"如果我们不这样做,会发生什么?" 比如: - 不用 `c_pct_rank` 直接用 `dividend_yield_ratio` 排序 → 跨期比较失效 - 不截断倒波动率权重 → 某端暴跌时仓位会极度集中 - MVO 不设下界 → 回测优化结果极好但实盘集中度过高 这样听众会对每个设计选择都留下印象,而不只是看热闹。
还要注意,讲课对象可能不会数学,对组合优化的权重调整没法用运筹学的方式去理解
好,这个约束很关键。一旦假设听众不懂数学,讲课逻辑要整体调整。 --- ## 核心原则:用钱和生活类比,绕开公式 --- **模块二:因子构建** `c_pct_rank` 不要说"截面百分位排名",说: > "今天全市场5000只股票,股息率排在前25%,就像班级考试排前25名才有资格进下一轮。每天重新排,不是和历史比,是和今天同场竞争的股票比。" ETF动量评分的 `slope × r²` 不要提回归,说: > "斜率告诉你涨得有多猛,R²告诉你涨得稳不稳。一只ETF连续稳定地涨,比忽上忽下最后也涨了同样多,分数要高得多。我们要的是'稳步向上',不是'坐过山车最后没输'。" --- **模块三:MVO 权重优化** 这块直接放弃讲数学,换一个生活比喻: > "你手里有6只ETF,怎么分钱?最笨的办法是平均分。但有的ETF收益好、风险低,有的收益差、风险还高——凭什么一样多?MVO就是一个'精打细算的管家',在你设定的范围内(每只最少5%、最多40%),帮你找到收益最大、风险最小的那个分法。" 然后重点讲**为什么要设上下界**,这个没有数学门槛: > "不加限制的话,管家会把90%的钱押在历史表现最好的那一只上——回测好看,但实盘一旦那只出问题就全完了。所以我们强制要求每只不能超过40%,这是'强制分散'。" --- **模块四:倒波动率风险平权(最重要,要讲透)** 这块是整个策略的灵魂,但完全可以不提公式。 先建立"风险不对等"的直觉: > "假设股票端和ETF端各放45%。听起来很公平。但股票端每天上下波动3%,ETF端只波动0.5%。你名义上平分了钱,实际上你承受的风险里,股票端占了85%以上。这叫'名义平衡,风险失衡'。" 再讲倒波动率的直觉,完全不用公式: > "我们想让两端承担一样多的风险。谁波动大,就少给它钱;谁波动小,就多给它钱。就像两个员工,一个做事稳,一个做事飘——你肯定给稳的那个更多任务比重。" 截断的部分用极端情况讲: > "但不能无限制地调。如果某天市场异常,ETF端波动率接近零,按公式几乎所有钱都要给ETF端——这不合理。所以我们设了一个范围:股票端最少30%、最多70%,超出就强制拉回来。" --- **整体讲课顺序建议调整成这样:** ``` ① 讲问题(5分钟) 股票和ETF混合,钱怎么分?固定比例有什么缺陷? ② 讲数据怎么来(10分钟) DAI查询、选股逻辑、ETF动量评分 重点:c_pct_rank 的"今日排名"概念 ③ 讲ETF端内部怎么分(10分钟) MVO = 精打细算的管家 重点:为什么要设上下界 ④ 讲两端之间怎么分(15分钟)★最重磅 风险不对等的直觉 倒波动率 = 稳的多给、飘的少给 截断 = 防止极端情况 ⑤ 讲工程细节(10分钟) initialize 只跑一次 vs handle_data 每天跑 前视偏差:为什么不能用今天的收盘价来下今天的单 两端调仓周期不同步怎么处理 ``` 最后留10分钟让听众改参数跑回测,亲眼看 `W_MIN/W_MAX` 和 `VOL_WINDOW` 变化对结果的影响——比讲100句话都有效。
请运行它给出绩效
运行完成,绩效结果如下: --- **核心绩效(2019-04-01 至 2026-06-18)** | 指标 | 策略 | 基准(上证指数) | |------|------|------| | 累计收益 | **204.19%** | 29.58% | | 超额收益 | **+174.61%** | — | | 最终组合价值 | **304万** | (初始100万) | **月度收益亮点** - 2022年全年持续跑赢,上证跌了近10%时策略正收益 - 2024年11月单月爆发:+30%(策略)vs +5%(基准) - 2026年至今略有回撤但仍大幅超额 **交易统计** | 指标 | 数值 | |------|------| | 总平仓次数 | 708次 | | 胜率 | 62.3%(441盈/267亏)| | 总手续费 | 5.18万 | --- 整体来看策略表现相当稳健——2022年大熊市期间超额最明显,显示出风险平权对回撤的保护效果,这正是讲课时最有说服力的一段。
非常好,请为我生成md格式的课件
课件已生成,保存在工作目录下:`课件_风险平权混合策略.md` --- 课件共分9个部分,结构是这样的: | 章节 | 内容 | 难度定位 | |------|------|---------| | 第一部分 | 为什么要做这个策略,固定比例的问题 | 纯直觉,无门槛 | | 第二部分 | 整体架构 + 可调参数一览 | 框架感 | | 第三部分 | 股票端选股,CTE嵌套和c_pct_rank解释 | 稍有代码 | | 第四部分 | ETF动量评分 + MVO"精打细算的管家" | 重点,无数学 | | 第五部分 | 倒波动率风险平权,含真实日志示例 | 核心,重点讲 | | 第六部分 | 工程细节:initialize分工、前视偏差、调仓同步 | 实践注意事项 | | 第七部分 | 回测结果,含分年度亮点分析 | 成果展示 | | 第八部分 | 三个动手实验,现场改参数 | 互动环节 | | 第九部分 | 风险提示 | 合规必要项 | 第五部分建议多花时间,风险平权是整个策略的灵魂,也是听众最容易"懂了但说不出来"的地方,反复用生活类比讲透更好。