BigQuant-SDK API 手册
由small_q创建,最终由small_q 被浏览 707 用户
BigQuant Financial Quantitative Toolbox - 金融量化工具箱 Python SDK
目录
- 顶层模块 (bigquant)
- 数据模块 (dai)
- 模拟交易模块 (papertrading)(暂未开放)
- AIStudio 模块 (aistudio)(暂未开放)
- 分布式计算模块 (fai)(暂未开放)
- 错误处理
1. 顶层模块 (bigquant)
import bigquant
bigquant.init()
bigquant.init(
*,
ak: str | None = None,
sk: str | None = None,
token: str | None = None,
) -> None
显式初始化 SDK。
参数:
| 参数 | 类型 | 说明 |
|---|---|---|
ak |
str |
Access Key |
sk |
str |
Secret Key |
token |
str |
认证 Token(仅支持 HTTP,不支持 Arrow Flight) |
ak/sk 和 token 二选一,ak/sk 支持全部功能。
示例:
bigquant.init(ak="YOUR_AK", sk="YOUR_SK")
bigquant.init_from_config()
bigquant.init_from_config(path: str | None = None) -> None
从配置文件初始化 SDK。
参数:
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
path |
str | None |
None |
配置文件路径,默认 ~/.bigquant/config.json |
异常:
FileNotFoundError:配置文件不存在ValueError:配置文件中没有 ak/sk 或 token
bigquant.init_from_token()
bigquant.init_from_token(token: str) -> None
使用 Token 初始化 SDK(仅支持 HTTP 接口)。
bigquant.init_from_password()
bigquant.init_from_password(username: str, password: str) -> None
使用用户名密码登录初始化 SDK。
bigquant.whoami()
bigquant.whoami() -> dict | None
获取当前登录用户信息。
返回值:
{
"id": 12345,
"username": "your_name",
"email": "you@example.com",
# ... 其他字段
}
bigquant.close()
bigquant.close() -> None
关闭所有客户端连接(HTTP + Arrow Flight)。
2. 数据模块 (dai)
from bigquant import dai
dai.query()
dai.query(
sql: str,
*,
full_db_scan: bool = False,
filters: dict[str, list] | None = None,
) -> QueryResult
执行 SQL 查询,通过 Arrow Flight 直连云端数据(高性能)。
参数:
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
sql |
str |
必填 | SQL 查询语句 |
full_db_scan |
bool |
False |
是否允许全表扫描(无 filters 时必须设为 True) |
filters |
dict | None |
None |
分区过滤条件,格式 {"column": ["val1", "val2"]} |
返回值: QueryResult
异常:
AssertionError:Token 认证时使用 Arrow Flight 模式(需要 AK/SK)
示例:
# 基础查询(带 filters 避免全表扫描)
result = dai.query(
"SELECT * FROM cn_stock_bar1d WHERE close > 10",
filters={"date": ["2024-01-01", "2024-12-31"]},
)
QueryResult
dai.query() 的返回值,提供多种格式转换方法。
QueryResult.df()
result.df() -> pd.DataFrame
转换为 pandas DataFrame。重复列名自动添加 _N 后缀。
QueryResult.arrow()
result.arrow() -> pa.Table
返回 PyArrow Table。
QueryResult.pl()
result.pl() -> polars.DataFrame
转换为 Polars DataFrame。
QueryResult.fetchall()
result.fetchall() -> list[dict]
返回所有行的列表,每行为字典。
QueryResult.fetch_arrow_reader()
result.fetch_arrow_reader(batch_size: int = 1_000_000) -> pa.RecordBatchReader
获取流式读取器,适合大数据场景分批读取。
DataSource
数据源管理类,支持读写自定义数据。
from bigquant import dai
DataSource(datasource_id)
DataSource(datasource_id: str)
通过 ID 引用已有数据源。
DataSource.read_bdb()
ds.read_bdb(
as_type: type = pa.Table,
partition_filter: dict[str, tuple | set] | None = None,
columns: list[str] | None = None,
) -> pd.DataFrame | pa.Table
读取数据。
参数:
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
as_type |
type |
pa.Table |
返回类型,支持 pd.DataFrame 或 pa.Table |
partition_filter |
dict | None |
None |
分区过滤:tuple 表示范围,set 表示枚举值 |
columns |
list[str] | None |
None |
指定读取的列,None 读取全部 |
示例:
ds = DataSource("my_signal_data")
df = ds.read_bdb(
as_type=pd.DataFrame,
partition_filter={
"date": ("2024-01-01", "2024-12-31"), # 范围
"instrument": {"000001.SZ", "600519.SH"}, # 枚举
},
columns=["date", "instrument", "my_signal"],
)
DataSource.write_bdb()
DataSource.write_bdb(
data: pd.DataFrame | pa.Table,
*,
id: str | None = None,
partitioning: list[str] | None = None,
indexes: list[str] | None = None,
overwrite: bool = False,
on_duplicates: str = "last",
sort_by: list[tuple[str, str]] | None = None,
docs: dict | None = None,
timeout: int = 300,
update_msg: str | None = None,
) -> DataSource
写入数据,返回 DataSource 对象。
参数:
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
data |
DataFrame | Table |
必填 | 要写入的数据 |
id |
str | None |
None |
数据源 ID,None 创建临时数据源 |
partitioning |
list[str] | None |
None |
分区列 |
indexes |
list[str] | None |
None |
索引列 |
overwrite |
bool |
False |
是否覆盖已有数据 |
on_duplicates |
str |
"last" |
重复处理策略:"last", "first", "error", "none" |
sort_by |
list[tuple] | None |
None |
排序,如 [("date", "ascending")] |
timeout |
int |
300 |
写入锁超时(秒) |
异常:
TypeError:data类型不是 DataFrame 或 TableValueError:data为空
dai.search_datasources()
dai.search_datasources(
keyword: str,
category: str | None = None,
page: int = 1,
size: int = 20,
fetch_all: bool = False,
) -> list[dict]
搜索平台数据表,支持中文/英文关键词。
参数:
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
keyword |
str |
必填 | 搜索关键词 |
category |
str | None |
None |
按分类前缀过滤,如 /金融数据/股票 |
page |
int |
1 |
页码 |
size |
int |
20 |
每页数量 |
fetch_all |
bool |
False |
是否自动翻页获取全部结果 |
dai.get_datasource_schema()
dai.get_datasource_schema(datasource_id: str) -> dict
获取数据表字段 schema。
返回值:
{
"datasource_id": "cn_stock_bar1d",
"cn_name": "股票日线行情",
"description": "...",
"fields": [
{
"name": "date",
"type": "string",
"description": "交易日期",
"primary": True,
"rank": 1,
},
# ...
]
}
dai.get_datasource_updates()
dai.get_datasource_updates(
datasource_id: str,
page: int = 1,
size: int = 20,
) -> list[dict]
获取数据表更新记录。每条记录包含 created_at、data.rows、data.cols。
dai.list_datasources()
dai.list_datasources(
constraints: dict | None = None,
page: int = 1,
size: int = 100,
order_by: str = "-updated_at",
) -> list[dict]
获取当前用户创建的 DataSource 列表。
dai.list_user_datasources()
dai.list_user_datasources(
constraints: dict | None = None,
page: int = 1,
size: int = 100,
order_by: str = "-updated_at",
) -> list[dict]
获取当前用户订阅的 DataSource 列表。
dai.list_space_datasources()
dai.list_space_datasources(
space_id: str | None = None,
constraints: dict | None = None,
page: int = 1,
size: int = 100,
order_by: str = "-updated_at",
) -> list[dict]
获取指定空间已上架的 DataSource 列表。
3. 模拟交易模块 (papertrading)
暂未开放:PaperTrading SDK 接口正在开发中,以下 API 文档仅供参考,暂不可用。
from bigquant import papertrading
papertrading.list()
papertrading.list(
constraints: dict | None = None,
order_by: list[str] | None = None,
include_fields: list[str] | None = None,
page: int = 1,
size: int = 1000,
) -> dict
获取策略列表。
返回值:
{
"items": [...], # 策略列表
"count": 10, # 总数量
"page": 1,
"size": 20,
"total": 1, # 总页数
}
papertrading.get()
papertrading.get(strategy_id: str) -> Strategy
获取策略对象。
异常:
ValueError:策略不存在
papertrading.delete()
papertrading.delete(strategy_id: str) -> dict
删除策略(含取消订阅数据、删除任务)。已分享的策略需先调用 unshare()。
papertrading.unshare()
papertrading.unshare(strategy_id: str) -> dict
取消策略分享。
Strategy 对象
通过 papertrading.get(strategy_id) 获取。
strategy.strategy_info
策略基本信息字典,包含 id、strategy_name、status、frequency、benchmark_instrument 等字段。
strategy.get_positions()
strategy.get_positions(
constraints: dict | None = None,
order_by: list[str] | None = None,
page: int = 1,
size: int = 100,
) -> dict
获取持仓信息。返回 {"items": [...], "count": N, ...}。
每条持仓包含:instrument、name、current_qty、cost_price、last_price、market_value、position_pnl、profit_ratio、posi_direction。
strategy.get_orders()
strategy.get_orders(
constraints: dict | None = None,
order_by: list[str] | None = None,
page: int = 1,
size: int = 100,
) -> dict
获取已成交订单。每条订单包含:instrument、direction(1=买/2=卖)、order_qty、order_price、average_price、filled_qty、status_msg。
strategy.get_planned_orders()
strategy.get_planned_orders(
constraints: dict | None = None,
order_by: list[str] | None = None,
page: int = 1,
size: int = 100,
) -> dict
获取计划订单(待交易信号)。
strategy.get_performances()
strategy.get_performances(fields: list[str] | None = None) -> list
获取绩效数据列表,每条为按字段顺序排列的列表:
| 索引 | 字段 | 说明 |
|---|---|---|
| 0 | trading_day |
交易日 |
| 1 | today_return |
当日收益 |
| 2 | max_drawdown |
最大回撤 |
| 3 | annualized_return |
年化收益 |
| 4 | risk_indicator |
风险指标(dict,含 sharpe) |
| 5 | win_percent |
胜率 |
| 6 | cumulative_return |
累计收益 |
| 7 | benchmark |
基准收益 |
| 8 | total_market_value |
总市值 |
| 9 | balance |
账户余额 |
| 10 | month_return |
近一月收益 |
| 11 | week_return |
近一周收益 |
| 12 | three_month_return |
近三月收益 |
| 13 | six_month_return |
近六月收益 |
| 14 | created_at |
记录创建时间 |
| 15 | portfolio_value |
组合总价值 |
4. AIStudio 模块 (aistudio)
暂未开放:AIStudio SDK 接口正在开发中,以下 API 文档仅供参考,暂不可用。
from bigquant import aistudio
aistudio.list()
aistudio.list(space_id: str = DEFAULT_SPACE_ID) -> list[dict]
获取可用资源规格列表。
返回值每条包含:
| 字段 | 说明 |
|---|---|
id |
规格 ID(UUID) |
name |
规格名称,如 D0(1C/6G) |
cpu |
CPU 核数 |
memory |
内存(GB) |
gpu |
GPU 数量 |
price |
价格(BQC/分钟) |
type |
类型:free / saas / bigpro |
aistudio.start()
aistudio.start(
resource_spec_id: str = DEFAULT_RESOURCE_SPEC_ID,
space_id: str = DEFAULT_SPACE_ID,
) -> AIStudio
启动 AIStudio 实例。默认使用免费规格 D0(1C/6G)。
注意: 需要 AK/SK 认证,Token 认证不支持。
异常:
RuntimeError:60 秒内无法启动
AIStudio 对象
通过 aistudio.start() 获取。
studio.run()
studio.run(
script_or_code: str,
*,
is_code: bool = False,
timeout: int = 300,
) -> str
在云端执行脚本或代码。
参数:
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
script_or_code |
str |
必填 | 文件绝对路径或代码字符串 |
is_code |
bool |
False |
True=代码字符串,False=文件路径 |
timeout |
int |
300 |
执行超时(秒) |
支持 .py 和 .ipynb 文件。
异常:
ValueError:文件路径不是绝对路径FileNotFoundError:文件不存在RuntimeError:执行失败或超时
studio.close()
studio.close() -> None
关闭 AIStudio 实例,释放云端资源。
5. 分布式计算模块 (fai)
暂未开放:FAI 分布式计算 SDK 接口正在开发中,以下 API 文档仅供参考,暂不可用。
from bigquant import fai
需要额外安装:
pip install 'bigquant[fai]' -i https://pypi.bigquant.com/simple/
fai.create_cluster()
fai.create_cluster(
cluster_name: str,
num_workers: int,
worker_cpus: int,
worker_memory: str | int,
worker_gpus: int = 0,
image: str | None = "",
max_no_fai_run_time: int = 300,
) -> Cluster
创建分布式计算集群。
参数:
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
cluster_name |
str |
必填 | 集群名称 |
num_workers |
int |
必填 | Worker 数量 |
worker_cpus |
int |
必填 | 每个 Worker 的 CPU 核数 |
worker_memory |
str | int |
必填 | 每个 Worker 的内存,如 "8G" 或字节数 |
worker_gpus |
int |
0 |
每个 Worker 的 GPU 数量 |
max_no_fai_run_time |
int |
300 |
最大空闲时间(秒),超时自动停止 |
fai.get_cluster()
fai.get_cluster(cluster_id: str) -> Cluster
获取已有集群对象。
fai.list_clusters()
fai.list_clusters(all_space: bool = False) -> list[dict]
获取集群列表。
fai.remote
@fai.remote
def my_func(x):
return x * 2
# 带参数
@fai.remote(num_cpus=2, memory="4G")
def heavy_func(data):
...
将函数标记为远程执行。支持 Ray 的所有 remote 选项(num_cpus、num_gpus、memory、max_retries 等)。
fai.get()
fai.get(object_refs) -> Any
获取远程函数执行结果(阻塞等待)。
fai.wait()
fai.wait(
object_refs: list,
num_returns: int = 1,
timeout: float | None = None,
) -> tuple[list, list]
等待部分远程任务完成,返回 (ready, not_ready) 元组。
fai.put()
fai.put(value) -> ObjectRef
将对象存入分布式对象存储,返回引用。
fai.shutdown()
fai.shutdown() -> None
关闭 Ray 连接。
Cluster 对象
通过 fai.create_cluster() 或 fai.get_cluster() 获取。
cluster.init()
cluster.init() -> None
连接到集群(自动启动并等待就绪)。必须在使用 @fai.remote 前调用。
cluster.start_cluster() / stop_cluster()
cluster.start_cluster() -> dict
cluster.stop_cluster() -> dict
启动/停止集群。
cluster.delete_cluster()
cluster.delete_cluster() -> dict
删除集群,释放所有资源。
cluster.wait_cluster()
cluster.wait_cluster(
status: str,
min_available_workers: int = 1,
timeout: int = 120,
) -> None
等待集群达到指定状态("Running" 或 "Stopped")。
6. 错误处理
| HTTP 状态码 | 原因 | 解决办法 |
|---|---|---|
| 401 | 认证失败 | 检查 AK/SK 配置,重新执行 bq auth configure |
| 403 | 无权限 | 联系管理员开通权限 |
| 404 | 资源不存在 | 检查资源 ID 是否正确 |
| 500+ | 服务器错误 | 重试或联系技术支持 |
常见 Python 异常:
| 异常类型 | 触发场景 |
|---|---|
FileNotFoundError |
配置文件或脚本文件不存在 |
ValueError |
参数格式错误,如 AK/SK 格式不对 |
AssertionError |
Token 认证时使用了需要 AK/SK 的功能 |
RuntimeError |
网络错误、服务超时、执行失败 |
TypeError |
传入了错误类型的数据 |
\