FSDP Tutorial¶
In this tutorial, we will walk through the implementation of Fully Sharded Data Parallel (FSDP) with Zero2 sharding strategy in thunder
.
Introduction¶
In recent times, the LLM models have grown so large that all the model parameters don’t fit on a single GPU. To circumvent this problem, there are various strategies like Tensor Parallel, Pipeline Parallel, Fully Sharded Data Parallel, etc to train these large models. In this tutorial, we discuss and implement Zero2 strategy for Fully Sharded Data Parallel (FSDP).
What is Zero2 strategy for FSDP?¶
In this strategy, we shard the model parameters across all the availabe GPUs. That is each GPU holds onto only a chunk of the parameter. During the forward pass, all GPUs call all_gather
communication primitive to gather the parameters from other GPUs. Unlike Zero3 strategy which frees the parameter after forward pass, we save these unsharded parameters for backward pass. This is to save the overhead of extra communication. In the backward pass, we utilize the saved parameters and compute
the gradients. Once the gradients are computed, we use reduce_scatter
communication primitive to reduce (average) the gradients across all GPUs and scatter those gradients so that a given GPU holds only a chunk of gradient.
For more information on FSDP, we recommend reading
Example Model¶
For this example we will have a simple model Linear(Tanh(Linear(x)))
which will be sharded over 2 GPUs
NOTE: We are generating the abstract trace so we don’t actually need a system with 2 GPUs for this. It is only required when we execute this trace.
[ ]:
import torch
import torch.distributed
import thunder
import thunder.distributed
from looseversion import LooseVersion
from IPython.display import Code
[ ]:
device='cuda'
dim = 64
def create_model():
layers = [torch.nn.Linear(dim, dim, bias=False),
torch.nn.Tanh(),
torch.nn.Linear(dim, dim, bias=False)]
return torch.nn.Sequential(*layers).to(device)
# Model
model = create_model()
# Input
x = torch.randn(dim, dim, device=device)
# we want to obtain a functional version of our model. The JIT does that internally and we reach into those
# internals here
thunder_model = thunder.jit(model)
cache_rec, i_, _ = thunder.compile_data(thunder_model).get_computation_and_inputs(x)
computation_trace = cache_rec.computation_traces[0]
[ ]:
def wrap_as_highlighted_code(trace):
return Code(str(trace), language="python")
We can show the functional version:
[ ]:
wrap_as_highlighted_code(computation_trace)
Step 1 : Configuration¶
For our implementation of FSDP, we will generate the trace where we are sharding our model over 2 GPU
[ ]:
# FSDP Config
# Usually these values are set in the environment by `torchrun` but for this example
# we will set them ourselves
world_size = 2 # We have two processes.
global_rank = 0 # Current process is the very first process.
Step 2: Function to shard parameters¶
Next step is to write a function which will actually shard the parameters over 0-dim.
[ ]:
# NOTE: We shard over 0th dimension of the param.
def shard_param(param: torch.Tensor, rank: int, world_size: int, name: str) -> None:
# We will keep it simple and error if param's 0th dim is not divisible by ``world_size``.
# Alternative is that we can pad our parameters so that they are divisible by `world_size`.
assert param.shape[0] % world_size == 0,(
f"Current sharding requires the first dimension of the parameter {name!r} ({param.shape[0]})"
f" to be divisible by the world size ({world_size})"
)
chunk_size = param.shape[0] // world_size
# rank helps us determine which chunk of the parameter we will hold.
shard = param.data.narrow(0, chunk_size * rank, chunk_size).clone()
param.data = shard
# Shard each parameter of the model
for param_name, param in model.named_parameters():
shard_param(param, global_rank, world_size, param_name)
# Mark the param to denote that it is sharded.
# This is required by the synchronization primitive we will use below.
param.distparallel_type = thunder.core.proxies.DistParallelType.FULLY_SHARDED
[ ]:
# Verify our model looks as expected
model
[ ]:
# Let us verify that we have actually sharded the parameters.
# Checking if the weight of 1st Linear layer is sharded over 0th dim.
assert model[0].weight.shape == (dim / world_size, dim)
Step 3: Add an operation to synchronize the parameters before calling the model.forward.¶
We have to create a process group. This is needed because the synchronization primitive synchronize
that we will use to gather and scatter our weights in forward and backward requires a process group.
[ ]:
# Create a process group
if LooseVersion(torch.__version__) > LooseVersion("2.6"):
# ProcessGroup constructor has been updated since https://github.com/pytorch/pytorch/pull/135653/
process_group = torch.distributed.distributed_c10d.ProcessGroup(torch.distributed.distributed_c10d.Store(),
global_rank, world_size)
else:
options = torch.distributed.distributed_c10d.ProcessGroup.Options(backend="nccl")
process_group = torch.distributed.distributed_c10d.ProcessGroup(torch.distributed.distributed_c10d.Store(),
global_rank, world_size, options)
torch.distributed.distributed_c10d.GroupMember.WORLD = process_group
[ ]:
# now we have a functional version of the model which
# takes as inputs the expected arguments and all the parameters.
functional_forward = computation_trace.python_callable()
# This function creates a model with synchronization
# before calling the forward pass.
def model_with_syncs(x, *params):
# We call `prims.synchronize` on all the parameters.
# This is essentially calling `all_gather` so that we have the complete
# parameter before we actually to the forward computation.
unsharded_params = []
for param in params:
unsharded_params.append(thunder.distributed.prims.synchronize(param, process_group))
return functional_forward(x, *unsharded_params)
Let us now see what the trace of our model looks like with all the synchronization.
Two main observations regarding the below trace
We can observe the
prims.synchronize
that we inserted usingmodel_with_syncs
.Output of the
prims.synchronize
have the shape of unsharded (original) parameter.
With this, we have implemented the FSDP for the forward pass of our model.
[ ]:
trace = thunder.trace()(model_with_syncs, x, *model.parameters())
wrap_as_highlighted_code(trace)
For backward, we don’t have to do anything because thunder
already knows how to compute the backward of prims.synchronize
. We can verify that by using the value_and_grad
transform to generate the complete forward and backward trace together.
Observations for the trace below:
prims.synchronize
from previous trace is now decomposed intoprims.all_gather
andprims.wait
. So, we can clearly see that we make a communication call to gather the parameter (which is asynchronous) and wait till we have the complete parameter.At the end of the trace (after the forward and the backward computation), we see calls to
prims.reduce_scatter
andprims.wait
. This takes care of reducing the gradients across all the GPUs and sharding them. One thing to note, for averaging gradients with low dynamic range dtype likefloat16
, if we naively sum the gradients across GPUs before dividing byworld_size
, it can lead to overflows. So we scale the gradient tensor withworld_size
, before callingreduce_scatter
withsum
reduction to effectively average the gradients without overflow.
[ ]:
from thunder.core.transforms import value_and_grad
forward_and_backward_model = value_and_grad(model_with_syncs)
forward_backward_trace = thunder.trace()(forward_and_backward_model, x, *model.parameters())
wrap_as_highlighted_code(forward_backward_trace)
The above trace, only contains primitive which specifies the semantic of an operation abstractly but doesn’t perform the actual computation.
Now we will generate the execution trace which can actually perform the compute.
In the execution trace generated below, we can see that all the primitives have been replaced with actually PyTorch operations. Also, our synchronization primitives have been replaced with PyTorch implementation provided by thunder i.e. torch_all_gather_prim_impl
, torch_reduce_scatter_prim_impl
, torch_wait_prim_impl
.
[ ]:
optimized_trace = thunder.transform_for_execution(forward_backward_trace, executors_list=thunder.get_always_executors())
# Grab the final trace
exec_trace = optimized_trace[-1]
wrap_as_highlighted_code(exec_trace)
Step 4 : Running the actual computation¶
Running the actual computation will require setting up 2 processes and running our above code in both those processes (which can be tricky with Jupyter Notebook). Instead, we will write a small script and run it with torchrun
which takes care of setting up the processes and relevant state.
NOTE: This requires device running this notebook to have at least 2-GPUs
In the example below, we will use thunder.distributed.fsdp
which does the same as what we did above (with some extra checks). The code below should look familiar as it is roughly all the above pieces in a single script.
[ ]:
%%writefile thunder_fsdp_simple_example.py
# imports
from thunder.tests.litgpt_model import GPT, Config
import torch
import torch.distributed
import thunder
import thunder.distributed
import os
# # # # # # # #
# Create Model
# # # # # # # #
# NOTE: We create the model on CPU.
device='cpu'
dim = 64
def create_model():
layers = []
layers.append(torch.nn.Linear(dim, dim))
layers.append(torch.nn.ReLU())
layers.append(torch.nn.Linear(dim, dim))
return torch.nn.Sequential(*layers).to(device)
# Model
model = create_model()
# Input
x = torch.randn(dim, dim, device=device)
# # # # # # # #
# Setup for distributed
# # # # # # # #
torch.distributed.init_process_group(backend='nccl')
rank = int(os.environ["LOCAL_RANK"])
device = f"cuda:{rank}"
# # # # # # # #
# Move inputs to correct device
# # # # # # # #
x = x.to(device)
# # # # # # # #
# Wrap the model in thunder.distributed.fsdp
# # # # # # # #
# thunder.distributed.fsdp takes care of moving the parameter
# shard to the correct GPU for the current process.
cmodel = thunder.jit(thunder.distributed.fsdp(model))
# Run the forward pass.
cmodel(x)
# # # # # # # #
# Check the traces
# # # # # # # #
fwd_traces = thunder.last_traces(cmodel)
bwd_traces = thunder.last_backward_traces(cmodel)
# # # # # # # #
# Print and check to see if they match ours
# # # # # # # #
if rank == 0:
print(fwd_traces[-1])
print("*******"* 8)
print(bwd_traces[-1])
Let us run the above script and check what the trace looks like.
We can observe that forward trace has torch_all_gather_prim_impl
to gather the parameter before forward pass and the backward trace has torch_reduce_scatter_prim_impl
to reduce and scatter the gradients back to different GPUs. This is similar to our implementation above.
[ ]:
!torchrun --nproc_per_node=2 thunder_fsdp_simple_example.py
Conclusion¶
We have created our implementation of FSDP to shard our model across multiple GPUs. In the process, we also learned that:
thunder
provides us with primitives for synchronization across mutiple GPUs.thunder
also takes care of implementing the backward support for the synchronization primitives, so we don’t have to explicitly do anything to get the backward working.We can just easily apply
thunder.distributed.fsdp
to our model and it will take care of sharding the parameters and also adding synchronizations to our model. Also, we can easily check the modifications by inspecting the traces.