问答交流

【平台使用】新编的交易策略运行后 无任何错误提醒和文字指示

由bqdqmmm3创建,最终由hxgre 被浏览 10 用户

# 导入聚宽库
import jqdata
import numpy as np
import pandas as pd
from jqlib.technical_analysis import *
from jqlib.alpha101 import *
from datetime import timedelta

# 初始化函数
def initialize(context):
    set_params(context)    
    set_backtest()         
    set_scheduler(context) 
    # 初始化行业分析数据
    g.sector_analysis = {}

# 参数设置新增行业分析相关参数
def set_params(context):
    g = globals()
    # 原有参数...
    g['sector_lookback'] = 20    # 行业分析回溯周期
    g['news_window'] = 7         # 新闻分析时间窗口
    g['policy_keywords'] = ['十四五规划', '补贴政策', '产业升级', '国产替代']  # 政策关键词

# 在定时任务中添加行业分析
def set_scheduler(context):
    run_daily(trade_main, time='14:30')
    run_daily(check_stop_loss, time='every_bar')
    run_daily(analyze_sector_trend, time='9:30')  # 开盘前更新行业数据

### 新增行业主线分析模块 ###
def analyze_sector_trend(context):
    # 获取申万一级行业数据
    sectors = get_industries(name='sw_l1')
    sector_codes = list(sectors.index)
    
    # 计算行业动量指标
    momentum_scores = {}
    for code in sector_codes:
        # 获取行业指数价格
        prices = get_price(code, end_date=context.current_dt, 
                          count=g.sector_lookback, fields=['close'])
        # 计算相对动量
        momentum = (prices['close'][-1]/prices['close'][0] - 1) * 0.4 + \
                  (prices['close'][-1]/prices['close'][-5] - 1) * 0.3 + \
                  (prices['close'][-1]/prices['close'][-1] - 1) * 0.3
        # 计算资金流向
        money_flow = get_money_flow(code, g.sector_lookback)
        # 计算新闻热度
        news_score = get_news_score(code, context)
        # 综合评分
        total_score = momentum*0.5 + money_flow*0.3 + news_score*0.2
        momentum_scores[code] = total_score
    
    # 选出最强主线
    sorted_sectors = sorted(momentum_scores.items(), key=lambda x: x[1], reverse=True)
    g.main_sector = sorted_sectors[0][0]
    
    # 分析细分板块
    sub_sectors = analyze_sub_sectors(g.main_sector, context)
    g.top_sub_sectors = sub_sectors[:3]
    
    # 记录分析结果
    log.info(f"当前主线:{sectors.loc[g.main_sector,'name']}")
    log.info(f"Top3细分板块:{[s[0] for s in g.top_sub_sectors]}")

def get_money_flow(code, period):
    # 计算行业资金流入强度
    df = get_price(code, end_date=context.current_dt, 
                 count=period, fields=['money_inflow','money_outflow'])
    net_inflow = (df['money_inflow'] - df['money_outflow']).sum()
    return net_inflow / df['money_inflow'].sum()

def get_news_score(code, context):
    # 获取近期新闻并分析政策利好
    news = get_news(code, end_date=context.current_dt, 
                  count=g.news_window)
    keyword_count = 0
    for n in news:
        content = n['content']
        keyword_count += sum([1 for k in g.policy_keywords if k in content])
    return keyword_count / len(g.policy_keywords) if news else 0

def analyze_sub_sectors(main_sector, context):
    # 获取细分板块(申万二级行业)
    sub_sectors = get_industries(name='sw_l2')
    sub_sectors = sub_sectors[sub_sectors['parent'] == main_sector]
    
    # 计算细分板块景气度
    scores = []
    for code in sub_sectors.index:
        # 获取成分股
        stocks = get_industry_stocks(code)
        # 计算板块净利润增长率
        q = query(valuation.code, income.net_profit
                 ).filter(valuation.code.in_(stocks))
        df = get_fundamentals(q, date=context.current_dt)
        growth_rate = df['net_profit'].pct_change().mean()
        # 计算技术面强度
        sector_index = get_price(code, end_date=context.current_dt, 
                               count=30, fields=['close'])
        momentum = sector_index['close'][-1]/sector_index['close'][0] - 1
        # 综合评分
        scores.append((code, growth_rate*0.6 + momentum*0.4))
    
    return sorted(scores, key=lambda x: x[1], reverse=True)

### 修改候选股筛选逻辑 ###
def get_candidates(context):
    # 仅选择主线板块的股票
    main_stocks = get_industry_stocks(g.main_sector)
    filtered_pool = [s for s in g.stock_pool if s in main_stocks]
    
    # 原有筛选逻辑增加细分板块过滤
    candidates = []
    for stock in filtered_pool:
        # 检查是否属于Top3细分板块
        industry = get_industry(stock, 'sw_l2')
        if industry in [s[0] for s in g.top_sub_sectors]:
            # 原有技术指标筛选
            if check_technical(stock):
                candidates.append(stock)
    return candidates[:g.n_stocks*3]

def check_technical(stock):
    # 原有技术指标校验
    close = attribute_history(stock, g.ema_slow+1, '1d', ['close'])
    ema_fast = EMA(close['close'], g.ema_fast)
    ema_slow = EMA(close['close'], g.ema_slow)
    upper, mid, lower = Bollinger_Bands(close['close'], g.boll_period, 2)
    return ema_fast[-1] > ema_slow[-1] and close['close'][-1] < lower[-1]

### 修改仓位管理模块 ###

\

标签

交易策略
评论
  • 你代码都是函数,没有执行过
{link}