BigQuant使用文档

DataSource—通用数据类型

由small_q创建,最终由small_q 被浏览 287 用户

DataSource

DataSource是bigmodule原生支持的一种泛用数据类型,在底层实现了许多优化机制,以确保数据准确、安全、便捷地传输和使用是。

\

导入DataSource

DataSource相关的方法和属性,定义在库 dai 中,通过以下代码进行导入:

import dai

\

DataSource对象与datasource_id

每个DataSource对象都有一个与其相匹配的唯一ID——datasource_id。

可以通过datasource_id来获取相应的DataSource对象。

DataSource对象 = dai.DataSource("datasource_id")

\

DataSource中的属性

属性 说明
type 获取DataSource的类型
metadata 获取DataSource的元数据

\

type

获取DataSource的类型。

基本用法:

DataSource对象.type

返回值类型:str

使用例:

dai.DataSource("ds_bdb").type   # bdb
dai.DataSource("ds_pickle").type   # pickle
dai.DataSource("ds_binary").type   # binary
dai.DataSource("ds_text").type   # text
dai.DataSource("ds_json").type   # json

\

metadata

获取DataSource的元数据。

基本用法:

DataSource对象.metadata

返回值类型:json

使用例:

dai.DataSource("holidays").metadata
# {'type': 'bdb', 'schema': {'date': 'timestamp[ns]', 'instrument': 'string', 'score': 'double', 'position': 'int64'}, 'partitioning': [], 'indexes': [], 'unique_together': ['date', 'instrument'], 'on_duplicates': 'last', 'preserve_pandas_index': False, 'sort_by': [], 'extra': ''}

\

DataSource中的方法

方法 说明
read() 自动判断DataSource的类型,获取相应的数据
exists() 判断DataSource是否存在,存在返回True,不存在则返回False
write_bdb() 创建bdb新表
read_bdb() 读取bdb表
insert_bdb() 更新和插入数据行到bdb表
apply_bdb() 按分区(partition)处理数据,可以用于数据增删查改
check_bdb() 检查数据源是否存在异常
write_pickle() 写入pickle数据
read_pickle() 读取pickle数据
write_text() 写入文本数据
read_text() 读取文本数据
write_json() 写入json数据
read_json() 读取json数据
write_binary() 写入二进制数据
read_binary() 读取二进制数据
delete() 删除数据
write_zero() 创建一个空的DataSource,并由您自己管理
mount() 挂载DataSource目录,只支持 zero DataSource挂载
unmount() 卸载挂载的DataSource目录
save_view() 保存视图类DataSource
persist() 持久化缓存DataSource

\

read

自动判断DataSource的类型,返回相应的数据。

基本用法:

变量标识符 = DataSource对象.read()

使用例:

a = ds_bdb.read()  # 读取bdb类型的dataSource对象, 返回一个具有表结构的数据对象
b = ds_pickle.read()  # 读取pickle类型的dataSource对象, 返回一个pickle数据对象
c = ds_binary.read()  # 读取二进制类型的dataSource对象, 返回一个二进制数据对象
d = ds_text.read()  # 读取文本类型的dataSource对象, 返回一个文本数据对象
e = ds_json.read()  # 读取json类型的dataSource对象, 返回一个json数据对象

\

exists

判断DataSource是否存在,存在返回True,不存在则返回False。

基本用法:

DataSource对象.exists()

返回值类型:bool

使用例:

dai.DataSource("ds_test").exists()  # True or False 

\

write_bdb

创建bdb新表。

参数列表:

参数 类型 说明
data Union[pa.Table, pd.DataFrame, ds.Dataset] 数据
id str 数据源ID,默认为None
partitioning List[str] 分区字段,默认为None
indexes List[str] 索引字段,默认为None
excludes Set[str] 排除字段,默认为None
unique_together List[str] 唯一约束字段,默认为None
on_duplicates str 重复数据处理策略,默认为last
sort_by List[Tuple[str, str]] 排序字段,默认为None
preserve_pandas_index bool 是否保留pandas索引,默认为False
docs Dict[str, Any] 数据源文档,默认为None。
timeout int 锁超时时间,默认为300秒。
extra str 写入元数据的额外信息,默认为空字符串。
overwrite bool 覆盖已有数据,分区数据不支持覆盖。

返回值类型:DataSource对象

使用例:

import numpy as np
import pandas as pd

import dai

instruments = ["000001.SZ", "000002.SZ", "000003.SZ"]
dates = list(pd.date_range("2021-12-29", "2022-01-02"))

df = pd.DataFrame({"instrument": instruments * len(dates), "date": dates * len(instruments), "value1": np.random.random(len(dates) * len(instruments))})
# 可选:定义分区,可以用于数据访问加速
df[dai.DEFAULT_PARTITION_FIELD] = df["date"].dt.year

# 如果这里抛出 ArrowInvalid: Object is not allowed to access 的错误,请您修改id
dai.DataSource.write_bdb(
    data=df,
    # datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
    id="ds_test",
    # 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
    unique_together=["date", "instrument"],
    indexes=["date"],
)

\

read_bdb

读取bdb表。

参数列表:

参数 类型 说明
as_type Union[pa.Table, pd.DataFrame, ds.Dataset] 返回类型,默认为pa.Table
partition_filter Dict[str, Union[tuple, set]] 分区过滤条件,默认为None
columns List[str] 返回部分列,默认为None

返回值类型:

返回值类型 说明
pa.Table 当as_type为pa.Table时返回。
pd.DataFrame 当as_type为pd.DataFrame时返回。
ds.Dataset 当as_type为ds.Dataset时返回。

使用例:

import pandas as pd

import dai

df_result = dai.DataSource("ds_test").read_bdb(as_type=pd.DataFrame)
df_result_with_filter = dai.DataSource("ds_test").read_bdb(as_type=pd.DataFrame, partition_filter={
    "date": ("2023-01-01", "2023-01-15"),
    "instrument": {"000002.SZ"}
})
df_result_with_columns = dai.DataSource("ds_test").read_bdb(as_type=pd.DataFrame, partition_filter={
    "date": ("2023-01-01", "2023-01-15"),
    "instrument": {"000002.SZ"}
}, columns=["instrument", "date"])

\

insert_bdb

更新和插入数据行到bdb表。

参数列表:

参数 类型 说明
data Union[pa.Table, pd.DataFrame, ds.Dataset] 数据,支持类型为pa.Table、pd.DataFrame、ds.Dataset。
excludes Set[str] 需要排除的字段,默认为None。
timeout int 超时时间,默认为300秒

返回值类型: DataSource对象

使用例:

import numpy as np
import pandas as pd
import dai

ds = dai.DataSource("ds_test")

instruments = ["000001.SZ", "000002.SZ", "000003.SZ"]
dates = list(pd.date_range("2022-01-05", "2022-01-15"))

df = pd.DataFrame({"instrument": instruments * len(dates), "date": dates * len(instruments), "value1": np.random.random(len(dates) * len(instruments))})
df[dai.DEFAULT_PARTITION_FIELD] = df["date"].dt.year

ds.insert_bdb(df)

\

apply_bdb

按partition处理数据,可以用于数据增删查改。

参数列表:

参数 类型 说明
func Callable[[Union[pa.Table, pd.DataFrame, ds.Dataset]], Optional[Union[pa.Table, pd.DataFrame, ds.Dataset]]] 数据处理函数。输入为当前分区的数据。返回数据,如果长度大于0,则使用返回数据更新分区;如果长度为0,则删除当前分区;如果为None,则不对分区做处理。
as_type Union[pa.Table, pd.DataFrame, ds.Dataset] 数据插入类型,默认为pa.Table
partition_filter Dict[str, Union[tuple, set]] 分区过滤条件,默认为None。
timeout int 超时时间,默认为300秒

返回值类型: DataSource对象

使用例:

import numpy as np
import pandas as pd
import dai

ds = dai.DataSource("ds_test")

def delete_000001(df):
    df = df[df["instrument"] != "000001.SZ"]
    return df

def insert_value2(df):
    df["value2"] = np.random.random(len(df))
    return df

def remove_value2(df):
    del df["value2"]
    return df

def delete_all(df):
    return pd.DataFrame()

# 删除行
ds.apply_bdb(delete_000001, as_type=pd.DataFrame)
# 插入列
ds.apply_bdb(insert_value2, as_type=pd.DataFrame)
# 删除列
ds.apply_bdb(remove_value2, as_type=pd.DataFrame)
# 指定 partition_filter 可以只处理某些分区的数据以提高效率,例如删除 2022 分区的所有数据
ds.apply_bdb(
    delete_all,
    as_type=pd.DataFrame,
    partition_filter={
        "date": ("2022-01-01", "2023-01-01")
    }
)

\

check_bdb

检查数据源是否存在异常。

参数列表:

参数 类型 说明
delete_invalid_data bool 是否删除异常数据,默认为False。

返回值类型:无返回值

使用例:

dai.DataSource("ds_test").check_bdb()
# 删除异常数据,后续再补充数据
dai.DataSource("ds_test").check_bdb(delete_invalid_data=True)

\

write_pickle

写入pickle数据。

参数列表:

参数 类型 说明
data PyObject 数据,可以任意的python数据类型
id str 数据id,默认为None,写入到cache数据
overwrite bool 覆盖已有数据,默认为False

返回值类型:DataSource对象

使用例:

import dai

data = {"a": 1, "b": 2}
dai.DataSource.write_pickle(data, id="ds_pickle")

\

read_pickle

读取pickle数据。

基本用法:

DataSource对象.read_pickle()

返回值类型:PyObject

使用例:

import dai

dai.DataSource("ds_pickle").read_pickle()
# {'a': 1, 'b': 2}

\

write_text

写入文本数据。

参数列表:

参数 类型 说明
data str 数据
id str 数据id,默认为None,写入到cache数据
overwrite bool 覆盖已有数据,默认为False

返回值类型:DataSource对象

使用例:

import dai

text = "Hello, world!"
dai.DataSource.write_text(text, id="ds_text")

\

read_text

读取文本数据。

基本用法:

DataSource对象.read_text()

返回值类型:str

使用例:

import dai

dai.DataSource("ds_text").read_text()
# 'Hello, world!'

\

write_json

写入json数据。

参数列表:

参数 类型 说明
data any 数据,具有 json 的结构
id str 数据id,默认为None,写入到cache数据
overwrite bool 覆盖已有数据,默认为False

返回值类型:DataSource对象

使用例:

import dai

json = {"a": 1, "b": 2, "c": [1, 2, 3]}
dai.DataSource.write_json(json, id="ds_json")

\

read_json

读取json数据。

基本用法:

DataSource对象.read_json()

返回值类型:any

使用例:

import dai

dai.DataSource("ds_json").read_json()
# {"a": 1, "b": 2, "c": [1, 2, 3]}

\

write_binary

写入binary数据。

参数列表:

参数与 类型 说明
data bytes 数据,具有 json 的结构
id str 数据id,默认为None,写入到cache数据
overwrite bool 覆盖已有数据,默认为False

返回值类型:DataSource对象

使用例:

import dai

binary = b"Hello, world!"
dai.DataSource.write_binary(binary, id="ds_binary")

\

read_binary

读取binary数据。

基本用法:

DataSource对象.read_binary()

返回值类型:bytes

使用例:

import dai

dai.DataSource("ds_binary").read_binary()
# b'Hello, world!'

\

delete

删除数据。

基本用法:

DataSource对象.delete()

使用例:

import dai

dai.DataSource("ds_binary").delete()

\

write_zero

创建一个空的DataSource,并由您自己管理。

参数列表:

参数 类型 说明
id str 数据id

返回值类型:DataSource对象

使用例:

import dai

ds = dai.DataSource.write_zero("ds_zero")

\

mount

挂载DataSource目录,只支持 zero DataSource挂载。

参数列表:

参数 类型 说明
with_write bool 可写模式挂载,只有DataSource创建者可以执行此操作,默认为False。
symlink_to str 创建软链到指定地址,默认为None。
symlink_force bool 强制创建软链,如果目标地址文件存在,则删除,默认为False。

返回值类型: DataSourceMount

使用例:

import dai

with dai.DataSource("ds_zero").mount(with_write=True) as path:
    # mount到本地 path,并自动 unmount
    print(f"{path=}")
    # 往挂载目录可以写入数据
    with open(f"{path}/a.txt", "w") as f:
        f.write("Hello, world!")

\

unmount

卸载挂载的DataSource目录。

参数列表:

参数 类型 说明
path str 挂载目录

返回值类型: 无返回值

使用例:

import dai

ds = dai.DataSource("ds_zero")
mnt = ds.mount()
print(f"{mnt.path=}")
with open(f"{mnt.path}/a.txt", "r") as f:
    print(f.read())
ds.unmount(mnt.path)

\

save_view

保存视图类DataSource。

参数列表:

参数与 类型 说明
id str 数据id
sql str 视图sql
update_if_exists bool 如果视图存在,则更新,默认为True。
docs Dict[str, Any] 视图文档,默认为None。
timeout int 超时时间,默认为300秒。

返回值类型: DataSource对象

使用例:

import dai

sql = """
select * from cn_stock_bar1d prune join cn_stock_valuation using (instrument, date)
"""
dai.DataSource.save_view("ds_view", sql)
result_df = dai.query("select * from test_for_fun_view where date > '2023-01-01' order by instrument, date").df()

\

persist

持久化缓存DataSource。

参数列表:

参数 类型 说明
id str 数据id
overwrite bool 是否覆盖已有数据源,默认为False。

返回值类型: DataSource对象

使用例:

import dai

text = "Hello, world!"
cached_datasource = dai.DataSource.write_text(text)
cached_datasource.persist("ds_persist")

\

⭐为什么推荐使用DataSource

🔥使用DataSource作为统一的数据格式,其具有以下优点:🔥

  1. DataSource作为bigmodule原生的标准且通用的数据格式,拥有更加底层的技术支持,可以方便地在模块之间进行传输,让数据流动得更自由、迅速;
  2. DataSource提供了许多方法,能够方便地转化为其他类型的数据,也可以由其他类型转化而来,具有强大的兼容性和通用性,可以应对实际开发和使用过程中数据类型复杂多变的情况;
  3. DataSource底层实现了缓存共享机制,
    • 🚀缓存机制可以帮助加快线性策略的重复执行。例如,当可视化策略中仅有部分下游模块发生了变动,而上游模块不受影响时,再次执行该策略,上游模块先前输出的结果会被直接获取,并非被重新执行一次,只有那些发生改动的模块会被执行。这样一来,便可以加快策略的执行效率,特别是在尝试调整策略的时候,将节省下许多调试等待的时间。
    • 🚀共享机制可以加速数据在模块之间的传递。在调用DataSource相关方法封装数据时,都会得到一个对应的对象实例,该对象实例会带有一个特殊的属性——datasource_id(相当于数据的标记编号)。通过对象实例让数据在模块之间传递,在这个过程中,实际上仅是传递了datasource_id,而具体的数据则是保存在 redis 缓存数据库中。当模块在接收到来自另一个模块的数据datasource_id时,就可以通过该ID访问数据库来获取数据,避免了传递海量数据时带宽资源紧张的问题,有效地节约了数据传输的时间。

\

标签

数据类型
{link}