# -*- coding: utf-8 -*-
"""
风险平权混合策略(无择时)
============================
在"股票端 + ETF端"混合框架上,把两端的资金分配从【固定等权(45/45)】
改为【倒波动率风险平权】,解决"股票和基金风险不对等"的问题。
各端内部逻辑保持原样:
股票端:小盘高股息选 N 只,端内等权
ETF端 :动量评分选 top M + 均值方差优化(MVO)权重(搬自 小盘股息率_2 复杂版)
两端之间(核心改动):倒波动率分配
1. 用各端【内部权重】合成两端的"伪历史收益序列"(当场用标的历史价格算,
不依赖任何净值记录,回测/实盘第一天即可用):
股票端序列 = Σ(等权_i × 各股票近 VOL_WINDOW 日收益)
ETF端序列 = Σ(MVO权重_i × 各ETF近 VOL_WINDOW 日收益)
2. 各算年化波动率 σ_stock、σ_etf
3. 倒波动率定两端相对权重:w_stock = (1/σ_s) / (1/σ_s + 1/σ_e)
4. 相对权重截断到 [W_MIN, W_MAX] 后归一化,再乘总仓位 TOTAL_POS
协方差矩阵:组内不用、两端倒波动率也不用 → 全程无需协方差。
实现要点:任一端调仓日都重算两端配比,并把两端都对齐到最新配比
(非调仓端只按其保存的内部权重缩放,不重新选股,参考复杂版 signal_changed 逻辑),
保证总仓位恒为 TOTAL_POS、风险平权配比每个调仓日刷新。
"""
from xxybacktest import run_backtest, order_target_percent, OrderCost
from xxybacktest.data import DailyInfo
import pandas as pd
import numpy as np
cost = OrderCost(
open_tax=0,
close_tax=0.001,
open_commission=0.0003,
close_commission=0.0003,
min_commission=5,
)
# ===================== 可调参数 =====================
VOL_WINDOW = 20 # 倒波动率回看窗口(交易日,约一个月)
W_MIN, W_MAX = 0.3, 0.7 # 股票端相对权重的上下限(ETF端对称)
TOTAL_POS = 0.9 # 总投资仓位(剩余为现金)
VERBOSE = True # 打印每次调仓的权重变化日志
# ===================================================
def initialize(context):
from xxybacktest import Data
_DEFAULT_PARAMS = {
"specified_etfs": (
'160723.SZ', '513100.SH', '159985.SZ', '518880.SH',
'513130.SH', '510300.SH', '159915.SZ', '511380.SZ',
'513520.SZ', '159980.SZ', '513500.SH',
),
"stock_num": 5,
"etf_num": 6,
"stock_period": 22,
"etf_period": 8,
# MVO 参数(搬自复杂版)
"risk_aversion": 1.0,
"mv_min_weight": 0.05,
"mv_max_weight": 0.3,
"trend_period": 25,
"history_bar_count": 21,
}
g = context.g
for k, v in _DEFAULT_PARAMS.items():
setattr(g, k, v)
sd_stock = (pd.to_datetime(context.trade.start_time) - pd.Timedelta(days=30)).strftime("%Y-%m-%d")
sd_etf = (pd.to_datetime(context.trade.start_time) - pd.Timedelta(days=39)).strftime("%Y-%m-%d")
end_date = context.trade.end_time
# ===== 1. 股票端:小盘高股息选股(端内等权)=====
sql_stock = f"""
with t1 as (
select date, instrument, total_market_cap as cap, dividend_yield_ratio,
RANK() OVER (PARTITION BY date ORDER BY dividend_yield_ratio DESC) / COUNT(*) OVER (PARTITION BY date) AS rank_ratio,
from valuation
inner join daily_bar using (date, instrument)
inner join basic_info using (date, instrument)
inner join stock_status using (date, instrument)
where close <= 20
and list_days >= 200 and st_status = 0 and suspended = 0
and list_sector = 1 and stock_status.is_risk_warning = 0
and date >= '{sd_stock}' and date <= '{end_date}'
QUALIFY rank_ratio < 0.25
)
select date, instrument, RANK() OVER (PARTITION BY date ORDER BY cap) / COUNT(*) OVER (PARTITION BY date) as score
from t1
order by date, score, dividend_yield_ratio DESC
"""
context.stock_df = Data._db.query(sql_stock).df().set_index('date')
# ===== 2. ETF端:动量评分(趋势打分 + 均线比),供 MVO 前的选股 =====
sql_etf = f"""
SELECT date, instrument, close,
close / (lag(close, 18) over (partition by instrument order by date)) - 1 AS stop_gain_signal,
(avg(close) over (partition by instrument order by date rows 9 preceding))
/ (avg(close) over (partition by instrument order by date rows 4 preceding)) AS ma_ratio_5_10
FROM daily_fund
WHERE date >= '{sd_etf}' AND date <= '{end_date}'
AND instrument in {context.g.specified_etfs}
"""
etf_df = Data._db.query(sql_etf).df().sort_values(['instrument', 'date'])
def trend_score(close, period):
if len(close) < period:
return np.full_like(close, np.nan)
y = np.log(close)
windows = np.lib.stride_tricks.sliding_window_view(y, window_shape=period)
x = np.arange(period)
n = period
sum_x = x.sum(); sum_x2 = (x ** 2).sum()
denom = n * sum_x2 - sum_x ** 2
sum_y = windows.sum(axis=1); sum_xy = (windows * x).sum(axis=1)
slope = (n * sum_xy - sum_x * sum_y) / denom
intercept = (sum_y - slope * sum_x) / n
annualized = np.exp(slope * 250) - 1
y_pred = slope[:, None] * x + intercept[:, None]
ss_res = np.sum((windows - y_pred) ** 2, axis=1)
ss_tot = np.sum(windows ** 2, axis=1) - (sum_y ** 2) / n
r2 = np.nan_to_num(1 - ss_res / ss_tot, nan=0.0)
score = annualized * r2
full = pd.Series(index=close.index, dtype=float)
full[period - 1:] = score
return full
etf_df['trend_score'] = etf_df.groupby('instrument')['close'].transform(
lambda x: trend_score(x, context.g.trend_period))
etf_df['score'] = etf_df['trend_score'] + etf_df['ma_ratio_5_10']
context.etf_df = etf_df
# ===== 3. 注入 ETF 日线到缓存(供 history/下单取价)=====
etf_cache_df = Data._db.query(f"""
SELECT instrument, date, name, open, high, low, close, pre_close,
volume, amount, upper_limit, lower_limit
FROM daily_fund
WHERE date >= '{sd_etf}' AND date <= '{end_date}'
AND instrument IN {context.g.specified_etfs}
""").df()
for _, row in etf_cache_df.iterrows():
date_str = row['date'].strftime('%Y-%m-%d') if hasattr(row['date'], 'strftime') else str(row['date'])[:10]
code = row['instrument']
if Data._daily_cache is None:
Data._daily_cache = {}
if date_str not in Data._daily_cache:
Data._daily_cache[date_str] = {}
volume = int(row['volume']) if pd.notna(row['volume']) else 0
amount_val = float(row['amount']) if pd.notna(row['amount']) else 0.0
Data._daily_cache[date_str][code] = DailyInfo(
ts_code=code, name=str(row['name']) if pd.notna(row['name']) else '',
open=float(row['open']) if pd.notna(row['open']) else 0.0,
high=float(row['high']) if pd.notna(row['high']) else 0.0,
low=float(row['low']) if pd.notna(row['low']) else 0.0,
close=float(row['close']) if pd.notna(row['close']) else 0.0,
pre_close=float(row['pre_close']) if pd.notna(row['pre_close']) else 0.0,
volume=volume, amount=amount_val,
vwap=amount_val / (volume + 1) if volume >= 0 else 0.0,
upLimit=float(row['upper_limit']) if pd.notna(row['upper_limit']) else float('inf'),
downLimit=float(row['lower_limit']) if pd.notna(row['lower_limit']) else 0.0,
stop=1 if volume == 0 else 0, st_status=0,
)
name_df = etf_cache_df[etf_cache_df['name'].notna() & (etf_cache_df['name'] != '')][['instrument', 'name']].drop_duplicates('instrument')
for _, row in name_df.iterrows():
if row['instrument'] not in Data._instrument_names:
Data._instrument_names[row['instrument']] = row['name']
# ===== 4. 状态初始化 =====
g.stock_index = -1
g.etf_index = -1
g.etf_universe = set(g.specified_etfs)
g.stock_targets = [] # 当前股票端持仓(等权)
g.etf_weights = {} # 当前ETF端 MVO 内部权重(和为1)
def _mean_variance_optimization(returns, risk_aversion, min_weight, max_weight):
"""SLSQP 均值方差优化(搬自复杂版),返回和为1的权重 Series。"""
from scipy.optimize import minimize
exp_ret = returns.mean() * 252
cov = returns.cov() * 252
n = len(exp_ret)
w0 = np.clip(np.ones(n) / n, min_weight, max_weight)
w0 = w0 / w0.sum()
def obj(w):
return -(np.sum(exp_ret * w) - 0.5 * risk_aversion * np.dot(w, np.dot(cov, w)))
cons = [{'type': 'eq', 'fun': lambda x: np.sum(x) - 1}]
bounds = [(min_weight, max_weight)] * n
res = minimize(obj, w0, method='SLSQP', bounds=bounds, constraints=cons,
options={'maxiter': 5000, 'ftol': 1e-9, 'eps': 1e-8})
w = res.x if res.success else np.clip(np.ones(n) / n, min_weight, max_weight)
w = np.clip(w, min_weight, max_weight)
w = w / w.sum()
return pd.Series(w, index=exp_ret.index)
def _basket_vol(context, instruments, weights):
"""
用各端【内部权重】合成伪历史收益序列,返回年化波动率。
instruments: list[str]; weights: dict 或 等权(None)。
数据不足/缺失返回 np.nan,由调用方兜底。
取数:context.history 返回 {code: np.recarray},recarray.close 已是 ndarray
(注意不是 DataFrame,没有 .values 属性)。history 自动防未来:截止上一交易日。
"""
if not instruments:
return np.nan
hist = context.history(instruments, ['close'], VOL_WINDOW + 1)
if not hist:
return np.nan
rets = pd.DataFrame()
for ins in instruments:
rec = hist.get(ins)
if rec is None:
continue
close = np.asarray(rec.close, dtype=float)
# 去掉 NaN(该标的当日无数据时 history 填 nan)
if len(close) < VOL_WINDOW + 1 or np.isnan(close).any() or np.any(close[:-1] == 0):
continue
rets[ins] = close[1:] / close[:-1] - 1
if rets.shape[1] == 0:
return np.nan
if weights is None: # 等权
w = np.ones(rets.shape[1]) / rets.shape[1]
else:
w = np.array([weights.get(c, 0.0) for c in rets.columns])
if w.sum() == 0:
return np.nan
w = w / w.sum() # 仅对有数据的标的归一化
port_ret = (rets.values * w).sum(axis=1)
return port_ret.std() * np.sqrt(252)
if weights is None: # 等权
w = np.ones(rets.shape[1]) / rets.shape[1]
else:
w = np.array([weights.get(c, 0.0) for c in rets.columns])
if w.sum() == 0:
return np.nan
w = w / w.sum() # 仅对有数据的标的归一化
port_ret = (rets.values * w).sum(axis=1)
return port_ret.std() * np.sqrt(252)
def _risk_parity_split(sigma_stock, sigma_etf):
"""倒波动率两端分配。返回 (stock_total, etf_total, info)。
info 含中间量供日志:raw_ws(截断前股票相对权重), ws(截断后), clipped(是否触发截断), fallback(是否退回均分)。"""
fallback = False
if not np.isfinite(sigma_stock) or not np.isfinite(sigma_etf) or sigma_stock <= 0 or sigma_etf <= 0:
raw_ws = 0.5
fallback = True
else:
inv_s, inv_e = 1.0 / sigma_stock, 1.0 / sigma_etf
raw_ws = inv_s / (inv_s + inv_e)
ws = min(max(raw_ws, W_MIN), W_MAX) # 股票端相对权重夹在 [W_MIN, W_MAX]
we = 1.0 - ws
info = {"raw_ws": raw_ws, "ws": ws, "clipped": abs(ws - raw_ws) > 1e-9, "fallback": fallback}
return ws * TOTAL_POS, we * TOTAL_POS, info
def handle_data(context):
def is_etf(code):
return code in context.g.etf_universe
# 前视偏差修复:T日开盘用 T-1(昨日盘后)信号
if context.previous_dt is None:
return
today = context.previous_dt.strftime("%Y-%m-%d")
g = context.g
g.stock_index += 1
g.etf_index += 1
is_stock_rebal = (g.stock_index % g.stock_period == 0)
is_etf_rebal = (g.etf_index % g.etf_period == 0)
# ---- 1) 若到 ETF 调仓日:重新选股 + MVO,更新 g.etf_weights ----
if is_etf_rebal:
etf_today = context.etf_df[context.etf_df['date'] == today].sort_values('score', ascending=False).head(g.etf_num)
etf_targets = list(etf_today['instrument'])
if etf_targets:
try:
rets = pd.DataFrame()
for ins in etf_targets:
close = context.history([ins], ['close'], g.history_bar_count)
rets[ins] = np.diff(close[ins].close) / close[ins].close[:-1]
w = _mean_variance_optimization(rets, g.risk_aversion, g.mv_min_weight, g.mv_max_weight)
g.etf_weights = dict(w)
except Exception as e:
print(f"MVO失败,ETF端等权兜底: {e}")
g.etf_weights = {ins: 1.0 / len(etf_targets) for ins in etf_targets}
# ---- 2) 若到股票调仓日:重新选股,更新 g.stock_targets(等权)----
if is_stock_rebal:
try:
g.stock_targets = list(context.stock_df.loc[today].head(g.stock_num)['instrument'])
except KeyError:
g.stock_targets = []
# ---- 3) 任一端调仓 → 重算两端倒波动率配比,并对齐两端持仓 ----
if not (is_stock_rebal or is_etf_rebal):
return
stock_list = list(g.stock_targets)
etf_list = list(g.etf_weights.keys())
sigma_stock = _basket_vol(context, stock_list, weights=None) # 股票端等权
sigma_etf = _basket_vol(context, etf_list, weights=g.etf_weights) # ETF端用MVO权重
stock_total, etf_total, info = _risk_parity_split(sigma_stock, sigma_etf)
# 目标权重表(标的 -> 占总资金比例)
target_w = {}
if stock_list:
for ins in stock_list:
target_w[ins] = stock_total / len(stock_list) # 端内等权
if etf_list:
for ins, iw in g.etf_weights.items():
target_w[ins] = etf_total * iw # 端内 MVO 权重
# 先清掉不在目标中的持仓
holdings = {k for k, v in context.portfolio.positions.items() if v.amount > 0}
for ins in holdings - set(target_w.keys()):
context.order_target_percent(ins, 0)
# 再对齐到目标权重(两端都刷新到最新风险平权配比)
for ins, w in target_w.items():
context.order_target_percent(ins, w)
# ---- 调仓日志 ----
if VERBOSE:
trig = []
if is_stock_rebal:
trig.append("股票")
if is_etf_rebal:
trig.append("ETF")
sig_s = f"{sigma_stock:.2%}" if np.isfinite(sigma_stock) else "NA"
sig_e = f"{sigma_etf:.2%}" if np.isfinite(sigma_etf) else "NA"
note = ""
if info["fallback"]:
note = " [波动率缺失→均分兜底]"
elif info["clipped"]:
note = f" [截断: {info['raw_ws']:.1%}→{info['ws']:.1%}]"
print(f"\n[{today}] 调仓({'+'.join(trig)}) "
f"σ股={sig_s} σ基金={sig_e} "
f"→ 股票端={stock_total:.1%} 基金端={etf_total:.1%}{note}")
# 股票端明细(等权)
if stock_list:
sw = stock_total / len(stock_list)
print(f" 股票({len(stock_list)}只,各{sw:.2%}): {', '.join(stock_list)}")
# ETF端明细(MVO权重)
if etf_list:
etf_detail = ", ".join(f"{ins}:{etf_total*iw:.2%}" for ins, iw in g.etf_weights.items())
print(f" 基金({len(etf_list)}只): {etf_detail}")
if __name__ == "__main__":
result = run_backtest(
initialize=initialize,
handle_data=handle_data,
start_date="2019-04-01",
end_date="2026-06-17",
capital=100000,
data_path=r"D:\Desktop\最新回测框架\data",
benchmark="000001.SH",
plot=True,
order_cost=cost,
asset_type="stock",
)
returns = result.performance.returns
nav = (1 + returns).cumprod()
trading_days = len(returns)
total_return = nav.iloc[-1] / nav.iloc[0] - 1
annual_return = (nav.iloc[-1] / nav.iloc[0]) ** (252 / trading_days) - 1
drawdown = (nav - nav.cummax()) / nav.cummax()
max_drawdown = drawdown.min()
annual_vol = returns.std() * np.sqrt(252)
sharpe = annual_return / annual_vol if annual_vol != 0 else np.nan
print("=" * 40)
print("风险平权混合策略(无择时)绩效指标")
print("=" * 40)
print(f"总收益: {total_return:>10.2%}")
print(f"年化收益: {annual_return:>10.2%}")
print(f"最大回撤: {max_drawdown:>10.2%}")
print(f"年化波动: {annual_vol:>10.2%}")
print(f"夏普比率: {sharpe:>10.2f}")
print("=" * 40)
你能否看懂这个策略