DataSource/数据源

在 BigQuant 平台上,DataSource提供了底层数据读写的封装,用户不用关心数据在底层是通过内存、本地文件还是分布式文件存储。DataSource用作模块间传递数据,并可以作为输入数据和最终输出。

DataSource支持任何数据的读写,我们推荐尽量使用pandas DataFrame,并对DataFrame做了良好的接口设计和性能优化。

DataSource表示的数据是不可变的,可以新建写和读,但不能修改和增加。

接口

构造DataSource

class DataSource(id=None)

通过构造函数创建DataSource实例。

参数:id (字符串) –
  • 如果id为None,则会生成一个新的DataSource,可写入数据
  • 否则,则会指向id对应的DataSource,只可读取数据

静态函数

static DataSource.write_df(df, key='data')

通过构造函数创建DataSource实例。

参数:
  • df (DataFrame) – 需要写入的数据
  • key (字符串) – 数据表名,一般用默认的就可以了
返回:

写入之后的DataSource

返回类型:

DataSource

成员变量

DataSource.id

DataSource id。用于唯一标识一个DataSource

成员函数

classmethod read_df(key=None)

读取DataSource到DataFrame

参数:key (字符串) – 读取的数据表名。如果为None,这读取所有的表,并且按表名升序排列后连接为一个DataFrame
返回:DataFrame数据
返回类型:DataFrame
classmethod iter_df()

遍历读取DataSource为DataFrame。参考代码 遍历读取多个DataFrame

返回:迭代器,<key, DataFrame>
返回类型:generator
classmethod open_df_store()

打开DataSource为pandas HDFStore。如果DataSource为写权限,则以store可写;否则,可读。参考代码 写入多个DataFrame

返回:用于读或者写的store
返回类型:HDFStore
classmethod close_df_store()

在使用 open_df_store 后,必须调用 close_df_store 关闭文件句柄。否则数据可能损坏。

classmethod open_file(binary=False)

打开DataSource为文件句柄。如果DataSource为写权限,则以句柄可写;否则,可读。参考代码 写入任意数据

参数:binary (boolean) – 是否以二进制模式打开
返回:文件句柄
classmethod close_file()

在使用 open_file 后,必须调用 close_file 关闭文件句柄。否则数据可能损坏。

示例代码

写入一个DataFrame
In [19]:
def foo1():
    df = pd.DataFrame({'abc': range(0, 10)})
    ds = DataSource.write_df(df)
    return Outputs(data=ds)

# 使用 M.cached 实现 DataSource 复用
m1 = M.cached.v2(run=foo1)
print(m1)
[2017-06-03 10:30:17.349879] INFO: bigquant: cached.v2 start ..
[2017-06-03 10:30:17.353461] INFO: bigquant: hit cache
[2017-06-03 10:30:17.354797] INFO: bigquant: cached.v2 end [0.004939s].
{'data': DataSource(5299662a475311e7a4880242ac110009, v2), 'version': 'v2'}
读取一个DataFrame
In [20]:
m1.data.read_df().head()
Out[20]:
abc
0 0
1 1
2 2
3 3
4 4
写入多个DataFrame
In [21]:
def foo2():
    ds = DataSource()

    df_store = ds.open_df_store()
    df_store['df_a'] = pd.DataFrame({'abc': range(0, 10)})
    df_store['df_b'] = pd.DataFrame({'def': range(0, 5)})
    ds.close_df_store()

    return Outputs(data=ds)

# 使用 M.cached 实现 DataSource 复用
m2 = M.cached.v2(run=foo2)
print(m2)
[2017-06-03 10:30:17.422107] INFO: bigquant: cached.v2 start ..
[2017-06-03 10:30:17.425044] INFO: bigquant: hit cache
[2017-06-03 10:30:17.426426] INFO: bigquant: cached.v2 end [0.004329s].
{'data': DataSource(6a9e03b6478511e7a4880242ac110009, v2), 'version': 'v2'}
读取多个DataFrame
In [22]:
df_store = m2.data.open_df_store()
print(df_store['df_a'])
print(df_store['df_b'])
m2.data.close_df_store()
   abc
0    0
1    1
2    2
3    3
4    4
5    5
6    6
7    7
8    8
9    9
   def
0    0
1    1
2    2
3    3
4    4
遍历读取多个DataFrame
In [23]:
for key, df in m2.data.iter_df():
    print(key)
    print(df)
/df_a
   abc
0    0
1    1
2    2
3    3
4    4
5    5
6    6
7    7
8    8
9    9
/df_b
   def
0    0
1    1
2    2
3    3
4    4
写入任意数据
In [24]:
def foo3():
    ds = DataSource()
    writer = ds.open_file()
    writer.write('Hello, AISaaS')
    ds.close_file()

    return Outputs(data=ds)

# 使用 M.cached 实现 DataSource 复用
m3 = M.cached.v2(run=foo3)
print(m3)
[2017-06-03 10:30:17.523899] INFO: bigquant: cached.v2 start ..
[2017-06-03 10:30:17.526858] INFO: bigquant: hit cache
[2017-06-03 10:30:17.528375] INFO: bigquant: cached.v2 end [0.004493s].
{'data': DataSource(5ec961fa480411e7a4880242ac110009, v2), 'version': 'v2'}
读取任意数据
In [27]:
reader = m3.data.open_file()
text = reader.read()
m3.data.close_file()
print(text)
Hello, AISaaS