数据源

# 数据源

在BigQuant环境上,DataSource提供了底层数据读写的封装,用户不用关心数据在底层是通过内存、本地文件还是分布式文件存储。DataSource用作模块间传递数据,并可以作为输入数据和最终输出。

DataSource支持任何数据的读写,我们推荐尽量使用pandas DataFrame,并对DataFrame做了良好的接口设计和性能优化。

DataSource表示的数据是不可变的,可以新建写和读,但不能修改和增加。

# 接口

构造DataSource

class DataSource(id=None)

通过构造函数创建DataSource实例。

参数: id (字符串) –

  • 如果id为None,则会生成一个新的DataSource,可写入数据
  • 否则,则会指向id对应的DataSource,只可读取数据

静态函数

static DataSource.write_df(df, key='data')

通过构造函数创建DataSource实例。

参数:

  • df (DataFrame) – 需要写入的数据
  • key (字符串) – 数据表名,一般用默认的就可以了
    返回: 写入之后的DataSource
    返回类型: DataSource

成员变量

DataSource.id

DataSource id。用于唯一标识一个DataSource

成员函数

classmethod read_df(key=None)

# 读取DataSource到DataFrame

参数:
key (字符串) – 读取的数据表名。
如果为None,这读取所有的表,并且按表名升序排列后连接为一个DataFrame

返回:
DataFrame数据

返回类型: DataFrame

classmethod iter_df()

# 遍历读取DataSource为DataFrame。

返回: 迭代器,

返回类型: generator

classmethod open_df_store()  

打开DataSource为pandas HDFStore。如果DataSource为写权限,则以store可写;否则,可读。参考代码 写入多个DataFrame

返回: 用于读或者写的store

返回类型: HDFStore

classmethod close_df_store()

在使用 open_df_store 后,必须调用 close_df_store 关闭文件句柄。否则数据可能损坏。

classmethod open_file(binary=False)

打开DataSource为文件句柄。如果DataSource为写权限,则以句柄可写;否则,可读。参考代码 写入任意数据

参数: binary (boolean) – 是否以二进制模式打开
返回: 文件句柄

classmethod close_file()     

在使用 open_file 后,必须调用 close_file 关闭文件句柄。否则数据可能损坏。

# 写入一个DataFrame

示例代码

def foo1():
    df = pd.DataFrame({'abc': range(0, 10)})
    ds = DataSource.write_df(df)
    return Outputs(data=ds)

# 使用 M.cached 实现 DataSource 复用
m1 = M.cached.v3(run=foo1)
print(m1)

# 读取一个DataFrame

示例代码

m1.data.read_df().head()

运行结果:

​    abc
0    0
1    1
2    2
3    3
4    4    

写入多个DataFrame

示例代码

def foo2():
    ds = DataSource()
    df_store = ds.open_df_store()
    df_store['df_a'] = pd.DataFrame({'abc': range(0, 10)})
    df_store['df_b'] = pd.DataFrame({'def': range(0, 5)})
    ds.close_df_store()

    return Outputs(data=ds)

#使用 M.cached 实现 DataSource 复用
m2 = M.cached.v3(run=foo2)
print(m2)

运行结果:

[2018-08-29 16:02:09.673889] INFO: bigquant: cached.v2 开始运行..
[2018-08-29 16:02:09.694587] INFO: bigquant: cached.v2 运行完成[0.020701s].
{'version': 'v3', 'data': DataSource(d44c80f8ab6111e8b62c0a580a8102e0, v3)}

# 读取多个DataFrame

示例代码

df_store = m2.data.open_df_store()
print(df_store['df_a'])
print(df_store['df_b'])
m2.data.close_df_store()

运行结果:

   abc
0    0
1    1
2    2
3    3
4    4
5    5
6    6
7    7
8    8
9    9
   def
0    0
1    1
2    2
3    3
4    4

# 遍历读取多个DataFrame

示例代码

for key, df in m2.data.iter_df():
    print(key)
    print(df)

运行结果:

/df_a
   abc
0    0
1    1
2    2
3    3
4    4
5    5
6    6
7    7
8    8
9    9
/df_b
   def
0    0
1    1
2    2
3    3
4    4

写入任意数据

示例代码

def foo3():
    ds = DataSource()
    writer = ds.open_file()
    writer.write('Hello, BigQuant')
    ds.close_file()
    return Outputs(data=ds)
#使用 M.cached 实现 DataSource 复用
m3 = M.cached.v3(run=foo3)
print(m3)

运行结果:

[2018-08-29 16:04:12.919633] INFO: bigquant: cached.v2 开始运行..
[2018-08-29 16:04:12.928131] INFO: bigquant: cached.v2 运行完成[0.008512s].
{'version': 'v3', 'data': DataSource(1dc25a32ab6211e8b62c0a580a8102e0, v3)}

# 读取任意数据

reader = m3.data.open_file()
text = reader.read()
m3.data.close_file()
print(text)

运行结果:

Hello, BigQuant