Skip to content

Data-Loader OOM Issues #466

@russfellows

Description

@russfellows

Dataset Sharding Analysis: s3dlio + dlio_benchmark

Date: June 17, 2026
Scope: Investigation only — no code changes proposed here


Issues identified by @wolfgang-desalvador

@wolfgang-desalvador @keithpij and @zhenghh04 please review.

Problem Statement

For workloads with tens of millions of files or objects, Python OOM occurs because every rank
currently holds the entire file list in RAM. The file list is built at startup before any
training begins, and every rank builds the same global list.

Specific RAM cost (as a concrete anchor):

Dataset size Path string (~60 bytes avg) Python list overhead (~56 bytes/item)
1 M files ~60 MB ~116 MB total
10 M files ~600 MB ~1.16 GB per rank
64 M files ~3.84 GB ~7.4 GB per rank

With 8 ranks per node, 64 M files → ~60 GB RAM consumed just for file lists on a single
8-rank node, before a single byte of training data is read.


Where the Memory Lives — Both Codebases

dlio_benchmark: the problem is in main.py + config.py

Root cause: main.py lines 213-263 — every rank calls storage.walk_node() which
calls list_objects() and accumulates the full global list in fullpaths. This is then
stored in ConfigArguments.file_list_train (a ClassVar[List[str]]) — a process-global
singleton that all reader threads inherit.

main.py:walk_node() → storage.list_objects() → fullpaths (full global list, all ranks)
    ↓
ConfigArguments.file_list_train = fullpaths   ← ENTIRE list, same on every rank
    ↓
config.py:build_sample_map_iter()  ← slices the list by rank AFTER loading it all
config.py:get_global_map_index()   ← builds VirtualIndexMap from full list
    ↓
reader_handler.py:self._file_list = args.file_list_train  ← each reader gets full list

The rank-level sharding only happens after the full list is in RAM. For 10 M files,
every rank first allocates ~1 GB just to keep the list, then discards 7/8 of it.

s3dlio ParquetRowGroupDataset: same pattern at a smaller scale

In parquet_rg.rs::new() (lines 180-188):

let file_uris: Vec<String> = run_on_global_rt(async move {
    store.list(&uri_prefix_owned, true).await
})
.filter(|u| u.ends_with(".parquet"))
.collect();

The full list is collected into Vec<String> before any construction proceeds. For
10 M Parquet files this is ~600 MB just for the URI strings in Rust. Python inherits this
through file_uris_arc().


Existing Sharding Mechanisms (What Already Works)

dlio_benchmark — MPI-level sharding

config.py:build_sample_map_iter() shards at the sample level across MPI ranks after
the full list is loaded:

samples_per_proc = int(math.ceil(total_samples / self.comm_size))
start_sample_index = samples_per_proc * self.my_rank
end_sample_index = samples_per_proc * (self.my_rank + 1) - 1

This correctly distributes work but does not reduce RAM — every rank still holds the full
list.

dlio_benchmark — PyTorch worker sharding (TorchIterableDataset)

torch_data_loader.py:TorchIterableDataset.__iter__() already shards files across
PyTorch DataLoader workers:

all_files = all_files[worker_info.id::worker_info.num_workers]

This is correct intra-rank sharding but again operates on a list that was already fully
loaded.

s3dlio — PyDataset.from_uris()

python_aiml_api.rs already exposes:

ds = s3dlio.PyDataset.from_uris(uris)

This accepts a pre-built URI list, so a caller can compute its shard and pass only its
portion. This is the right hook — but the caller must first enumerate the full list to slice
it, which defeats the purpose.


Recommended Approaches (Ranked)

Option A — Prefix-Based Deterministic Sharding (Recommended First Step)

How it works: Assign each rank a slice of the key space by deterministic prefix or
last-character hash. Storage APIs accept any prefix, so this costs only 1/N of the listing
work and RAM:

Rank 0: list("s3://bucket/train/000*/") + "001*/" + ... up to 1/N of prefix space
Rank 1: list("s3://bucket/train/032*/") + ...

For uniformly distributed keys (UUIDs, hash-named files, most training sets), this splits
perfectly. For sequential names (sample_000000.parquet, sample_000001.parquet), you can
shard by prefix range:

Rank 0: list prefix "sample_0" through "sample_1"
Rank k: list prefix covering the k-th bucket of the key range

What needs to change:

  1. In s3dlio list() (Rust): add start_after: Option<String> and end_before: Option<String> parameters. AWS S3 list_objects_v2 supports StartAfter natively. Azure Blob and GCS have equivalent parameters. This is a small, safe, additive change.
  2. In dlio_benchmark main.py: compute the rank's prefix range, list only that range, never call walk_node() for the full prefix.

Tradeoffs:

  • ✅ RAM: O(N/ranks) instead of O(N) — 8 ranks → 8× less RAM
  • ✅ Listing latency: 8 parallel listings of 1/8 each → same wall-clock time
  • ✅ No MPI barrier needed for listing — ranks list in parallel
  • ⚠ Requires approximately uniform key distribution. Skewed datasets need a sampling step.
  • ❌ Does not support global shuffle across all N files between epochs (see Option D).

Option B — Manifest File / Pre-Built Shard Files

Generate one manifest per rank at data-generation time (or as a preprocessing step):

s3://bucket/manifests/train_rank0.txt   # 1/8 of file list, one URI per line
s3://bucket/manifests/train_rank1.txt
...

Each rank reads only its own manifest (tiny text file, ~60 MB for 1 M entries). This is
how large-scale PyTorch training pipelines (WebDataset, NVIDIA DALI) handle this at the
1 B+ file scale.

What needs to change:

  1. dlio_benchmark: add a manifest_file config option; replace walk_node() with
    manifest read when set.
  2. Optionally: add a dlio_benchmark generate_manifest sub-command.
  3. s3dlio: no changes needed — PyDataset.from_uris() already accepts pre-built lists.

Tradeoffs:

  • ✅ Zero listing overhead at training time
  • ✅ Arbitrary shard sizes — manifests can be unequal for heterogeneous clusters
  • ✅ Manifests can be pre-shuffled per epoch
  • ⚠ Requires a manifest generation step before training (one-time cost or re-run for new shuffles)
  • ❌ Re-shuffling between epochs requires generating new manifests or re-sorting at epoch start

Option C — Lazy Streaming Listing (Avoids RAM, Not Sharding)

Instead of list() → Vec<String>, stream listing results through an iterator that yields
URIs on demand. Each worker pulls the next batch of URIs as it finishes previous work. This
eliminates the RAM spike but does not reduce total listing volume:

s3dlio: list_objects_stream() already exists in s3_utils.rs (line 760). It is a
Tokio async stream that pages through listing results incrementally. The Python API does
not currently expose a streaming listing interface.

dlio_benchmark ParquetRowGroupDataset: the build_extents() call in parquet_rg.rs
already streams concurrent footer fetches — but it first collects the full URI list.

What needs to change:

  1. s3dlio: expose list_stream(uri_prefix) → Iterator[str] in the Python API. This is a
    wrapper around the existing list_objects_stream() Tokio stream, bridged to Python via
    blocking_recv().
  2. ParquetRowGroupDataset::new(): accept either uri_prefix: &str (current) or
    file_uris_iter: impl Iterator<Item=String> so callers can provide a streaming source.
  3. dlio_benchmark: replace walk_node() with s3dlio.list_stream().

Tradeoffs:

  • ✅ O(1) RAM for listing — URIs never all in RAM simultaneously
  • ✅ Works with any key distribution, no pre-processing needed
  • ✅ Compatible with existing shuffle (shuffle the stream in a sliding window)
  • ⚠ Inter-rank coordination: all ranks stream the same prefix, doing 8× the listing work
  • ⚠ Shuffling a streaming list requires reservoir sampling or a partial buffer

Option D — Coordinated Distributed Listing (Scalable + Shuffle-Safe)

One rank (rank 0) lists the full dataset, then broadcasts a shuffled permutation array
using only indices (not strings) to all other ranks. Each rank then lists its own shard
by index from the permutation.

This is how dlio_benchmark already works for sample-level shuffle
(config.py:get_global_map_index()), but VirtualIndexMap still requires the full
string list to exist somewhere.

Improved version:

  1. Rank 0 only lists once and stores key hash digests (8 bytes each, not 60+ byte strings).
  2. Rank 0 broadcasts a shuffled permutation of indices to all ranks.
  3. Each rank reconstructs only its shard of string URIs (fetching names by range from storage).

At 64 M files: index array = 512 MB (int64) on rank 0 vs 4 GB string list. After broadcast,
each of 8 ranks holds only 64 MB of indices, then fetches its ~8 M file names.

What needs to change:

  1. s3dlio: expose a list_keys_hashed(prefix) → List[u64] or list_count(prefix) → int
    plus list_range(prefix, start_after, limit) → List[str] API.
  2. dlio_benchmark: replace walk_node() with MPI-coordinated version above.
  3. Requires MPI barrier at listing time (rank 0 lists first, then broadcasts).

Tradeoffs:

  • ✅ Full global shuffle preserved
  • ✅ Scales to 100 M+ files
  • ⚠ More complex; needs MPI coordination
  • ⚠ Rank 0 still holds the full index array (just 8 bytes/file vs 60+ bytes/file)

Shuffle Between Epochs — Analysis

This is the hardest constraint. Current dlio_benchmark shuffle (config.py:reconfigure()):

np.random.shuffle(self.file_list_train)   # in-place shuffle of full list

This requires the full list in RAM on every rank. Options:

Approach Epoch shuffle support RAM after listing
A (prefix shard) ❌ No global shuffle (only local shard shuffle) O(N/ranks)
B (manifest) ✅ Pre-generate shuffled manifests per epoch O(N/ranks)
C (streaming) ⚠ Reservoir sample shuffle in sliding window O(window_size)
D (distributed) ✅ Full global shuffle via index broadcast O(N×8B / ranks)

Practical recommendation for shuffle: For true global shuffle at scale, pre-generate
per-epoch manifests (Option B). The manifest for epoch k is the epoch k-1 manifest
reshuffled deterministically (seeded by epoch number). A manifest of 10 M entries is only
~600 MB of compressed text — trivial to store in object storage.


Concrete Next Steps (Prioritized)

Immediate (dlio_benchmark only, no s3dlio changes needed)

  1. Add manifest_file config option to main.py — when set, skip walk_node() and
    read the rank's manifest file instead. This requires ~20 lines of code change in
    main.py and config.py. Zero risk to existing functionality (opt-in config).

  2. Add dlio generate_manifest sub-command — lists the full dataset once (can be run
    from rank 0 only), shards into N files, optionally shuffles, writes to storage.

Short term (s3dlio changes, small)

  1. Expose streaming listing in Python API — wrap existing list_objects_stream()
    as s3dlio.list_stream(uri, opts) → iterator. This is ~50 lines of Rust + PyO3 bridging
    the existing async stream to a Python iterator via blocking_recv().

  2. Add start_after / max_keys to store.list() — the AWS SDK StartAfter field
    is already threaded through the list_objects_v2 builder in s3_utils.rs. Exposing it
    in the ObjectStore trait and Python API enables prefix-range sharding.

Medium term

  1. ParquetRowGroupDataset accept pre-built URI list — add a second constructor:
    ParquetRowGroupDataset::from_uris(uris: Vec<String>, ...). This allows the caller to
    shard however it wants (manifest, prefix range, MPI shard) and pass only the shard's
    URIs to the DataLoader. The DataLoader then never sees the full list.

  2. s3dlio.PyDataset.from_uris() + streaming listing in dlio — use the already-existing
    from_uris() API with a streaming listing call to drip URIs in batches, capping peak RAM
    at batch_size × avg_uri_length.


Summary Table

Option dlio change complexity s3dlio change complexity RAM savings Shuffle support Recommended?
A: Prefix shard Medium Small O(N/ranks) Local only ✅ Good starting point
B: Manifest files Small None O(N/ranks) Full (pre-computed) Best for production
C: Streaming list Medium Small O(1) Reservoir only ✅ Good for listing RAM
D: Distributed index Large Medium O(N×8B/ranks) Full For 100 M+ at scale

Recommended path: Implement Option B (manifests) first — it solves the OOM problem
immediately with minimal code risk and works with every existing reader and data loader. Then
implement Option C (streaming listing) in s3dlio to eliminate listing RAM entirely for new
code paths. Options A and D are best saved for when global shuffle at the 100 M+ file scale
becomes a requirement.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions