BigQuant使用文档

Dai 数据管理 API

由small_q创建,最终由small_q 被浏览 2 用户

1 模块介绍

本模块提供数据查询、数据源读写、自定义函数等功能,支持远程查询和 BDB 数据管理

2 API 介绍

2.1 自定义 UDF 函数

类名: dai.DaiUDF

功能: 自定义 UDF 函数定义

参数:

  • name : str,必填,UDF 函数名称
  • function : Callable,必填,Python 函数对象
  • parameters : Optional[List],非必填,参数类型列表,默认为 None
  • return_type : Optional[Any],非必填,返回值类型,默认为 None
  • type : Optional[Any],非必填,UDF 类型,默认为 None
  • null_handling : Optional[Any],非必填,NULL 值处理方式,默认为 None
  • exception_handling : Optional[Any],非必填,异常处理方式,默认为 None
  • side_effects : bool,非必填,是否有副作用,默认为 False

说明: 用于在 dai.query() 中传递自定义函数,SDK 会自动序列化函数代码并在云端执行

2.2 远程查询数据

方法名:dai.query()\n功能:远程查询\n参数:

  • sql: str,必填,mysql 查询语句
  • udf_list:List[DaiUDF],非必填,UDF 函数列表,默认为 [ ]
  • full_db_scan: bool,非必填, 是否允许全表查询,默认为 False
  • filters:Dict[str, List[Any]],非必填,过滤条件 {"column": ["value1", "value2"]},默认为 {}
  • bind_relations:Dict[str, Any],非必填,绑定本地数据到 SQL 查询 {"name": DataFrame/Table},默认为 None
  • params:Dict[str, Any],非必填,查询参数 {"param_name": "value"},默认为 None
  • compression:bool,非必填,是否启用字符串压缩,默认为 False
  • resource_spec_id:str,指定 AIStudio 资源规格 ID(SDK 专属),默认为 D0(1C/6G) 免费
  • space_id:str,指定 AIStudio 空间 ID(SDK 专属),默认为 主空间

返回:QueryResult 对象

2.3 数据源管理

类名:dai.DataSource

功能:数据源管理

参数:

  • datasource_id : str,必填,数据源 ID

方法:见下

2.3.1 读取 BDB 数据

方法名: dai.DataSource.read_bdb()

功能: 读取 BDB 数据

参数:

  • as_type : Type,非必填,返回类型,默认为 pa.Table,支持 pd.DataFrame, pa.Table
  • partition_filter : Optional[Dict[str, Union[tuple, set]]],非必填,分区过滤条件,默认为 None
    • tuple: 表示范围,如 ("2024-01-01", "2024-12-31")
    • set: 表示特定值,如 {"000001.SZ", "600000.SH"}
  • columns : Optional[List[str]],非必填,要读取的列名列表,默认为 None(读取所有列)

返回: 根据 as_type 参数返回 pd.DataFrame 或 pa.Table

2.3.2 写入 BDB 数据

方法名:dai.DataSource.write_bdb()

功能: 写入 BDB 数据

参数:

data : Union[pd.DataFrame, pa.Table],必填,要写入的数据

update_logs : Optional[Union[bool, Dict]],非必填,是否记录更新日志,默认为 None

update_msg : Optional[str],非必填,更新备注信息,默认为 None

id : Optional[str],非必填,数据源 ID,默认为 None(创建临时数据源)

partitioning : Optional[List[str]],非必填,分区列,默认为 None

indexes : Optional[List[str]],非必填,索引列,默认为 None

excludes : Optional[Set[str]],非必填,排除字段,默认为 None

unique_together : Optional[List[str]],非必填,唯一约束,默认为 None

on_duplicates : str,非必填,冲突处理策略,默认为 "last",可选值:["last", "first", "error", "none"]

sort_by : Optional[List[Tuple[str, str]]],非必填,排序,格式为 [("field", "ascending/descending"), ...],默认为 None

preserve_pandas_index : bool,非必填,是否保留 pandas 索引,默认为 False

docs : Optional[Dict[str, Any]],非必填,文档,默认为 None

timeout : int,非必填,写入锁的超时时间(秒),默认为 300

extra : str,非必填,额外信息,默认为 ""

base_ds : Optional[DataSource],非必填,继承 extra 参数,默认为 None

overwrite : bool,非必填,是否覆盖已有数据,默认为 False

max_threads : Optional[int],非必填,最大线程数,默认为 None

preserve_order : bool,非必填,是否保持顺序,默认为 False

返回:DataSource 对象

2.4 查询结果

类名:dai.QueryResult

功能:查询结果包装类,提供多种数据格式转换方法

方法:见下

2.4.1 获取 Arrow Table

方法名:dai.QueryResult.arrow()

功能:返回 Arrow Table 格式的查询结果

参数:无

返回:pa.Table 对象

2.4.2 转换为 pandas DataFrame

方法名:dai.QueryResult.df()

功能:转换为 pandas DataFrame

参数:无

返回:pd.DataFrame 对象

2.4.3 转换为 Polars DataFrame

方法名:dai.QueryResult.pl()

功能:转换为 Polars DataFrame

参数:无

返回:polars.DataFrame 对象

2.4.4 获取所有行

方法名:dai.QueryResult.fetchall()

功能:获取所有行为列表

参数:无

返回:list,所有行的列表

2.4.5 获取流式读取器

方法名:dai.QueryResult.fetch_arrow_reader()

功能:获取 Arrow 流式读取器,用于大数据场景分批读取

参数:

  • batch_size : int,非必填,每批数据的行数,默认为 1000000

返回:pyarrow.RecordBatchReader 对象

{link}