BigQuant使用文档

FAI 使用案例

由qxiao创建,最终由small_q 被浏览 133 用户

连接集群

如果我们的集群开启了智能管理,那么我们在使用完集群后无需对集群进行任何处理,他将在默认5分钟无使用的情况下自动停止,也不再收费,当我们下次调用init函数时,会自动拉起并连接

import fai

# 连接和初始化fai
fai.init(cluster="fai-youwu-gduptjuu.fai-cluster",token="Wn56crstJbe9PluNytYmvl7g9NwTvPcI")

第一种情况,最简单的使用fai来执行远程函数

下面会简单的介绍fai.remotefai.get 的使用

  • 只需要在原有的函数上添加@fai.remote 装饰器,其中装饰器可以有参数如 @fai.remote(num_gpus=1,num_cpus=0.2) 也可以没有参数@fai.remote,没有参数的情况下等价于@fai.remote(num_cpus=1)
  • 执行函数的时候我们通过func.inner(*args,**kwargs)来提交远程执行队列,此时会返回一个执行引用ref,我们可以通过fai.get(ref)来获取实际的返回结果,这个结果会通过网络传递到本地来
  • fai.get的参数除了一个ref,也可以是List[ref]一个ref的列表,返回值是一个对象或者对象列表

下面是一个简单例子

@fai.remote
def func1(arg1,arg2,kw1=1,kw2=5):
    return arg1+arg2+kw1+kw2

ref = func1.remote(1,2,kw1=3,kw2=4)
result = fai.get(ref)
assert result == 10

refs = [func1.remote(1,2,kw1=3,kw2=4) for _ in range(3)]
results = fai.get(refs)
assert results == [10,10,10]

使用fai.remote的参数来控制并发

我们在启动节点的时候会有资源规格和节点数,比如我们启动的节点是8C,32G,2卡,一共启动了2个节点,这个时候我们的@fai.remote()参数最大为@fai.remote(num_cpus=8,num_gpus=2,memory="32G")

  • num_cpus: 表示这个任务需要多少多少cpu能执行,默认是1,可以是小数,如果是0.1,表示一个核能跑10个任务,但是这个参数只是表示当分配的资源后还剩余0.1核就可以执行这个任务,但是这个任务实际消耗多少cpu,是不受控制的
  • num_gpus: 表示这个任务需要多少显卡来执行,默认是0,当我们声明了这个参数的时候,函数内获取的显卡的顺序都是从0开始的,这样方便使用显卡,而不用知道显卡的实际顺序
  • memory: 这个参数表示当节点还有多少内存剩余的时候调用这个任务,默认是None,一般不填
  • 这些资源参数同时存在时,要所有参数都满足才会调度,下面的代码我们按照8C,32G,2卡,2节点的资源来演示
  • 当资源不满足的情况戏,任务会在后台等待,直到其他任务退出之后,空余资源满足的情况下才能执行
@fai.remote(num_cpus=8,num_gpus=2) # 我们申明了两张卡
def func2():
    import torch
    device_name = torch.cuda.get_device_name(0)
    device_name2 = torch.cuda.get_device_name(1)
    return device_name,device_name2

print("func2:",fai.get(func2.remote()))

@fai.remote(num_cpus=4,num_gpus=1) # 我们申明了一张卡,但是我们并发4个
def func3():
    import torch
    device_name = torch.cuda.get_device_name(0)
    return device_name

refs = [func3.remote() for _ in range(4)]
print("func3:",fai.get(refs))

# 下面测试任务等待
@fai.remote(num_cpus=5) 
def func4():
    import datetime
    import time
    time.sleep(10)
    return datetime.datetime.now().second

refs = [func4.remote() for _ in range(4)]
# 因为我们的一个节点只有8cpu,我们申明的任务需要划走5cpu,所以一个节点只能执行一个该任务,
# 最后我们能观察到我们返回的结果数字之间是两个差不多同时,另外两个差不多同时,他们相差10秒左右
# 总共执行20秒
print("func4:",fai.get(refs)) 

给任务传递参数

我们的任务大多数情况下都是需要传递参数的,这个时候有一些注意点

  • 任务参数不能太大,比如我们传一个非常大的dataframe,比如1G的数据,我们不建议这么做,会非常慢,同时会导致共享内存锐减,导致oom程序崩溃
    • 这个过程中,会先将数据pickle,然后通过网络发送到对应的节点,对应的节点会反序列化在执行,这个过程中会消耗大量的算力和网络,后台也会划分对应的存储来处理这个对象,因此我们建议将数据落盘在读取。可以吧数据写到这个目录上来共享文件
  • 最好在远程函数内 import 包,这样能避免传输相关的包到节点上
  • 尽可能的减少函数外部的变量,这些变量也会pickle到节点上,有些对象是不能进行pickle序列化的,这样会导致错误
  • 不要再远程函数内使用多进程,使用fai.remote 来代替
import pandas as pd
import numpy as np

# 计算每个列所需的行数以达到1GB
bytes_per_gb = 1 * (1024 ** 3)  # 1GB
num_columns = 10  # 假设我们需要10列
bytes_per_float = 8  # float64占用8字节
rows_per_column = bytes_per_gb // (num_columns * bytes_per_float)

# 创建一个1GB的DataFrame
data = np.random.rand(rows_per_column, num_columns)
large_df = pd.DataFrame(data, columns=[f'col_{i}' for i in range(num_columns)])

save_path = "/home/aiuser/work/test.csv"
large_df.to_csv(save_path)

@fai.remote
def func5():
    import pandas as pd # 在远程函数应用包
    df = pd.read_csv(save_path) # 小对象`save_path`可以直接读取,large_df大对象最好落盘
    return df.shape


print(fai.get(func5.remote()))

或者直接返回这个函数的ref作为参数另外一个远程函数的参数,他们会尽可能的在一个节点间执行

@fai.remote
def return_large_data():
    import pandas as pd
    import numpy as np

    # 计算每个列所需的行数以达到1GB
    bytes_per_gb = 1 * (1024 ** 3)  # 1GB
    num_columns = 10  # 假设我们需要10列
    bytes_per_float = 8  # float64占用8字节
    rows_per_column = bytes_per_gb // (num_columns * bytes_per_float)

    # 创建一个1GB的DataFrame
    data = np.random.rand(rows_per_column, num_columns)
    large_df = pd.DataFrame(data, columns=[f'col_{i}' for i in range(num_columns)])
    return large_df

@fai.remote
def func5(df):
    return df.shape


ref = return_large_data.remote() 
print(fai.get(func5.remote(ref))) # 直接传递ref

fai.remote 函数之间的调用

  • fai.remote函数之间也可以相互调用,他们可以组织称一个DAG执行队列
  • ref 可以直接作为另外一个远程函数的参数,另外的远程函数或自动获取他的值
@fai.remote
def node1():
    return "node1"

@fai.remote
def node2(node):
    return f"{node}->node2"

@fai.remote
def node3(node):
    return f"{node}->node3"

@fai.remote
def node4(node_1,node_2):
    return f"{node_1}->node4,{node_2}->node4"

@fai.remote
def main():
    ref1 = node1.remote()
    ref2 = node2.remote(ref1)
    ref3 = node3.remote(ref2)
    ref4 = node4.remote(ref1,ref3)
    return fai.get(ref4)

print(fai.get(main.remote()))

\

评论
  • fai.remote函数之间的调用代码有误,缺少fai.get()
  • 在fai.remote函数内,不需要使用fai.get(),会自动处理
{link}