实时k线缓存
由xuxiaoyin创建,最终由xuxiaoyin 被浏览 1 用户
应用场景
在计算形如均线这样的时序因子时,需要历史的k线数据,所以,我们结合实时数据合成出实时分钟线这篇帖子设计出一个k线缓存的机制。
效果
快速开始
基本用法
# 1. 初始化 Array,保留最近100根K线
array = Array(lookback=100)
# 2. 在策略回调中更新数据
def on_five_minute_bar(context, bar):
context.array.update(bar) # bar 是字典格式
# 3. 等待数据填充完成
if not context.array.inited:
return
# 4. 访问历史数据
close_prices = context.array.close # 获取缓存的收盘价序列
last_close = context.array[0]['close'] # 最新一根K线的收盘价
在bigtrader引擎中的完整实例
import bigtrader
import talib
def initialize(context):
# 创建 Array 实例
context.donchain_array = Array(lookback=20)
context.atr_array = Array(lookback=14)
# 创建 BarGenerator 生成5分钟K线
context.bg = BarGenerator(
trader=context,
window=5,
on_bar=on_five_minute_bar
)
def on_five_minute_bar(context, bar):
"""5分钟K线回调"""
# 项数组推送5分钟k线
context.donchain_array.update(bar)
context.atr_array.update(bar)
# 检查是否初始化完成
if not context.donchain_array.inited:
return
# 计算唐奇安通道
upper = max(context.donchain_array.high)
lower = min(context.donchain_array.low)
# 计算ATR
atr = talib.ATR(
context.atr_array.high,
context.atr_array.low,
context.atr_array.close,
timeperiod=14
)[-1]
print(f"上轨: {upper}, 下轨: {lower}, ATR: {atr}")
def handle_tick(context, tick):
# 将tick数据喂给BarGenerator
context.bg.update_tick(tick)
应用场景
1. 唐奇安通道突破
def callback_function(context, bar):
"""回调函数为BarGenerator指定的回调函数"""
context.array.update(bar)
if not context.array.inited:
return
# 计算20日通道
upper = max(context.array.high[-20:])
lower = min(context.array.low[-20:])
current_price = bar['close']
# 突破上轨做多
if current_price > upper:
context.order_target_percent(context.symbol, 1.0)
# 跌破下轨做空
elif current_price < lower:
context.order_target_percent(context.symbol, -1.0)
2. 多周期指标计算
def initialize(context):
context.array_1m = Array(lookback=60) # 1分钟
context.array_5m = Array(lookback=100) # 5分钟
# 创建 BarGenerator 生成分钟K线
context.bg_1m = BarGenerator(
trader=context,
window=1,
on_bar=on_bar_1m
)
context.bg_5m = BarGenerator(
trader=context,
window=1,
on_bar=on_bar_5m
)
def on_bar_1m(context, bar):
context.array_1m.update(bar)
if context.array_1m.inited:
# 计算1分钟RSI
rsi_1m = talib.RSI(context.array_1m.close, timeperiod=14)[-1]
def on_bar_5m(context, bar):
context.array_5m.update(bar)
if context.array_5m.inited:
# 计算5分钟MACD
macd, signal, hist = talib.MACD(context.array_5m.close)
def handle_tick(context, tick):
context.bg_1m.update_tick(tick)
context.bg_5m.update_tick(tick)
进阶用法
扩展Array类添加自定义字段
class ExtendedArray(Array):
@property
def vwap(self):
"""成交量加权平均价"""
amount = self._get_series('amount')
volume = self._get_series('volume')
return amount / volume
# 使用
array = ExtendedArray(lookback=100)
vwap = array.vwap # 直接访问 VWAP
完整代码
https://bigquant.com/codesharev3/2cf31c0c-806f-4662-95d5-b2f2a463112d
\