BigQuant 2026年度私享会

实时k线缓存

由xuxiaoyin创建,最终由xuxiaoyin 被浏览 1 用户

应用场景

在计算形如均线这样的时序因子时,需要历史的k线数据,所以,我们结合实时数据合成出实时分钟线这篇帖子设计出一个k线缓存的机制。

效果

快速开始

基本用法

# 1. 初始化 Array,保留最近100根K线
array = Array(lookback=100)

# 2. 在策略回调中更新数据
def on_five_minute_bar(context, bar):
    context.array.update(bar)  # bar 是字典格式

    # 3. 等待数据填充完成
    if not context.array.inited:
        return

    # 4. 访问历史数据
    close_prices = context.array.close      # 获取缓存的收盘价序列
    last_close = context.array[0]['close']  # 最新一根K线的收盘价

在bigtrader引擎中的完整实例

import bigtrader
import talib

def initialize(context):
    # 创建 Array 实例
    context.donchain_array = Array(lookback=20)
    context.atr_array = Array(lookback=14)

    # 创建 BarGenerator 生成5分钟K线
    context.bg = BarGenerator(
        trader=context,
        window=5,
        on_bar=on_five_minute_bar
    )

def on_five_minute_bar(context, bar):
    """5分钟K线回调"""
    # 项数组推送5分钟k线
    context.donchain_array.update(bar)
    context.atr_array.update(bar)

    # 检查是否初始化完成
    if not context.donchain_array.inited:
        return

    # 计算唐奇安通道
    upper = max(context.donchain_array.high)
    lower = min(context.donchain_array.low)

    # 计算ATR
    atr = talib.ATR(
        context.atr_array.high,
        context.atr_array.low,
        context.atr_array.close,
        timeperiod=14
    )[-1]

    print(f"上轨: {upper}, 下轨: {lower}, ATR: {atr}")

def handle_tick(context, tick):
    # 将tick数据喂给BarGenerator
    context.bg.update_tick(tick)

应用场景

1. 唐奇安通道突破

def callback_function(context, bar):
    """回调函数为BarGenerator指定的回调函数"""
    context.array.update(bar)

    if not context.array.inited:
        return

    # 计算20日通道
    upper = max(context.array.high[-20:])
    lower = min(context.array.low[-20:])

    current_price = bar['close']

    # 突破上轨做多
    if current_price > upper:
        context.order_target_percent(context.symbol, 1.0)

    # 跌破下轨做空
    elif current_price < lower:
        context.order_target_percent(context.symbol, -1.0)

2. 多周期指标计算

def initialize(context):
    context.array_1m = Array(lookback=60)   # 1分钟
    context.array_5m = Array(lookback=100)  # 5分钟
    
    # 创建 BarGenerator 生成分钟K线
    context.bg_1m = BarGenerator(
        trader=context,
        window=1,
        on_bar=on_bar_1m
    )
    
    context.bg_5m = BarGenerator(
        trader=context,
        window=1,
        on_bar=on_bar_5m
    )  

def on_bar_1m(context, bar):
    context.array_1m.update(bar)

    if context.array_1m.inited:
        # 计算1分钟RSI
        rsi_1m = talib.RSI(context.array_1m.close, timeperiod=14)[-1]

def on_bar_5m(context, bar):
    context.array_5m.update(bar)

    if context.array_5m.inited:
        # 计算5分钟MACD
        macd, signal, hist = talib.MACD(context.array_5m.close)
        
def handle_tick(context, tick):
    context.bg_1m.update_tick(tick)
    context.bg_5m.update_tick(tick)

进阶用法

扩展Array类添加自定义字段

class ExtendedArray(Array):
    @property
    def vwap(self):
        """成交量加权平均价"""
        amount = self._get_series('amount')
        volume = self._get_series('volume')
        return amount / volume

# 使用
array = ExtendedArray(lookback=100)
vwap = array.vwap  # 直接访问 VWAP

完整代码

https://bigquant.com/codesharev3/2cf31c0c-806f-4662-95d5-b2f2a463112d

\

{link}