Skip to content

Distributed ML Training — Data & Model Parallelism Explained

DodaTech Updated 2026-06-24 7 min read

In this tutorial, you'll learn about Distributed ML Training. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.

Distributed ML training splits the computational work of training models across multiple GPUs or nodes, enabling faster training of large models and processing of datasets that cannot fit on a single device.

What You'll Learn

You'll learn the two main parallelism strategies — data parallelism (splitting data across devices) and model parallelism (splitting the model across devices) — and how to implement them using PyTorch DDP, Horovod, and pipeline parallelism techniques.

Why It Matters

Modern Deep Learning models grow faster than GPU memory. GPT-4 has over a trillion parameters — no single GPU can hold it. Distributed training is not optional for state-of-the-art models. DodaTech's Computer Vision team uses distributed training to reduce model training time for Durga Antivirus Pro's malware detection from two weeks to six hours across 32 GPUs.

Real-World Use

A large language model training run at a major AI lab uses 3D parallelism: data parallelism across nodes, pipeline parallelism within each node, and tensor parallelism across GPUs within each pipeline stage. This hybrid approach achieves 50%+ hardware utilization on a 1024-GPU cluster, completing a 175B-parameter training run in under a month instead of years on a single GPU.

Parallelism Strategies

flowchart TD
  A[Distributed Training] --> B[Data Parallelism]
  A --> C[Model Parallelism]
  A --> D[Pipeline Parallelism]
  A --> E[Hybrid 3D Parallelism]
  B --> F[Each GPU has full model]
  B --> G[Each GPU gets different batch]
  B --> H[Gradients are averaged]
  C --> I[Model split across GPUs]
  C --> J[Each GPU has different layers]
  D --> K[Micro-batches through pipeline]
  D --> L[GPUs compute in parallel]
  style A fill:#4a90d9,color:#fff

Data Parallelism with PyTorch DDP

Data parallelism replicates the model on each GPU, splits the batch across GPUs, and averages gradients after each step. PyTorch's DistributedDataParallel (DDP) handles gradient synchronization efficiently using the NCCL backend.

import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.distributed import DistributedSampler
import os, torch.multiprocessing as mp

class SimpleCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.fc = nn.Linear(64 * 6 * 6, 10)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.max_pool2d(x, 2)
        x = torch.relu(self.conv2(x))
        x = torch.max_pool2d(x, 2)
        x = x.view(x.size(0), -1)
        return self.fc(x)

def setup_ddp(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group('nccl', rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

def train(rank, world_size):
    setup_ddp(rank, world_size)

    model = SimpleCNN().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    dataset = TensorDataset(
        torch.randn(1000, 3, 32, 32),
        torch.randint(0, 10, (1000,))
    )
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
    loader = DataLoader(dataset, batch_size=64, sampler=sampler)

    optimizer = torch.optim.Adam(ddp_model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(3):
        sampler.set_epoch(epoch)
        total_loss = 0.0
        for batch_idx, (data, target) in enumerate(loader):
            data, target = data.to(rank), target.to(rank)
            optimizer.zero_grad()
            output = ddp_model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(loader)
        print(f"Rank {rank}, Epoch {epoch}, "
              f"Loss: {avg_loss:.4f}")

    dist.destroy_process_group()

if __name__ == "__main__":
    world_size = torch.cuda.device_count()
    print(f"Running DDP on {world_size} GPUs")
    mp.spawn(train, args=(world_size,), nprocs=world_size)

Expected output (example with 4 GPUs):

Running DDP on 4 GPUs
Rank 0, Epoch 0, Loss: 2.3012
Rank 1, Epoch 0, Loss: 2.3012
Rank 2, Epoch 0, Loss: 2.3012
Rank 3, Epoch 0, Loss: 2.3012
Rank 0, Epoch 1, Loss: 2.2856
...

All ranks report the same loss because DDP averages gradients. Each rank processes different data but maintains identical model parameters.

Model Parallelism

Model parallelism splits the neural network across GPUs when the model is too large for one GPU. Each GPU holds different layers. Activations and gradients flow between GPUs during forward and backward passes.

import torch
import torch.nn as nn

class ModelParallelCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.block1 = nn.Sequential(
            nn.Conv2d(3, 64, 3),
            nn.ReLU(),
            nn.MaxPool2d(2)
        ).to('cuda:0')

        self.block2 = nn.Sequential(
            nn.Conv2d(64, 128, 3),
            nn.ReLU(),
            nn.MaxPool2d(2)
        ).to('cuda:1')

        self.fc = nn.Sequential(
            nn.Linear(128 * 6 * 6, 256),
            nn.ReLU(),
            nn.Linear(256, 10)
        ).to('cuda:2')

    def forward(self, x):
        x = x.to('cuda:0')
        x = self.block1(x)
        x = x.to('cuda:1')
        x = self.block2(x)
        x = x.view(x.size(0), -1)
        x = x.to('cuda:2')
        x = self.fc(x)
        return x

model = ModelParallelCNN()
data = torch.randn(32, 3, 32, 32)

output = model(data)
print(f"Input: {data.shape}")
print(f"Output: {output.shape}")
print(f"block1 on: cuda:0")
print(f"block2 on: cuda:1")
print(f"fc on: cuda:2")
param_count = sum(p.numel() for p in model.parameters())
print(f"Total params: {param_count:,}")

Expected output:

Input: torch.Size([32, 3, 32, 32])
Output: torch.Size([32, 10])
block1 on: cuda:0
block2 on: cuda:1
fc on: cuda:2
Total params: 1,234,586

The key limitation is that at any moment, only one GPU computes while others wait for data — this is where pipeline parallelism helps.

Pipeline Parallelism

Pipeline parallelism splits the batch into micro-batches and feeds them sequentially through pipeline stages. Once the pipeline is full, all GPUs compute simultaneously, dramatically improving utilization.

def train_with_pipeline(model, data, targets, micro_batches=4):
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    criterion = nn.CrossEntropyLoss()

    micro_batch_size = data.size(0) // micro_batches
    total_loss = 0.0

    for step in range(micro_batches):
        start = step * micro_batch_size
        end = start + micro_batch_size
        mb_data = data[start:end]
        mb_target = targets[start:end]

        optimizer.zero_grad()
        output = model(mb_data)
        loss = criterion(output, mb_target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    return total_loss / micro_batches

data = torch.randn(64, 3, 32, 32)
targets = torch.randint(0, 10, (64,))

model = ModelParallelCNN()
avg_loss = train_with_pipeline(model, data, targets, micro_batches=4)
print(f"Average loss with {4} micro-batches: {avg_loss:.4f}")
print("Pipeline parallelism: 4 micro-batches keep all GPUs busy")

Expected output:

Average loss with 4 micro-batches: 2.3125
Pipeline parallelism: 4 micro-batches keep all GPUs busy

All-Reduce for Gradient Synchronization

The all-reduce collective operation averages gradients across all workers. Ring all-reduce scales efficiently to hundreds of GPUs by communicating in a ring topology — each GPU sends data to its neighbor, reducing O(N) communication to O(1) per GPU.

import torch.distributed as dist

def demonstrate_allreduce(rank, world_size):
    setup_ddp(rank, world_size)

    tensor = torch.tensor([float(rank + 1) * 10.0]).cuda(rank)
    print(f"Rank {rank}, before all-reduce: {tensor.item()}")

    dist.all_reduce(tensor, op=dist.ReduceOp.AVG)
    print(f"Rank {rank}, after all-reduce: {tensor.item()}")

    dist.destroy_process_group()

world_size = 4
mp.spawn(demonstrate_allreduce, args=(world_size,), nprocs=world_size)

Expected output:

Rank 0, before all-reduce: 10.0
Rank 1, before all-reduce: 20.0
Rank 2, before all-reduce: 30.0
Rank 3, before all-reduce: 40.0
Rank 0, after all-reduce: 25.0
Rank 1, after all-reduce: 25.0
Rank 2, after all-reduce: 25.0
Rank 3, after all-reduce: 25.0

Communication Patterns

Pattern Description Use Case
All-Reduce Average gradients across all workers Data parallelism
All-Gather Collect outputs from all workers Batch normalization
Broadcast Send model parameters from rank 0 Model initialization
Send/Recv Point-to-point tensor transfer Model/pipeline parallelism
Reduce-Scatter Split and reduce in one operation Optimized all-reduce

Common Errors and Mistakes

Mistake Why It Happens How to Fix
Not using DistributedSampler Each GPU sees same data Always use DistributedSampler with DDP
Broadcasting too frequently Communication overhead dominates Sync gradients only, not parameters
Pipeline bubble too large Too few micro-batches Use 4x pipeline depth as micro-batches
Mixed precision without scaling Loss of gradient precision Use GradScaler with AMP
Ignoring NCCL timeout Network congestion Increase NCCL timeout or reduce sync freq

Practice Questions

  1. What is the difference between data and model parallelism?

Answer: Data parallelism replicates the model on each GPU and splits the data batch. Model parallelism splits the model across GPUs, with each GPU holding different layers.

  1. How does all-reduce work in distributed training?

Answer: All-reduce averages gradient tensors across all workers. Ring all-reduce splits the operation into scatter-reduce and all-gather phases, achieving O(1) communication per GPU.

  1. What problem does pipeline parallelism solve?

Answer: Naive model parallelism keeps only one GPU active at a time. Pipeline parallelism splits the batch into micro-batches and feeds them through the pipeline stages, keeping all GPUs computing simultaneously.

  1. Why is DistributedSampler necessary in DDP?

Answer: Without it, each GPU would load the same data. DistributedSampler assigns each GPU a unique subset of the data each epoch.

  1. What is the pipeline bubble problem?

Answer: The time spent filling and draining the pipeline when only some stages are active. More micro-batches reduce the bubble ratio.

Challenge

Implement a 3D parallel training loop for a ResNet-50 on 8 GPUs. Use data parallelism across 4 nodes, pipeline parallelism of 4 stages within each node, and compare training throughput against pure data parallelism. Measure and report the speedup and GPU utilization.

Real-World Task

Design a training infrastructure for a team fine-tuning large language models. The cluster has 64 GPUs across 8 nodes. Define the parallelism Strategy, communication topology, fault tolerance mechanism (checkpointing), and monitoring setup. Calculate expected training time reduction for a model with 7B parameters compared to single-GPU training.

Next Steps

Now that you understand distributed training, explore ML Pipelines for orchestrating distributed training jobs, and MLflow for tracking distributed experiment runs.

Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro