实践笔记_多卡大规模训练

DataParallel vs DistributedDataParallel

维度 DataParallel (DP) DistributedDataParallel (DDP)
架构 单进程多线程 多进程(每个GPU一个进程)
通信方式 主GPU收集→分发(串行) 进程间直接通信(Ring All-Reduce)
负载均衡 主GPU负载重 所有GPU负载均衡
性能 慢(GIL限制、通信瓶颈) 快(无GIL、高效通信)
适用场景 单机小规模实验 单机多卡/多机多卡生产环境
代码复杂度 简单(一行代码) 稍复杂(需多进程初始化)
数据加载 单DataLoader 每GPU独立DataLoader
Batch Size 总batch size 每GPU batch size
梯度同步 主GPU收集所有梯度(O(N)) Ring All-Reduce(O(log N))
参数更新 主GPU更新后广播(串行) 每个GPU独立更新(并行)
数据传输 GPU0↔GPU1, GPU0↔GPU2… 环形拓扑,带宽充分利用
GPU占用 主GPU显存占用更大 所有GPU显存占用均衡
Batchsize/DataloaderBatchsize
(总共64batches/4GPU的时候)
DataLoader(dataset, batch_size=64)
batch_per_gpu = total_batch
DataLoader(dataset, batch_size=16)
batch_per_gpu = total_batch // world_size
学习率设置 batch_size=64, lr=0.01 batch_size_per_gpu=16, 4 GPUs → total_batch=64, lr仍用0.01
保存权重 默认保存 if rank == 0: # 只在主进程保存 torch.save(model.module.state_dict(), “model.pth”)

DP

1
2
3
4
5
6
7
8
9
10
11
12
13
                    [GPU 0 (主)]

[CPU DataLoader] ─→ [数据batch] ─→ [分发给GPU1-3]

[前向传播]

[各GPU计算loss和梯度]

[梯度汇总到GPU0]

[参数更新]

[广播新参数到其他GPU]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# DP模式:单DataLoader
from torch.utils.data import DataLoader

# 创建一个DataLoader,为所有GPU服务
dataloader = DataLoader(
dataset,
batch_size=64, # 总batch size
shuffle=True,
num_workers=4,
pin_memory=True
)

model = nn.DataParallel(model)
model.cuda()

for batch in dataloader:
# batch直接加载到默认GPU(通常是GPU0)
batch = batch.cuda()
output = model(batch) # 自动分发到各GPU
  • 主GPU负载重:负责收集、分发、梯度汇总
  • 通信串行:GPU0需要依次与其他GPU通信
  • GIL限制:Python全局锁影响多线程效率
  • batch size限制:受限于主GPU显存
  • 单一DataLoader实例:只有一个进程加载数据
  • batch_size是总大小:64个样本会平均分到4个GPU(每个16个)
  • 主GPU(GPU0)负责分发:数据先加载到GPU0,再分发到其他GPU
  • 数据加载瓶颈:所有GPU等待同一个DataLoader

DDP

1
2
3
4
5
6
7
8
9
10
11
12
13
进程0 (GPU0)         进程1 (GPU1)         进程2 (GPU2)         进程3 (GPU3)
↓ ↓ ↓ ↓
[加载数据] [加载数据] [加载数据] [加载数据]
↓ ↓ ↓ ↓
[前向传播] [前向传播] [前向传播] [前向传播]
↓ ↓ ↓ ↓
[计算梯度] [计算梯度] [计算梯度] [计算梯度]
↓ ↓ ↓ ↓
└───────────────→ [All-Reduce] ←────────────────┘

[所有GPU获得平均梯度]

[每个GPU独立更新参数]

DistributedSampler 是 PyTorch 中专门为分布式训练设计的采样器,它的核心作用是将数据集均匀分配给多个GPU进程,并确保每个epoch的数据划分不同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# DDP模式:每GPU独立DataLoader
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

def train(rank, world_size):
# 每个进程独立创建DataLoader
sampler = DistributedSampler(
dataset,
num_replicas=world_size, # 总GPU数
rank=rank, # 当前GPU编号
shuffle=True
)

dataloader = DataLoader(
dataset,
batch_size=16, # 每个GPU的batch size
sampler=sampler, # 使用分布式采样器
num_workers=4, # 每个GPU有4个worker
pin_memory=True
)

# 每个GPU有自己的模型副本
model = DDP(model.cuda(rank), device_ids=[rank])

for epoch in range(epochs):
sampler.set_epoch(epoch) # 重要:保证每个epoch shuffle不同
for batch in dataloader:
batch = batch.cuda(rank)
output = model(batch)
  • 无主GPU:所有GPU地位平等
  • 并行通信:Ring All-Reduce算法,通信效率高
  • 无GIL限制:每个进程独立Python解释器
  • 数据并行:每个GPU独立加载数据
  • 多个DataLoader实例:每个GPU进程独立加载数据
  • batch_size是每GPU大小:batch =64/4= 16
  • Worker总数是总共的worker数目:num_workers × GPU数

num_workers设置偏好

GPU数量和num_workers没有直接的对应关系,它们是两个独立的维度。设置错误不会导致程序崩溃,但会影响训练效率。

GPU数量:决定模型并行度

num_workers:决定数据加载并行度,保守起步num_workers = 4

两者没有对应关系

1
2
3
4
5
# 这些配置都是合法的
num_workers = 0, GPU数 = 4 # ✅ 单进程加载数据
num_workers = 4, GPU数 = 4 # ✅ 常见配置
num_workers = 8, GPU数 = 4 # ✅ 更多worker
num_workers = 2, GPU数 = 8 # ✅ 较少worker

CPU workers负责数据的预处理,然后打包成数据队列之后放一起再依次给GPU分组加载

1
2
3
4
5
6
7
8
                    [GPU 0] ← 模型副本0

[CPU Worker 0] ─→ [数据队列]
[CPU Worker 1] ─→ [数据队列] ←─ [主进程] ─→ [GPU 1] ← 模型副本1
[CPU Worker 2] ─→ [数据队列] [GPU 2]
[CPU Worker 3] ─→ [数据队列] [GPU 3]
↑ ↑
数据加载进程 数据分发进程
1
2
3
4
5
Worker0 → batch[0:8]  ┐
Worker1 → batch[8:16] ├→ 主进程合并 → batch(32) → GPU0(0-7)
Worker2 → batch[16:24]│ GPU1(8-15)
Worker3 → batch[24:32]┘ GPU2(16-23)
GPU3(24-32)

多GPU训练的num_workers设置策略

策略1:DataParallel(单进程多GPU)

1
2
3
4
5
6
7
8
# 单进程控制多个GPU
model = nn.DataParallel(model, device_ids=[0,1,2,3])
dataloader = DataLoader(
dataset,
batch_size=128, # 总batch size
num_workers=8, # 8个worker提供数据
pin_memory=True # 加速CPU→GPU传输
)

策略2:DistributedDataParallel(多进程多GPU)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 每个GPU一个进程
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# 每个进程独立
local_rank = int(os.environ['LOCAL_RANK'])
torch.cuda.set_device(local_rank)
dist.init_process_group('nccl')

# 每个进程有自己的dataloader
dataloader = DataLoader(
dataset,
batch_size=32, # 每个GPU的batch size
num_workers=4, # 每个GPU的worker数
sampler=DistributedSampler(dataset)
)

自动测试使用num_worker数目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import torch
import time
from torch.utils.data import DataLoader

def find_optimal_workers(dataset, batch_size, gpu_count, max_workers=16):
"""自动寻找最优num_workers"""

print(f"Testing num_workers for {gpu_count} GPUs...")
best_time = float('inf')
best_workers = 0

for workers in [0, 2, 4, 8, 12, 16]:
if workers > max_workers:
break

dataloader = DataLoader(
dataset,
batch_size=batch_size * gpu_count,
num_workers=workers,
pin_memory=True
)

# 测量数据加载速度(不训练)
start = time.time()
for i, batch in enumerate(dataloader):
if i >= 50: # 测试50个batch
break
elapsed = time.time() - start

# 计算每秒处理batch数
throughput = 50 / elapsed

print(f" workers={workers}: {throughput:.2f} batches/sec")

if throughput > best_throughput:
best_throughput = throughput
best_workers = workers

return best_workers

# 使用
best_workers = find_optimal_workers(dataset, batch_size=32, gpu_count=4)
print(f"Recommended num_workers: {best_workers}")