并行处理 深度学习等复杂计算任务
由bq1fuwkt创建,最终由bq1fuwkt 被浏览 2 用户
问题描述
我有大量数据需要处理(如批量计算因子、训练多个模型、参数调优等),单机执行太慢,如何使用 BigQuant SDK 进行分布式并行计算,加速处理过程?
详细解答
BigQuant SDK 提供了 fai 模块(FAI = Fast AI Computing),可以创建多节点集群,并行执行计算密集型任务。
1. 查看现有集群
首先查看是否已有可用的计算集群:
import bigquant
# 查看所有集群
clusters = bigquant.fai.list_clusters()
print(f"现有集群数量: {len(clusters)}")
for cluster in clusters:
print(f"集群ID: {cluster['id']}")
print(f"集群名称: {cluster.get('fullname', 'N/A')}")
print(f"状态: {cluster.get('status', 'N/A')}")
print(f"Worker数量: {cluster.get('num_workers', 0)}")
print(f"可用Worker: {cluster.get('available_workers', 0)}")
print("-" * 50)
- 关键说明
- 集群状态包括:Running(运行中)、Stopped(已停止)、Pending(启动中)、Stopping(停止中)
- num_workers 是集群总 Worker 数量
- available_workers 是当前可用的 Worker 数量
2. 创建计算集群
如果没有集群或需要新建,可以创建指定配置的集群:
# 创建集群
cluster = bigquant.fai.create_cluster(
cluster_name="my_compute_cluster", # 集群名称
num_workers=2, # Worker 数量
worker_cpus=4, # 每个 Worker 的 CPU 核数
worker_memory="8G", # 每个 Worker 的内存
worker_gpus=0, # 每个 Worker 的 GPU 数量(0表示不使用GPU)
max_no_fai_run_time=300 # 空闲超时时间(秒)
)
print(f"✓ 集群已创建")
print(f" 集群ID: {cluster.cluster_info['id']}")
print(f" 集群名称: {cluster.cluster_info['fullname']}")
print(f" Worker数量: {cluster.cluster_info['num_workers']}")
- 关键说明
- num_workers: Worker 节点数量,决定并行度
- worker_cpus: 每个 Worker 的 CPU 核数
- worker_memory: 支持字符串格式(如 "8G", "512M")或整数(字节数)
- worker_gpus: GPU 数量,用于深度学习任务
- max_no_fai_run_time: 空闲自动关闭时间(秒)
3. 启动集群并初始化连接
创建后需要启动集群并等待就绪:
# 等待集群启动完成
cluster.wait_cluster("Running")
print("✓ 集群已启动")
# 初始化 Ray 连接
cluster.init()
print("✓ Ray 连接已建立")
- 关键说明
- wait_cluster("Running") 会阻塞等待集群启动(默认超时120秒)
- init() 会自动处理集群状态,如果是 Stopped 会先启动
- init() 会关闭现有 Ray 连接并建立新连接
4. 定义远程执行函数
使用 @fai.remote 装饰器定义可并行执行的函数:
# 定义远程函数
@bigquant.fai.remote
def compute_factor(stock_code, start_date, end_date):
"""在远程 Worker 上计算单只股票的因子"""
import pandas as pd
import numpy as np
import time
# 模拟数据查询
time.sleep(0.5) # 模拟查询延迟
# 模拟因子计算
np.random.seed(hash(stock_code) % 2**32)
data = pd.DataFrame({
'date': pd.date_range(start_date, end_date, freq='D'),
'close': 10 + np.random.randn(10).cumsum()
})
# 计算技术指标
data['ma5'] = data['close'].rolling(5).mean()
data['ma20'] = data['close'].rolling(20).mean()
data['volatility'] = data['close'].rolling(10).std()
result = {
'stock_code': stock_code,
'data_rows': len(data),
'mean_close': data['close'].mean(),
'max_volatility': data['volatility'].max()
}
return result
print("✓ 远程函数已定义")
- 关键说明
- @fai.remote 装饰器标记函数为远程执行
- 函数会被序列化并发送到 Worker 节点执行
- 函数内的 import 语句必须在函数内部,不能使用外部导入
- 返回值会被序列化传回主节点
5. 提交并行任务
批量提交任务到集群执行:
# 准备任务列表
stock_codes = [f"00000{i}.SZ" for i in range(1, 11)] # 10只股票
start_date = "2024-01-01"
end_date = "2024-01-10"
# 提交并行任务(不会阻塞)
print(f"提交 {len(stock_codes)} 个任务...")
task_refs = [
compute_factor.remote(code, start_date, end_date)
for code in stock_codes
]
print(f"✓ 已提交 {len(task_refs)} 个任务")
print(" 任务正在并行执行...")
- 关键说明
- .remote() 返回的是 ObjectRef(引用),不是实际结果
- 任务立即提交,不会阻塞主程序
- 多个任务会自动分配到不同 Worker 并行执行
6. 获取执行结果
等待任务完成并获取结果:
# 获取所有结果(会阻塞等待)
print("等待任务完成...")
results = bigquant.fai.get(task_refs)
print(f"\n✓ 所有任务已完成")
print(f" 共处理 {len(results)} 只股票")
# 查看结果
for result in results:
print(f" {result['stock_code']}: {result['data_rows']} 行数据, "
f"均价={result['mean_close']:.2f}, "
f"最大波动={result['max_volatility']:.2f}")
- 关键说明
- fai.get() 会阻塞等待所有任务完成
- 也可以传入单个 ObjectRef 获取单个结果
- 结果顺序与提交顺序一致
7. 高级用法 - 使用 wait() 控制并发
对于大量任务,可以使用 wait() 控制并发度:
# 批量提交1000个任务
all_tasks = [compute_factor.remote(f"{i:06d}.SZ", "2024-01-01", "2024-01-10")
for i in range(1, 1001)]
# 每次等待100个任务完成
batch_size = 100
all_results = []
print(f"处理 {len(all_tasks)} 个任务...")
remaining = all_tasks
while remaining:
# 等待至少 batch_size 个任务完成(或全部完成)
num_returns = min(batch_size, len(remaining))
ready, remaining = bigquant.fai.wait(remaining, num_returns=num_returns)
# 获取已完成任务的结果
batch_results = bigquant.fai.get(ready)
all_results.extend(batch_results)
print(f" 已完成: {len(all_results)}/{len(all_tasks)}")
print(f"✓ 全部完成,共 {len(all_results)} 个结果")
- 关键说明
- fai.wait() 返回 (ready, not_ready) 元组
- num_returns 指定等待多少个任务完成
- 适用于超大批量任务,避免内存占用过高
8. 使用共享数据 - put() 和 get()
对于大对象,可以放入对象存储,避免重复传输:
import pandas as pd
# 准备大对象(如配置、参数等)
large_config = pd.DataFrame({
'param': ['alpha', 'beta', 'gamma'],
'value': [0.1, 0.2, 0.3]
})
# 放入对象存储
config_ref = bigquant.fai.put(large_config)
print("✓ 配置已放入对象存储")
# 定义使用共享数据的函数
@bigquant.fai.remote
def process_with_config(stock_code, config_ref):
"""使用共享配置处理数据"""
import ray
# 从对象存储获取配置(只传输引用,不重复传输数据)
config = ray.get(config_ref)
# 使用配置进行计算
alpha = config[config['param'] == 'alpha']['value'].values[0]
result = {
'stock_code': stock_code,
'alpha': alpha,
'computed_value': alpha * 100
}
return result
# 提交任务(传递引用而非数据)
tasks = [process_with_config.remote(f"{i:06d}.SZ", config_ref) for i in range(1, 11)]
results = bigquant.fai.get(tasks)
print(f"✓ 处理完成 {len(results)} 个任务")
- 关键说明
- fai.put() 将对象放入 Ray 对象存储
- 返回的引用可以在多个任务间共享,避免重复传输
- Worker 通过 ray.get() 获取实际数据
9. 指定资源需求
为任务指定 CPU、内存、GPU 需求:
# 定义需要更多资源的函数
@bigquant.fai.remote(num_cpus=2, memory="2G")
def heavy_computation(data_size):
"""需要 2 个 CPU 和 2G 内存的任务"""
import numpy as np
# 大规模计算
data = np.random.randn(data_size, data_size)
result = np.linalg.svd(data)
return {"size": data_size, "computed": True}
# GPU 任务示例(需要 GPU 集群)
@bigquant.fai.remote(num_gpus=1, memory="4G")
def train_model(model_id):
"""需要 1 个 GPU 和 4G 内存的训练任务"""
# 模拟模型训练
import time
time.sleep(2)
return {"model_id": model_id, "accuracy": 0.95}
- 关键说明
- num_cpus: 需要的 CPU 核数
- memory: 需要的内存(字符串格式,如 "2G", "512M")
- num_gpus: 需要的 GPU 数量
- Ray 会根据资源需求调度任务到合适的 Worker
10. 关闭集群
使用完毕后关闭连接并删除集群:
# 关闭 Ray 连接
cluster.shutdown()
print("✓ Ray 连接已关闭")
# 删除集群(释放云端资源)
result = cluster.delete_cluster()
print("✓ 集群已删除")
- 关键说明
- shutdown() 只关闭本地连接,集群仍在运行
- delete_cluster() 会删除集群,释放所有资源
- 建议使用完毕后及时删除,避免资源浪费
深度学习任务示例
使用 GPU 集群进行模型训练:
import bigquant
# 创建 GPU 集群
cluster = bigquant.fai.create_cluster(
cluster_name="dl_training",
num_workers=2,
worker_cpus=8,
worker_memory="16G",
worker_gpus=1 # 每个 Worker 1 个 GPU
)
cluster.wait_cluster("Running")
cluster.init()
# 定义训练函数
@bigquant.fai.remote(num_gpus=1, memory="8G")
def train_model(model_config):
"""训练深度学习模型"""
import torch
import time
# 检查 GPU 可用性
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")
# 模拟模型训练
model_name = model_config['name']
epochs = model_config['epochs']
# 训练循环
for epoch in range(epochs):
time.sleep(0.5) # 模拟训练时间
result = {
'model_name': model_name,
'epochs': epochs,
'final_loss': 0.05,
'accuracy': 0.95,
'device': str(device)
}
return result
# 准备多个模型配置
model_configs = [
{'name': 'LSTM_model_1', 'epochs': 10},
{'name': 'LSTM_model_2', 'epochs': 15},
{'name': 'Transformer_model', 'epochs': 20},
{'name': 'CNN_model', 'epochs': 12}
]
# 并行训练
print(f"开始训练 {len(model_configs)} 个模型...")
tasks = [train_model.remote(config) for config in model_configs]
results = bigquant.fai.get(tasks)
print("\n训练结果:")
for result in results:
print(f" {result['model_name']}: "
f"Accuracy={result['accuracy']:.2%}, "
f"Loss={result['final_loss']:.4f}, "
f"Device={result['device']}")
# 清理
cluster.shutdown()
cluster.delete_cluster()
关键概念解释
FAI (Fast AI Computing)
BigQuant 提供的分布式计算服务,基于 Ray 框架,支持 CPU 和 GPU 集群。
Cluster(集群)
一组计算节点的集合,包含:
- Head 节点: 调度和管理节点
- Worker 节点: 实际执行计算的节点
@fai.remote 装饰器
标记函数为远程执行函数,被装饰的函数会:
- 被序列化并发送到 Worker
- 在 Worker 上执行
- 返回结果的引用(ObjectRef)
ObjectRef
Ray 对象引用,指向存储在 Ray 对象存储中的数据。通过 fai.get() 获取实际数据。
资源需求
- num_cpus: CPU 核数需求
- memory: 内存需求(如 "2G", "512M")
- num_gpus: GPU 数量需求
对象存储
Ray 的共享内存系统,用于:
- 存储任务参数和返回值
- 在节点间共享大对象
- 避免重复序列化/反序列化
适用场景
- 批量因子计算
并行计算数千只股票的技术指标、基本面因子等。
- 参数调优
并行测试不同参数组合,快速找到最优参数。
- 模型训练
使用 GPU 集群并行训练多个模型或进行超参数搜索。
- 回测优化
并行执行多个策略的回测,加速策略开发。
- 数据处理
并行处理大规模数据清洗、特征工程等任务。
注意事项
- 函数独立性: 远程函数必须包含所有依赖,不能使用外部变量
- Import 位置: 所有 import 必须在函数内部
- 数据大小: 避免传递超大对象,使用 fai.put() 共享
- 资源管理: 及时删除集群,避免资源浪费
- 错误处理: 任务失败会抛出异常,需要捕获处理
- 版本一致: Worker 和主节点的 Python 环境应保持一致
最佳实践
- 合理设置 Worker 数量: 根据任务数量和单任务耗时选择
- 批量提交: 一次提交多个任务,减少调度开销
- 使用 wait(): 大批量任务使用 wait() 控制内存
- 复用集群: 多次计算复用同一集群,避免重复启动
\