BigQuant使用文档

FAI - 分布式云计算加速集群

由qxiao创建,最终由small_q 被浏览 563 用户

FAI是BigQuant研发的云加速集群调度应用,可以动态调用服务器集群级的算力,加速海量数据处理、高频因子挖掘、并行超参搜索、滚动训练等复杂任务的运行。

FAI 快速入门

安装FAI插件

启动AIStudio AIStudio 快速入门

在扩展商店中找到FAI并安装,如果已经默认安装则可以忽略这一步。

安装 Fai AIStudio Extension{w:90}{w:100}{w:100}{w:100}{w:100}{w:100}

FAI入口与集群列表

FAI集群入口位于左侧活动栏,点击入口图标,会在左侧侧边栏展开FAI分布式计算集群列表,列表内容展示了您拥有的集群和它们的运行状态。

FAI入口与集群列表{w:100}{w:100}{w:100}{w:100}{w:100}

新建集群

点击FAI侧边栏面板顶部的 【集群 +】 按钮进入新建集群流程。

新建集群入口{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}

新建集群面板如下图,需要关注三件事:

  1. 单节点资源规格
  2. 节点数量
  3. 是否启用智能管理

FAI新建集群面板{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}

  • 一个集群调用的总资源量等于单节点规格×节点数量,消费宽币的速度也等于单节点价格×节点数量
  • 您能调用的总资源量上限受您的用户账户等级以及账户余额的双重限制,目前每个普通用户都可以单独创建和调用总量为100CPU/400G内存的计算集群,这个限制会视实际需求在以后再次调整。
  • 已启动的集群会开始计费,停止的集群其资源会被释放并不再计费,最小计费周期是1分钟。
  • 智能管理会在集群没有任务时自动停止集群并释放集群资源、停止计费。

==强烈建议勾选(已默认勾选)智能管理选项,否则需要手动管理集群启停,有可能在忘记停止集群时产生不希望的费用。==

\

管理集群

新建集群后,点击集群列表中的集群名,即可进入集群管理面板。

进入集群管理面板{w:100}{w:100}{w:100}{w:100}{w:100}

在集群管理面板中,您可以:

  1. 启动、停止、删除集群;
  2. 查看集群基本信息;
  3. 查看日志;
  4. 查看集群使用情况,进行实时监控。

FAI集群管理{w:100}{w:100}{w:100}{w:100}{w:100}

\

使用FAI执行计算任务

使用FAI集群加速运行最简单的方法就是在可视化环境中使用**“自定义运行FAI加速**”模块。

FAI加速模块{w:100}{w:100}{w:100}{w:100}{w:100}

  • 首先在模块搜索中找到“自定义运行FAI加速”模块,将模块拖到画布中,这个模块不需要输入输出连线。

{w:100}{w:100}{w:100}{w:100}{w:100}

  • 单击画布中的“自定义运行FAI加速”模块,在右侧的属性配置栏中的加速集群选项卡中,为当前画布选择一个加速集群。

运行{w:100}{w:100}{w:100}{w:100}{w:100}

  • 点击全部运行,整个画布就会被提交给对应的FAI加速集群中运行,对应集群会自动启动并运行任务。

    可以在集群面板观察任务运行情况。

{w:100}{w:100}{w:100}{w:100}{w:100}

  • 执行完成后,结果会被返回到画布中,如果勾选了“智能管理”,对应的FAI集群将在任务结束后几分钟自动停止。

\

FAI使用案例

连接集群

如果我们的集群开启了智能管理,那么我们在使用完集群后无需对集群进行任何处理,他将在默认5分钟无使用的情况下自动停止,也不再收费,当我们下次调用init函数时,会自动拉起并连接。

import fai

# 连接和初始化fai
fai.init(cluster="fai-youwu-gduptjuu.fai-cluster",token="Wn56crstJbe9PluNytYmvl7g9NwTvPcI")

使用fai执行远程函数

下面会简单的介绍fai.remotefai.get 的使用

  • 只需要在原有的函数上添加@fai.remote 装饰器,其中装饰器可以有参数如 @fai.remote(num_gpus=1,num_cpus=0.2) 也可以没有参数@fai.remote,没有参数的情况下等价于@fai.remote(num_cpus=1)
  • 执行函数的时候我们通过func.inner(*args,**kwargs)来提交远程执行队列,此时会返回一个执行引用ref,我们可以通过fai.get(ref)来获取实际的返回结果,这个结果会通过网络传递到本地来
  • fai.get的参数除了一个ref,也可以是List[ref]一个ref的列表,返回值是一个对象或者对象列表

下面是一个简单例子

@fai.remote
def func1(arg1,arg2,kw1=1,kw2=5):
    return arg1+arg2+kw1+kw2

ref = func1.remote(1,2,kw1=3,kw2=4)
result = fai.get(ref)
assert result == 10

refs = [func1.remote(1,2,kw1=3,kw2=4) for _ in range(3)]
results = fai.get(refs)
assert results == [10,10,10]

\

使用fai.remote的参数控制并发

我们在启动节点的时候会有资源规格和节点数,比如我们启动的节点是8C,32G,2卡,一共启动了2个节点,这个时候我们的@fai.remote()参数最大为@fai.remote(num_cpus=8,num_gpus=2,memory="32G")

  • num_cpus: 表示这个任务需要多少多少cpu能执行,默认是1,可以是小数,如果是0.1,表示一个核能跑10个任务,但是这个参数只是表示当分配的资源后还剩余0.1核就可以执行这个任务,但是这个任务实际消耗多少cpu,是不受控制的
  • num_gpus: 表示这个任务需要多少显卡来执行,默认是0,当我们声明了这个参数的时候,函数内获取的显卡的顺序都是从0开始的,这样方便使用显卡,而不用知道显卡的实际顺序
  • memory: 这个参数表示当节点还有多少内存剩余的时候调用这个任务,默认是None,一般不填
  • 这些资源参数同时存在时,要所有参数都满足才会调度,下面的代码我们按照8C,32G,2卡,2节点的资源来演示
  • 当资源不满足的情况戏,任务会在后台等待,直到其他任务退出之后,空余资源满足的情况下才能执行
@fai.remote(num_cpus=8,num_gpus=2) # 我们申明了两张卡
def func2():
    import torch
    device_name = torch.cuda.get_device_name(0)
    device_name2 = torch.cuda.get_device_name(1)
    return device_name,device_name2

print("func2:",fai.get(func2.remote()))

@fai.remote(num_cpus=4,num_gpus=1) # 我们申明了一张卡,但是我们并发4个
def func3():
    import torch
    device_name = torch.cuda.get_device_name(0)
    return device_name

refs = [func3.remote() for _ in range(4)]
print("func3:",fai.get(refs))

# 下面测试任务等待
@fai.remote(num_cpus=5) 
def func4():
    import datetime
    import time
    time.sleep(10)
    return datetime.datetime.now().second

refs = [func4.remote() for _ in range(4)]
# 因为我们的一个节点只有8cpu,我们申明的任务需要划走5cpu,所以一个节点只能执行一个该任务,
# 最后我们能观察到我们返回的结果数字之间是两个差不多同时,另外两个差不多同时,他们相差10秒左右
# 总共执行20秒
print("func4:",fai.get(refs)) 

\

给任务传递参数

我们的任务大多数情况下都是需要传递参数的,这个时候有一些注意点

  • 任务参数不能太大,比如我们传一个非常大的dataframe,比如1G的数据,我们不建议这么做,会非常慢,同时会导致共享内存锐减,导致oom程序崩溃
    • 这个过程中,会先将数据pickle,然后通过网络发送到对应的节点,对应的节点会反序列化在执行,这个过程中会消耗大量的算力和网络,后台也会划分对应的存储来处理这个对象,因此我们建议将数据落盘在读取。可以吧数据写到这个目录上来共享文件
  • 最好在远程函数内 import 包,这样能避免传输相关的包到节点上
  • 尽可能的减少函数外部的变量,这些变量也会pickle到节点上,有些对象是不能进行pickle序列化的,这样会导致错误
  • 不要再远程函数内使用多进程,使用fai.remote 来代替
import pandas as pd
import numpy as np

# 计算每个列所需的行数以达到1GB
bytes_per_gb = 1 * (1024 ** 3)  # 1GB
num_columns = 10  # 假设我们需要10列
bytes_per_float = 8  # float64占用8字节
rows_per_column = bytes_per_gb // (num_columns * bytes_per_float)

# 创建一个1GB的DataFrame
data = np.random.rand(rows_per_column, num_columns)
large_df = pd.DataFrame(data, columns=[f'col_{i}' for i in range(num_columns)])

save_path = "/home/aiuser/work/test.csv"
large_df.to_csv(save_path)

@fai.remote
def func5():
    import pandas as pd # 在远程函数应用包
    df = pd.read_csv(save_path) # 小对象`save_path`可以直接读取,large_df大对象最好落盘
    return df.shape


print(fai.get(func5.remote()))

或者直接返回这个函数的ref作为参数另外一个远程函数的参数,他们会尽可能的在一个节点间执行

@fai.remote
def return_large_data():
    import pandas as pd
    import numpy as np

    # 计算每个列所需的行数以达到1GB
    bytes_per_gb = 1 * (1024 ** 3)  # 1GB
    num_columns = 10  # 假设我们需要10列
    bytes_per_float = 8  # float64占用8字节
    rows_per_column = bytes_per_gb // (num_columns * bytes_per_float)

    # 创建一个1GB的DataFrame
    data = np.random.rand(rows_per_column, num_columns)
    large_df = pd.DataFrame(data, columns=[f'col_{i}' for i in range(num_columns)])
    return large_df

@fai.remote
def func5(df):
    return df.shape


ref = return_large_data.remote() 
print(fai.get(func5.remote(ref))) # 直接传递ref

fai.remote 函数之间的调用

  • fai.remote函数之间也可以相互调用,他们可以组织称一个DAG执行队列。
  • ref 可以直接作为另外一个远程函数的参数,另外的远程函数可自动获取他的值。
@fai.remote
def node1():
    return "node1"

@fai.remote
def node2(node):
    return f"{node}->node2"

@fai.remote
def node3(node):
    return f"{node}->node3"

@fai.remote
def node4(node_1,node_2):
    return f"{node_1}->node4,{node_2}->node4"

@fai.remote
def main():
    ref1 = node1.remote()
    ref2 = node2.remote(ref1)
    ref3 = node3.remote(ref2)
    ref4 = node4.remote(ref1,ref3)
    return fai.get(ref4)

print(fai.get(main.remote()))

\

自定义运行FAI加速

模块介绍

FAI是BigQuant研发的云加速集群调度应用,其对海量数据的处理能力和计算能力是有目共睹的。

”自定义运行FAI加速”模块只是FAI用法的冰山一角,但这冰山一角所实现的功能十分强大,它可以对不同模块下的参数进行调整,而且能够达到并行调整的程度。纵向能够对模块的运行起到加速效果,横向能够并行运行得到不同参数下结果。

应用举例

  1. 数据处理方面, 我们都知道抽取特定日期的股票数据是从”代码列表”模块中设定的, 所以我们可以通过设定不同的时间段去取数据, 而平时如果我们想要对比不同时间段下数据对策略的效果, 我们必须要反复调整模块中的时间段, 多次运行后得到结果, 如此这样费时费力, 但如果引入"自定义运行FAI加速"模块,不同时间段能够并行运行, 实现了横向并行加速的效果;
  2. 模型调参方面, BigQuant平台有封装好各种机器学习深度学习模块, 模型调参使用的最多的是随机撒点结合启发式搜索或者网格搜索的方式调参, 随机撒点和网格搜索有个共同的特点——如果需要调节的参数种类很多, 则需要检验的参数组合就会以倍数的量级增长。这时"自定义运行FAI加速"模块的优势体现出来了, 使用该模块能够达到多组参数并行计算的效果, 除此之外还能起到横向运算加速的效果。


先搭建好你的模型和策略, 并在此基础上添加“自定义运行FAI加速模块”

让我们点开这个模块, 看看其中有那些参数

可以发现有两个参数需要你填写:

  1. 加速集群: 选择你开启的集群;
  2. run函数: 在这里面编写你要调节参数的相关代码。


我们先来介绍一下如何开启集群,开启集群有两种方法:

  • 方法一:图片从左到右, 首先打开集群按钮, 新建集群。进一步设置集群的各种部件(节点规格, 节点数等), 然后保存并启动。

镜像管理只需要设置成默认镜像即可。


重新回到自定义加速模块,我们将新建的集群在模块中选中:

现在我们开始进行调参程序的编写, 以随机森林的树的数量和每个叶子节点最小样本数为例;

我们需要以代码的方式表达调参逻辑, 所以我们需要点开代码模式, 看看这两个参数是如何用代码表示的, 首先我们观察到随机森林名称是m12, 所以我们只要找到m12的代码区域即可。

直到了这两个参数的代码表示后, 我们可以开始编写run函数中的代码了:


run函数中只需要在方框的基础上对内容进行变换即可, 接下来我来剖析红色框框中的代码逻辑:

针对随机森林的模块, 我们要调节的有两个参数——树的数量, 叶子最小样本数。

树的数量的取值为4或9, 叶子最小样本数取值为50或200. 所以这是个网格搜索的问题, 所以将它们组合起来为

[{'m12.iterations': 4, 'm12.min_samples_per_leaf': 50},

{'m12.iterations': 4, 'm12.min_samples_per_leaf': 200},

{'m12.iterations': 9, 'm12.min_samples_per_leaf': 50},

{'m12.iterations': 9, 'm12.min_samples_per_leaf': 200}]


**注意:**不同都为参数一定要在同一个字典中作为键各出现一次, 不同的参数不允许单独出现在一个字典里作为键, 如

[...{'m12.iteration': 4},    {'m12.min_samples_per_leaf': 50}...]

所以上述嵌套循环的目的就是为了构造出上述字典列表,之后直接运行即可。

由于这里设了两个参数, 参数取值都为2, 所以他会输出2*2=4个结果。

m13.result是FAI调参加速模块的结果, 由于我们调了4组参数, 所以它会有四种结果, 每种结果下找到回测模块(m15)调用display即可查看不同参数下的回测效果。


{{heading_numbering_zhCN}}

标签

AIStudio

文档

FAI 使用案例使用"自定义运行FAI加速"模块进行调参的教程FAI 快速入门
{link}