BigQuant使用文档

BigQuant-SDK API 手册

由small_q创建,最终由small_q 被浏览 707 用户

BigQuant Financial Quantitative Toolbox - 金融量化工具箱 Python SDK


目录

  1. 顶层模块 (bigquant)
  2. 数据模块 (dai)
  3. 模拟交易模块 (papertrading)(暂未开放)
  4. AIStudio 模块 (aistudio)(暂未开放)
  5. 分布式计算模块 (fai)(暂未开放)
  6. 错误处理

1. 顶层模块 (bigquant)

import bigquant

bigquant.init()

bigquant.init(
    *,
    ak: str | None = None,
    sk: str | None = None,
    token: str | None = None,
) -> None

显式初始化 SDK。

参数:

参数 类型 说明
ak str Access Key
sk str Secret Key
token str 认证 Token(仅支持 HTTP,不支持 Arrow Flight)

ak/sktoken 二选一,ak/sk 支持全部功能。

示例:

bigquant.init(ak="YOUR_AK", sk="YOUR_SK")

bigquant.init_from_config()

bigquant.init_from_config(path: str | None = None) -> None

从配置文件初始化 SDK。

参数:

参数 类型 默认值 说明
path str | None None 配置文件路径,默认 ~/.bigquant/config.json

异常:

  • FileNotFoundError:配置文件不存在
  • ValueError:配置文件中没有 ak/sk 或 token

bigquant.init_from_token()

bigquant.init_from_token(token: str) -> None

使用 Token 初始化 SDK(仅支持 HTTP 接口)。


bigquant.init_from_password()

bigquant.init_from_password(username: str, password: str) -> None

使用用户名密码登录初始化 SDK。


bigquant.whoami()

bigquant.whoami() -> dict | None

获取当前登录用户信息。

返回值:

{
    "id": 12345,
    "username": "your_name",
    "email": "you@example.com",
    # ... 其他字段
}

bigquant.close()

bigquant.close() -> None

关闭所有客户端连接(HTTP + Arrow Flight)。


2. 数据模块 (dai)

from bigquant import dai

dai.query()

dai.query(
    sql: str,
    *,
    full_db_scan: bool = False,
    filters: dict[str, list] | None = None,
) -> QueryResult

执行 SQL 查询,通过 Arrow Flight 直连云端数据(高性能)。

参数:

参数 类型 默认值 说明
sql str 必填 SQL 查询语句
full_db_scan bool False 是否允许全表扫描(无 filters 时必须设为 True)
filters dict | None None 分区过滤条件,格式 {"column": ["val1", "val2"]}

返回值: QueryResult

异常:

  • AssertionError:Token 认证时使用 Arrow Flight 模式(需要 AK/SK)

示例:

# 基础查询(带 filters 避免全表扫描)
result = dai.query(
    "SELECT * FROM cn_stock_bar1d WHERE close > 10",
    filters={"date": ["2024-01-01", "2024-12-31"]},
)

QueryResult

dai.query() 的返回值,提供多种格式转换方法。

QueryResult.df()

result.df() -> pd.DataFrame

转换为 pandas DataFrame。重复列名自动添加 _N 后缀。

QueryResult.arrow()

result.arrow() -> pa.Table

返回 PyArrow Table。

QueryResult.pl()

result.pl() -> polars.DataFrame

转换为 Polars DataFrame。

QueryResult.fetchall()

result.fetchall() -> list[dict]

返回所有行的列表,每行为字典。

QueryResult.fetch_arrow_reader()

result.fetch_arrow_reader(batch_size: int = 1_000_000) -> pa.RecordBatchReader

获取流式读取器,适合大数据场景分批读取。


DataSource

数据源管理类,支持读写自定义数据。

from bigquant import dai

DataSource(datasource_id)

DataSource(datasource_id: str)

通过 ID 引用已有数据源。

DataSource.read_bdb()

ds.read_bdb(
    as_type: type = pa.Table,
    partition_filter: dict[str, tuple | set] | None = None,
    columns: list[str] | None = None,
) -> pd.DataFrame | pa.Table

读取数据。

参数:

参数 类型 默认值 说明
as_type type pa.Table 返回类型,支持 pd.DataFramepa.Table
partition_filter dict | None None 分区过滤:tuple 表示范围,set 表示枚举值
columns list[str] | None None 指定读取的列,None 读取全部

示例:

ds = DataSource("my_signal_data")
df = ds.read_bdb(
    as_type=pd.DataFrame,
    partition_filter={
        "date": ("2024-01-01", "2024-12-31"),   # 范围
        "instrument": {"000001.SZ", "600519.SH"}, # 枚举
    },
    columns=["date", "instrument", "my_signal"],
)

DataSource.write_bdb()

DataSource.write_bdb(
    data: pd.DataFrame | pa.Table,
    *,
    id: str | None = None,
    partitioning: list[str] | None = None,
    indexes: list[str] | None = None,
    overwrite: bool = False,
    on_duplicates: str = "last",
    sort_by: list[tuple[str, str]] | None = None,
    docs: dict | None = None,
    timeout: int = 300,
    update_msg: str | None = None,
) -> DataSource

写入数据,返回 DataSource 对象。

参数:

参数 类型 默认值 说明
data DataFrame | Table 必填 要写入的数据
id str | None None 数据源 ID,None 创建临时数据源
partitioning list[str] | None None 分区列
indexes list[str] | None None 索引列
overwrite bool False 是否覆盖已有数据
on_duplicates str "last" 重复处理策略:"last", "first", "error", "none"
sort_by list[tuple] | None None 排序,如 [("date", "ascending")]
timeout int 300 写入锁超时(秒)

异常:

  • TypeErrordata 类型不是 DataFrame 或 Table
  • ValueErrordata 为空

dai.search_datasources()

dai.search_datasources(
    keyword: str,
    category: str | None = None,
    page: int = 1,
    size: int = 20,
    fetch_all: bool = False,
) -> list[dict]

搜索平台数据表,支持中文/英文关键词。

参数:

参数 类型 默认值 说明
keyword str 必填 搜索关键词
category str | None None 按分类前缀过滤,如 /金融数据/股票
page int 1 页码
size int 20 每页数量
fetch_all bool False 是否自动翻页获取全部结果

dai.get_datasource_schema()

dai.get_datasource_schema(datasource_id: str) -> dict

获取数据表字段 schema。

返回值:

{
    "datasource_id": "cn_stock_bar1d",
    "cn_name": "股票日线行情",
    "description": "...",
    "fields": [
        {
            "name": "date",
            "type": "string",
            "description": "交易日期",
            "primary": True,
            "rank": 1,
        },
        # ...
    ]
}

dai.get_datasource_updates()

dai.get_datasource_updates(
    datasource_id: str,
    page: int = 1,
    size: int = 20,
) -> list[dict]

获取数据表更新记录。每条记录包含 created_atdata.rowsdata.cols


dai.list_datasources()

dai.list_datasources(
    constraints: dict | None = None,
    page: int = 1,
    size: int = 100,
    order_by: str = "-updated_at",
) -> list[dict]

获取当前用户创建的 DataSource 列表。


dai.list_user_datasources()

dai.list_user_datasources(
    constraints: dict | None = None,
    page: int = 1,
    size: int = 100,
    order_by: str = "-updated_at",
) -> list[dict]

获取当前用户订阅的 DataSource 列表。


dai.list_space_datasources()

dai.list_space_datasources(
    space_id: str | None = None,
    constraints: dict | None = None,
    page: int = 1,
    size: int = 100,
    order_by: str = "-updated_at",
) -> list[dict]

获取指定空间已上架的 DataSource 列表。


3. 模拟交易模块 (papertrading)

暂未开放:PaperTrading SDK 接口正在开发中,以下 API 文档仅供参考,暂不可用。

from bigquant import papertrading

papertrading.list()

papertrading.list(
    constraints: dict | None = None,
    order_by: list[str] | None = None,
    include_fields: list[str] | None = None,
    page: int = 1,
    size: int = 1000,
) -> dict

获取策略列表。

返回值:

{
    "items": [...],   # 策略列表
    "count": 10,      # 总数量
    "page": 1,
    "size": 20,
    "total": 1,       # 总页数
}

papertrading.get()

papertrading.get(strategy_id: str) -> Strategy

获取策略对象。

异常:

  • ValueError:策略不存在

papertrading.delete()

papertrading.delete(strategy_id: str) -> dict

删除策略(含取消订阅数据、删除任务)。已分享的策略需先调用 unshare()


papertrading.unshare()

papertrading.unshare(strategy_id: str) -> dict

取消策略分享。


Strategy 对象

通过 papertrading.get(strategy_id) 获取。

strategy.strategy_info

策略基本信息字典,包含 idstrategy_namestatusfrequencybenchmark_instrument 等字段。

strategy.get_positions()

strategy.get_positions(
    constraints: dict | None = None,
    order_by: list[str] | None = None,
    page: int = 1,
    size: int = 100,
) -> dict

获取持仓信息。返回 {"items": [...], "count": N, ...}

每条持仓包含:instrumentnamecurrent_qtycost_pricelast_pricemarket_valueposition_pnlprofit_ratioposi_direction

strategy.get_orders()

strategy.get_orders(
    constraints: dict | None = None,
    order_by: list[str] | None = None,
    page: int = 1,
    size: int = 100,
) -> dict

获取已成交订单。每条订单包含:instrumentdirection(1=买/2=卖)、order_qtyorder_priceaverage_pricefilled_qtystatus_msg

strategy.get_planned_orders()

strategy.get_planned_orders(
    constraints: dict | None = None,
    order_by: list[str] | None = None,
    page: int = 1,
    size: int = 100,
) -> dict

获取计划订单(待交易信号)。

strategy.get_performances()

strategy.get_performances(fields: list[str] | None = None) -> list

获取绩效数据列表,每条为按字段顺序排列的列表:

索引 字段 说明
0 trading_day 交易日
1 today_return 当日收益
2 max_drawdown 最大回撤
3 annualized_return 年化收益
4 risk_indicator 风险指标(dict,含 sharpe
5 win_percent 胜率
6 cumulative_return 累计收益
7 benchmark 基准收益
8 total_market_value 总市值
9 balance 账户余额
10 month_return 近一月收益
11 week_return 近一周收益
12 three_month_return 近三月收益
13 six_month_return 近六月收益
14 created_at 记录创建时间
15 portfolio_value 组合总价值

4. AIStudio 模块 (aistudio)

暂未开放:AIStudio SDK 接口正在开发中,以下 API 文档仅供参考,暂不可用。

from bigquant import aistudio

aistudio.list()

aistudio.list(space_id: str = DEFAULT_SPACE_ID) -> list[dict]

获取可用资源规格列表。

返回值每条包含:

字段 说明
id 规格 ID(UUID)
name 规格名称,如 D0(1C/6G)
cpu CPU 核数
memory 内存(GB)
gpu GPU 数量
price 价格(BQC/分钟)
type 类型:free / saas / bigpro

aistudio.start()

aistudio.start(
    resource_spec_id: str = DEFAULT_RESOURCE_SPEC_ID,
    space_id: str = DEFAULT_SPACE_ID,
) -> AIStudio

启动 AIStudio 实例。默认使用免费规格 D0(1C/6G)。

注意: 需要 AK/SK 认证,Token 认证不支持。

异常:

  • RuntimeError:60 秒内无法启动

AIStudio 对象

通过 aistudio.start() 获取。

studio.run()

studio.run(
    script_or_code: str,
    *,
    is_code: bool = False,
    timeout: int = 300,
) -> str

在云端执行脚本或代码。

参数:

参数 类型 默认值 说明
script_or_code str 必填 文件绝对路径或代码字符串
is_code bool False True=代码字符串,False=文件路径
timeout int 300 执行超时(秒)

支持 .py.ipynb 文件。

异常:

  • ValueError:文件路径不是绝对路径
  • FileNotFoundError:文件不存在
  • RuntimeError:执行失败或超时

studio.close()

studio.close() -> None

关闭 AIStudio 实例,释放云端资源。


5. 分布式计算模块 (fai)

暂未开放:FAI 分布式计算 SDK 接口正在开发中,以下 API 文档仅供参考,暂不可用。

from bigquant import fai

需要额外安装:pip install 'bigquant[fai]' -i https://pypi.bigquant.com/simple/

fai.create_cluster()

fai.create_cluster(
    cluster_name: str,
    num_workers: int,
    worker_cpus: int,
    worker_memory: str | int,
    worker_gpus: int = 0,
    image: str | None = "",
    max_no_fai_run_time: int = 300,
) -> Cluster

创建分布式计算集群。

参数:

参数 类型 默认值 说明
cluster_name str 必填 集群名称
num_workers int 必填 Worker 数量
worker_cpus int 必填 每个 Worker 的 CPU 核数
worker_memory str | int 必填 每个 Worker 的内存,如 "8G" 或字节数
worker_gpus int 0 每个 Worker 的 GPU 数量
max_no_fai_run_time int 300 最大空闲时间(秒),超时自动停止

fai.get_cluster()

fai.get_cluster(cluster_id: str) -> Cluster

获取已有集群对象。


fai.list_clusters()

fai.list_clusters(all_space: bool = False) -> list[dict]

获取集群列表。


fai.remote

@fai.remote
def my_func(x):
    return x * 2

# 带参数
@fai.remote(num_cpus=2, memory="4G")
def heavy_func(data):
    ...

将函数标记为远程执行。支持 Ray 的所有 remote 选项(num_cpusnum_gpusmemorymax_retries 等)。


fai.get()

fai.get(object_refs) -> Any

获取远程函数执行结果(阻塞等待)。


fai.wait()

fai.wait(
    object_refs: list,
    num_returns: int = 1,
    timeout: float | None = None,
) -> tuple[list, list]

等待部分远程任务完成,返回 (ready, not_ready) 元组。


fai.put()

fai.put(value) -> ObjectRef

将对象存入分布式对象存储,返回引用。


fai.shutdown()

fai.shutdown() -> None

关闭 Ray 连接。


Cluster 对象

通过 fai.create_cluster()fai.get_cluster() 获取。

cluster.init()

cluster.init() -> None

连接到集群(自动启动并等待就绪)。必须在使用 @fai.remote 前调用。

cluster.start_cluster() / stop_cluster()

cluster.start_cluster() -> dict
cluster.stop_cluster() -> dict

启动/停止集群。

cluster.delete_cluster()

cluster.delete_cluster() -> dict

删除集群,释放所有资源。

cluster.wait_cluster()

cluster.wait_cluster(
    status: str,
    min_available_workers: int = 1,
    timeout: int = 120,
) -> None

等待集群达到指定状态("Running""Stopped")。


6. 错误处理

HTTP 状态码 原因 解决办法
401 认证失败 检查 AK/SK 配置,重新执行 bq auth configure
403 无权限 联系管理员开通权限
404 资源不存在 检查资源 ID 是否正确
500+ 服务器错误 重试或联系技术支持

常见 Python 异常:

异常类型 触发场景
FileNotFoundError 配置文件或脚本文件不存在
ValueError 参数格式错误,如 AK/SK 格式不对
AssertionError Token 认证时使用了需要 AK/SK 的功能
RuntimeError 网络错误、服务超时、执行失败
TypeError 传入了错误类型的数据

\

文档

Auth 用户管理 APIDai 数据管理 APIPapertrading 模拟交易 API
评论
  • 这是怎么回事: AttributeError: module 'bigquant' has no attribute 'dai',是需要开通什么权限吗?
  • 请问您找到原因了吗,我也出现这个情况
  • 联系管理员开通权限
{link}