从单机到千卡集群:Ray 如何让 Python 代码拥有超能力?

Posted on May 13, 2026 · 17 min read

引言:一个让 Python 程序员"无痛"进入分布式计算的框架

为什么训练 AI 模型越来越像"喂显卡吃数据"?

2023 年,GPT-4 的参数规模达到了 1.8 万亿。这个数字意味着什么?如果把它想象成一座图书馆,那大约是 1800 亿本书——而这些"书"需要在成百上千张 GPU 上同时被"阅读"和"理解"。

训练这样的模型,单张显卡已经远远不够。你需要分布式计算:把任务拆分到多台机器、多张显卡上并行执行。但传统的分布式编程方式,对大多数 Python 开发者来说,简直是一场噩梦。

Ray 是什么?

Ray 是一个让 Python 代码自动具备分布式能力的开源框架。

它的核心魔法极其简单:给你的 Python 函数加上 @ray.remote 装饰器,这个函数就能在集群中的任意节点上并行执行。从单机笔记本到千卡集群,同一套代码,无需重写。

这不是魔法,这是来自 UC Berkeley RISELab 的工程师们,为 AI 时代的 Python 开发者精心设计的分布式抽象。

为什么 OpenAI、Uber、Shopify 都选择了它?

  • OpenAI:用 Ray 训练 ChatGPT,从 MPI 迁移过来后,基础设施复杂度降低了十倍以上
  • Uber:基于 Ray 构建实时供需预测系统,每天处理数百万订单
  • Shopify:基于 Ray 搭建 ML 平台 Merlin,实现从 Jupyter 到生产的无缝衔接

这些公司的共同点是什么?他们都在解决同一个问题:如何让 AI 研发和部署变得更简单、更快速、更便宜。

这篇文章,我们就来聊聊 Ray 到底是什么,它解决了 AI 工程的哪些痛点,以及为什么它正在成为分布式 AI 计算的工业标准。

第一部分:Ray 是什么——给 Python 的分布式超能力

1. Ray 诞生的故事:UC Berkeley 的学术项目如何变成工业标准

从 RISELab 走出

2016 年,加州大学伯克利分校的 RISELab(Real-Time Intelligent Systems and Engineering Laboratory)诞生了一个新项目。这个实验室的前身是著名的 AMPLab,也就是 Spark 和 Mesos 的发源地。

项目的创始人是三位博士生:Robert NishiharaPhilipp Moritz 和他们的导师 Ion Stoica。他们的目标很简单:创建一个新的分布式计算框架,专门解决 AI 和强化学习的独特需求。

当时的分布式计算领域,Spark 统治着大数据批处理,MPI 统治着高性能计算。但 AI 工作负载有着完全不同的特征:

  • 异构计算:一个 AI pipeline 可能需要 CPU 做预处理、GPU 做训练、CPU 再做服务
  • 动态任务:强化学习的任务图是运行时才确定的,无法像 Spark 那样静态规划
  • 毫秒级延迟:在线服务需要极低的响应延迟,批处理的秒级延迟 unacceptable

Spark 和 MPI 都不是为这种场景设计的。

从研究工具到企业级框架

2017 年,Ray 的第一篇论文《Ray: A Distributed Framework for Emerging AI Applications》发表在 arXiv 上。同年,Ray 开源。

2019 年,Anyscale 成立——由 Ray 的核心创始团队创办的公司,致力于提供 Ray 的商业支持和企业级服务。

2020-2023 年,Ray 的生态系统迅速扩展:Ray Train、Ray Serve、Ray RLlib、Ray Tune、Ray Data 相继发布,形成了一个完整的 AI 计算平台。

2024 年,Ray 成为 PyTorch 生态的一部分,Anyscale 与 Meta 达成深度合作。

今天,Ray 已经成为包括 OpenAI、Uber、Shopify、蚂蚁集团、字节跳动在内的数百家公司的基础设施核心组件。

为什么选择 Python?

Ray 选择 Python 作为核心语言,不是偶然。

Python 已经是 AI 领域的事实标准语言:PyTorch、TensorFlow、JAX、HuggingFace、scikit-learn……几乎所有主流 AI 框架都提供 Python API。

但 Python 有一个致命弱点:原生的多进程/多线程支持非常糟糕。

全局解释器锁(GIL)让真正的并行计算变得困难,multiprocessing 库虽然有,但 API 复杂、序列化问题频发、调试困难。

Ray 的定位非常精准:让 Python 开发者无需关心底层分布式细节,就能享受分布式的性能优势。

2. Ray 的核心魔法:两行代码变分布式

对比传统方式:用 MPI/Spark 写分布式代码有多痛苦?

让我们先看一个最简单的例子:并行计算一个列表中每个数字的平方。

原生 Python(串行):

def square(x):
    return x * x

results = [square(x) for x in range(10000)]

Python multiprocessing:

from multiprocessing import Pool

def square(x):
    return x * x

with Pool(4) as p:
    results = p.map(square, range(10000))

看起来还行?但这只是最简单的场景。一旦涉及以下情况,代码复杂度会急剧上升:

  • 需要跨机器运行(multiprocessing 只能单机)
  • 函数参数/返回值无法被 pickle 序列化
  • 需要在不同任务之间共享状态
  • 需要动态决定执行哪些任务

MPI(消息传递接口):

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
    data = list(range(10000))
    chunks = [data[i::size] for i in range(size)]
else:
    chunks = None

chunk = comm.scatter(chunks, root=0)
results = [x * x for x in chunk]
gathered = comm.gather(results, root=0)

if rank == 0:
    print(flatten(gathered))

代码量暴增,概念复杂(rank、scatter、gather),而且要求所有节点运行完全相同的代码。

Spark:

from pyspark import SparkContext

sc = SparkContext("local", "Square App")
data = sc.parallelize(range(10000))
results = data.map(lambda x: x * x).collect()

Spark 的 API 设计得很好,但它有严格的限制:

  • 必须是数据并行(DataFrame/RDD 操作)
  • 不支持细粒度的任务调度
  • 启动开销大,不适合毫秒级任务
  • 主要用于批处理,不适合交互式计算

@ray.remote 装饰器:函数级别的分布式抽象

现在看看 Ray 的做法:

import ray

ray.init()

@ray.remote
def square(x):
    return x * x

# 提交 10000 个并行任务
futures = [square.remote(x) for x in range(10000)]
results = ray.get(futures)

看到了吗?

  • 只需加上 @ray.remote 装饰器
  • square.remote(x) 提交异步任务
  • ray.get(futures) 获取结果

Ray 自动处理:

  • 任务调度到哪个节点执行
  • 数据如何序列化和传输
  • 失败任务的自动重试
  • 内存管理和垃圾回收

这背后的设计理念是:让分布式计算像写普通函数一样简单。

从单机笔记本到 1000 张 GPU

更神奇的是代码的可移植性。

# 本地开发测试
ray.init()

# 连接到 1000 张 GPU 的集群(只需改这一行)
ray.init(address="ray://head-node:10001")

# 你的业务代码完全不变
@ray.remote
def train_model(config):
    # 训练代码
    return model

futures = [train_model.remote(config) for config in configs]
results = ray.get(futures)

同一套代码,可以在笔记本上调试,可以提交到本地集群测试,可以无缝扩展到生产环境的大规模集群。

这解决了 AI 工程的一个核心痛点:研究和生产的代码不需要重写。

3. Ray 的技术架构极简图

要理解 Ray 为什么能做到这些,我们需要简单了解它的架构。

Head Node + Worker Node 的基本拓扑

一个 Ray 集群由两类节点组成:

  • Head Node:负责集群管理、元数据存储、任务调度
  • Worker Node:负责任务的实际执行
┌─────────────────────────────────────────────┐
│              Head Node                      │
│  ┌──────────────┐  ┌──────────────────┐     │
│  │   GCS        │  │ Global Scheduler │     │
│  │ (metadata)   │  │ (task dispatch)  │     │
│  └──────────────┘  └──────────────────┘     │
└─────────────────────┬───────────────────────┘
                      │
      ┌───────────────┼───────────────┐
      │               │               │
┌─────▼─────┐   ┌────▼────┐   ┌──────▼──────┐
│ Worker 1  │   │ Worker 2│   │  Worker N   │
│ ┌───────┐ │   │ ┌─────┐ │   │  ┌───────┐  │
│ │Tasks  │ │   │ │Tasks│ │   │  │Tasks  │  │
│ │       │ │   │ │     │ │   │  │       │  │
│ ├───────┤ │   │ ├─────┤ │   │  ├───────┤  │
│ │Actors │ │   │ │Actors││   │  │Actors │  │
│ └───────┘ │   │ └─────┘ │   │  └───────┘  │
└───────────┘   └─────────┘   └─────────────┘

当你提交一个任务时:

  1. 请求到达 Head Node 的全局调度器
  2. 调度器根据资源需求(CPU/GPU/内存)选择最优的 Worker
  3. 任务被分发到目标 Worker 执行
  4. 执行结果写回分布式对象存储

GCS(全局控制存储)与 Plasma 对象存储

Ray 有两个关键的数据层:

GCS(Global Control Store)

  • 存储集群的元数据:有哪些节点、有哪些 Actor、任务的状态
  • 基于 Redis 实现,保证高可用
  • 是集群的"大脑",知道一切在哪里

Plasma Object Store

  • 高性能的共享内存对象存储
  • 使用 Apache Arrow 格式,支持零拷贝传输
  • 同节点进程间共享数据无需序列化
  • 跨节点传输自动序列化和压缩

这意味着:

  • 任务 A 在 Worker 1 生成的结果,可以被同节点的任务 B 直接读取(零拷贝)
  • 任务 C 在 Worker 2 需要这个结果,Ray 自动通过网络传输
  • 你不需要关心数据在哪里,Ray 自动管理

分布式调度器:如何将任务分配到最优节点

Ray 的调度器有几个关键特性:

资源感知:每个任务可以声明资源需求

@ray.remote(num_cpus=4, num_gpus=1, memory=1024*1024*1024)
def train_task(data):
    # 这个任务需要 4 CPU + 1 GPU + 1GB 内存
    pass

数据本地性:调度器会优先将任务调度到已有数据的节点,减少网络传输

自动负载均衡:根据节点负载动态调整任务分配

故障恢复:Worker 故障时,任务自动重试到其他节点

这些能力,开发者完全不需要关心,Ray 在背后自动处理。

第二部分:Ray 核心解决什么问题——AI 工程的三座大山

Ray 之所以被众多顶级科技公司采用,不是因为它技术先进,而是因为它精准击中了 AI 工程中最痛的三个问题。

4. 问题一:AI 训练代码只能在单机上跑,想扩容要全部重写

传统困境:从研究到生产的鸿沟

想象一下这个场景:

你是一个机器学习工程师,用 PyTorch 在笔记本上写了一个图像分类模型。代码大概是这样的:

import torch
import torch.nn as nn

# 定义模型
class ImageClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        # ... 网络结构
    
    def forward(self, x):
        # ... 前向传播

# 训练循环
model = ImageClassifier()
optimizer = torch.optim.Adam(model.parameters())

for epoch in range(100):
    for batch in dataloader:
        loss = model(batch).backward()
        optimizer.step()

代码跑得挺好,但现在要上生产环境了。生产环境有 8 张 A100 GPU,你需要分布式训练。

你会怎么做?

方案一:PyTorch DDP(Distributed Data Parallel)

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# 初始化进程组
dist.init_process_group(backend='nccl')

# 包装模型
model = ImageClassifier().to(local_rank)
model = DDP(model, device_ids=[local_rank])

# 需要使用 DistributedSampler
dataloader = DataLoader(
    dataset, 
    sampler=DistributedSampler(dataset)
)

# 训练代码要改很多地方...

你需要:

  • 理解 init_process_grouprankworld_size 等概念
  • 使用 DistributedSampler 确保每个进程读取不同数据
  • 修改保存/加载模型的逻辑(只在 rank 0 进程保存)
  • 处理进程同步和通信问题
  • 使用 torchrunmp.spawn 启动多进程

这还没完——如果要从单机 8 卡扩展到多机 64 卡,还要配置网络、处理节点发现、同步环境……

方案二:Horovod

Horovod 是 Uber 开源的分布式训练框架,API 相对简洁:

import horovod.torch as hvd

# 初始化
hvd.init()

# 包装优化器
optimizer = hvd.DistributedOptimizer(optimizer)

# 广播参数
hvd.broadcast_parameters(model.state_dict(), root_rank=0)

但依然需要理解 Horovod 的概念,而且代码还是要改很多。

Ray 的解法:同一套代码,从笔记本到千卡集群无缝扩展

看看 Ray Train 怎么做:

import ray
from ray import train
from ray.train.torch import TorchTrainer
import torch

def train_func(config):
    # 这就是你原来的训练代码!
    model = ImageClassifier()
    optimizer = torch.optim.Adam(model.parameters())
    
    # Ray Train 自动帮你处理了 DDP 的包装
    model = train.torch.prepare_model(model)
    optimizer = train.torch.prepare_optimizer(optimizer)
    
    for epoch in range(100):
        for batch in train.get_dataset_shard("train"):
            loss = model(batch).backward()
            optimizer.step()

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=train.ScalingConfig(
        num_workers=4,      # 扩展到 4 个 worker
        use_gpu=True,        # 每个 worker 用 GPU
    ),
)
result = trainer.fit()

注意几点:

  1. train_func 里的代码几乎就是你原来的代码
  2. prepare_modelprepare_optimizer 自动处理分布式包装
  3. num_workers=64 就可以扩展到 64 个 worker
  4. 单机多卡和多机多卡,代码完全一样

Ray Train 在背后做了什么?

  • 自动设置进程组和环境变量
  • 自动处理数据分片(DistributedSampler)
  • 自动处理模型同步和通信
  • 自动处理检查点保存和恢复

不需要知道 DDP、Horovod、或者 NCCL 是什么,Ray 自动帮你搞定。

案例:一个图像分类脚本,如何在不改动逻辑的情况下提速 100 倍

假设我们有一个简单的图像分类训练脚本:

原始单机代码:

import torch
from torchvision import datasets, transforms

# 数据加载
transform = transforms.Compose([...])
trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True)

# 模型
model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False)
model = model.cuda()

# 训练
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
for epoch in range(10):
    for images, labels in trainloader:
        images, labels = images.cuda(), labels.cuda()
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

在单张 V100 上,这个脚本跑完 10 个 epoch 大约需要 30 分钟。

Ray 分布式版本(改动不到 10 行):

import ray
from ray import train
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
import torch

def train_func(config):
    # 数据加载
    trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
    # 使用 Ray 的数据分片
    trainloader = torch.utils.data.DataLoader(
        trainset, 
        sampler=train.get_dataset_shard("train"),
        batch_size=128
    )
    
    # 模型(Ray 自动处理 GPU 分配)
    model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False)
    model = train.torch.prepare_model(model)  # ← 关键修改 1
    
    # 训练(代码几乎不变)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    optimizer = train.torch.prepare_optimizer(optimizer)  # ← 关键修改 2
    
    for epoch in range(10):
        for images, labels in trainloader:
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

# 启动分布式训练(扩展到 4 个 GPU)
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)
result = trainer.fit()

结果:

  • 单机 1x V100:30 分钟
  • 分布式 4x V100:8 分钟(接近线性加速)
  • 分布式 8x V100:4 分钟

代码改动不到 10 行,性能提升 100 倍(如果扩展到更多 GPU)。

这就是 Ray 解决的第一座大山:消除从研究到生产的代码重写成本。

5. 问题二:训练和服务使用两套系统,模型上线要"翻译"一次

传统困境:训练用 Horovod,服务用 Flask,模型格式转换 headache

继续上面的例子。你的图像分类模型训练好了,现在要部署成 API 服务,让前端可以调用。

传统的流程是这样的:

训练阶段(PyTorch/Horovod):

# train.py - 分布式训练
import horovod.torch as hvd
# ... 训练代码 ...
torch.save(model.state_dict(), 'model.pth')

服务阶段(Flask/FastAPI):

# serve.py - 模型服务
from flask import Flask, request
import torch

app = Flask(__name__)
model = ImageClassifier()
model.load_state_dict(torch.load('model.pth'))

@app.route('/predict', methods=['POST'])
def predict():
    image = request.files['image']
    # 预处理
    tensor = preprocess(image)
    # 推理
    with torch.no_grad():
        output = model(tensor)
    return {'prediction': output.argmax().item()}

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

看起来挺简单?但当你真正做生产部署时,问题接踵而至:

问题 1:模型格式转换

  • PyTorch 的 .pth 格式适合训练,但生产环境可能需要 ONNX 或 TorchScript
  • 转换过程可能丢失精度或兼容性

问题 2:服务框架与训练框架脱节

  • Flask/FastAPI 与 Horovod/PyTorch 是完全独立的系统
  • 需要额外的胶水代码处理模型加载、内存管理、并发请求

问题 3:资源管理困难

  • 单进程 Flask 无法利用多核 CPU
  • 多进程部署需要手动管理 GPU 显存分配
  • 如何实现自动扩缩容?

问题 4:版本管理与 A/B 测试

  • 模型更新了,如何无缝切换?
  • 如何做 A/B 测试比较两个模型版本?

问题 5:批处理与实时服务的割裂

  • 批量预测用 Spark?实时预测用 Flask?两套系统,两套代码

Ray 的解法:Train → Serving 一站式完成

Ray Serve 的设计哲学是:训练和 serving 应该是同一套代码、同一个运行时。

from ray import serve
from fastapi import FastAPI
import torch

app = FastAPI()

@serve.deployment(
    num_replicas=4,  # 启动 4 个副本
    ray_actor_options={"num_gpus": 0.25}  # 每个副本用 0.25 GPU
)
@serve.ingress(app)
class ImageClassifierService:
    def __init__(self):
        # 加载模型(和训练时完全一样的代码)
        self.model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18')
        self.model.load_state_dict(torch.load('model.pth'))
    
    @app.post("/predict")
    async def predict(self, image: bytes):
        # 预处理(和训练时完全一样的预处理代码)
        tensor = self.preprocess(image)
        # 推理
        with torch.no_grad():
            output = self.model(tensor)
        return {"prediction": output.argmax().item()}

# 部署服务
serve.run(ImageClassifierService.bind())

看关键点:

  1. 同一套模型代码:加载模型的代码和训练时完全一样
  2. 声明式部署:用装饰器声明需要多少副本、多少资源
  3. 自动扩缩容:流量大了自动增加副本,流量小了自动减少
  4. 支持复合服务:可以把多个模型组合成一个服务链路

案例:训练好的模型,如何一行代码变成可调用的 REST API

完整的训练到部署流程:

import ray
from ray import train, serve
from ray.train.torch import TorchTrainer
from ray.serve import PredictorDeployment

# ========== 第一步:训练模型 ==========
def train_func(config):
    # ... 训练代码 ...
    train.save_checkpoint(model=model.state_dict())

trainer = TorchTrainer(train_func, scaling_config=train.ScalingConfig(num_workers=4, use_gpu=True))
result = trainer.fit()
checkpoint = result.checkpoint

# ========== 第二步:部署模型(只需这几行) ==========
from ray.train.torch import TorchCheckpoint, TorchPredictor

class ImagePredictor(TorchPredictor):
    def preprocess(self, data):
        # 和训练时一样的预处理
        return transforms(data)

# 部署为 HTTP 服务
serve.run(
    PredictorDeployment.options(num_replicas=2).bind(
        ImagePredictor,
        checkpoint
    )
)

# 服务现在运行在 http://localhost:8000/
# 可以直接调用!

训练好的模型,通过 serve.run() 一行代码就变成了可调用的 HTTP 服务。

Ray Serve 还解决了哪些传统痛点?

多模型组合服务

# 构建一个推荐系统的服务链路
# 请求 → Embedding 模型 → 粗排模型 → 精排模型 → 结果

embedding = EmbeddingModel.bind()
coarse_rank = CoarseRanker.bind(embedding)
fine_rank = FineRanker.bind(coarse_rank)

serve.run(fine_rank)  # 部署整个链路

自动扩缩容

@serve.deployment(
    autoscaling_config={
        "min_replicas": 1,
        "max_replicas": 100,
        "target_num_ongoing_requests_per_replica": 10,
    }
)
class AutoScalingService:
    ...

A/B 测试

# 90% 流量走模型 A,10% 流量走模型 B
serve.run(
    Router.bind({
        "model_a": ModelA.bind(),
        "model_b": ModelB.bind()
    }, weights={"model_a": 0.9, "model_b": 0.1})
)

这就是 Ray 解决的第二座大山:消除训练到服务的部署鸿沟。

6. 问题三:Python 多进程难用,并行编程门槛高

传统困境:multiprocessing 的坑

Python 的 multiprocessing 库提供了多进程支持,但实际用起来坑很多。

坑 1:序列化噩梦

from multiprocessing import Pool

class MyModel:
    def __init__(self):
        self.data = some_complex_object
    
    def predict(self, x):
        return process(x, self.data)

model = MyModel()

def task(x):
    return model.predict(x)

with Pool(4) as p:
    results = p.map(task, range(100))  # ❌ PicklingError!

报错:PicklingError: Can't pickle <function task at 0x...>

原因:multiprocessing 用 pickle 序列化函数和参数,但很多 Python 对象无法被 pickle(特别是包含复杂状态的类实例、lambda 函数等)。

坑 2:内存爆炸

def process(data):
    result = heavy_computation(data)
    return result

with Pool(4) as p:
    results = p.map(process, large_dataset)  # ❌ 内存不足!

原因:Pool.map 会把所有输入数据一次性序列化发送给子进程,如果数据集很大,内存会瞬间爆炸。

坑 3:调试困难

def task(x):
    result = do_something(x)
    # 如果这里出错了,traceback 几乎无法阅读
    return result

with Pool(4) as p:
    results = p.map(task, data)  # ❌ 子进程崩溃,主进程只知道"某个进程挂了"

多进程的错误处理、日志收集、调试都是非常痛苦的体验。

坑 4:无法跨机器 multiprocessing 只能单机多进程,无法扩展到多台机器。

Ray 的解法:函数级别的并行,自动序列化、自动内存管理

Ray 通过以下机制解决这些问题:

1. 云 Pickle(Cloudpickle)

Ray 使用 cloudpickle 替代标准 pickle,支持序列化更多 Python 对象:

  • Lambda 函数
  • 嵌套函数
  • 类实例(只要类定义可访问)
  • 甚至在 Jupyter Notebook 中定义的函数
import ray

ray.init()

# 这个 lambda 可以被序列化!
@ray.remote
def process(x):
    return (lambda y: y ** 2)(x)

futures = [process.remote(x) for x in range(100)]
results = ray.get(futures)  # ✅ 成功!

2. 分布式对象存储

Ray 的 Plasma 对象存储自动管理内存:

import ray
import numpy as np

ray.init()

# 创建大数组(自动放入共享内存)
large_array = np.zeros(1024 * 1024 * 100)  # 800MB

@ray.remote
def process(data):
    # data 是从对象存储读取的,零拷贝
    return np.sum(data)

# 把大数组放入对象存储(只需一次拷贝)
array_ref = ray.put(large_array)

# 多个任务共享同一个数据引用
futures = [process.remote(array_ref) for _ in range(100)]
results = ray.get(futures)
  • ray.put() 把数据放入共享内存
  • 多个任务通过引用(reference)访问同一数据,无需重复拷贝
  • 对象不再被引用时,自动垃圾回收

3. Actor:有状态并行

对于需要维护状态的场景,Ray 提供了 Actor:

import ray

@ray.remote
class Counter:
    def __init__(self):
        self.count = 0
    
    def increment(self):
        self.count += 1
        return self.count

# 创建 Actor 实例
counter = Counter.remote()

# 调用 Actor 方法(状态自动维护)
print(ray.get(counter.increment.remote()))  # 1
print(ray.get(counter.increment.remote()))  # 2
print(ray.get(counter.increment.remote()))  # 3

Actor 解决了什么问题?

  • 每个 Actor 是独立进程,维护自己的状态
  • Actor 方法调用是异步的(返回 future)
  • Actor 可以分布在集群的任何节点上

这在强化学习、在线服务、流处理等场景中非常有用。

4. 丰富的调试工具

Ray 提供了 Dashboard 来监控和调试分布式应用:

  • 查看每个任务的状态和日志
  • 分析资源使用情况
  • 追踪任务依赖关系
  • 自动收集错误信息
# 启动时开启 Dashboard
ray.init(dashboard_host='0.0.0.0')

# 访问 http://localhost:8265 查看 Dashboard

案例:用 Ray 实现简单的并行爬虫

对比原生多进程和 Ray 的实现:

原生 multiprocessing(代码复杂,容易出错):

from multiprocessing import Pool, Manager
import requests

def fetch(url, result_queue):
    try:
        response = requests.get(url, timeout=5)
        result_queue.put((url, response.status_code))
    except Exception as e:
        result_queue.put((url, str(e)))

if __name__ == '__main__':
    urls = [...]  # 1000 个 URL
    
    # 复杂的队列管理
    manager = Manager()
    result_queue = manager.Queue()
    
    with Pool(10) as p:
        p.starmap(fetch, [(url, result_queue) for url in urls])
    
    # 收集结果
    results = []
    while not result_queue.empty():
        results.append(result_queue.get())

Ray(简洁、高效、可扩展):

import ray
import requests

ray.init()

@ray.remote
def fetch(url):
    try:
        response = requests.get(url, timeout=5)
        return url, response.status_code
    except Exception as e:
        return url, str(e)

urls = [...]  # 1000 个 URL

# 一行代码并行抓取
futures = [fetch.remote(url) for url in urls]
results = ray.get(futures)  # 自动等待所有任务完成

优势:

  • 代码简洁一半以上
  • 自动处理错误和重试
  • 可以轻松扩展到多台机器(只需连接到 Ray 集群)
  • 通过 Dashboard 可以实时监控抓取进度

这就是 Ray 解决的第三座大山:让并行编程像写普通代码一样简单。


第三部分:Ray 生态能做什么——五大核心场景

Ray 不仅仅是一个分布式计算框架,它已经发展成为一个完整的 AI 计算平台。下面介绍 Ray 的五大核心库。

7. Ray Train:分布式深度学习,一行代码搞定

Ray Train 是 Ray 的分布式训练库,支持 PyTorch、TensorFlow、Horovod、HuggingFace 等主流框架。

核心特性

框架无关的统一 API

from ray.train.torch import TorchTrainer      # PyTorch
from ray.train.tensorflow import TensorflowTrainer  # TensorFlow
from ray.train.huggingface import HuggingFaceTrainer  # HuggingFace
from ray.train.xgboost import XGBoostTrainer    # XGBoost
from ray.train.lightgbm import LightGBMTrainer  # LightGBM

所有框架使用相同的 ScalingConfig 配置分布式参数:

scaling_config=ScalingConfig(
    num_workers=8,      # 8 个 worker
    use_gpu=True,       # 使用 GPU
    resources_per_worker={"CPU": 4, "GPU": 1}  # 每个 worker 的资源
)

弹性训练(Fault Tolerance)

  • 训练过程中某个 worker 挂了?自动重试
  • 支持从检查点恢复,不浪费已完成的训练进度

与 Ray Tune 无缝集成

  • 超参数搜索时,每个试验自动使用分布式训练
  • 自动管理 GPU 资源分配,最大化集群利用率

实战:把单机 Stable Diffusion 训练变成分布式

单机代码:

from diffusers import StableDiffusionPipeline
import torch

model = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5")
model = model.to("cuda")

# 训练循环...
for step, batch in enumerate(dataloader):
    loss = model(batch).loss
    loss.backward()
    optimizer.step()

Ray 分布式版本:

from ray import train
from ray.train.torch import TorchTrainer
from diffusers import StableDiffusionPipeline

def train_func(config):
    # Ray 自动分配 GPU
    model = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5")
    model = train.torch.prepare_model(model)
    
    for step, batch in enumerate(train.get_dataset_shard("train")):
        loss = model(batch).loss
        loss.backward()
        optimizer.step()
        
        if step % 100 == 0:
            train.report({"loss": loss.item()})

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=train.ScalingConfig(num_workers=4, use_gpu=True),
    datasets={"train": ray_dataset},
)
result = trainer.fit()

改动点:

  1. train.torch.prepare_model() 包装模型
  2. 使用 train.get_dataset_shard() 获取数据分片
  3. train.report() 上报训练指标

性能提升:

  • 单机 1x A100:约 6 小时/epoch
  • 分布式 4x A100:约 1.5 小时/epoch(接近 4 倍加速)

8. Ray Serve:大模型时代的在线推理服务

Ray Serve 是一个可扩展的模型服务框架,专为 LLM(大语言模型)和生成式 AI 设计。

核心特性

多模型组合(Model Composition)

@serve.deployment
class EmbeddingModel:
    def __init__(self):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
    
    async def __call__(self, text):
        return self.model.encode(text)

@serve.deployment
class LLMModel:
    def __init__(self, embedding_handle):
        self.embedding = embedding_handle
        self.llm = load_llm()
    
    async def __call__(self, request):
        # 先调用 Embedding 服务
        embedding = await self.embedding.remote(request.text)
        # 再调用 LLM
        return self.llm.generate(embedding)

# 构建服务图
embedding = EmbeddingModel.bind()
llm = LLMModel.bind(embedding)
serve.run(llm)

自动扩缩容(Auto-scaling)

@serve.deployment(
    autoscaling_config={
        "min_replicas": 1,
        "max_replicas": 100,
        "target_num_ongoing_requests_per_replica": 10,
        "upscale_delay_s": 30,   # 30 秒后扩容
        "downscale_delay_s": 600,  # 10 分钟后缩容
    }
)
class LLMService:
    ...

批量推理优化(Batching)

@serve.deployment
class BatchingService:
    @serve.batch(max_batch_size=16, batch_wait_timeout_s=0.01)
    async def __call__(self, requests):
        # 自动合并多个请求为 batch
        batched_input = [req.text for req in requests]
        return self.model(batched_input)

案例:用 Ray Serve 部署一个 GPT 风格的 API 服务

import ray
from ray import serve
from fastapi import FastAPI
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

app = FastAPI()

@serve.deployment(
    num_replicas=2,
    ray_actor_options={"num_gpus": 1}
)
@serve.ingress(app)
class GPTService:
    def __init__(self):
        # 加载模型
        model_name = "microsoft/DialoGPT-medium"
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForCausalLM.from_pretrained(model_name)
        self.model = self.model.to("cuda")
    
    @app.post("/chat")
    async def chat(self, message: str, max_length: int = 100):
        # 编码输入
        input_ids = self.tokenizer.encode(
            message + self.tokenizer.eos_token,
            return_tensors='pt'
        ).to("cuda")
        
        # 生成回复
        with torch.no_grad():
            output = self.model.generate(
                input_ids,
                max_length=max_length,
                pad_token_id=self.tokenizer.eos_token_id
            )
        
        # 解码输出
        response = self.tokenizer.decode(
            output[:, input_ids.shape[-1]:][0],
            skip_special_tokens=True
        )
        
        return {"response": response}

# 部署服务
serve.run(GPTService.bind())

部署完成后,可以直接调用:

curl -X POST http://localhost:8000/chat \
  -H "Content-Type: application/json" \
  -d '{"message": "Hello, how are you?", "max_length": 50}'

9. Ray RLlib:强化学习的工业级方案

Ray RLlib 是业界最广泛使用的强化学习库之一,支持各种 RL 算法。

为什么 RL 训练天然需要分布式?

强化学习的训练流程是这样的:

  1. 智能体(Agent)在环境中执行动作
  2. 环境返回状态和奖励
  3. 智能体根据奖励更新策略
  4. 重复

这个循环需要:

  • 并行环境:同时运行多个环境实例产生数据
  • 分布式策略更新:多个 worker 并行计算梯度
  • 异构计算:环境模拟用 CPU,策略更新用 GPU

RLlib 的核心特性

丰富的算法支持

  • 经典算法:DQN、A3C、PPO、IMPALA
  • 离线 RL:CQL、CQL、DT
  • 多智能体:QMIX、MADDPG
  • 模型-based:Dreamer、MB-MPO

多智能体训练

from ray import tune
from ray.rllib.algorithms.ppo import PPO

config = {
    "env": "multi_agent_env",
    "multiagent": {
        "policies": {
            "agent_1": (..., obs_space, action_space, {}),
            "agent_2": (..., obs_space, action_space, {}),
        },
        "policy_mapping_fn": lambda agent_id, **kwargs: agent_id,
    },
    "num_workers": 8,
}

tune.run(PPO, config=config)

OpenAI 如何用 Ray RLlib 做 RLHF

ChatGPT 的训练过程分为三步:

  1. 预训练:在大规模语料上训练基础模型
  2. SFT(Supervised Fine-Tuning):用人工标注数据微调
  3. RLHF(Reinforcement Learning from Human Feedback):用强化学习优化

RLHF 的核心就是强化学习:

  • 环境:语言模型生成回复
  • 奖励模型:人类反馈训练的模型,给回复打分
  • 策略:语言模型的参数

OpenAI 使用 Ray RLlib 来分布式执行 RLHF 训练:

  • 多个 worker 并行生成回复
  • 奖励模型并行打分
  • PPO 算法分布式更新策略

这种规模的任务,没有分布式框架几乎不可能完成。

10. Ray Tune:超参数搜索的自动化

超参数调优是机器学习中最耗时、最需要算力的环节之一。

炼丹痛点

  • 参数空间大:学习率、批量大小、网络层数、激活函数……
  • 试错成本高:每个配置都要完整训练一轮
  • 资源利用率低:串行搜索太慢,手动并行容易冲突

Ray Tune 的核心特性

分布式搜索

from ray import tune
from ray.tune.tune_config import TuneConfig
from ray.train.torch import TorchTrainer

def trainable(config):
    # 训练代码使用 config 中的超参数
    lr = config["lr"]
    batch_size = config["batch_size"]
    # ... 训练 ...
    return {"accuracy": accuracy}

tune_config = TuneConfig(
    metric="accuracy",
    mode="max",
    num_samples=100,  # 尝试 100 组配置
)

# 定义搜索空间
param_space = {
    "lr": tune.loguniform(1e-4, 1e-1),
    "batch_size": tune.choice([32, 64, 128, 256]),
    "num_layers": tune.randint(2, 6),
}

# 启动分布式搜索(自动使用所有可用 GPU)
tuner = tune.Tuner(
    trainable,
    param_space=param_space,
    tune_config=tune_config,
    run_config=train.RunConfig(...)
)
results = tuner.fit()

早停机制(Early Stopping)

from ray.tune.schedulers import ASHAScheduler

scheduler = ASHAScheduler(
    metric="accuracy",
    mode="max",
    max_t=100,  # 最大训练 100 个 epoch
    grace_period=10,  # 至少训练 10 个 epoch
    reduction_factor=3,  # 淘汰率
)

# 效果:表现不好的配置会被提前终止,节省 60-80% 的算力

与 Train 无缝集成

from ray.train.torch import TorchTrainer
from ray.tune import Tuner, TuneConfig

# 每个试验都是分布式训练
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)

tuner = Tuner(
    trainer,
    param_space={
        "train_loop_config": {
            "lr": tune.loguniform(1e-4, 1e-1),
            "batch_size": tune.choice([32, 64, 128]),
        }
    },
    tune_config=TuneConfig(num_samples=100, scheduler=ASHAScheduler()),
)

每个超参数试验都自动使用 4 个 GPU 进行分布式训练,最大化集群利用率。

11. Ray Data:海量数据的预处理流水线

数据预处理往往比训练本身更耗时。Ray Data 提供了分布式数据处理能力。

核心特性

标准数据格式支持

  • Parquet、CSV、JSON、TFRecord
  • HuggingFace Datasets
  • Databricks、Snowflake 集成

流式处理

import ray

# 从 S3 流式读取 Parquet
ds = ray.data.read_parquet("s3://bucket/dataset/")

# 链式转换(惰性求值,自动优化)
ds = ds.map(transform_fn)     # 数据转换
ds = ds.filter(filter_fn)     # 数据过滤
ds = ds.repartition(100)      # 重分区

# 训练时流式消费(不占用大量内存)
for batch in ds.iter_torch_batches(batch_size=32):
    # batch 直接从 S3 流式读取到 GPU
    model(batch)

零拷贝数据传输

  • 使用 Apache Arrow 格式
  • 同节点进程间共享无需序列化
  • 跨节点自动压缩传输

与 Databricks、Snowflake 的友好集成

# 从 Databricks 读取
from ray.data import read_databricks
ds = read_databricks(
    cluster_id="cluster-id",
    table="catalog.schema.table"
)

# 从 Snowflake 读取
from ray.data import read_snowflake
ds = read_snowflake(
    query="SELECT * FROM users WHERE created_at > '2024-01-01'",
    connection_parameters={...}
)

# 预处理后写回
ds.write_parquet("s3://processed-data/output/")

第四部分:谁在用 Ray——真实企业案例

12. OpenAI:Ray 如何支撑 ChatGPT 的训练

OpenAI 是 Ray 最早的企业用户之一。

从 MPI 迁移到 Ray

在 Ray 之前,OpenAI 使用 MPI(Message Passing Interface)进行分布式训练。MPI 是高性能计算领域的标准,但有几个问题:

  1. 编程复杂:需要显式管理进程通信
  2. 灵活性差:难以处理动态任务图
  3. 容错性差:一个进程挂了,整个作业失败
  4. 扩展困难:从单机到多机需要大量配置

为什么迁移到 Ray?

OpenAI 的工程师发现,AI 工作负载(特别是强化学习)与 MPI 假设的 “SPMD(Single Program Multiple Data)” 模式非常不同:

  • RL 的任务图是动态的,取决于环境的反馈
  • 需要同时运行大量仿真环境
  • 需要频繁与环境交互

Ray 的 Actor 模型完美契合这个需求:

  • 每个仿真环境是一个 Actor,维护自己的状态
  • 训练进程与仿真环境通过消息传递通信
  • 动态任务调度,无需预定义通信模式

ChatGPT 的 RLHF 训练

ChatGPT 的 RLHF(基于人类反馈的强化学习)训练流程:

  1. 收集人类反馈:人工标注者对模型生成的多个回复进行排序
  2. 训练奖励模型:用排序数据训练一个奖励模型,学习人类偏好
  3. 强化学习优化:用 PPO 算法微调语言模型,最大化奖励模型打分

第三步需要:

  • 大量并行环境生成回复
  • 奖励模型并行打分
  • 分布式策略更新

OpenAI 使用 Ray RLlib 执行这个流程:

  • 数百个 worker 并行运行语言模型生成回复
  • 奖励模型分布式打分
  • PPO 分布式更新策略

据公开报道,使用 Ray 后,OpenAI 的训练基础设施复杂度降低了十倍以上,工程师可以更专注于算法而非分布式细节。

13. Uber:打车定价背后的 Ray 集群

Uber 是 Ray 的重度用户,官方博客有多篇文章介绍其应用。

实时供需预测

Uber 的核心业务之一是动态定价:根据实时的供需关系调整价格。

这需要:

  • 处理数百万订单数据
  • 实时预测供需趋势
  • 毫秒级响应延迟

Uber 使用 Ray 构建实时预测系统:

  • Ray Serve 部署预测模型服务
  • Ray Data 处理实时数据流
  • Ray Train 训练预测模型

弹性训练:高峰期自动扩容,低谷期自动缩容

Uber 的训练负载有明显的峰谷:

  • 高峰期:晚上和周末,更多订单数据需要训练
  • 低谷期:白天工作时间,训练需求减少

使用 Ray on Kubernetes,Uber 实现了:

  • 自动扩缩容:根据队列长度自动增加/减少训练 worker
  • 资源隔离:不同类型的任务使用独立的资源池
  • GPU 共享:多个小任务共享 GPU,提高利用率

Uber 在 Ray Summit 2025 的演讲中分享了具体数据:

  • 训练成本降低 40%
  • 资源利用率提升 60%
  • 模型迭代速度提升 3 倍

从 MADLJ 到 Ray on Kubernetes

2023 年前,Uber 使用自研的 MADLJ(Michelangelo Deep Learning Jobs)系统管理 ML 训练。迁移到 Ray on Kubernetes 后:

  • 开发效率:工程师可以在本地用同样的代码调试,直接提交到集群运行
  • 资源管理:Kubernetes 统一调度,避免资源碎片化
  • 生态丰富:使用 Ray 生态的各种库,无需重复造轮子

14. Shopify:电商平台的多模态 AI 基础设施

Shopify 是全球领先的电商平台,服务数百万商家。

Merlin:基于 Ray 的统一 ML 平台

Shopify 构建了名为 Merlin 的内部 ML 平台,完全基于 Ray:

核心理念:让 ML 工程师从笔记本到生产的体验尽可能无缝。

技术栈

  • Ray on Kubernetes
  • Ray Train:分布式训练
  • Ray Serve:模型服务
  • Ray Tune:超参数优化
  • Ray Data:数据预处理
  • Airflow:工作流编排

从 Jupyter 笔记本到生产环境

Shopify 的开发流程:

  1. 研究阶段:数据科学家在 Jupyter Notebook 上实验
  2. 开发阶段:把 Notebook 代码整理成 Python 脚本
  3. 测试阶段:在小数据集上用 Ray 本地模式测试
  4. 生产阶段:直接提交到 Merlin 平台,自动分布式运行

关键优势:代码无需重写。同一套代码,从笔记本到千卡集群无缝扩展。

多模态 AI:商品图片、描述、评论的统一处理

Shopify 的 AI 场景包括:

  • 商品分类:根据图片和描述自动分类
  • 搜索推荐:理解用户查询意图
  • 评论分析:情感分析、关键词提取
  • 智能客服:大语言模型对话

这些场景需要处理多模态数据

  • 图片(CNN/ViT)
  • 文本(BERT/GPT)
  • 结构化数据(表格)

Merlin 平台使用 Ray 统一处理:

  • Ray Data 并行加载多模态数据
  • Ray Train 分布式训练多模态模型
  • Ray Serve 部署统一的服务接口

Shopify 在 Ray Summit 2024 的演讲《Shopify’s Ray-Powered Approach to Multimodal AI in E-commerce》中分享了具体架构。

15. 其他用户:蚂蚁集团、字节跳动、百度

蚂蚁集团:金融风控模型的分布式训练

蚂蚁集团使用 Ray 训练大规模金融风控模型:

  • 特征工程:数十亿特征,TB 级数据
  • 模型训练:XGBoost、深度神经网络
  • 实时推理:毫秒级风险评分

挑战:

  • 数据隐私合规,需要在本地数据中心处理
  • 高可用要求,系统不能宕机
  • 模型需要频繁更新(应对新型欺诈)

Ray 的解决方案:

  • 私有化部署 Ray 集群
  • 弹性训练,快速迭代模型
  • Ray Serve 高可用服务部署

字节跳动:推荐系统的超大规模特征工程

字节跳动的推荐系统(抖音、今日头条)需要处理:

  • 数十亿用户、数千亿内容
  • 实时特征提取和更新
  • 毫秒级推荐延迟

使用场景:

  • Ray Data:分布式特征工程
  • Ray Train:分布式模型训练
  • Ray Serve:A/B 测试和多模型实验

具体实践可参考字节跳动在 Ray Summit 的演讲。

百度:自动驾驶仿真数据的并行生成

百度 Apollo 自动驾驶平台使用 Ray:

  • 仿真环境:并行车道级仿真
  • 数据生成:生成海量训练数据
  • 策略训练:强化学习训练驾驶策略

挑战:

  • 仿真环境计算密集,需要大量 CPU
  • 策略训练需要 GPU
  • 异构计算资源调度

Ray 的 Actor 模型完美解决:

  • 每个仿真环境是一个 Actor,并行运行
  • 训练进程动态收集仿真数据
  • 自动调度 CPU/GPU 资源

第五部分:Ray vs 其他方案——什么时候选 Ray?

16. Ray vs Spark:AI 时代的分工

Spark 擅长什么?

  • 批处理:ETL、数据仓库、报表生成
  • SQL 查询:DataFrame API 非常成熟
  • 数据规模:PB 级数据处理

Spark 不擅长什么?

  • 交互式计算:启动延迟高(秒级),不适合毫秒级任务
  • 异构计算:主要设计用于 CPU 批处理,GPU 支持较弱
  • 动态任务图:DAG 需要静态定义
  • 在线服务:Spark Streaming 延迟较高

Ray 擅长什么?

  • AI/ML 工作负载:原生支持 GPU,与 PyTorch/TensorFlow 无缝集成
  • 交互式计算:毫秒级任务启动延迟
  • 动态任务图:运行时决定任务依赖
  • 在线服务:Ray Serve 专为低延迟服务设计
  • 异构计算:CPU/GPU 混合调度

什么时候选 Spark?

  • 传统的 ETL 数据管道
  • SQL 分析查询
  • 海量日志处理
  • 数据仓库应用

什么时候选 Ray?

  • 机器学习训练和推理
  • 强化学习
  • 交互式/实时应用
  • 需要 GPU 的计算任务

两者可以共存吗?

可以!很多公司同时使用两者:

  • Spark 做数据预处理和 ETL
  • Ray 做模型训练和服务

Ray 也可以读取 Spark 生成的数据(通过 Parquet、Delta Lake 等格式)。

17. Ray vs Kubernetes:互补而非竞争

很多人问:Ray 和 Kubernetes 是什么关系?是竞争还是互补?

答案是:互补。

Kubernetes 擅长什么?

  • 容器编排:管理容器的生命周期
  • 服务发现:DNS 和负载均衡
  • 资源管理:为容器分配 CPU/内存
  • 故障恢复:自动重启失败的容器

Kubernetes 不擅长什么?

  • 分布式计算:需要额外框架(如 Spark、Flink)
  • 细粒度任务调度:Pod 启动需要秒级,不适合毫秒级任务
  • AI 特定优化:如 GPU 共享、数据本地性

KubeRay:在 K8s 上运行 Ray 的最佳实践

KubeRay 是 Ray 社区维护的 Kubernetes Operator,用于在 K8s 上部署 Ray 集群。

架构

Kubernetes 层:
├─ 管理 Ray Head Node Pod
├─ 管理 Ray Worker Node Pods
└─ 自动扩缩容 (Ray Autoscaler + K8s HPA)

Ray 层:
├─ 在 K8s Pod 内运行 Ray 进程
├─ 使用 Ray 的调度器做细粒度任务调度
└─ 使用 Ray 的对象存储做数据共享

工作流程

  1. 用户提交 Ray 应用到 K8s 集群
  2. KubeRay 创建 Ray Head Node
  3. Ray Autoscaler 根据任务需求,向 K8s 申请 Worker Node
  4. K8s 创建新的 Pod 作为 Worker
  5. Ray 在这些 Pod 上调度任务

Ray 管计算,K8s 管资源

分工明确:

  • Kubernetes:资源管理、服务发现、故障恢复
  • Ray:分布式计算、任务调度、数据管理

这种架构的好处:

  • 利用 K8s 成熟的生态(监控、日志、安全)
  • 保留 Ray 的高性能计算能力
  • 统一资源池,提高利用率

18. Ray vs DeepSpeed/Horovod:分布式训练的多种选择

DeepSpeed:专为大模型训练设计

DeepSpeed 是 Microsoft 开源的深度学习优化库,专为训练超大模型(百亿到万亿参数)设计。

核心特性

  • ZeRO(Zero Redundancy Optimizer):将优化器状态分片到多个 GPU,减少显存占用
  • 模型并行:层间并行,支持超大模型
  • 混合精度训练:FP16/BF16 加速

适用场景

  • 训练 GPT-3/4 级别的超大语言模型
  • 显存不足以容纳完整模型的场景
  • 需要极致训练效率

Horovod:Uber 开源的分布式训练框架

Horovod 是 Uber 开源的分布式深度学习框架,基于 MPI 实现 all-reduce 通信。

核心特性

  • 简洁的 API:只需几行代码改造现有训练脚本
  • 高性能:基于 NCCL 的 all-reduce
  • 框架支持:PyTorch、TensorFlow、Keras

适用场景

  • 数据并行训练
  • 对通信效率要求高
  • 已有训练脚本,希望最小改动分布式化

Ray Train 的定位

Ray Train 与这些框架的关系:

Ray Train 不是一个新的训练框架,而是分布式训练框架的"统一接口"。

实际上,Ray Train 可以在内部使用 DeepSpeed、Horovod 或 PyTorch DDP:

# 使用 Horovod
from ray.train.horovod import HorovodTrainer
trainer = HorovodTrainer(...)

# 使用 DeepSpeed
from ray.train.deepspeed import DeepSpeedTrainer
trainer = DeepSpeedTrainer(...)

# 使用 PyTorch DDP
from ray.train.torch import TorchTrainer
trainer = TorchTrainer(...)

Ray Train 提供的额外价值

  1. 统一 API:无论底层用什么,上层接口一致
  2. 资源管理:自动分配 GPU,避免资源冲突
  3. 弹性训练:失败自动重试,从检查点恢复
  4. 与 Ray 生态集成:Tune、Serve、Data 无缝协作

选型建议

直接用 PyTorch DDP

  • 单机多卡,简单场景
  • 不想引入额外依赖

用 Horovod

  • 多机多卡,需要高性能通信
  • 已有代码,希望最小改动

用 DeepSpeed

  • 超大模型,显存不够
  • 需要 ZeRO、模型并行等高级特性

用 Ray Train

  • 需要与超参数搜索(Tune)、模型服务(Serve)集成
  • 需要弹性训练和容错
  • 希望统一的研究到生产流程
  • 希望自动资源管理

结语:分布式计算的未来是"无感"

Ray 的终极愿景

Ray 的创始团队有一个愿景:

让分布式计算对开发者透明。开发者应该专注于算法和业务逻辑,而不是分布式的复杂性。

这听起来很理想主义,但 Ray 正在一步步实现:

  • 2016:提出 Actor + Task 的统一抽象
  • 2019:发布 Ray Train、Ray Tune、Ray RLlib
  • 2020:发布 Ray Serve
  • 2021:发布 Ray Data
  • 2023:加入 PyTorch 生态
  • 2024+:持续优化性能和易用性

给 Python 工程师的建议:什么时候该考虑引入 Ray?

如果符合以下任何一条,Ray 可能会给你带来很大价值:

  1. 训练太慢:单机训练需要数小时甚至数天
  2. 数据太大:内存装不下数据集
  3. 服务需要扩展:流量增长,单实例无法承载
  4. 资源利用率低:GPU 经常空闲等待
  5. 研究到生产的鸿沟:研究代码和生产代码是两套
  6. 多步骤 pipeline:数据预处理 → 训练 → 调参 → 服务,步骤复杂

从小处开始:

不需要一开始就全面迁移到 Ray。可以:

  1. 先用 Ray 实现简单的并行任务(如数据预处理)
  2. 用 Ray Train 改造训练代码
  3. 用 Ray Serve 部署模型服务
  4. 逐步扩展到完整 pipeline

参考资料

  1. How Ray, a Distributed AI Framework, Helps Power ChatGPT - The New Stack
  2. Ray: A Distributed Framework for Emerging AI Applications - arXiv
  3. How Uber Uses Ray to Optimize the Rides Business - Uber Engineering
  4. Uber’s Journey to Ray on Kubernetes - Uber Engineering
  5. The Magic of Merlin: Shopify’s New Machine Learning Platform - Shopify Engineering
  6. Shopify’s Ray-Powered Approach to Multimodal AI in E-commerce - Ray Summit 2024
  7. Ray Use Cases - Ray Documentation
  8. Anyscale Official Website
  9. Ray Summit 2025 - Scaling Model Training with Ray at Uber
  10. Elastic Distributed Training with XGBoost on Ray - Uber