实践笔记_多卡大规模训练
DataParallel vs DistributedDataParallel
| 维度 |
DataParallel (DP) |
DistributedDataParallel (DDP) |
| 架构 |
单进程多线程 |
多进程(每个GPU一个进程) |
| 通信方式 |
主GPU收集→分发(串行) |
进程间直接通信(Ring All-Reduce) |
| 负载均衡 |
主GPU负载重 |
所有GPU负载均衡 |
| 性能 |
慢(GIL限制、通信瓶颈) |
快(无GIL、高效通信) |
| 适用场景 |
单机小规模实验 |
单机多卡/多机多卡生产环境 |
| 代码复杂度 |
简单(一行代码) |
稍复杂(需多进程初始化) |
| 数据加载 |
单DataLoader |
每GPU独立DataLoader |
| Batch Size |
总batch size |
每GPU batch size |
| 梯度同步 |
主GPU收集所有梯度(O(N)) |
Ring All-Reduce(O(log N)) |
| 参数更新 |
主GPU更新后广播(串行) |
每个GPU独立更新(并行) |
| 数据传输 |
GPU0↔GPU1, GPU0↔GPU2… |
环形拓扑,带宽充分利用 |
| GPU占用 |
主GPU显存占用更大 |
所有GPU显存占用均衡 |
Batchsize/DataloaderBatchsize (总共64batches/4GPU的时候) |
DataLoader(dataset, batch_size=64) batch_per_gpu = total_batch |
DataLoader(dataset, batch_size=16) batch_per_gpu = total_batch // world_size |
| 学习率设置 |
batch_size=64, lr=0.01 |
batch_size_per_gpu=16, 4 GPUs → total_batch=64, lr仍用0.01 |
| 保存权重 |
默认保存 |
if rank == 0: # 只在主进程保存 torch.save(model.module.state_dict(), “model.pth”) |
|
|
|
|
|
|
|
|
|
DP
1 2 3 4 5 6 7 8 9 10 11 12 13
| [GPU 0 (主)] ↑ [CPU DataLoader] ─→ [数据batch] ─→ [分发给GPU1-3] ↓ [前向传播] ↓ [各GPU计算loss和梯度] ↓ [梯度汇总到GPU0] ↓ [参数更新] ↓ [广播新参数到其他GPU]
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from torch.utils.data import DataLoader
dataloader = DataLoader( dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=True )
model = nn.DataParallel(model) model.cuda()
for batch in dataloader: batch = batch.cuda() output = model(batch)
|
- 主GPU负载重:负责收集、分发、梯度汇总
- 通信串行:GPU0需要依次与其他GPU通信
- GIL限制:Python全局锁影响多线程效率
- batch size限制:受限于主GPU显存
- 单一DataLoader实例:只有一个进程加载数据
- batch_size是总大小:64个样本会平均分到4个GPU(每个16个)
- 主GPU(GPU0)负责分发:数据先加载到GPU0,再分发到其他GPU
- 数据加载瓶颈:所有GPU等待同一个DataLoader
DDP
1 2 3 4 5 6 7 8 9 10 11 12 13
| 进程0 (GPU0) 进程1 (GPU1) 进程2 (GPU2) 进程3 (GPU3) ↓ ↓ ↓ ↓ [加载数据] [加载数据] [加载数据] [加载数据] ↓ ↓ ↓ ↓ [前向传播] [前向传播] [前向传播] [前向传播] ↓ ↓ ↓ ↓ [计算梯度] [计算梯度] [计算梯度] [计算梯度] ↓ ↓ ↓ ↓ └───────────────→ [All-Reduce] ←────────────────┘ ↓ [所有GPU获得平均梯度] ↓ [每个GPU独立更新参数]
|
DistributedSampler 是 PyTorch 中专门为分布式训练设计的采样器,它的核心作用是将数据集均匀分配给多个GPU进程,并确保每个epoch的数据划分不同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| from torch.utils.data import DataLoader from torch.utils.data.distributed import DistributedSampler
def train(rank, world_size): sampler = DistributedSampler( dataset, num_replicas=world_size, rank=rank, shuffle=True ) dataloader = DataLoader( dataset, batch_size=16, sampler=sampler, num_workers=4, pin_memory=True ) model = DDP(model.cuda(rank), device_ids=[rank]) for epoch in range(epochs): sampler.set_epoch(epoch) for batch in dataloader: batch = batch.cuda(rank) output = model(batch)
|
- 无主GPU:所有GPU地位平等
- 并行通信:Ring All-Reduce算法,通信效率高
- 无GIL限制:每个进程独立Python解释器
- 数据并行:每个GPU独立加载数据
- 多个DataLoader实例:每个GPU进程独立加载数据
- batch_size是每GPU大小:batch =64/4= 16
- Worker总数是总共的worker数目:num_workers × GPU数
num_workers设置偏好
GPU数量和num_workers没有直接的对应关系,它们是两个独立的维度。设置错误不会导致程序崩溃,但会影响训练效率。
GPU数量:决定模型并行度
num_workers:决定数据加载并行度,保守起步num_workers = 4
两者没有对应关系
1 2 3 4 5
| num_workers = 0, GPU数 = 4 num_workers = 4, GPU数 = 4 num_workers = 8, GPU数 = 4 num_workers = 2, GPU数 = 8
|
CPU workers负责数据的预处理,然后打包成数据队列之后放一起再依次给GPU分组加载
1 2 3 4 5 6 7 8
| [GPU 0] ← 模型副本0 ↑ [CPU Worker 0] ─→ [数据队列] [CPU Worker 1] ─→ [数据队列] ←─ [主进程] ─→ [GPU 1] ← 模型副本1 [CPU Worker 2] ─→ [数据队列] [GPU 2] [CPU Worker 3] ─→ [数据队列] [GPU 3] ↑ ↑ 数据加载进程 数据分发进程
|
1 2 3 4 5
| Worker0 → batch[0:8] ┐ Worker1 → batch[8:16] ├→ 主进程合并 → batch(32) → GPU0(0-7) Worker2 → batch[16:24]│ GPU1(8-15) Worker3 → batch[24:32]┘ GPU2(16-23) GPU3(24-32)
|
多GPU训练的num_workers设置策略
策略1:DataParallel(单进程多GPU)
1 2 3 4 5 6 7 8
| model = nn.DataParallel(model, device_ids=[0,1,2,3]) dataloader = DataLoader( dataset, batch_size=128, num_workers=8, pin_memory=True )
|
策略2:DistributedDataParallel(多进程多GPU)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP
local_rank = int(os.environ['LOCAL_RANK']) torch.cuda.set_device(local_rank) dist.init_process_group('nccl')
dataloader = DataLoader( dataset, batch_size=32, num_workers=4, sampler=DistributedSampler(dataset) )
|
自动测试使用num_worker数目
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import torch import time from torch.utils.data import DataLoader
def find_optimal_workers(dataset, batch_size, gpu_count, max_workers=16): """自动寻找最优num_workers""" print(f"Testing num_workers for {gpu_count} GPUs...") best_time = float('inf') best_workers = 0 for workers in [0, 2, 4, 8, 12, 16]: if workers > max_workers: break dataloader = DataLoader( dataset, batch_size=batch_size * gpu_count, num_workers=workers, pin_memory=True ) start = time.time() for i, batch in enumerate(dataloader): if i >= 50: break elapsed = time.time() - start throughput = 50 / elapsed print(f" workers={workers}: {throughput:.2f} batches/sec") if throughput > best_throughput: best_throughput = throughput best_workers = workers return best_workers
best_workers = find_optimal_workers(dataset, batch_size=32, gpu_count=4) print(f"Recommended num_workers: {best_workers}")
|