DataSource/数据源¶
在 BigQuant 平台上,DataSource提供了底层数据读写的封装,用户不用关心数据在底层是通过内存、本地文件还是分布式文件存储。DataSource用作模块间传递数据,并可以作为输入数据和最终输出。
DataSource支持任何数据的读写,我们推荐尽量使用pandas DataFrame,并对DataFrame做了良好的接口设计和性能优化。
DataSource表示的数据是不可变的,可以新建写和读,但不能修改和增加。
接口¶
构造DataSource
-
class
DataSource
(id=None)¶ 通过构造函数创建DataSource实例。
参数: id (字符串) – - 如果id为None,则会生成一个新的DataSource,可写入数据
- 否则,则会指向id对应的DataSource,只可读取数据
静态函数
-
static
DataSource.
write_df
(df, key='data')¶ 通过构造函数创建DataSource实例。
参数: - df (DataFrame) – 需要写入的数据
- key (字符串) – 数据表名,一般用默认的就可以了
返回: 写入之后的DataSource
返回类型:
成员变量
-
DataSource.
id
¶ DataSource id。用于唯一标识一个DataSource
成员函数
-
classmethod
read_df
(key=None)¶ 读取DataSource到DataFrame
参数: key (字符串) – 读取的数据表名。如果为None,这读取所有的表,并且按表名升序排列后连接为一个DataFrame 返回: DataFrame数据 返回类型: DataFrame
-
classmethod
iter_df
()¶ 遍历读取DataSource为DataFrame。参考代码 遍历读取多个DataFrame
返回: 迭代器,<key, DataFrame> 返回类型: generator
-
classmethod
open_df_store
()¶ 打开DataSource为pandas HDFStore。如果DataSource为写权限,则以store可写;否则,可读。参考代码 写入多个DataFrame
返回: 用于读或者写的store 返回类型: HDFStore
-
classmethod
close_df_store
()¶ 在使用 open_df_store 后,必须调用 close_df_store 关闭文件句柄。否则数据可能损坏。
-
classmethod
open_file
(binary=False)¶ 打开DataSource为文件句柄。如果DataSource为写权限,则以句柄可写;否则,可读。参考代码 写入任意数据
参数: binary (boolean) – 是否以二进制模式打开 返回: 文件句柄
-
classmethod
close_file
()¶ 在使用 open_file 后,必须调用 close_file 关闭文件句柄。否则数据可能损坏。
示例代码¶
写入一个DataFrame¶
In [19]:
def foo1():
df = pd.DataFrame({'abc': range(0, 10)})
ds = DataSource.write_df(df)
return Outputs(data=ds)
# 使用 M.cached 实现 DataSource 复用
m1 = M.cached.v2(run=foo1)
print(m1)
[2017-06-03 10:30:17.349879] INFO: bigquant: cached.v2 start ..
[2017-06-03 10:30:17.353461] INFO: bigquant: hit cache
[2017-06-03 10:30:17.354797] INFO: bigquant: cached.v2 end [0.004939s].
{'data': DataSource(5299662a475311e7a4880242ac110009, v2), 'version': 'v2'}
写入多个DataFrame¶
In [21]:
def foo2():
ds = DataSource()
df_store = ds.open_df_store()
df_store['df_a'] = pd.DataFrame({'abc': range(0, 10)})
df_store['df_b'] = pd.DataFrame({'def': range(0, 5)})
ds.close_df_store()
return Outputs(data=ds)
# 使用 M.cached 实现 DataSource 复用
m2 = M.cached.v2(run=foo2)
print(m2)
[2017-06-03 10:30:17.422107] INFO: bigquant: cached.v2 start ..
[2017-06-03 10:30:17.425044] INFO: bigquant: hit cache
[2017-06-03 10:30:17.426426] INFO: bigquant: cached.v2 end [0.004329s].
{'data': DataSource(6a9e03b6478511e7a4880242ac110009, v2), 'version': 'v2'}
读取多个DataFrame¶
In [22]:
df_store = m2.data.open_df_store()
print(df_store['df_a'])
print(df_store['df_b'])
m2.data.close_df_store()
abc
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
def
0 0
1 1
2 2
3 3
4 4
遍历读取多个DataFrame¶
In [23]:
for key, df in m2.data.iter_df():
print(key)
print(df)
/df_a
abc
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
/df_b
def
0 0
1 1
2 2
3 3
4 4
写入任意数据¶
In [24]:
def foo3():
ds = DataSource()
writer = ds.open_file()
writer.write('Hello, AISaaS')
ds.close_file()
return Outputs(data=ds)
# 使用 M.cached 实现 DataSource 复用
m3 = M.cached.v2(run=foo3)
print(m3)
[2017-06-03 10:30:17.523899] INFO: bigquant: cached.v2 start ..
[2017-06-03 10:30:17.526858] INFO: bigquant: hit cache
[2017-06-03 10:30:17.528375] INFO: bigquant: cached.v2 end [0.004493s].
{'data': DataSource(5ec961fa480411e7a4880242ac110009, v2), 'version': 'v2'}
读取任意数据¶
In [27]:
reader = m3.data.open_file()
text = reader.read()
m3.data.close_file()
print(text)
Hello, AISaaS