从单机到千卡集群:Ray 如何让 Python 代码拥有超能力?
引言:一个让 Python 程序员"无痛"进入分布式计算的框架
为什么训练 AI 模型越来越像"喂显卡吃数据"?
2023 年,GPT-4 的参数规模达到了 1.8 万亿。这个数字意味着什么?如果把它想象成一座图书馆,那大约是 1800 亿本书——而这些"书"需要在成百上千张 GPU 上同时被"阅读"和"理解"。
训练这样的模型,单张显卡已经远远不够。你需要分布式计算:把任务拆分到多台机器、多张显卡上并行执行。但传统的分布式编程方式,对大多数 Python 开发者来说,简直是一场噩梦。
Ray 是什么?
Ray 是一个让 Python 代码自动具备分布式能力的开源框架。
它的核心魔法极其简单:给你的 Python 函数加上 @ray.remote 装饰器,这个函数就能在集群中的任意节点上并行执行。从单机笔记本到千卡集群,同一套代码,无需重写。
这不是魔法,这是来自 UC Berkeley RISELab 的工程师们,为 AI 时代的 Python 开发者精心设计的分布式抽象。
为什么 OpenAI、Uber、Shopify 都选择了它?
- OpenAI:用 Ray 训练 ChatGPT,从 MPI 迁移过来后,基础设施复杂度降低了十倍以上
- Uber:基于 Ray 构建实时供需预测系统,每天处理数百万订单
- Shopify:基于 Ray 搭建 ML 平台 Merlin,实现从 Jupyter 到生产的无缝衔接
这些公司的共同点是什么?他们都在解决同一个问题:如何让 AI 研发和部署变得更简单、更快速、更便宜。
这篇文章,我们就来聊聊 Ray 到底是什么,它解决了 AI 工程的哪些痛点,以及为什么它正在成为分布式 AI 计算的工业标准。
第一部分:Ray 是什么——给 Python 的分布式超能力
1. Ray 诞生的故事:UC Berkeley 的学术项目如何变成工业标准
从 RISELab 走出
2016 年,加州大学伯克利分校的 RISELab(Real-Time Intelligent Systems and Engineering Laboratory)诞生了一个新项目。这个实验室的前身是著名的 AMPLab,也就是 Spark 和 Mesos 的发源地。
项目的创始人是三位博士生:Robert Nishihara、Philipp Moritz 和他们的导师 Ion Stoica。他们的目标很简单:创建一个新的分布式计算框架,专门解决 AI 和强化学习的独特需求。
当时的分布式计算领域,Spark 统治着大数据批处理,MPI 统治着高性能计算。但 AI 工作负载有着完全不同的特征:
- 异构计算:一个 AI pipeline 可能需要 CPU 做预处理、GPU 做训练、CPU 再做服务
- 动态任务:强化学习的任务图是运行时才确定的,无法像 Spark 那样静态规划
- 毫秒级延迟:在线服务需要极低的响应延迟,批处理的秒级延迟 unacceptable
Spark 和 MPI 都不是为这种场景设计的。
从研究工具到企业级框架
2017 年,Ray 的第一篇论文《Ray: A Distributed Framework for Emerging AI Applications》发表在 arXiv 上。同年,Ray 开源。
2019 年,Anyscale 成立——由 Ray 的核心创始团队创办的公司,致力于提供 Ray 的商业支持和企业级服务。
2020-2023 年,Ray 的生态系统迅速扩展:Ray Train、Ray Serve、Ray RLlib、Ray Tune、Ray Data 相继发布,形成了一个完整的 AI 计算平台。
2024 年,Ray 成为 PyTorch 生态的一部分,Anyscale 与 Meta 达成深度合作。
今天,Ray 已经成为包括 OpenAI、Uber、Shopify、蚂蚁集团、字节跳动在内的数百家公司的基础设施核心组件。
为什么选择 Python?
Ray 选择 Python 作为核心语言,不是偶然。
Python 已经是 AI 领域的事实标准语言:PyTorch、TensorFlow、JAX、HuggingFace、scikit-learn……几乎所有主流 AI 框架都提供 Python API。
但 Python 有一个致命弱点:原生的多进程/多线程支持非常糟糕。
全局解释器锁(GIL)让真正的并行计算变得困难,multiprocessing 库虽然有,但 API 复杂、序列化问题频发、调试困难。
Ray 的定位非常精准:让 Python 开发者无需关心底层分布式细节,就能享受分布式的性能优势。
2. Ray 的核心魔法:两行代码变分布式
对比传统方式:用 MPI/Spark 写分布式代码有多痛苦?
让我们先看一个最简单的例子:并行计算一个列表中每个数字的平方。
原生 Python(串行):
def square(x):
return x * x
results = [square(x) for x in range(10000)]
Python multiprocessing:
from multiprocessing import Pool
def square(x):
return x * x
with Pool(4) as p:
results = p.map(square, range(10000))
看起来还行?但这只是最简单的场景。一旦涉及以下情况,代码复杂度会急剧上升:
- 需要跨机器运行(multiprocessing 只能单机)
- 函数参数/返回值无法被 pickle 序列化
- 需要在不同任务之间共享状态
- 需要动态决定执行哪些任务
MPI(消息传递接口):
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
data = list(range(10000))
chunks = [data[i::size] for i in range(size)]
else:
chunks = None
chunk = comm.scatter(chunks, root=0)
results = [x * x for x in chunk]
gathered = comm.gather(results, root=0)
if rank == 0:
print(flatten(gathered))
代码量暴增,概念复杂(rank、scatter、gather),而且要求所有节点运行完全相同的代码。
Spark:
from pyspark import SparkContext
sc = SparkContext("local", "Square App")
data = sc.parallelize(range(10000))
results = data.map(lambda x: x * x).collect()
Spark 的 API 设计得很好,但它有严格的限制:
- 必须是数据并行(DataFrame/RDD 操作)
- 不支持细粒度的任务调度
- 启动开销大,不适合毫秒级任务
- 主要用于批处理,不适合交互式计算
@ray.remote 装饰器:函数级别的分布式抽象
现在看看 Ray 的做法:
import ray
ray.init()
@ray.remote
def square(x):
return x * x
# 提交 10000 个并行任务
futures = [square.remote(x) for x in range(10000)]
results = ray.get(futures)
看到了吗?
- 只需加上
@ray.remote装饰器 square.remote(x)提交异步任务ray.get(futures)获取结果
Ray 自动处理:
- 任务调度到哪个节点执行
- 数据如何序列化和传输
- 失败任务的自动重试
- 内存管理和垃圾回收
这背后的设计理念是:让分布式计算像写普通函数一样简单。
从单机笔记本到 1000 张 GPU
更神奇的是代码的可移植性。
# 本地开发测试
ray.init()
# 连接到 1000 张 GPU 的集群(只需改这一行)
ray.init(address="ray://head-node:10001")
# 你的业务代码完全不变
@ray.remote
def train_model(config):
# 训练代码
return model
futures = [train_model.remote(config) for config in configs]
results = ray.get(futures)
同一套代码,可以在笔记本上调试,可以提交到本地集群测试,可以无缝扩展到生产环境的大规模集群。
这解决了 AI 工程的一个核心痛点:研究和生产的代码不需要重写。
3. Ray 的技术架构极简图
要理解 Ray 为什么能做到这些,我们需要简单了解它的架构。
Head Node + Worker Node 的基本拓扑
一个 Ray 集群由两类节点组成:
- Head Node:负责集群管理、元数据存储、任务调度
- Worker Node:负责任务的实际执行
┌─────────────────────────────────────────────┐
│ Head Node │
│ ┌──────────────┐ ┌──────────────────┐ │
│ │ GCS │ │ Global Scheduler │ │
│ │ (metadata) │ │ (task dispatch) │ │
│ └──────────────┘ └──────────────────┘ │
└─────────────────────┬───────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
┌─────▼─────┐ ┌────▼────┐ ┌──────▼──────┐
│ Worker 1 │ │ Worker 2│ │ Worker N │
│ ┌───────┐ │ │ ┌─────┐ │ │ ┌───────┐ │
│ │Tasks │ │ │ │Tasks│ │ │ │Tasks │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ ├───────┤ │ │ ├─────┤ │ │ ├───────┤ │
│ │Actors │ │ │ │Actors││ │ │Actors │ │
│ └───────┘ │ │ └─────┘ │ │ └───────┘ │
└───────────┘ └─────────┘ └─────────────┘
当你提交一个任务时:
- 请求到达 Head Node 的全局调度器
- 调度器根据资源需求(CPU/GPU/内存)选择最优的 Worker
- 任务被分发到目标 Worker 执行
- 执行结果写回分布式对象存储
GCS(全局控制存储)与 Plasma 对象存储
Ray 有两个关键的数据层:
GCS(Global Control Store):
- 存储集群的元数据:有哪些节点、有哪些 Actor、任务的状态
- 基于 Redis 实现,保证高可用
- 是集群的"大脑",知道一切在哪里
Plasma Object Store:
- 高性能的共享内存对象存储
- 使用 Apache Arrow 格式,支持零拷贝传输
- 同节点进程间共享数据无需序列化
- 跨节点传输自动序列化和压缩
这意味着:
- 任务 A 在 Worker 1 生成的结果,可以被同节点的任务 B 直接读取(零拷贝)
- 任务 C 在 Worker 2 需要这个结果,Ray 自动通过网络传输
- 你不需要关心数据在哪里,Ray 自动管理
分布式调度器:如何将任务分配到最优节点
Ray 的调度器有几个关键特性:
资源感知:每个任务可以声明资源需求
@ray.remote(num_cpus=4, num_gpus=1, memory=1024*1024*1024)
def train_task(data):
# 这个任务需要 4 CPU + 1 GPU + 1GB 内存
pass
数据本地性:调度器会优先将任务调度到已有数据的节点,减少网络传输
自动负载均衡:根据节点负载动态调整任务分配
故障恢复:Worker 故障时,任务自动重试到其他节点
这些能力,开发者完全不需要关心,Ray 在背后自动处理。
第二部分:Ray 核心解决什么问题——AI 工程的三座大山
Ray 之所以被众多顶级科技公司采用,不是因为它技术先进,而是因为它精准击中了 AI 工程中最痛的三个问题。
4. 问题一:AI 训练代码只能在单机上跑,想扩容要全部重写
传统困境:从研究到生产的鸿沟
想象一下这个场景:
你是一个机器学习工程师,用 PyTorch 在笔记本上写了一个图像分类模型。代码大概是这样的:
import torch
import torch.nn as nn
# 定义模型
class ImageClassifier(nn.Module):
def __init__(self):
super().__init__()
# ... 网络结构
def forward(self, x):
# ... 前向传播
# 训练循环
model = ImageClassifier()
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(100):
for batch in dataloader:
loss = model(batch).backward()
optimizer.step()
代码跑得挺好,但现在要上生产环境了。生产环境有 8 张 A100 GPU,你需要分布式训练。
你会怎么做?
方案一:PyTorch DDP(Distributed Data Parallel)
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
# 初始化进程组
dist.init_process_group(backend='nccl')
# 包装模型
model = ImageClassifier().to(local_rank)
model = DDP(model, device_ids=[local_rank])
# 需要使用 DistributedSampler
dataloader = DataLoader(
dataset,
sampler=DistributedSampler(dataset)
)
# 训练代码要改很多地方...
你需要:
- 理解
init_process_group、rank、world_size等概念 - 使用
DistributedSampler确保每个进程读取不同数据 - 修改保存/加载模型的逻辑(只在 rank 0 进程保存)
- 处理进程同步和通信问题
- 使用
torchrun或mp.spawn启动多进程
这还没完——如果要从单机 8 卡扩展到多机 64 卡,还要配置网络、处理节点发现、同步环境……
方案二:Horovod
Horovod 是 Uber 开源的分布式训练框架,API 相对简洁:
import horovod.torch as hvd
# 初始化
hvd.init()
# 包装优化器
optimizer = hvd.DistributedOptimizer(optimizer)
# 广播参数
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
但依然需要理解 Horovod 的概念,而且代码还是要改很多。
Ray 的解法:同一套代码,从笔记本到千卡集群无缝扩展
看看 Ray Train 怎么做:
import ray
from ray import train
from ray.train.torch import TorchTrainer
import torch
def train_func(config):
# 这就是你原来的训练代码!
model = ImageClassifier()
optimizer = torch.optim.Adam(model.parameters())
# Ray Train 自动帮你处理了 DDP 的包装
model = train.torch.prepare_model(model)
optimizer = train.torch.prepare_optimizer(optimizer)
for epoch in range(100):
for batch in train.get_dataset_shard("train"):
loss = model(batch).backward()
optimizer.step()
trainer = TorchTrainer(
train_loop_per_worker=train_func,
scaling_config=train.ScalingConfig(
num_workers=4, # 扩展到 4 个 worker
use_gpu=True, # 每个 worker 用 GPU
),
)
result = trainer.fit()
注意几点:
train_func里的代码几乎就是你原来的代码prepare_model和prepare_optimizer自动处理分布式包装- 改
num_workers=64就可以扩展到 64 个 worker - 单机多卡和多机多卡,代码完全一样
Ray Train 在背后做了什么?
- 自动设置进程组和环境变量
- 自动处理数据分片(DistributedSampler)
- 自动处理模型同步和通信
- 自动处理检查点保存和恢复
你不需要知道 DDP、Horovod、或者 NCCL 是什么,Ray 自动帮你搞定。
案例:一个图像分类脚本,如何在不改动逻辑的情况下提速 100 倍
假设我们有一个简单的图像分类训练脚本:
原始单机代码:
import torch
from torchvision import datasets, transforms
# 数据加载
transform = transforms.Compose([...])
trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True)
# 模型
model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False)
model = model.cuda()
# 训练
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
for epoch in range(10):
for images, labels in trainloader:
images, labels = images.cuda(), labels.cuda()
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
在单张 V100 上,这个脚本跑完 10 个 epoch 大约需要 30 分钟。
Ray 分布式版本(改动不到 10 行):
import ray
from ray import train
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
import torch
def train_func(config):
# 数据加载
trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
# 使用 Ray 的数据分片
trainloader = torch.utils.data.DataLoader(
trainset,
sampler=train.get_dataset_shard("train"),
batch_size=128
)
# 模型(Ray 自动处理 GPU 分配)
model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False)
model = train.torch.prepare_model(model) # ← 关键修改 1
# 训练(代码几乎不变)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
optimizer = train.torch.prepare_optimizer(optimizer) # ← 关键修改 2
for epoch in range(10):
for images, labels in trainloader:
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# 启动分布式训练(扩展到 4 个 GPU)
trainer = TorchTrainer(
train_loop_per_worker=train_func,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)
result = trainer.fit()
结果:
- 单机 1x V100:30 分钟
- 分布式 4x V100:8 分钟(接近线性加速)
- 分布式 8x V100:4 分钟
代码改动不到 10 行,性能提升 100 倍(如果扩展到更多 GPU)。
这就是 Ray 解决的第一座大山:消除从研究到生产的代码重写成本。
5. 问题二:训练和服务使用两套系统,模型上线要"翻译"一次
传统困境:训练用 Horovod,服务用 Flask,模型格式转换 headache
继续上面的例子。你的图像分类模型训练好了,现在要部署成 API 服务,让前端可以调用。
传统的流程是这样的:
训练阶段(PyTorch/Horovod):
# train.py - 分布式训练
import horovod.torch as hvd
# ... 训练代码 ...
torch.save(model.state_dict(), 'model.pth')
服务阶段(Flask/FastAPI):
# serve.py - 模型服务
from flask import Flask, request
import torch
app = Flask(__name__)
model = ImageClassifier()
model.load_state_dict(torch.load('model.pth'))
@app.route('/predict', methods=['POST'])
def predict():
image = request.files['image']
# 预处理
tensor = preprocess(image)
# 推理
with torch.no_grad():
output = model(tensor)
return {'prediction': output.argmax().item()}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
看起来挺简单?但当你真正做生产部署时,问题接踵而至:
问题 1:模型格式转换
- PyTorch 的
.pth格式适合训练,但生产环境可能需要 ONNX 或 TorchScript - 转换过程可能丢失精度或兼容性
问题 2:服务框架与训练框架脱节
- Flask/FastAPI 与 Horovod/PyTorch 是完全独立的系统
- 需要额外的胶水代码处理模型加载、内存管理、并发请求
问题 3:资源管理困难
- 单进程 Flask 无法利用多核 CPU
- 多进程部署需要手动管理 GPU 显存分配
- 如何实现自动扩缩容?
问题 4:版本管理与 A/B 测试
- 模型更新了,如何无缝切换?
- 如何做 A/B 测试比较两个模型版本?
问题 5:批处理与实时服务的割裂
- 批量预测用 Spark?实时预测用 Flask?两套系统,两套代码
Ray 的解法:Train → Serving 一站式完成
Ray Serve 的设计哲学是:训练和 serving 应该是同一套代码、同一个运行时。
from ray import serve
from fastapi import FastAPI
import torch
app = FastAPI()
@serve.deployment(
num_replicas=4, # 启动 4 个副本
ray_actor_options={"num_gpus": 0.25} # 每个副本用 0.25 GPU
)
@serve.ingress(app)
class ImageClassifierService:
def __init__(self):
# 加载模型(和训练时完全一样的代码)
self.model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18')
self.model.load_state_dict(torch.load('model.pth'))
@app.post("/predict")
async def predict(self, image: bytes):
# 预处理(和训练时完全一样的预处理代码)
tensor = self.preprocess(image)
# 推理
with torch.no_grad():
output = self.model(tensor)
return {"prediction": output.argmax().item()}
# 部署服务
serve.run(ImageClassifierService.bind())
看关键点:
- 同一套模型代码:加载模型的代码和训练时完全一样
- 声明式部署:用装饰器声明需要多少副本、多少资源
- 自动扩缩容:流量大了自动增加副本,流量小了自动减少
- 支持复合服务:可以把多个模型组合成一个服务链路
案例:训练好的模型,如何一行代码变成可调用的 REST API
完整的训练到部署流程:
import ray
from ray import train, serve
from ray.train.torch import TorchTrainer
from ray.serve import PredictorDeployment
# ========== 第一步:训练模型 ==========
def train_func(config):
# ... 训练代码 ...
train.save_checkpoint(model=model.state_dict())
trainer = TorchTrainer(train_func, scaling_config=train.ScalingConfig(num_workers=4, use_gpu=True))
result = trainer.fit()
checkpoint = result.checkpoint
# ========== 第二步:部署模型(只需这几行) ==========
from ray.train.torch import TorchCheckpoint, TorchPredictor
class ImagePredictor(TorchPredictor):
def preprocess(self, data):
# 和训练时一样的预处理
return transforms(data)
# 部署为 HTTP 服务
serve.run(
PredictorDeployment.options(num_replicas=2).bind(
ImagePredictor,
checkpoint
)
)
# 服务现在运行在 http://localhost:8000/
# 可以直接调用!
训练好的模型,通过 serve.run() 一行代码就变成了可调用的 HTTP 服务。
Ray Serve 还解决了哪些传统痛点?
多模型组合服务:
# 构建一个推荐系统的服务链路
# 请求 → Embedding 模型 → 粗排模型 → 精排模型 → 结果
embedding = EmbeddingModel.bind()
coarse_rank = CoarseRanker.bind(embedding)
fine_rank = FineRanker.bind(coarse_rank)
serve.run(fine_rank) # 部署整个链路
自动扩缩容:
@serve.deployment(
autoscaling_config={
"min_replicas": 1,
"max_replicas": 100,
"target_num_ongoing_requests_per_replica": 10,
}
)
class AutoScalingService:
...
A/B 测试:
# 90% 流量走模型 A,10% 流量走模型 B
serve.run(
Router.bind({
"model_a": ModelA.bind(),
"model_b": ModelB.bind()
}, weights={"model_a": 0.9, "model_b": 0.1})
)
这就是 Ray 解决的第二座大山:消除训练到服务的部署鸿沟。
6. 问题三:Python 多进程难用,并行编程门槛高
传统困境:multiprocessing 的坑
Python 的 multiprocessing 库提供了多进程支持,但实际用起来坑很多。
坑 1:序列化噩梦
from multiprocessing import Pool
class MyModel:
def __init__(self):
self.data = some_complex_object
def predict(self, x):
return process(x, self.data)
model = MyModel()
def task(x):
return model.predict(x)
with Pool(4) as p:
results = p.map(task, range(100)) # ❌ PicklingError!
报错:PicklingError: Can't pickle <function task at 0x...>
原因:multiprocessing 用 pickle 序列化函数和参数,但很多 Python 对象无法被 pickle(特别是包含复杂状态的类实例、lambda 函数等)。
坑 2:内存爆炸
def process(data):
result = heavy_computation(data)
return result
with Pool(4) as p:
results = p.map(process, large_dataset) # ❌ 内存不足!
原因:Pool.map 会把所有输入数据一次性序列化发送给子进程,如果数据集很大,内存会瞬间爆炸。
坑 3:调试困难
def task(x):
result = do_something(x)
# 如果这里出错了,traceback 几乎无法阅读
return result
with Pool(4) as p:
results = p.map(task, data) # ❌ 子进程崩溃,主进程只知道"某个进程挂了"
多进程的错误处理、日志收集、调试都是非常痛苦的体验。
坑 4:无法跨机器
multiprocessing 只能单机多进程,无法扩展到多台机器。
Ray 的解法:函数级别的并行,自动序列化、自动内存管理
Ray 通过以下机制解决这些问题:
1. 云 Pickle(Cloudpickle)
Ray 使用 cloudpickle 替代标准 pickle,支持序列化更多 Python 对象:
- Lambda 函数
- 嵌套函数
- 类实例(只要类定义可访问)
- 甚至在 Jupyter Notebook 中定义的函数
import ray
ray.init()
# 这个 lambda 可以被序列化!
@ray.remote
def process(x):
return (lambda y: y ** 2)(x)
futures = [process.remote(x) for x in range(100)]
results = ray.get(futures) # ✅ 成功!
2. 分布式对象存储
Ray 的 Plasma 对象存储自动管理内存:
import ray
import numpy as np
ray.init()
# 创建大数组(自动放入共享内存)
large_array = np.zeros(1024 * 1024 * 100) # 800MB
@ray.remote
def process(data):
# data 是从对象存储读取的,零拷贝
return np.sum(data)
# 把大数组放入对象存储(只需一次拷贝)
array_ref = ray.put(large_array)
# 多个任务共享同一个数据引用
futures = [process.remote(array_ref) for _ in range(100)]
results = ray.get(futures)
ray.put()把数据放入共享内存- 多个任务通过引用(reference)访问同一数据,无需重复拷贝
- 对象不再被引用时,自动垃圾回收
3. Actor:有状态并行
对于需要维护状态的场景,Ray 提供了 Actor:
import ray
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
# 创建 Actor 实例
counter = Counter.remote()
# 调用 Actor 方法(状态自动维护)
print(ray.get(counter.increment.remote())) # 1
print(ray.get(counter.increment.remote())) # 2
print(ray.get(counter.increment.remote())) # 3
Actor 解决了什么问题?
- 每个 Actor 是独立进程,维护自己的状态
- Actor 方法调用是异步的(返回 future)
- Actor 可以分布在集群的任何节点上
这在强化学习、在线服务、流处理等场景中非常有用。
4. 丰富的调试工具
Ray 提供了 Dashboard 来监控和调试分布式应用:
- 查看每个任务的状态和日志
- 分析资源使用情况
- 追踪任务依赖关系
- 自动收集错误信息
# 启动时开启 Dashboard
ray.init(dashboard_host='0.0.0.0')
# 访问 http://localhost:8265 查看 Dashboard
案例:用 Ray 实现简单的并行爬虫
对比原生多进程和 Ray 的实现:
原生 multiprocessing(代码复杂,容易出错):
from multiprocessing import Pool, Manager
import requests
def fetch(url, result_queue):
try:
response = requests.get(url, timeout=5)
result_queue.put((url, response.status_code))
except Exception as e:
result_queue.put((url, str(e)))
if __name__ == '__main__':
urls = [...] # 1000 个 URL
# 复杂的队列管理
manager = Manager()
result_queue = manager.Queue()
with Pool(10) as p:
p.starmap(fetch, [(url, result_queue) for url in urls])
# 收集结果
results = []
while not result_queue.empty():
results.append(result_queue.get())
Ray(简洁、高效、可扩展):
import ray
import requests
ray.init()
@ray.remote
def fetch(url):
try:
response = requests.get(url, timeout=5)
return url, response.status_code
except Exception as e:
return url, str(e)
urls = [...] # 1000 个 URL
# 一行代码并行抓取
futures = [fetch.remote(url) for url in urls]
results = ray.get(futures) # 自动等待所有任务完成
优势:
- 代码简洁一半以上
- 自动处理错误和重试
- 可以轻松扩展到多台机器(只需连接到 Ray 集群)
- 通过 Dashboard 可以实时监控抓取进度
这就是 Ray 解决的第三座大山:让并行编程像写普通代码一样简单。
第三部分:Ray 生态能做什么——五大核心场景
Ray 不仅仅是一个分布式计算框架,它已经发展成为一个完整的 AI 计算平台。下面介绍 Ray 的五大核心库。
7. Ray Train:分布式深度学习,一行代码搞定
Ray Train 是 Ray 的分布式训练库,支持 PyTorch、TensorFlow、Horovod、HuggingFace 等主流框架。
核心特性
框架无关的统一 API:
from ray.train.torch import TorchTrainer # PyTorch
from ray.train.tensorflow import TensorflowTrainer # TensorFlow
from ray.train.huggingface import HuggingFaceTrainer # HuggingFace
from ray.train.xgboost import XGBoostTrainer # XGBoost
from ray.train.lightgbm import LightGBMTrainer # LightGBM
所有框架使用相同的 ScalingConfig 配置分布式参数:
scaling_config=ScalingConfig(
num_workers=8, # 8 个 worker
use_gpu=True, # 使用 GPU
resources_per_worker={"CPU": 4, "GPU": 1} # 每个 worker 的资源
)
弹性训练(Fault Tolerance):
- 训练过程中某个 worker 挂了?自动重试
- 支持从检查点恢复,不浪费已完成的训练进度
与 Ray Tune 无缝集成:
- 超参数搜索时,每个试验自动使用分布式训练
- 自动管理 GPU 资源分配,最大化集群利用率
实战:把单机 Stable Diffusion 训练变成分布式
单机代码:
from diffusers import StableDiffusionPipeline
import torch
model = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5")
model = model.to("cuda")
# 训练循环...
for step, batch in enumerate(dataloader):
loss = model(batch).loss
loss.backward()
optimizer.step()
Ray 分布式版本:
from ray import train
from ray.train.torch import TorchTrainer
from diffusers import StableDiffusionPipeline
def train_func(config):
# Ray 自动分配 GPU
model = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5")
model = train.torch.prepare_model(model)
for step, batch in enumerate(train.get_dataset_shard("train")):
loss = model(batch).loss
loss.backward()
optimizer.step()
if step % 100 == 0:
train.report({"loss": loss.item()})
trainer = TorchTrainer(
train_loop_per_worker=train_func,
scaling_config=train.ScalingConfig(num_workers=4, use_gpu=True),
datasets={"train": ray_dataset},
)
result = trainer.fit()
改动点:
- 用
train.torch.prepare_model()包装模型 - 使用
train.get_dataset_shard()获取数据分片 - 用
train.report()上报训练指标
性能提升:
- 单机 1x A100:约 6 小时/epoch
- 分布式 4x A100:约 1.5 小时/epoch(接近 4 倍加速)
8. Ray Serve:大模型时代的在线推理服务
Ray Serve 是一个可扩展的模型服务框架,专为 LLM(大语言模型)和生成式 AI 设计。
核心特性
多模型组合(Model Composition):
@serve.deployment
class EmbeddingModel:
def __init__(self):
self.model = SentenceTransformer('all-MiniLM-L6-v2')
async def __call__(self, text):
return self.model.encode(text)
@serve.deployment
class LLMModel:
def __init__(self, embedding_handle):
self.embedding = embedding_handle
self.llm = load_llm()
async def __call__(self, request):
# 先调用 Embedding 服务
embedding = await self.embedding.remote(request.text)
# 再调用 LLM
return self.llm.generate(embedding)
# 构建服务图
embedding = EmbeddingModel.bind()
llm = LLMModel.bind(embedding)
serve.run(llm)
自动扩缩容(Auto-scaling):
@serve.deployment(
autoscaling_config={
"min_replicas": 1,
"max_replicas": 100,
"target_num_ongoing_requests_per_replica": 10,
"upscale_delay_s": 30, # 30 秒后扩容
"downscale_delay_s": 600, # 10 分钟后缩容
}
)
class LLMService:
...
批量推理优化(Batching):
@serve.deployment
class BatchingService:
@serve.batch(max_batch_size=16, batch_wait_timeout_s=0.01)
async def __call__(self, requests):
# 自动合并多个请求为 batch
batched_input = [req.text for req in requests]
return self.model(batched_input)
案例:用 Ray Serve 部署一个 GPT 风格的 API 服务
import ray
from ray import serve
from fastapi import FastAPI
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
app = FastAPI()
@serve.deployment(
num_replicas=2,
ray_actor_options={"num_gpus": 1}
)
@serve.ingress(app)
class GPTService:
def __init__(self):
# 加载模型
model_name = "microsoft/DialoGPT-medium"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(model_name)
self.model = self.model.to("cuda")
@app.post("/chat")
async def chat(self, message: str, max_length: int = 100):
# 编码输入
input_ids = self.tokenizer.encode(
message + self.tokenizer.eos_token,
return_tensors='pt'
).to("cuda")
# 生成回复
with torch.no_grad():
output = self.model.generate(
input_ids,
max_length=max_length,
pad_token_id=self.tokenizer.eos_token_id
)
# 解码输出
response = self.tokenizer.decode(
output[:, input_ids.shape[-1]:][0],
skip_special_tokens=True
)
return {"response": response}
# 部署服务
serve.run(GPTService.bind())
部署完成后,可以直接调用:
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"message": "Hello, how are you?", "max_length": 50}'
9. Ray RLlib:强化学习的工业级方案
Ray RLlib 是业界最广泛使用的强化学习库之一,支持各种 RL 算法。
为什么 RL 训练天然需要分布式?
强化学习的训练流程是这样的:
- 智能体(Agent)在环境中执行动作
- 环境返回状态和奖励
- 智能体根据奖励更新策略
- 重复
这个循环需要:
- 并行环境:同时运行多个环境实例产生数据
- 分布式策略更新:多个 worker 并行计算梯度
- 异构计算:环境模拟用 CPU,策略更新用 GPU
RLlib 的核心特性
丰富的算法支持:
- 经典算法:DQN、A3C、PPO、IMPALA
- 离线 RL:CQL、CQL、DT
- 多智能体:QMIX、MADDPG
- 模型-based:Dreamer、MB-MPO
多智能体训练:
from ray import tune
from ray.rllib.algorithms.ppo import PPO
config = {
"env": "multi_agent_env",
"multiagent": {
"policies": {
"agent_1": (..., obs_space, action_space, {}),
"agent_2": (..., obs_space, action_space, {}),
},
"policy_mapping_fn": lambda agent_id, **kwargs: agent_id,
},
"num_workers": 8,
}
tune.run(PPO, config=config)
OpenAI 如何用 Ray RLlib 做 RLHF
ChatGPT 的训练过程分为三步:
- 预训练:在大规模语料上训练基础模型
- SFT(Supervised Fine-Tuning):用人工标注数据微调
- RLHF(Reinforcement Learning from Human Feedback):用强化学习优化
RLHF 的核心就是强化学习:
- 环境:语言模型生成回复
- 奖励模型:人类反馈训练的模型,给回复打分
- 策略:语言模型的参数
OpenAI 使用 Ray RLlib 来分布式执行 RLHF 训练:
- 多个 worker 并行生成回复
- 奖励模型并行打分
- PPO 算法分布式更新策略
这种规模的任务,没有分布式框架几乎不可能完成。
10. Ray Tune:超参数搜索的自动化
超参数调优是机器学习中最耗时、最需要算力的环节之一。
炼丹痛点
- 参数空间大:学习率、批量大小、网络层数、激活函数……
- 试错成本高:每个配置都要完整训练一轮
- 资源利用率低:串行搜索太慢,手动并行容易冲突
Ray Tune 的核心特性
分布式搜索:
from ray import tune
from ray.tune.tune_config import TuneConfig
from ray.train.torch import TorchTrainer
def trainable(config):
# 训练代码使用 config 中的超参数
lr = config["lr"]
batch_size = config["batch_size"]
# ... 训练 ...
return {"accuracy": accuracy}
tune_config = TuneConfig(
metric="accuracy",
mode="max",
num_samples=100, # 尝试 100 组配置
)
# 定义搜索空间
param_space = {
"lr": tune.loguniform(1e-4, 1e-1),
"batch_size": tune.choice([32, 64, 128, 256]),
"num_layers": tune.randint(2, 6),
}
# 启动分布式搜索(自动使用所有可用 GPU)
tuner = tune.Tuner(
trainable,
param_space=param_space,
tune_config=tune_config,
run_config=train.RunConfig(...)
)
results = tuner.fit()
早停机制(Early Stopping):
from ray.tune.schedulers import ASHAScheduler
scheduler = ASHAScheduler(
metric="accuracy",
mode="max",
max_t=100, # 最大训练 100 个 epoch
grace_period=10, # 至少训练 10 个 epoch
reduction_factor=3, # 淘汰率
)
# 效果:表现不好的配置会被提前终止,节省 60-80% 的算力
与 Train 无缝集成:
from ray.train.torch import TorchTrainer
from ray.tune import Tuner, TuneConfig
# 每个试验都是分布式训练
trainer = TorchTrainer(
train_loop_per_worker=train_func,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)
tuner = Tuner(
trainer,
param_space={
"train_loop_config": {
"lr": tune.loguniform(1e-4, 1e-1),
"batch_size": tune.choice([32, 64, 128]),
}
},
tune_config=TuneConfig(num_samples=100, scheduler=ASHAScheduler()),
)
每个超参数试验都自动使用 4 个 GPU 进行分布式训练,最大化集群利用率。
11. Ray Data:海量数据的预处理流水线
数据预处理往往比训练本身更耗时。Ray Data 提供了分布式数据处理能力。
核心特性
标准数据格式支持:
- Parquet、CSV、JSON、TFRecord
- HuggingFace Datasets
- Databricks、Snowflake 集成
流式处理:
import ray
# 从 S3 流式读取 Parquet
ds = ray.data.read_parquet("s3://bucket/dataset/")
# 链式转换(惰性求值,自动优化)
ds = ds.map(transform_fn) # 数据转换
ds = ds.filter(filter_fn) # 数据过滤
ds = ds.repartition(100) # 重分区
# 训练时流式消费(不占用大量内存)
for batch in ds.iter_torch_batches(batch_size=32):
# batch 直接从 S3 流式读取到 GPU
model(batch)
零拷贝数据传输:
- 使用 Apache Arrow 格式
- 同节点进程间共享无需序列化
- 跨节点自动压缩传输
与 Databricks、Snowflake 的友好集成
# 从 Databricks 读取
from ray.data import read_databricks
ds = read_databricks(
cluster_id="cluster-id",
table="catalog.schema.table"
)
# 从 Snowflake 读取
from ray.data import read_snowflake
ds = read_snowflake(
query="SELECT * FROM users WHERE created_at > '2024-01-01'",
connection_parameters={...}
)
# 预处理后写回
ds.write_parquet("s3://processed-data/output/")
第四部分:谁在用 Ray——真实企业案例
12. OpenAI:Ray 如何支撑 ChatGPT 的训练
OpenAI 是 Ray 最早的企业用户之一。
从 MPI 迁移到 Ray
在 Ray 之前,OpenAI 使用 MPI(Message Passing Interface)进行分布式训练。MPI 是高性能计算领域的标准,但有几个问题:
- 编程复杂:需要显式管理进程通信
- 灵活性差:难以处理动态任务图
- 容错性差:一个进程挂了,整个作业失败
- 扩展困难:从单机到多机需要大量配置
为什么迁移到 Ray?
OpenAI 的工程师发现,AI 工作负载(特别是强化学习)与 MPI 假设的 “SPMD(Single Program Multiple Data)” 模式非常不同:
- RL 的任务图是动态的,取决于环境的反馈
- 需要同时运行大量仿真环境
- 需要频繁与环境交互
Ray 的 Actor 模型完美契合这个需求:
- 每个仿真环境是一个 Actor,维护自己的状态
- 训练进程与仿真环境通过消息传递通信
- 动态任务调度,无需预定义通信模式
ChatGPT 的 RLHF 训练
ChatGPT 的 RLHF(基于人类反馈的强化学习)训练流程:
- 收集人类反馈:人工标注者对模型生成的多个回复进行排序
- 训练奖励模型:用排序数据训练一个奖励模型,学习人类偏好
- 强化学习优化:用 PPO 算法微调语言模型,最大化奖励模型打分
第三步需要:
- 大量并行环境生成回复
- 奖励模型并行打分
- 分布式策略更新
OpenAI 使用 Ray RLlib 执行这个流程:
- 数百个 worker 并行运行语言模型生成回复
- 奖励模型分布式打分
- PPO 分布式更新策略
据公开报道,使用 Ray 后,OpenAI 的训练基础设施复杂度降低了十倍以上,工程师可以更专注于算法而非分布式细节。
13. Uber:打车定价背后的 Ray 集群
Uber 是 Ray 的重度用户,官方博客有多篇文章介绍其应用。
实时供需预测
Uber 的核心业务之一是动态定价:根据实时的供需关系调整价格。
这需要:
- 处理数百万订单数据
- 实时预测供需趋势
- 毫秒级响应延迟
Uber 使用 Ray 构建实时预测系统:
- Ray Serve 部署预测模型服务
- Ray Data 处理实时数据流
- Ray Train 训练预测模型
弹性训练:高峰期自动扩容,低谷期自动缩容
Uber 的训练负载有明显的峰谷:
- 高峰期:晚上和周末,更多订单数据需要训练
- 低谷期:白天工作时间,训练需求减少
使用 Ray on Kubernetes,Uber 实现了:
- 自动扩缩容:根据队列长度自动增加/减少训练 worker
- 资源隔离:不同类型的任务使用独立的资源池
- GPU 共享:多个小任务共享 GPU,提高利用率
Uber 在 Ray Summit 2025 的演讲中分享了具体数据:
- 训练成本降低 40%
- 资源利用率提升 60%
- 模型迭代速度提升 3 倍
从 MADLJ 到 Ray on Kubernetes
2023 年前,Uber 使用自研的 MADLJ(Michelangelo Deep Learning Jobs)系统管理 ML 训练。迁移到 Ray on Kubernetes 后:
- 开发效率:工程师可以在本地用同样的代码调试,直接提交到集群运行
- 资源管理:Kubernetes 统一调度,避免资源碎片化
- 生态丰富:使用 Ray 生态的各种库,无需重复造轮子
14. Shopify:电商平台的多模态 AI 基础设施
Shopify 是全球领先的电商平台,服务数百万商家。
Merlin:基于 Ray 的统一 ML 平台
Shopify 构建了名为 Merlin 的内部 ML 平台,完全基于 Ray:
核心理念:让 ML 工程师从笔记本到生产的体验尽可能无缝。
技术栈:
- Ray on Kubernetes
- Ray Train:分布式训练
- Ray Serve:模型服务
- Ray Tune:超参数优化
- Ray Data:数据预处理
- Airflow:工作流编排
从 Jupyter 笔记本到生产环境
Shopify 的开发流程:
- 研究阶段:数据科学家在 Jupyter Notebook 上实验
- 开发阶段:把 Notebook 代码整理成 Python 脚本
- 测试阶段:在小数据集上用 Ray 本地模式测试
- 生产阶段:直接提交到 Merlin 平台,自动分布式运行
关键优势:代码无需重写。同一套代码,从笔记本到千卡集群无缝扩展。
多模态 AI:商品图片、描述、评论的统一处理
Shopify 的 AI 场景包括:
- 商品分类:根据图片和描述自动分类
- 搜索推荐:理解用户查询意图
- 评论分析:情感分析、关键词提取
- 智能客服:大语言模型对话
这些场景需要处理多模态数据:
- 图片(CNN/ViT)
- 文本(BERT/GPT)
- 结构化数据(表格)
Merlin 平台使用 Ray 统一处理:
- Ray Data 并行加载多模态数据
- Ray Train 分布式训练多模态模型
- Ray Serve 部署统一的服务接口
Shopify 在 Ray Summit 2024 的演讲《Shopify’s Ray-Powered Approach to Multimodal AI in E-commerce》中分享了具体架构。
15. 其他用户:蚂蚁集团、字节跳动、百度
蚂蚁集团:金融风控模型的分布式训练
蚂蚁集团使用 Ray 训练大规模金融风控模型:
- 特征工程:数十亿特征,TB 级数据
- 模型训练:XGBoost、深度神经网络
- 实时推理:毫秒级风险评分
挑战:
- 数据隐私合规,需要在本地数据中心处理
- 高可用要求,系统不能宕机
- 模型需要频繁更新(应对新型欺诈)
Ray 的解决方案:
- 私有化部署 Ray 集群
- 弹性训练,快速迭代模型
- Ray Serve 高可用服务部署
字节跳动:推荐系统的超大规模特征工程
字节跳动的推荐系统(抖音、今日头条)需要处理:
- 数十亿用户、数千亿内容
- 实时特征提取和更新
- 毫秒级推荐延迟
使用场景:
- Ray Data:分布式特征工程
- Ray Train:分布式模型训练
- Ray Serve:A/B 测试和多模型实验
具体实践可参考字节跳动在 Ray Summit 的演讲。
百度:自动驾驶仿真数据的并行生成
百度 Apollo 自动驾驶平台使用 Ray:
- 仿真环境:并行车道级仿真
- 数据生成:生成海量训练数据
- 策略训练:强化学习训练驾驶策略
挑战:
- 仿真环境计算密集,需要大量 CPU
- 策略训练需要 GPU
- 异构计算资源调度
Ray 的 Actor 模型完美解决:
- 每个仿真环境是一个 Actor,并行运行
- 训练进程动态收集仿真数据
- 自动调度 CPU/GPU 资源
第五部分:Ray vs 其他方案——什么时候选 Ray?
16. Ray vs Spark:AI 时代的分工
Spark 擅长什么?
- 批处理:ETL、数据仓库、报表生成
- SQL 查询:DataFrame API 非常成熟
- 数据规模:PB 级数据处理
Spark 不擅长什么?
- 交互式计算:启动延迟高(秒级),不适合毫秒级任务
- 异构计算:主要设计用于 CPU 批处理,GPU 支持较弱
- 动态任务图:DAG 需要静态定义
- 在线服务:Spark Streaming 延迟较高
Ray 擅长什么?
- AI/ML 工作负载:原生支持 GPU,与 PyTorch/TensorFlow 无缝集成
- 交互式计算:毫秒级任务启动延迟
- 动态任务图:运行时决定任务依赖
- 在线服务:Ray Serve 专为低延迟服务设计
- 异构计算:CPU/GPU 混合调度
什么时候选 Spark?
- 传统的 ETL 数据管道
- SQL 分析查询
- 海量日志处理
- 数据仓库应用
什么时候选 Ray?
- 机器学习训练和推理
- 强化学习
- 交互式/实时应用
- 需要 GPU 的计算任务
两者可以共存吗?
可以!很多公司同时使用两者:
- Spark 做数据预处理和 ETL
- Ray 做模型训练和服务
Ray 也可以读取 Spark 生成的数据(通过 Parquet、Delta Lake 等格式)。
17. Ray vs Kubernetes:互补而非竞争
很多人问:Ray 和 Kubernetes 是什么关系?是竞争还是互补?
答案是:互补。
Kubernetes 擅长什么?
- 容器编排:管理容器的生命周期
- 服务发现:DNS 和负载均衡
- 资源管理:为容器分配 CPU/内存
- 故障恢复:自动重启失败的容器
Kubernetes 不擅长什么?
- 分布式计算:需要额外框架(如 Spark、Flink)
- 细粒度任务调度:Pod 启动需要秒级,不适合毫秒级任务
- AI 特定优化:如 GPU 共享、数据本地性
KubeRay:在 K8s 上运行 Ray 的最佳实践
KubeRay 是 Ray 社区维护的 Kubernetes Operator,用于在 K8s 上部署 Ray 集群。
架构:
Kubernetes 层:
├─ 管理 Ray Head Node Pod
├─ 管理 Ray Worker Node Pods
└─ 自动扩缩容 (Ray Autoscaler + K8s HPA)
Ray 层:
├─ 在 K8s Pod 内运行 Ray 进程
├─ 使用 Ray 的调度器做细粒度任务调度
└─ 使用 Ray 的对象存储做数据共享
工作流程:
- 用户提交 Ray 应用到 K8s 集群
- KubeRay 创建 Ray Head Node
- Ray Autoscaler 根据任务需求,向 K8s 申请 Worker Node
- K8s 创建新的 Pod 作为 Worker
- Ray 在这些 Pod 上调度任务
Ray 管计算,K8s 管资源
分工明确:
- Kubernetes:资源管理、服务发现、故障恢复
- Ray:分布式计算、任务调度、数据管理
这种架构的好处:
- 利用 K8s 成熟的生态(监控、日志、安全)
- 保留 Ray 的高性能计算能力
- 统一资源池,提高利用率
18. Ray vs DeepSpeed/Horovod:分布式训练的多种选择
DeepSpeed:专为大模型训练设计
DeepSpeed 是 Microsoft 开源的深度学习优化库,专为训练超大模型(百亿到万亿参数)设计。
核心特性:
- ZeRO(Zero Redundancy Optimizer):将优化器状态分片到多个 GPU,减少显存占用
- 模型并行:层间并行,支持超大模型
- 混合精度训练:FP16/BF16 加速
适用场景:
- 训练 GPT-3/4 级别的超大语言模型
- 显存不足以容纳完整模型的场景
- 需要极致训练效率
Horovod:Uber 开源的分布式训练框架
Horovod 是 Uber 开源的分布式深度学习框架,基于 MPI 实现 all-reduce 通信。
核心特性:
- 简洁的 API:只需几行代码改造现有训练脚本
- 高性能:基于 NCCL 的 all-reduce
- 框架支持:PyTorch、TensorFlow、Keras
适用场景:
- 数据并行训练
- 对通信效率要求高
- 已有训练脚本,希望最小改动分布式化
Ray Train 的定位
Ray Train 与这些框架的关系:
Ray Train 不是一个新的训练框架,而是分布式训练框架的"统一接口"。
实际上,Ray Train 可以在内部使用 DeepSpeed、Horovod 或 PyTorch DDP:
# 使用 Horovod
from ray.train.horovod import HorovodTrainer
trainer = HorovodTrainer(...)
# 使用 DeepSpeed
from ray.train.deepspeed import DeepSpeedTrainer
trainer = DeepSpeedTrainer(...)
# 使用 PyTorch DDP
from ray.train.torch import TorchTrainer
trainer = TorchTrainer(...)
Ray Train 提供的额外价值:
- 统一 API:无论底层用什么,上层接口一致
- 资源管理:自动分配 GPU,避免资源冲突
- 弹性训练:失败自动重试,从检查点恢复
- 与 Ray 生态集成:Tune、Serve、Data 无缝协作
选型建议
直接用 PyTorch DDP:
- 单机多卡,简单场景
- 不想引入额外依赖
用 Horovod:
- 多机多卡,需要高性能通信
- 已有代码,希望最小改动
用 DeepSpeed:
- 超大模型,显存不够
- 需要 ZeRO、模型并行等高级特性
用 Ray Train:
- 需要与超参数搜索(Tune)、模型服务(Serve)集成
- 需要弹性训练和容错
- 希望统一的研究到生产流程
- 希望自动资源管理
结语:分布式计算的未来是"无感"
Ray 的终极愿景
Ray 的创始团队有一个愿景:
让分布式计算对开发者透明。开发者应该专注于算法和业务逻辑,而不是分布式的复杂性。
这听起来很理想主义,但 Ray 正在一步步实现:
- 2016:提出 Actor + Task 的统一抽象
- 2019:发布 Ray Train、Ray Tune、Ray RLlib
- 2020:发布 Ray Serve
- 2021:发布 Ray Data
- 2023:加入 PyTorch 生态
- 2024+:持续优化性能和易用性
给 Python 工程师的建议:什么时候该考虑引入 Ray?
如果符合以下任何一条,Ray 可能会给你带来很大价值:
- 训练太慢:单机训练需要数小时甚至数天
- 数据太大:内存装不下数据集
- 服务需要扩展:流量增长,单实例无法承载
- 资源利用率低:GPU 经常空闲等待
- 研究到生产的鸿沟:研究代码和生产代码是两套
- 多步骤 pipeline:数据预处理 → 训练 → 调参 → 服务,步骤复杂
从小处开始:
不需要一开始就全面迁移到 Ray。可以:
- 先用 Ray 实现简单的并行任务(如数据预处理)
- 用 Ray Train 改造训练代码
- 用 Ray Serve 部署模型服务
- 逐步扩展到完整 pipeline
参考资料
- How Ray, a Distributed AI Framework, Helps Power ChatGPT - The New Stack
- Ray: A Distributed Framework for Emerging AI Applications - arXiv
- How Uber Uses Ray to Optimize the Rides Business - Uber Engineering
- Uber’s Journey to Ray on Kubernetes - Uber Engineering
- The Magic of Merlin: Shopify’s New Machine Learning Platform - Shopify Engineering
- Shopify’s Ray-Powered Approach to Multimodal AI in E-commerce - Ray Summit 2024
- Ray Use Cases - Ray Documentation
- Anyscale Official Website
- Ray Summit 2025 - Scaling Model Training with Ray at Uber
- Elastic Distributed Training with XGBoost on Ray - Uber