BigQuant使用文档

本地生成因子 并提供给线上策略

由bq1fuwkt创建,最终由bq1fuwkt 被浏览 3 用户

问题描述

我想在本地计算自定义因子(如技术指标、基本面因子等),然后上传到 BigQuant 平台,供线上模拟交易策略使用,应该如何实现?

详细解答

BigQuant SDK 提供了 DataSource 类,可以将本地计算的因子数据上传为数据源,然后在线上策略中通过 SQL 查询使用。

方法一:本地计算因子并上传(推荐)

  1. 查询基础数据

首先从平台获取用于计算因子的基础数据,例如 从平安银行、万科A、浦发银行获取 2024 年的历史数据

  import bigquant
  import pandas as pd
  import numpy as np

  # 查询股票历史数据
  result = bigquant.dai.query("""
      SELECT date, instrument, open, high, low, close, volume
      FROM cn_stock_bar1d
      WHERE date >= '2024-01-01' AND date <= '2024-12-31'
        AND instrument IN ('000001.SZ', '000002.SZ', '600000.SH')
      ORDER BY instrument, date
  """)

  df = result.df()
  print(f"查询到 {len(df)} 条数据")
  print(df.head())
  • 关键说明
    • 建议按 instrument, date 排序,方便后续按股票分组计算
  1. 本地计算因子

在本地计算各种技术指标或自定义因子

  # 按股票分组计算移动平均线
  def calculate_factors(group):
      """计算技术指标因子"""
      # 5日均线
      group['ma5'] = group['close'].rolling(window=5).mean()

      # 20日均线
      group['ma20'] = group['close'].rolling(window=20).mean()

      # 5日涨跌幅
      group['return_5d'] = group['close'].pct_change(periods=5)

      # 波动率(20日标准差)
      group['volatility_20d'] = group['close'].rolling(window=20).std()

      # 相对强弱指标 RSI (简化版)
      delta = group['close'].diff()
      gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
      loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
      rs = gain / loss
      group['rsi_14'] = 100 - (100 / (1 + rs))

      return group

  # 按股票分组计算
  df_factors = df.groupby('instrument', group_keys=False).apply(calculate_factors)

  # 删除因滚动窗口产生的 NaN 值
  df_factors = df_factors.dropna()

  print(f"\n计算得到 {len(df_factors)} 条因子数据")
  print(df_factors.head())
  • 关键说明
    • 使用 groupby().apply() 对每只股票单独计算指标
    • 移动平均、标准差等需要足够的历史数据,会产生 NaN
    • 可以根据需求计算任意自定义因子
  1. 上传因子数据到平台

将计算好的因子数据上传为 DataSource:

  # 选择需要上传的列
  factor_cols = ['date', 'instrument', 'ma5', 'ma20', 'return_5d', 'volatility_20d', 'rsi_14']
  df_upload = df_factors[factor_cols].copy()

  # 写入数据源
  ds = bigquant.dai.DataSource.write_bdb(
      data=df_upload,
      id="my_custom_factors_2024",  # 数据源 ID(不指定则创建临时数据源)
      partitioning=["date"],  # 按日期分区(提升查询性能)
      indexes=["instrument"],  # 为 instrument 创建索引
      unique_together=["date", "instrument"],  # 复合唯一约束
      on_duplicates="last",  # 重复数据保留最新的
      sort_by=[("date", "ascending"), ("instrument", "ascending")],  # 排序
      docs={"description": "自定义技术指标因子", "version": "v1.0"},  # 文档说明
      overwrite=True  # 如果已存在则覆盖
  )

  print(f"\n✓ 因子数据已上传")
  print(f"  数据源 ID: {ds.id}")
  print(f"  数据行数: {len(df_upload)}")
  • 关键说明
    • id: 指定数据源 ID,便于在策略中引用;如果不指定则创建临时数据源
    • partitioning: 分区字段,通常使用日期分区以提升查询性能
    • indexes: 索引字段,用于加速查询
    • unique_together: 唯一约束,防止重复数据
    • on_duplicates: 处理重复数据的策略("last" 保留最新,"first" 保留最旧,"error" 报错)
    • overwrite=True: 如果数据源已存在,则覆盖旧数据

方法二:在查询中计算因子并上传

如果因子计算逻辑简单,可以在 SQL 查询时直接计算:

  # 使用 SQL 窗口函数计算移动平均
  result = bigquant.dai.query("""
      SELECT 
          date,
          instrument,
          close,
          AVG(close) OVER (
              PARTITION BY instrument 
              ORDER BY date 
              ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
          ) as ma5,
          AVG(close) OVER (
              PARTITION BY instrument 
              ORDER BY date 
              ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
          ) as ma20
      FROM cn_stock_bar1d
      WHERE date >= '2024-01-01' AND date <= '2024-12-31'
        AND instrument IN ('000001.SZ', '000002.SZ', '600000.SH')
      ORDER BY instrument, date
  """)

  df_factors = result.df()

  # 上传计算结果
  ds = bigquant.dai.DataSource.write_bdb(
      data=df_factors[['date', 'instrument', 'ma5', 'ma20']],
      id="my_sql_factors_2024",
      partitioning=["date"],
      indexes=["instrument"],
      unique_together=["date", "instrument"],
      overwrite=True
  )

  print(f"✓ 因子数据已上传: {ds.id}")
  • 关键说明
    • 使用 SQL 窗口函数 AVG() OVER() 计算移动平均
    • 适用于简单的聚合计算,复杂逻辑建议在本地计算

方法三:使用 UDF 在查询中计算因子

对于复杂的因子计算逻辑,可以定义 UDF(用户自定义函数):

  from bigquant.dai import DaiUDF

  # 定义因子计算函数
  def calculate_rsi(prices):
      """计算 RSI 指标"""
      # 注意:这是简化示例,实际 UDF 需要更复杂的实现
      return prices * 1.5  # 示例计算

  # 在查询中使用 UDF
  result = bigquant.dai.query(
      sql="""
          SELECT 
              date,
              instrument,
              close,
              calculate_rsi(close) as custom_rsi
          FROM cn_stock_bar1d
          WHERE date >= '2024-12-01' AND date <= '2024-12-31'
            AND instrument = '000001.SZ'
      """,
      udf_list=[
          DaiUDF(
              name="calculate_rsi",
              function=calculate_rsi,
              return_type="DOUBLE"
          )
      ]
  )

  df = result.df()
  print(df.head())
  • 关键说明
    • UDF 在云端执行,可以在 SQL 中直接调用
    • 适用于需要在查询时动态计算的场景
    • UDF 函数会被序列化并上传到云端执行

关键概念解释

DataSource

表示 BigQuant 平台上的一个数据源。可以是平台内置的数据(如 cn_stock_bar1d),也可以是用户上传的自定义数据。

DataSource.write_bdb()

将本地数据上传到平台,创建或更新数据源。关键参数:

  • data: pandas DataFrame 或 PyArrow Table
  • id: 数据源 ID(不指定则创建临时数据源)
  • partitioning: 分区字段(通常用日期)
  • indexes: 索引字段(用于加速查询)
  • unique_together: 复合唯一约束
  • on_duplicates: 重复数据处理策略
  • overwrite: 是否覆盖已有数据

DataSource.read_bdb()

从平台读取数据源。关键参数:

  • as_type: 返回类型(pd.DataFrame 或 pa.Table)
  • partition_filter: 分区过滤(元组表示范围,集合表示精确值)
  • columns: 指定读取的列

分区 (Partitioning)

将数据按某个字段(通常是日期)划分存储,可以大幅提升查询性能。查询时只读取相关分区的数据。

索引 (Indexes)

为某个字段创建索引,加速基于该字段的查询和关联操作。

唯一约束 (Unique Together)

确保指定字段的组合在数据中是唯一的,避免重复数据。

临时数据源 vs 持久数据源

  • 临时数据源: id=None,系统自动分配 ID(如 cache_xxx),一段时间后自动清理
  • 持久数据源: 指定 id,永久保存,可在策略中重复使用

数据更新策略

增量更新(推荐)

  # 只计算和上传新增的数据
  latest_date = "2024-12-31"  # 数据源中最新日期

  # 查询新数据
  result = bigquant.dai.query(f"""
      SELECT date, instrument, close, volume
      FROM cn_stock_bar1d
      WHERE date > '{latest_date}'
      ORDER BY instrument, date
  """)

  df_new = result.df()

  # 计算因子
  df_new_factors = df_new.groupby('instrument', group_keys=False).apply(calculate_factors)
  df_new_factors = df_new_factors.dropna()

  # 追加到数据源(on_duplicates="last" 确保更新覆盖)
  ds = bigquant.dai.DataSource.write_bdb(
      data=df_new_factors,
      id="my_custom_factors_2024",
      partitioning=["date"],
      indexes=["instrument"],
      unique_together=["date", "instrument"],
      on_duplicates="last",  # 如果日期+股票重复,保留新数据
      overwrite=False  # 不覆盖整个数据源,只追加/更新
  )

  print(f"✓ 增量更新完成,新增 {len(df_new_factors)} 条数据")

全量覆盖

  # 重新计算所有数据并覆盖
  ds = bigquant.dai.DataSource.write_bdb(
      data=df_all_factors,
      id="my_custom_factors_2024",
      partitioning=["date"],
      indexes=["instrument"],
      unique_together=["date", "instrument"],
      overwrite=True  # 完全覆盖旧数据
  )

  print(f"✓ 全量更新完成")

性能优化建议

  • 使用分区: 按日期分区可以大幅提升查询性能
  • 创建索引: 为常用查询字段(如 instrument)创建索引
  • 减少列数: 只上传策略中需要使用的因子列
  • 数据类型优化: 使用合适的数据类型,避免使用 object 类型
  • 批量上传: 一次上传多个因子,减少网络往返

注意事项

  • 数据源 ID 命名: 使用有意义的 ID,便于在策略中引用(如 my_factors_2024 而非 temp_123)
  • 权限管理: 上传的数据源默认只有自己可以访问


\

标签

模拟交易技术指标
{link}