Sign Up to Our Newsletter

Be the first to know the latest tech updates

Uncategorized

Optimizing Data Transfer in Distributed AI/ML Training Workloads

Optimizing Data Transfer in Distributed AI/ML Training Workloads


part of a series of posts on optimizing data transfer using NVIDIA Nsight™ Systems (nsys) profiler. Part one focused on CPU-to-GPU data copies, and part two on GPU-to-CPU copies. In this post, we turn our attention to data transfer between GPUs.

Nowadays, it is quite common for AI/ML training — particularly of large models — to be distributed across multiple GPUs. While there are many different schemes for performing such distribution, what they all have in common is their reliance on the constant transfer of data — such as gradients, weights, statistics, and/or metrics — between the GPUs, throughout training. As with the other types of data transfer we analyzed in our previous posts, here too, a poor implementation could easily lead to under-utilization of compute resources and the unjustified inflation of training costs. Optimizing GPU-to-GPU communication is an active area of research and innovation involving both hardware and software development.

In this post, we will focus on the most common form of distributed training — data-distributed training. In data-distributed training, identical copies of the ML model are maintained on each GPU. Each input batch is evenly distributed among the GPUs, each of which executes a training step to calculate the local gradients. The local gradients are then shared and averaged across the GPUs, resulting in an identical gradient update to each of the model copies. Using NVIDIA Nsight™ Systems (nsys) profiler, we will analyze the effect of the GPU-to-GPU transfer of the gradients on the runtime performance of training a toy model and assess a few techniques for reducing its overhead.

Disclaimers

The code we will share in this post is intended for demonstrative purposes; please do not rely on its accuracy or optimality. Please do not interpret our mention of any tool, framework, library, service, or platform as an endorsement of its use.

Thanks to Yitzhak Levi for his contributions to this post.

Instance Selection for Distributed Training

In a previous post, Instance Selection for Deep Learning, we discussed the importance of choosing an instance type that is most suitable for your AI/ML workload and the potential impact of that choice on the success of your project. When choosing an instance type for a workload that includes a lot of GPU-to-GPU traffic, you will want to pay attention to how such communication is carried out, including: the instance topology, GPU interconnects, maximal throughput, and latency.

In this post, we will limit our discussion to distribution between GPUs on a single instance. We will experiment with two instance types: an Amazon EC2 g6e.48xlarge with 8 NVIDIA L40S GPUs and an Amazon EC2 p4d.24xlarge with 8 NVIDIA A100 GPUs. Each will run an AWS Deep Learning (Ubuntu 24.04) AMI with PyTorch (2.8), nsys-cli profiler (version 2025.6.1), and the NVIDIA Tools Extension (NVTX) library.

One of the primary differences between the two instance types is how the GPUs are connected: On the g6e.48xlarge communication between the GPUs is over PCI Express (PCIe), whereas p4d.24xlarge includes NVIDIA NVLink™ — dedicated hardware for enabling high-throughput GPU-to-GPU communication. Communication over the PCIe bus is significantly slower than NVLink. While this may be sufficient for workloads with a low communication-to-compute ratio, it could be a performance killer for workloads with high communication rates.

To discover the topology of the instance types, we run the following command:

nvidia-smi topo -m

On the g6e.48xlarge, we get the following results:

GPU Topology of g6e.48xlarge (by Author)

GPUs on the same NUMA node are connected by a “NODE” link and GPUs on different NUMA nodes by a “SYS” link. Both links traverse the PCIe as well as another interconnect; neither is a direct connection (a.k.a., a “PIX” link). We will see later on how this can impact throughput performance.

On the p4d.24xlarge, every pair of GPUs is linked by a dedicated NVLink connection:

GPU Topology of p4d.24xlarge (by Author)

A Toy Model

To facilitate our discussion, we build a toy data-distributed training experiment.

We choose a model with a relatively high communication-to-compute ratio — a Vision Transformer (ViT)-L/32 image-classification model with roughly 306 million parameters — and a synthetic dataset that we will use to train it:

import os, time, torch, nvtx
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import Dataset, DataLoader
from torchvision.models import vit_l_32

WORLD_SIZE = int(os.environ.get("WORLD_SIZE", 1))
BATCH_SIZE = 32
IMG_SIZE = 224
WARMUP_STEPS = 10
PROFILE_STEPS = 3
COOLDOWN_STEPS = 1
TOTAL_STEPS = WARMUP_STEPS + PROFILE_STEPS + COOLDOWN_STEPS
N_WORKERS = 8

def get_model():
    return vit_l_32(weights=None)

# A synthetic dataset with random images and labels
class FakeDataset(Dataset):
    def __len__(self):
        return TOTAL_STEPS * BATCH_SIZE * WORLD_SIZE

    def __getitem__(self, index):
        img = torch.randn((3, IMG_SIZE, IMG_SIZE))
        label = torch.randint(0, 1000, (1,)).item()
        return img, label

def get_data_iter(rank, world_size):
    dataset = FakeDataset()

    sampler = DistributedSampler(dataset, num_replicas=world_size,
                                 rank=rank, shuffle=True)

    train_loader = DataLoader(dataset, batch_size=BATCH_SIZE,
                              sampler=sampler, num_workers=N_WORKERS, 
                              pin_memory=True)

    return iter(train_loader)

We define a utility function that we will use to set up PyTorch DistributedDataParallel (DDP) training. We configure the PyTorch distributed communication package to use the NVIDIA Collective Communications Library (NCCL) as its communication backend and wrap the model in a DistributedDataParallel container.

def configure_ddp(model, rank):
    dist.init_process_group("nccl")
    model = DDP(model,
                device_ids=[rank],
                bucket_cap_mb=2000)
    return model

Note that we naively configure the DDP bucket capacity to 2 GB. DDP groups the model’s gradients into buckets and performs gradient reduction of each bucket in a separate command. The implication of setting the bucket capacity to 2 GB is that all of the ~306 million (FP32) gradients will fit in a single bucket (306 million x 4 bytes per gradient = ~1.22 GB) and reduction will only occur once all gradients have been calculated.

As in our previous posts, we schedule nsys profiler programmatically and wrap the different portions of the training step with color-coded NVTX annotations:

def train(use_ddp=False):
    # detect the env vars set by torchrun
    rank = int(os.environ.get("RANK", 0))
    local_rank = int(os.environ.get("LOCAL_RANK", 0))
    torch.cuda.set_device(local_rank)

    model = get_model().to(local_rank)
    criterion = nn.CrossEntropyLoss().to(local_rank)

    if use_ddp:
        model = configure_ddp(model, rank)

    optimizer = optim.SGD(model.parameters())

    data_iter = get_data_iter(rank, WORLD_SIZE)

    model.train()

    for i in range(TOTAL_STEPS):

        # Schedule Profiling
        if i == WARMUP_STEPS:
            torch.cuda.synchronize()
            start_time = time.perf_counter()
            torch.cuda.profiler.start()
        elif i == WARMUP_STEPS + PROFILE_STEPS:
            torch.cuda.synchronize()
            torch.cuda.profiler.stop()
            end_time = time.perf_counter()

        with nvtx.annotate(f"Batch {i}", color="blue"):

            with nvtx.annotate("get batch", color="red"):
                data, target = next(data_iter)
                data = data.to(local_rank, non_blocking=True)
                target = target.to(local_rank, non_blocking=True)
            with nvtx.annotate("forward", color="green"):
                output = model(data)
                loss = criterion(output, target)

            with nvtx.annotate("backward", color="purple"):
                optimizer.zero_grad()
                loss.backward()

            with nvtx.annotate("optimizer step", color="yellow"):
                optimizer.step()

    if use_ddp:
        dist.destroy_process_group()

    if rank == 0:
        total_time = end_time - start_time
        print(f"Throughput: {PROFILE_STEPS/total_time:.2f} steps/sec")


if __name__ == "__main__":
    # enable ddp if run with torchrun
    train(use_ddp="RANK" in os.environ)

We set the NCCL_DEBUG environment variable for visibility into how NCCL sets up the transport links between the GPUs.

export NCCL_DEBUG=INFO

Single-GPU Performance

We begin our experimentation by running our script on a single GPU without DDP:

nsys profile \
  --capture-range=cudaProfilerApi \
  --capture-range-end=stop \
  --trace=cuda,nvtx,osrt,nccl \
  --output=baseline \
  python train.py

Note the inclusion of nccl in the trace section; this will be critical for analyzing the GPU-to-GPU communication in the multi-GPU experiments.

The throughput of the baseline experiment is 8.91 steps per second on the L40S GPU and 5.17 steps per second on the A100: The newer NVIDIA Ada Lovelace Architecture performs better than the NVIDIA Ampere Architecture on our toy model. In the image below we show the timeline view for the L40S experiment.

Single L40S GPU Nsight Systems Trace (by Author)

In this post we will focus on the CUDA portion of the timeline. Looking at the NVTX row, we see the recurring red (CPU-to-GPU copy), green (forward), purple (backward), and yellow (optimizer update) pattern that makes up our train step. Note, however, that the purple appears as just a tiny blip while, in practice, the backward pass fills the entire gap between the forward and optimizer blocks: The NVTX library appears to have captured only the initial launch of the autograd graph.

We will use this trace as a comparative baseline for our next experiments.

DDP with 1 GPU

We assess the impact of the DDP wrapper by running torchrun on a single GPU:

torchrun --nproc_per_node=1 \
    --no-python \
    nsys profile \
    --capture-range=cudaProfilerApi \
    --capture-range-end=stop \
    --trace=cuda,nvtx,nccl,osrt \
    --output=ddp-1gpu \
    python train.py

The resulting throughput drops to 8.40 steps per second on the L40S GPU and 5.04 steps per second on the A100. Even in the absence of any cross-GPU communication, DDP introduces overhead that can decrease throughput by ~3–7%. The important lesson from this is to always ensure that single GPU training is run without DDP.

The nsys trace of the L40S experiment confirms the presence of the DDP overhead:

Single L40S GPU With DDP – Profiler Trace (by Author)

The main change from the training step in the previous trace is a large chunk of device-to-device (DtoD) memory copies at the end of the backward block and just before the optimizer block (highlighted in the trace above). This is the DDP in action: Even in the absence of a cross GPU gradient reduction, DDP prepares the local gradients in a dedicated memory block for reduction and then copies the results back into the grad property of each parameter in preparation for the gradient update. (Note that the DtoD copies are between two memory locations on the same GPU — not between two different GPUs).

DDP With Multiple GPUs

Next, we assess the impact of gradient sharing between 2, 4, and 8 GPUs:

torchrun --nproc_per_node=8 \
    --no-python \
    nsys profile \
    --capture-range=cudaProfilerApi \
    --capture-range-end=stop \
    --trace=cuda,nvtx,nccl,osrt \
    --output=ddp-8gpu_%q{RANK} \
    python train.py

The table below captures impact on the training throughput on the g6e.48xlarge and the p4d.24xlarge:

Throughput Performance of DDP Training (by Author)

DDP training performance plummets on the g6e.48xlarge instance: The 8-GPU throughput is more than 6 times slower than the single-GPU result. The NCCL logs include multiple lines describing the communication paths between the GPUs, e.g.:

NCCL INFO Channel 00 : 0[0] -> 1[1] via SHM/direct/direct

This indicates that the data transfer between each two GPUs passes through CPU shared memory which greatly limits the communication bandwidth.

On the p4d.24xlarge, in contrast, where the NVLink connections allow for direct peer-to-peer (P2P) communication, the 8-GPU throughput is just 8% lower than the baseline result:

NCCL INFO Channel 00/0 : 0[0] -> 1[1] via P2P/CUMEM/read

Even though each individual L40S outperforms the A100 on our toy model by 72%, the inclusion of NVLink makes the p4d.24xlarge more optimal than the g6e.48xlarge for running DDP over 8 GPUs.

The data transfer bottleneck on the g6e.48xlarge can be easily seen in the nsys trace. Here we use the “multiple view” option to display the activity of all the DDP processes in a single timeline:

8-GPU DDP on g6e.48xlarge – Profiler Trace (by Author)

The gradient reduction occurs in the ncclAllReduce call on the NCCL row. This comes just after the local gradient calculation has completed and just before the DtoD memory operation discussed above that copies the reduced gradients back to the grad field of each parameter. On the g6e.48xlarge the NCCL operation dominates the backward pass, accounting for approximately 84% of the overall step time (587 out of 701 milliseconds).

On the p4d.24xlarge, the NCCL call takes up a much smaller portion of the training step:

8-GPU DDP on p4d.24xlarge – Profiler Trace (by Author)

In the next sections we will discuss a few DDP optimization techniques and assess their impact on runtime performance. We will limit our scope to PyTorch-level optimizations, leaving other methods (e.g., NCCL/OS/network tuning) for another post.

A general approach for dealing with communication bottlenecks is to reduce the frequency of communication. In DDP workloads this can be achieved by increasing the data batch size (i.e., increasing the compute per step) or, if the memory capacity forbids this, apply gradient accumulation — instead of applying the gradient reduction and update every step, accumulate the local gradients for N steps and apply the reduction every Nth step. Both techniques increase the effective batch size — the number of overall samples between gradient updates which can impact model convergence and may require tuning of the optimization parameters. In our discussion we assume the effective batch size is fixed.

We will cover four techniques. The first two focus on reducing DDP overhead. The final two directly address the data transfer bottleneck.

Optimization 1: Static Graph Declaration

Our first change is to pass static_graph=True to the DDP container. Generally speaking, models might include parameters for which gradients are not calculated at every step — referred to as “unused parameters” in the DDP documentation. This is common in models with conditional logic. DDP includes dedicated mechanisms for identifying and handling unused parameters. In the case of our toy model, all of the gradients are calculated in each step — our graph is “static”. Explicitly declaring our graph as static reduces the overhead associated with handling a dynamic gradient set.

def configure_ddp(model, rank):
    dist.init_process_group("nccl")
    model = DDP(model,
                device_ids=[rank],
                static_graph=True,
                bucket_cap_mb=2000)
    return model

In the case of our toy model, the impact of this change is negligible on both the g6e.48xlarge and the p4d.24xlarge. Without any further delay, we proceed to the next optimization.

Optimization 2: Increase Memory Efficiency

Our second technique addresses the large chunk of DtoD memory copying we identified above. Instead of copying the gradients to and from the NCCL communication memory blocks, we can explicitly set the parameter gradients to point directly to the NCCL communication buffers. Consequently, the same memory is used to store the local gradients, to perform the gradient reduction, and to apply the gradient updates. This is configured by passing gradient_as_bucket_view=True to the DDP container:

def configure_ddp(model, rank):
    dist.init_process_group("nccl")
    model = DDP(model,
                device_ids=[rank],
                static_graph=True,
                gradient_as_bucket_view=True,
                bucket_cap_mb=2000)
    return model

In the trace below, captured on the p4d.24xlarge, we no longer see the block of DtoD memory copy between the all-reduce and (yellow) optimizer steps:

Memory-Optimized DDP on p4d.24xlarge – Profiler Trace (by Author)

In the case of our toy example, this optimization boosts performance by a modest 1%.

Optimization 3: Gradient Compression

A common approach for addressing communication bottlenecks is to apply compression algorithms to reduce the size of the payload. PyTorch DDP provides a number of dedicated communication hooks that automate compression of gradients before NCCL reduction and decompression afterward. In the code block below, we apply bfloat16 gradient compression:

import torch.distributed.algorithms.ddp_comm_hooks.default_hooks as ddp_hks

def configure_ddp(model, rank):
    dist.init_process_group("nccl")
    model = DDP(model,
                device_ids=[rank],
                static_graph=True,
                gradient_as_bucket_view=True,
                bucket_cap_mb=2000)

    model.register_comm_hook(state=None, hook=ddp_hks.bf16_compress_hook)
    return model

On the heavily bottlenecked g6e.48xlarge instance bfloat16 compression results in a considerable 65% speed-up! The nsys trace shows the reduced-sized NCCL call as well as the newly introduced compression operations:

BF16-Compressed DDP on g6e.48xlarge – Profiler Trace (by Author)

On the p4d.24xlarge the overhead of the compression operations outweigh the gains in the communication speed, leading to an overall reduction in throughput:

BF16-Compressed DDP on p4d.24xlarge – Profiler Trace (by Author)

PyTorch offers a more aggressive compression algorithm than bfloat16, called PowerSGD. Below we present a naive usage — in practice this can require a lot of tuning. Please see the documentation for details:

from torch.distributed.algorithms.ddp_comm_hooks import powerSGD_hook 

def configure_ddp(model, rank):
    dist.init_process_group("nccl")
    model = DDP(model,
                device_ids=[rank],
                static_graph=True,
                gradient_as_bucket_view=True,
                bucket_cap_mb=2000)

    state = powerSGD_hook.PowerSGDState(
        process_group=None
    )

    model.register_comm_hook(
        state,
        hook=powerSGD_hook.powerSGD_hook
    )
    return model

PowerSGD has a dramatic impact on the g6e.48xlarge instance, increasing throughput all the way back up to 7.5 steps per second — more than five times faster than the baseline result! Note the reduction in NCCL kernel size in the resultant trace:

PowerSGD-Compressed DDP on g6e.48xlarge – Profiler Trace (by Author)

It is important to note that these compression algorithms are precision-lossy and should be used with caution, as they may impact your model convergence.

Optimization 4: Parallelize Gradient Reduction

DDP groups parameters into multiple buckets and triggers gradient reduction of each bucket independently — as soon as the bucket is filled. This allows for running gradient reduction of filled buckets while gradient calculation (of other buckets) is still ongoing. The degree of overlap depends on the number of DDP buckets which we control via the bucket_cap_mb setting of the DDP container. Recall that in our initial implementation, we explicitly set this to 2000 (2 GB) which (given the size of our model) translated to a single DDP bucket. The table below shows the throughput for different values of bucket_cap_mb. The optimal setting will vary based on the details of the model and runtime environment.

DDP Throughput vs. Bucket Capacity (by Author)

Note the significant 12% improvement when using multiple buckets on the g6e.48xlarge with BF16 compression. PowerSGD, on the other hand, works best when applied to a single bucket.

In the image below, captured on a p4d.24xlarge with bucket_cap_mb set to 100, we can see the impact of this optimization on the profiler trace:

DDP With Multiple Buckets on p4d.24xlarge – Profiler Trace (by Author)

In place of the single NCCL all-reduce operation, we now have 11 smaller blocks running in parallel with the local gradient computation.

Results

We summarize the results of our experiments in the following table:

Experiment Results (by Author)

On the p4d.24xlarge, where the data is transferred over NVLink, the overall impact was a modest 4% speed-up. But on the g6e.48xlarge the gains were significant and mostly due to gradient compression — and 86% boost for the BF16 scheme and an over 5X improvement for (the naive implementation of) PowerSGD.

Importantly, these results can vary considerably based on the model architecture and runtime environment.

Summary

This concludes our three-part series on identifying and solving data transfer bottlenecks with NVIDIA Nsight™ Systems (nsys) profiler. In each of the posts we demonstrated the tendency of data transfers to introduce performance bottlenecks and resource under-utilization. In each case, we used nsys profiler to identify the bottlenecks and measure the impact of different optimization techniques.

The accelerations we achieved in each of the use-cases that we studied strengthen the importance of integrating the regular use of tools such as nsys profiler into the AI/ML development workflow and highlight the opportunity — even for non-CUDA specialists — to achieve meaningful performance gains and AI/ML cost reductions.



Source link

Team TeachToday

Team TeachToday

About Author

TechToday Logo

Your go-to destination for the latest in tech, AI breakthroughs, industry trends, and expert insights.

Get Latest Updates and big deals

Our expertise, as well as our passion for web design, sets us apart from other agencies.

Digitally Interactive  Copyright 2022-25 All Rights Reserved.