FAI 使用案例
由qxiao创建,最终由small_q 被浏览 133 用户
连接集群
如果我们的集群开启了智能管理,那么我们在使用完集群后无需对集群进行任何处理,他将在默认5分钟无使用的情况下自动停止,也不再收费,当我们下次调用init
函数时,会自动拉起并连接
import fai
# 连接和初始化fai
fai.init(cluster="fai-youwu-gduptjuu.fai-cluster",token="Wn56crstJbe9PluNytYmvl7g9NwTvPcI")
第一种情况,最简单的使用fai来执行远程函数
下面会简单的介绍fai.remote
和 fai.get
的使用
- 只需要在原有的函数上添加
@fai.remote
装饰器,其中装饰器可以有参数如@fai.remote(num_gpus=1,num_cpus=0.2)
也可以没有参数@fai.remote
,没有参数的情况下等价于@fai.remote(num_cpus=1)
- 执行函数的时候我们通过
func.inner(*args,**kwargs)
来提交远程执行队列,此时会返回一个执行引用ref
,我们可以通过fai.get(ref)
来获取实际的返回结果,这个结果会通过网络传递到本地来 fai.get
的参数除了一个ref
,也可以是List[ref]
一个ref
的列表,返回值是一个对象或者对象列表
下面是一个简单例子
@fai.remote
def func1(arg1,arg2,kw1=1,kw2=5):
return arg1+arg2+kw1+kw2
ref = func1.remote(1,2,kw1=3,kw2=4)
result = fai.get(ref)
assert result == 10
refs = [func1.remote(1,2,kw1=3,kw2=4) for _ in range(3)]
results = fai.get(refs)
assert results == [10,10,10]
使用fai.remote
的参数来控制并发
我们在启动节点的时候会有资源规格和节点数,比如我们启动的节点是8C,32G,2卡,一共启动了2个节点,这个时候我们的@fai.remote()
参数最大为@fai.remote(num_cpus=8,num_gpus=2,memory="32G")
- num_cpus: 表示这个任务需要多少多少cpu能执行,默认是1,可以是小数,如果是0.1,表示一个核能跑10个任务,但是这个参数只是表示当分配的资源后还剩余0.1核就可以执行这个任务,但是这个任务实际消耗多少cpu,是不受控制的
- num_gpus: 表示这个任务需要多少显卡来执行,默认是0,当我们声明了这个参数的时候,函数内获取的显卡的顺序都是从0开始的,这样方便使用显卡,而不用知道显卡的实际顺序
- memory: 这个参数表示当节点还有多少内存剩余的时候调用这个任务,默认是None,一般不填
- 这些资源参数同时存在时,要所有参数都满足才会调度,下面的代码我们按照8C,32G,2卡,2节点的资源来演示
- 当资源不满足的情况戏,任务会在后台等待,直到其他任务退出之后,空余资源满足的情况下才能执行
@fai.remote(num_cpus=8,num_gpus=2) # 我们申明了两张卡
def func2():
import torch
device_name = torch.cuda.get_device_name(0)
device_name2 = torch.cuda.get_device_name(1)
return device_name,device_name2
print("func2:",fai.get(func2.remote()))
@fai.remote(num_cpus=4,num_gpus=1) # 我们申明了一张卡,但是我们并发4个
def func3():
import torch
device_name = torch.cuda.get_device_name(0)
return device_name
refs = [func3.remote() for _ in range(4)]
print("func3:",fai.get(refs))
# 下面测试任务等待
@fai.remote(num_cpus=5)
def func4():
import datetime
import time
time.sleep(10)
return datetime.datetime.now().second
refs = [func4.remote() for _ in range(4)]
# 因为我们的一个节点只有8cpu,我们申明的任务需要划走5cpu,所以一个节点只能执行一个该任务,
# 最后我们能观察到我们返回的结果数字之间是两个差不多同时,另外两个差不多同时,他们相差10秒左右
# 总共执行20秒
print("func4:",fai.get(refs))
给任务传递参数
我们的任务大多数情况下都是需要传递参数的,这个时候有一些注意点
- 任务参数不能太大,比如我们传一个非常大的
dataframe
,比如1G的数据,我们不建议这么做,会非常慢,同时会导致共享内存锐减,导致oom程序崩溃- 这个过程中,会先将数据pickle,然后通过网络发送到对应的节点,对应的节点会反序列化在执行,这个过程中会消耗大量的算力和网络,后台也会划分对应的存储来处理这个对象,因此我们建议将数据落盘在读取。可以吧数据写到这个目录上来共享文件
- 最好在远程函数内 import 包,这样能避免传输相关的包到节点上
- 尽可能的减少函数外部的变量,这些变量也会pickle到节点上,有些对象是不能进行pickle序列化的,这样会导致错误
- 不要再远程函数内使用多进程,使用fai.remote 来代替
import pandas as pd
import numpy as np
# 计算每个列所需的行数以达到1GB
bytes_per_gb = 1 * (1024 ** 3) # 1GB
num_columns = 10 # 假设我们需要10列
bytes_per_float = 8 # float64占用8字节
rows_per_column = bytes_per_gb // (num_columns * bytes_per_float)
# 创建一个1GB的DataFrame
data = np.random.rand(rows_per_column, num_columns)
large_df = pd.DataFrame(data, columns=[f'col_{i}' for i in range(num_columns)])
save_path = "/home/aiuser/work/test.csv"
large_df.to_csv(save_path)
@fai.remote
def func5():
import pandas as pd # 在远程函数应用包
df = pd.read_csv(save_path) # 小对象`save_path`可以直接读取,large_df大对象最好落盘
return df.shape
print(fai.get(func5.remote()))
或者直接返回这个函数的ref作为参数另外一个远程函数的参数,他们会尽可能的在一个节点间执行
@fai.remote
def return_large_data():
import pandas as pd
import numpy as np
# 计算每个列所需的行数以达到1GB
bytes_per_gb = 1 * (1024 ** 3) # 1GB
num_columns = 10 # 假设我们需要10列
bytes_per_float = 8 # float64占用8字节
rows_per_column = bytes_per_gb // (num_columns * bytes_per_float)
# 创建一个1GB的DataFrame
data = np.random.rand(rows_per_column, num_columns)
large_df = pd.DataFrame(data, columns=[f'col_{i}' for i in range(num_columns)])
return large_df
@fai.remote
def func5(df):
return df.shape
ref = return_large_data.remote()
print(fai.get(func5.remote(ref))) # 直接传递ref
fai.remote 函数之间的调用
- fai.remote函数之间也可以相互调用,他们可以组织称一个DAG执行队列
- ref 可以直接作为另外一个远程函数的参数,另外的远程函数或自动获取他的值
@fai.remote
def node1():
return "node1"
@fai.remote
def node2(node):
return f"{node}->node2"
@fai.remote
def node3(node):
return f"{node}->node3"
@fai.remote
def node4(node_1,node_2):
return f"{node_1}->node4,{node_2}->node4"
@fai.remote
def main():
ref1 = node1.remote()
ref2 = node2.remote(ref1)
ref3 = node3.remote(ref2)
ref4 = node4.remote(ref1,ref3)
return fai.get(ref4)
print(fai.get(main.remote()))
\