【平台使用】新编的交易策略运行后 无任何错误提醒和文字指示
由bqdqmmm3创建,最终由hxgre 被浏览 10 用户
# 导入聚宽库
import jqdata
import numpy as np
import pandas as pd
from jqlib.technical_analysis import *
from jqlib.alpha101 import *
from datetime import timedelta
# 初始化函数
def initialize(context):
set_params(context)
set_backtest()
set_scheduler(context)
# 初始化行业分析数据
g.sector_analysis = {}
# 参数设置新增行业分析相关参数
def set_params(context):
g = globals()
# 原有参数...
g['sector_lookback'] = 20 # 行业分析回溯周期
g['news_window'] = 7 # 新闻分析时间窗口
g['policy_keywords'] = ['十四五规划', '补贴政策', '产业升级', '国产替代'] # 政策关键词
# 在定时任务中添加行业分析
def set_scheduler(context):
run_daily(trade_main, time='14:30')
run_daily(check_stop_loss, time='every_bar')
run_daily(analyze_sector_trend, time='9:30') # 开盘前更新行业数据
### 新增行业主线分析模块 ###
def analyze_sector_trend(context):
# 获取申万一级行业数据
sectors = get_industries(name='sw_l1')
sector_codes = list(sectors.index)
# 计算行业动量指标
momentum_scores = {}
for code in sector_codes:
# 获取行业指数价格
prices = get_price(code, end_date=context.current_dt,
count=g.sector_lookback, fields=['close'])
# 计算相对动量
momentum = (prices['close'][-1]/prices['close'][0] - 1) * 0.4 + \
(prices['close'][-1]/prices['close'][-5] - 1) * 0.3 + \
(prices['close'][-1]/prices['close'][-1] - 1) * 0.3
# 计算资金流向
money_flow = get_money_flow(code, g.sector_lookback)
# 计算新闻热度
news_score = get_news_score(code, context)
# 综合评分
total_score = momentum*0.5 + money_flow*0.3 + news_score*0.2
momentum_scores[code] = total_score
# 选出最强主线
sorted_sectors = sorted(momentum_scores.items(), key=lambda x: x[1], reverse=True)
g.main_sector = sorted_sectors[0][0]
# 分析细分板块
sub_sectors = analyze_sub_sectors(g.main_sector, context)
g.top_sub_sectors = sub_sectors[:3]
# 记录分析结果
log.info(f"当前主线:{sectors.loc[g.main_sector,'name']}")
log.info(f"Top3细分板块:{[s[0] for s in g.top_sub_sectors]}")
def get_money_flow(code, period):
# 计算行业资金流入强度
df = get_price(code, end_date=context.current_dt,
count=period, fields=['money_inflow','money_outflow'])
net_inflow = (df['money_inflow'] - df['money_outflow']).sum()
return net_inflow / df['money_inflow'].sum()
def get_news_score(code, context):
# 获取近期新闻并分析政策利好
news = get_news(code, end_date=context.current_dt,
count=g.news_window)
keyword_count = 0
for n in news:
content = n['content']
keyword_count += sum([1 for k in g.policy_keywords if k in content])
return keyword_count / len(g.policy_keywords) if news else 0
def analyze_sub_sectors(main_sector, context):
# 获取细分板块(申万二级行业)
sub_sectors = get_industries(name='sw_l2')
sub_sectors = sub_sectors[sub_sectors['parent'] == main_sector]
# 计算细分板块景气度
scores = []
for code in sub_sectors.index:
# 获取成分股
stocks = get_industry_stocks(code)
# 计算板块净利润增长率
q = query(valuation.code, income.net_profit
).filter(valuation.code.in_(stocks))
df = get_fundamentals(q, date=context.current_dt)
growth_rate = df['net_profit'].pct_change().mean()
# 计算技术面强度
sector_index = get_price(code, end_date=context.current_dt,
count=30, fields=['close'])
momentum = sector_index['close'][-1]/sector_index['close'][0] - 1
# 综合评分
scores.append((code, growth_rate*0.6 + momentum*0.4))
return sorted(scores, key=lambda x: x[1], reverse=True)
### 修改候选股筛选逻辑 ###
def get_candidates(context):
# 仅选择主线板块的股票
main_stocks = get_industry_stocks(g.main_sector)
filtered_pool = [s for s in g.stock_pool if s in main_stocks]
# 原有筛选逻辑增加细分板块过滤
candidates = []
for stock in filtered_pool:
# 检查是否属于Top3细分板块
industry = get_industry(stock, 'sw_l2')
if industry in [s[0] for s in g.top_sub_sectors]:
# 原有技术指标筛选
if check_technical(stock):
candidates.append(stock)
return candidates[:g.n_stocks*3]
def check_technical(stock):
# 原有技术指标校验
close = attribute_history(stock, g.ema_slow+1, '1d', ['close'])
ema_fast = EMA(close['close'], g.ema_fast)
ema_slow = EMA(close['close'], g.ema_slow)
upper, mid, lower = Bollinger_Bands(close['close'], g.boll_period, 2)
return ema_fast[-1] > ema_slow[-1] and close['close'][-1] < lower[-1]
### 修改仓位管理模块 ###
\