From cb198ed6c6cccaa75e087d41fb300be71743896d Mon Sep 17 00:00:00 2001 From: Houss3m Date: Sun, 8 Feb 2026 10:40:56 +0300 Subject: [PATCH 1/2] feat(zipformer): Add multi-node DDP training support via torchrun/SLURM This commit enables multi-node distributed training for Zipformer using torchrun and SLURM. The changes are backward-compatible with existing single-node training workflows. Key changes: - train.py: Detect torchrun launch via RANK/WORLD_SIZE env vars - train.py: Use LOCAL_RANK for correct GPU device mapping across nodes - train.py: Pass use_ddp_launch flag to setup_dist for proper init - Add slurm_multinode_ddp.sh: Example SLURM script for multi-node training Usage: # Single-node (unchanged): ./zipformer/train.py --world-size 4 ... # Multi-node via SLURM: sbatch zipformer/slurm_multinode_ddp.sh --- .../ASR/zipformer/slurm_multinode_ddp.sh | 148 ++++++++++++++++++ egs/librispeech/ASR/zipformer/train.py | 35 +++-- 2 files changed, 174 insertions(+), 9 deletions(-) create mode 100755 egs/librispeech/ASR/zipformer/slurm_multinode_ddp.sh diff --git a/egs/librispeech/ASR/zipformer/slurm_multinode_ddp.sh b/egs/librispeech/ASR/zipformer/slurm_multinode_ddp.sh new file mode 100755 index 0000000000..01a7d1de81 --- /dev/null +++ b/egs/librispeech/ASR/zipformer/slurm_multinode_ddp.sh @@ -0,0 +1,148 @@ +#!/bin/bash -l +# +# Multi-node DDP training script for Zipformer using SLURM + torchrun +# +# This script demonstrates how to run distributed training across multiple +# nodes using SLURM as the job scheduler and PyTorch's torchrun for process +# management within each node. +# +# Usage: +# sbatch run_multinode_ddp.sh +# +# Requirements: +# - SLURM cluster with GPU nodes +# - PyTorch with NCCL backend support +# - Nodes must be able to communicate over TCP (for NCCL) +# +# Adjust SBATCH directives and training arguments below to match your setup. + +#SBATCH -J zipformer-ddp +#SBATCH -o logs/zipformer_ddp_%N_%j.log +#SBATCH -p gpu # Partition name (adjust to your cluster) +#SBATCH --nodes=2 # Number of nodes +#SBATCH --ntasks-per-node=1 # 1 torchrun launcher per node +#SBATCH --gpus-per-node=8 # GPUs per node +#SBATCH -c 24 # CPU cores per task +#SBATCH --mem=0 # Use all available memory + +set -euo pipefail + +# ============================================================================ +# Environment setup +# ============================================================================ + +# Activate your conda environment (adjust path as needed) +source ~/miniconda3/etc/profile.d/conda.sh +conda activate k2-icefall + +# Set PYTHONPATH to include icefall +export PYTHONPATH=$PWD/../../..:${PYTHONPATH:-} + +# ============================================================================ +# Debugging options (optional, can be removed for production runs) +# ============================================================================ + +# Uncomment for verbose NCCL debugging +# export NCCL_DEBUG=INFO +# export TORCH_DISTRIBUTED_DEBUG=DETAIL + +# Unbuffered Python output for real-time logging +export PYTHONUNBUFFERED=1 + +# Disable InfiniBand if your cluster uses Ethernet +# (comment out if your cluster has InfiniBand support) +export NCCL_IB_DISABLE=1 + +# ============================================================================ +# Distributed training configuration +# ============================================================================ + +echo "Running on nodes: ${SLURM_JOB_NODELIST}" +HOSTS=($(scontrol show hostnames "${SLURM_JOB_NODELIST}")) +MASTER_NODE="${HOSTS[0]}" +echo "Master node is: ${MASTER_NODE}" + +# Get master node's IP address +MASTER_ADDR=$(srun -N1 -n1 -w "${MASTER_NODE}" bash -lc \ + "ip -o -4 addr show scope global | awk '{print \$4}' | cut -d/ -f1 | head -n1") + +# Use a job-unique port to avoid collisions with other jobs +MASTER_PORT=$((20000 + (SLURM_JOB_ID % 20000))) + +export MASTER_ADDR MASTER_PORT + +# Calculate world size +GPUS_PER_NODE=8 +WORLD_SIZE=$(( SLURM_NNODES * GPUS_PER_NODE )) + +echo "MASTER_ADDR=${MASTER_ADDR}" +echo "MASTER_PORT=${MASTER_PORT}" +echo "GPUS_PER_NODE=${GPUS_PER_NODE}" +echo "WORLD_SIZE=${WORLD_SIZE}" + +# Create logs directory if it doesn't exist +mkdir -p logs + +# ============================================================================ +# Training configuration - MODIFY THESE FOR YOUR EXPERIMENT +# ============================================================================ + +EXP_DIR="zipformer/exp-multinode" +BPE_MODEL="data/lang_bpe_500/bpe.model" +NUM_EPOCHS=30 +MAX_DURATION=1000 + +# For streaming model, set CAUSAL=1 +CAUSAL=0 +CHUNK_SIZE="16,32,64,-1" +LEFT_CONTEXT_FRAMES="64,128,256,-1" + +# ============================================================================ +# Launch training +# ============================================================================ + +# Launch exactly 1 torchrun process per node +# Each torchrun will spawn GPUS_PER_NODE worker processes +srun --ntasks=${SLURM_NNODES} --ntasks-per-node=1 --kill-on-bad-exit=1 --export=ALL bash -lc ' + set -euo pipefail + + # Re-activate environment in the srun context + source ~/miniconda3/etc/profile.d/conda.sh + conda activate k2-icefall + export PYTHONPATH='"$PWD"'/../../..:${PYTHONPATH:-} + + echo "Host=$(hostname) SLURM_PROCID=$SLURM_PROCID SLURM_NODEID=${SLURM_NODEID:-NA}" + + # Determine if this node should host the rendezvous server + # Only the master node (SLURM_PROCID=0) hosts the TCPStore + if [ "$SLURM_PROCID" -eq 0 ]; then + RDZV_IS_HOST=1 + else + RDZV_IS_HOST=0 + # Small delay to ensure master is ready + sleep 5 + fi + + torchrun \ + --nnodes='"$SLURM_NNODES"' \ + --node_rank="$SLURM_PROCID" \ + --nproc_per_node='"$GPUS_PER_NODE"' \ + --rdzv_id='"$SLURM_JOB_ID"' \ + --rdzv_backend=c10d \ + --rdzv_endpoint='"$MASTER_ADDR"':'"$MASTER_PORT"' \ + --rdzv_conf is_host="$RDZV_IS_HOST" \ + --max_restarts 0 \ + ./zipformer/train.py \ + --world-size '"$WORLD_SIZE"' \ + --num-epochs '"$NUM_EPOCHS"' \ + --use-fp16 1 \ + --exp-dir '"$EXP_DIR"' \ + --max-duration '"$MAX_DURATION"' \ + --causal '"$CAUSAL"' \ + --chunk-size '"$CHUNK_SIZE"' \ + --left-context-frames '"$LEFT_CONTEXT_FRAMES"' \ + --full-libri 1 \ + --bpe-model '"$BPE_MODEL"' +' + +echo "Training complete!" diff --git a/egs/librispeech/ASR/zipformer/train.py b/egs/librispeech/ASR/zipformer/train.py index e7ea652ca6..7b5b5d640d 100755 --- a/egs/librispeech/ASR/zipformer/train.py +++ b/egs/librispeech/ASR/zipformer/train.py @@ -56,6 +56,7 @@ import argparse import copy import logging +import os import warnings from pathlib import Path from shutil import copyfile @@ -1262,7 +1263,12 @@ def run(rank, world_size, args): fix_random_seed(params.seed) if world_size > 1: - setup_dist(rank, world_size, params.master_port) + setup_dist( + rank=rank, + world_size=world_size, + master_port=params.master_port, + use_ddp_launch=(os.environ.get("RANK") is not None), + ) setup_logger(f"{params.exp_dir}/log/log-train") logging.info("Training started") @@ -1274,7 +1280,9 @@ def run(rank, world_size, args): device = torch.device("cpu") if torch.cuda.is_available(): - device = torch.device("cuda", rank) + # Use LOCAL_RANK for GPU device when launched via torchrun/SLURM + local_rank = int(os.environ.get("LOCAL_RANK", rank % torch.cuda.device_count())) + device = torch.device("cuda", local_rank) logging.info(f"Device: {device}") sp = spm.SentencePieceProcessor() @@ -1338,7 +1346,7 @@ def run(rank, world_size, args): model.to(device) if world_size > 1: logging.info("Using DDP") - model = DDP(model, device_ids=[rank], find_unused_parameters=True) + model = DDP(model, device_ids=[local_rank], find_unused_parameters=True) optimizer = ScaledAdam( get_parameter_groups_with_lrs(model, lr=params.base_lr, include_names=True), @@ -1584,13 +1592,22 @@ def main(): args = parser.parse_args() args.exp_dir = Path(args.exp_dir) - world_size = args.world_size - assert world_size >= 1 - if world_size > 1: - mp.spawn(run, args=(world_size, args), nprocs=world_size, join=True) - else: - run(rank=0, world_size=1, args=args) + # Check if we are being launched by torchrun/Slurm + # These environment variables are standard for distributed launchers + env_rank = int(os.environ.get("RANK", -1)) + env_world_size = int(os.environ.get("WORLD_SIZE", -1)) + if env_rank != -1: + # Multi-node/torchrun mode: bypass mp.spawn + # We use world_size from environment, not from args + run(rank=env_rank, world_size=env_world_size, args=args) + else: + # Single-node mode: use the original mp.spawn logic + world_size = args.world_size + if world_size > 1: + mp.spawn(run, args=(world_size, args), nprocs=world_size, join=True) + else: + run(rank=0, world_size=1, args=args) torch.set_num_threads(1) torch.set_num_interop_threads(1) From 4876ea7050d2c75c58707766c418617ded2495c0 Mon Sep 17 00:00:00 2001 From: Houss3m Date: Sun, 8 Feb 2026 11:13:41 +0300 Subject: [PATCH 2/2] fix: Address review feedback from Gemini and CodeRabbit SLURM script fixes: - Fix typo in usage comment (run_multinode_ddp.sh -> slurm_multinode_ddp.sh) - Use SLURM_GPUS_PER_NODE env var instead of hardcoded value - Remove fragile sleep/rdzv_conf block, rely on torchrun's built-in sync train.py fixes: - Add torch.cuda.set_device(device) to prevent GPU 0 allocation issues - Initialize local_rank before conditional to avoid potential NameError - Override params.world_size with actual world_size for correct scheduling --- .../ASR/zipformer/slurm_multinode_ddp.sh | 17 +++-------------- egs/librispeech/ASR/zipformer/train.py | 5 +++++ 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/egs/librispeech/ASR/zipformer/slurm_multinode_ddp.sh b/egs/librispeech/ASR/zipformer/slurm_multinode_ddp.sh index 01a7d1de81..807afe424f 100755 --- a/egs/librispeech/ASR/zipformer/slurm_multinode_ddp.sh +++ b/egs/librispeech/ASR/zipformer/slurm_multinode_ddp.sh @@ -7,7 +7,7 @@ # management within each node. # # Usage: -# sbatch run_multinode_ddp.sh +# sbatch slurm_multinode_ddp.sh # # Requirements: # - SLURM cluster with GPU nodes @@ -71,8 +71,8 @@ MASTER_PORT=$((20000 + (SLURM_JOB_ID % 20000))) export MASTER_ADDR MASTER_PORT -# Calculate world size -GPUS_PER_NODE=8 +# Get GPUs per node from SLURM (set by --gpus-per-node directive) +GPUS_PER_NODE=${SLURM_GPUS_PER_NODE:-8} WORLD_SIZE=$(( SLURM_NNODES * GPUS_PER_NODE )) echo "MASTER_ADDR=${MASTER_ADDR}" @@ -113,16 +113,6 @@ srun --ntasks=${SLURM_NNODES} --ntasks-per-node=1 --kill-on-bad-exit=1 --export= echo "Host=$(hostname) SLURM_PROCID=$SLURM_PROCID SLURM_NODEID=${SLURM_NODEID:-NA}" - # Determine if this node should host the rendezvous server - # Only the master node (SLURM_PROCID=0) hosts the TCPStore - if [ "$SLURM_PROCID" -eq 0 ]; then - RDZV_IS_HOST=1 - else - RDZV_IS_HOST=0 - # Small delay to ensure master is ready - sleep 5 - fi - torchrun \ --nnodes='"$SLURM_NNODES"' \ --node_rank="$SLURM_PROCID" \ @@ -130,7 +120,6 @@ srun --ntasks=${SLURM_NNODES} --ntasks-per-node=1 --kill-on-bad-exit=1 --export= --rdzv_id='"$SLURM_JOB_ID"' \ --rdzv_backend=c10d \ --rdzv_endpoint='"$MASTER_ADDR"':'"$MASTER_PORT"' \ - --rdzv_conf is_host="$RDZV_IS_HOST" \ --max_restarts 0 \ ./zipformer/train.py \ --world-size '"$WORLD_SIZE"' \ diff --git a/egs/librispeech/ASR/zipformer/train.py b/egs/librispeech/ASR/zipformer/train.py index 7b5b5d640d..aeda43957e 100755 --- a/egs/librispeech/ASR/zipformer/train.py +++ b/egs/librispeech/ASR/zipformer/train.py @@ -1261,6 +1261,9 @@ def run(rank, world_size, args): params = get_params() params.update(vars(args)) + # Override world_size with actual value (important for torchrun launches) + params.world_size = world_size + fix_random_seed(params.seed) if world_size > 1: setup_dist( @@ -1279,10 +1282,12 @@ def run(rank, world_size, args): tb_writer = None device = torch.device("cpu") + local_rank = 0 if torch.cuda.is_available(): # Use LOCAL_RANK for GPU device when launched via torchrun/SLURM local_rank = int(os.environ.get("LOCAL_RANK", rank % torch.cuda.device_count())) device = torch.device("cuda", local_rank) + torch.cuda.set_device(device) logging.info(f"Device: {device}") sp = spm.SentencePieceProcessor()