diff --git a/.claude/skills/migrate-to-resumable-dataloader/SKILL.md b/.claude/skills/migrate-to-resumable-dataloader/SKILL.md new file mode 100644 index 000000000000..0135f76ea036 --- /dev/null +++ b/.claude/skills/migrate-to-resumable-dataloader/SKILL.md @@ -0,0 +1,166 @@ +--- +name: migrate-to-resumable-dataloader +description: This skill should be used when the user asks to "migrate to the resumable dataloader", "switch to indexed Lhotse", "adopt the indexed + resumable pipeline", "make my training resumable", "set up StatefulDataLoader for NeMo/Lhotse", "use AIStore GetBatch", or "convert this YAML to the resumable path". Walks a NeMo training YAML and optional launcher, data blend, and runtime context through the indexed + resumable Lhotse migration; lints interacting fields; auto-patches safe YAML changes; emits a migration report, pre-flight checklist, and index-build command. Static analysis only; never launches training. +argument-hint: ' [launcher.py] [blend.yaml] [runtime-notes]' +--- + +# Migrate a NeMo training YAML to indexed + resumable Lhotse + +Use this skill to port a NeMo training config from streaming/replay-style Lhotse +loading to indexed access plus `torchdata.StatefulDataLoader` checkpoint/restore. +The migration is fragile because YAML flags, launcher seed policy, index paths, +storage backend, and resume topology all interact. + +## Core concepts + +- Indexed sources need `.idx` sidecars for random access into JSONL, tar, and + supported Shar-style data. Build these once per blend/source set. +- `use_stateful_dataloader: true` lets Lightning checkpoint the dataloader + iterator state, but only if seeds, worker counts, and distributed topology are + stable across chunks. +- Training configs must use `force_map_dataset: false` so indexed sources + partition across data-parallel ranks and workers without map-style sampler + overhead. Treat `force_map_dataset: true` for training as not launch-ready + unless the user explicitly approves a temporary exception; every source in the + training iteration graph must be indexed and partition-compatible before + launch. +- Remote audio on AIStore/S3 generally needs `USE_AIS_GET_BATCH=true` so audio + fetches are deferred to sample time instead of constructing eager tar readers + for every shard. + +## Inputs + +| input | required | source | purpose | +|---|---|---|---| +| Training YAML | yes | argument or `--config=` | Inspect `data.train_ds`, `data.validation_ds`, `trainer`, `exp_manager`, and any model fields that affect resume. | +| Launcher script | no | argument or auto-detect from project conventions | Check per-chunk seed policy, resume topology invariance, Python path setup, AIStore env vars, and optional index staging. | +| Data-blend YAML | no | resolved from `data.train_ds.input_cfg` when possible | Check indexability: compressed paths, non-seekable paths, unsupported `extra_fields`, `slice_length`, and mixed indexed/non-indexed chains. | +| Runtime context | no | argument, config file, or user-provided notes | Detect storage backend, AIStore endpoint availability, container constraints, and index mirror destination. | + +## Outputs + +Every output lands in `migrate-resumable//` in the current repo: + +| output | purpose | +|---|---| +| `migration-report.md` | Findings, rationale, patched fields, and unresolved blockers. | +| `-resumable.yaml` | Patched training config when safe automatic edits are possible. | +| `-resumable.yaml` | Patched blend, only when a blend was inspected and safe changes are possible. | +| `pre-flight-checklist.md` | User-run steps before submitting training. | +| `build-indexes-cmd.sh` | One-shot index-build command using the project wrapper when available, otherwise the generic NeMo/Lhotse index builder. | + +## Workflow + +### 1. Discover and parse inputs + +1. Resolve the training YAML path and read it with OmegaConf or a + comment-preserving YAML parser. +2. Resolve any referenced blend YAMLs from `data.*.input_cfg`. Prefer project + conventions when obvious, but fall back to paths relative to the config. +3. If a launcher path is supplied, read it. Otherwise inspect likely project + launchers (`train.py`, `pretrain.py`, shell wrappers, or raw `torchrun` / + `python` commands) and pick the closest match. +4. If runtime context is supplied, read it for container image, environment + variables, filesystem mounts, worker counts, and AIStore endpoint settings. +5. Detect remote storage from source paths (`s3://`, `ais://`, `http(s)://`) and + local filesystem storage from ordinary absolute or relative paths. + +### 2. Run lint pipeline + +Run every relevant check in: + +- `references/option-reference.md` +- `references/conflict-matrix.md` +- `references/failure-modes.md` +- `references/aistore-vs-non-aistore.md` when remote storage is present + +Each finding should include severity, field/path, current value, recommended +value, and a short rationale. + +Severities: + +- **fatal**: automatic patching is not possible; user must preprocess data or + change the source layout. +- **error**: automatic patching is safe and should be applied. +- **warning**: context-dependent; emit a report item and optional YAML comment. +- **note**: informational; no patch. + +### 3. Emit patched YAML and blend + +Apply safe `error`-severity patches. Preserve comments when possible with +`ruamel.yaml`; otherwise serialize with OmegaConf/YAML and rely on the report for +rationale. For blend edits, never silently drop data: leave an explicit report +entry and comment for every excluded or rewritten source. + +### 4. Generate `migration-report.md` + +Use `templates/migration-report.md`. Include: + +1. Summary of storage workflow, counts by severity, and readiness. +2. Inputs inspected. +3. Findings table. +4. Walkthrough for train data, validation data, trainer/exp manager, launcher, + and storage backend. +5. Data-blend audit. +6. Verification and pre-flight steps. + +### 5. Generate `pre-flight-checklist.md` + +Use `templates/pre-flight-checklist.md` when present. Required steps: + +- Build `.idx` sidecars for every training/validation/test blend involved. +- Verify `indexes_root` points at the same stable mirror used by the runtime, or + that explicit node-local index staging populates it before training starts. +- If AIStore is in play: verify `aistore` SDK availability, `AIS_ENDPOINT`, and + whether `USE_AIS_GET_BATCH` or `USE_AIS_INDIVIDUAL_GETS` is required. +- Verify one invariant seed across resumable chunks. +- Verify `num_workers`, `world_size`, and relevant distributed topology do not + change across resume boundaries. +- Recommend a small smoke ladder: single-node single chunk, single-node resume, + then full topology. + +### 6. Generate `build-indexes-cmd.sh` + +Prefer a project-provided wrapper when one is clearly present. Otherwise emit a +generic command using: + +```bash +python /scripts/dataloading/build_indexes.py \ + --indexes-root \ + --workers \ + .yaml [.yaml ...] +``` + +If running through a managed runtime or container wrapper, include comments for required +container image, mounts, environment variables, worker count, and any CPU/GPU +container-hook workaround the project requires. + +### 7. Print final summary to chat + +Keep the final chat response under 10 lines: output directory, finding counts, +report path, and the next command the user should run. + +## Knowledge base + +- `references/option-reference.md`: field-by-field reference for YAML and + launcher settings. +- `references/failure-modes.md`: known failure signatures, triggers, and fixes. +- `references/conflict-matrix.md`: incompatible option pairs. +- `references/best-practices.md`: priority-ordered checklist. +- `references/aistore-vs-non-aistore.md`: storage workflow selection. +- `templates/migration-report.md`: report template. +- `templates/pre-flight-checklist.md`: checklist template, when present. +- `scripts/analyze.py`: optional static-analysis helper, when present. + +## Constraints + +- Prefer static analysis. Do not launch training, build indexes, prefetch data, or + modify external runtime state unless the user explicitly asks. +- Cross-check recommendations against the actual NeMo/Lhotse code in the user's + checkout when paths are available. Relevant areas are common Lhotse dataloader + config, indexed adapters, `lhotse.indexing`, AIStore batch loading, and NeMo + dataloader construction. +- Treat project wrappers as optional conveniences, not as part of the generic + migration contract. +- When evidence is missing, say so. Do not encode project-specific run history + or local experiment names as general guidance. diff --git a/.claude/skills/migrate-to-resumable-dataloader/references/aistore-vs-non-aistore.md b/.claude/skills/migrate-to-resumable-dataloader/references/aistore-vs-non-aistore.md new file mode 100644 index 000000000000..54314e7364ee --- /dev/null +++ b/.claude/skills/migrate-to-resumable-dataloader/references/aistore-vs-non-aistore.md @@ -0,0 +1,79 @@ +# AIStore vs filesystem workflows + +Indexed + resumable Lhotse can read audio/tar sources from a local filesystem or +from AIStore-compatible URLs. Manifests/cuts may be on disk in either workflow. +Choose the workflow from source path schemes, not from where the process runs. + +## Detection + +| signal | workflow | +|---|---| +| `tarred_audio_filepaths: s3://...`, `ais://...`, or `http(s)://...` | AIStore/remote workflow | +| `tarred_audio_filepaths: /path/...` or relative filesystem path | filesystem workflow | +| mixed local and remote paths | remote workflow, because it has the stricter requirements | + +`AIS_ENDPOINT` in the environment is necessary for AIStore access, but it is not +sufficient evidence that the blend uses AIStore. + +## Remote AIStore workflow + +Required setup: + +- `aistore` SDK installed in the build/training container. +- `AIS_ENDPOINT` exported into the process that reads remote sources. +- `USE_AIS_GET_BATCH=true` when remote tar/audio should be fetched lazily by + minibatch instead of opening every shard eagerly. + +Optional setup: + +- `USE_AIS_INDIVIDUAL_GETS=true` to bypass the batch endpoint and fetch each + object individually. This is slower but useful when the batch endpoint is + unavailable or returns empty content for some objects. + +Index building: + +- The index builder reads remote tar files through AIStore byte-range capable + paths and writes `.idx` sidecars to the configured index mirror. +- A successful index build proves byte-range access worked for the indexed + source paths. It does not prove the batch endpoint will later serve every + object successfully. + +Runtime data access: + +1. Keep manifests/cuts on a local/shared filesystem when random access would be + inefficient from remote storage. +2. Point `data.*.indexes_root` at a persistent index mirror by default. +3. Use node-local index staging only when direct mirror reads are too slow or + metadata-heavy; make the YAML path match the staged destination. +4. Use manifest prefetch only as a fallback for remote manifest paths that + cannot be cached persistently. + +## Filesystem-only workflow + +Required setup: + +- All audio/tar paths resolve through the local filesystem visible in the + container/process. +- AIStore env vars are unset or ignored when no remote paths are present. +- `USE_AIS_GET_BATCH=false` unless a mixed remote source requires it. + +Index building: + +- The index builder reads local files directly. +- Filesystem throughput and metadata behavior determine the best worker count. + +Runtime data access: + +1. Keep manifests/cuts on a local/shared filesystem. +2. Point `data.*.indexes_root` at a persistent index mirror. +3. Stage indexes to node-local SSD only when needed and only with matching YAML + paths. + +## Common gotchas + +- Do not infer workflow from runtime labels alone; inspect the source paths. +- Verify filesystem mounts inside the runtime/container, not only in the host shell. +- Reusing an index mirror requires identical source path strings and unchanged + source contents. +- AIStore individual GETs and batch GETs can exercise different backend paths; + test the exact access mode used by training. diff --git a/.claude/skills/migrate-to-resumable-dataloader/references/best-practices.md b/.claude/skills/migrate-to-resumable-dataloader/references/best-practices.md new file mode 100644 index 000000000000..6c206390080b --- /dev/null +++ b/.claude/skills/migrate-to-resumable-dataloader/references/best-practices.md @@ -0,0 +1,79 @@ +# Best practices - indexed + resumable Lhotse migration + +Prioritized checklist for migrating a NeMo config to indexed access and +checkpointable dataloading. + +## Tier 1 - non-negotiable + +1. **Pin `seed` and `shard_seed` to fixed integers.** The sampler and model RNG + must resume from a stable state. Avoid `"randomized"` for resumable chains. + +2. **Use one seed across every chunk of a resumable chain.** Lightning reseeds + global RNGs at chunk startup. Rotating the seed breaks bit-exact resume even + when dataloader state restores correctly. + +3. **Keep `num_workers` and distributed topology invariant.** Changing worker + count, world size, or rank/worker assignment invalidates stateful dataloader + snapshots and iterable partition state. + +4. **Build `.idx` sidecars once per stable source path set.** Reuse a persistent + index mirror across experiments. Rebuild only when source contents or path + strings change. + +5. **Disable concurrent bucketing for resumable training.** Background producer + threads can advance iterators outside the checkpointed main-thread state. + +## Tier 2 - strongly recommended + +6. **Run a bit-exact dataloader resume check before sweeping.** Take a few + batches, save dataloader state, take a few more as ground truth, restore in a + fresh process, and compare the restored batches. + +7. **Enforce `force_map_dataset: false` for training.** Map-style training has + too much sampler/manifest overhead. Before launch, confirm every training + source is indexed, multiplexer seeds are fixed, and topology is stable; if a + source cannot be indexed, report it as a migration blocker instead of + silently keeping map-style training. + +8. **Use frequent checkpoint triggers.** External termination may not execute a + graceful preemption callback. Step- or time-based saves reduce lost progress. + +9. **Smoke test in stages.** Run single-node single-chunk, then single-node + multi-chunk resume, then the intended full topology. + +10. **Keep `.idx` files on a persistent filesystem by default.** Stage to + node-local SSD only when direct filesystem reads are proven problematic, and + ensure the YAML `indexes_root` matches the staged destination. + +11. **Use AIStore batch fetching deliberately.** For remote tar/audio sources, + `USE_AIS_GET_BATCH=true` avoids eager remote tar-reader construction. If the + batch endpoint fails for a dataset, use `USE_AIS_INDIVIDUAL_GETS=true` as a + slower fallback while investigating storage availability. + +## Tier 3 - operational hygiene + +12. **Tune index-build workers to memory and storage backend.** Many workers can + OOM on large manifests or remote tar headers. Reduce workers or split the + blend when needed. + +13. **Keep optional prefetch steps explicit.** Manifest prefetch, index staging, + and model-cache preambles should be visible in the launcher and documented in + the report. + +14. **Use CPU-safe container settings for CPU-only index builds.** Some container + runtimes expect GPU hooks by default; bypass or disable them when the index + build runs without GPU access. + +## What not to do + +- Do not trust `meta.pt` key presence alone as proof of bit-exact resume. +- Do not combine incompatible Lightning checkpoint triggers. +- Do not point `indexes_root` at a node-local path unless the launcher populates + it before every chunk. +- Do not launch iterable training until every source in the chain has been + audited and made partition-compatible. +- Do not use map-style training to bypass indexing blockers; mark the migration + not launch-ready unless the user explicitly approves a temporary exception + with the blocker and expected overhead. +- Do not set `LHOTSE_USE_WORKER_PARTITION` manually; it is an internal signal set + by the dataloader worker initialization path. diff --git a/.claude/skills/migrate-to-resumable-dataloader/references/conflict-matrix.md b/.claude/skills/migrate-to-resumable-dataloader/references/conflict-matrix.md new file mode 100644 index 000000000000..f117201a0e16 --- /dev/null +++ b/.claude/skills/migrate-to-resumable-dataloader/references/conflict-matrix.md @@ -0,0 +1,31 @@ +# Conflict matrix - indexed + resumable Lhotse + +Table format: `A | B | conflict | severity | resolution`. + +Severities: + +- **fatal**: automatic patching is impossible; data must be preprocessed or the + launcher/storage setup must change. +- **error**: automatic patching is usually safe. +- **warning**: context-dependent; report clearly. +- **note**: informational. + +| A | B | conflict | severity | resolution | +|---|---|---|---|---| +| `data.train_ds.indexed: true` | `extra_fields:` on indexed NeMo entries | Indexed adapters cannot preserve arbitrary runtime field rewrites. | fatal | Preprocess the manifest to materialize fields, then drop `extra_fields`. | +| `data.train_ds.indexed: true` | `slice_length:` on indexed entries | Slicing changes cut/audio access and has no stable sidecar unless preprocessed. | fatal | Re-shard or preprocess offline, then drop `slice_length`. | +| `data.train_ds.indexed: true` | compressed JSONL/Shar cuts or compressed tar paths | Compressed streams do not provide stable seekable offsets for sidecars. | fatal | Re-export uncompressed or materialize seekable sources. | +| `data.train_ds.indexed: true` | `pipe:` paths | Pipes are not seekable. | fatal | Materialize upstream data to files or a seekable backend. | +| `data.train_ds.force_map_dataset: true` | resumable training launch | Map-style training keeps too much sampler/manifest work on the main process. | error | Set `data.train_ds.force_map_dataset: false` after making every training source indexed and partition-compatible. | +| `force_map_dataset: true` | `force_iterable_dataset: true` | Dataset mode selection is contradictory. | error | Keep one mode. For training, use `force_map_dataset: false`; for validation/test, keep map-style unless intentionally testing iterable behavior. | +| `use_stateful_dataloader: true` | per-chunk seed rotation | Model-level RNG diverges across resumed chunks. | error | Pin one seed for the whole chain in YAML and launcher. | +| `use_stateful_dataloader: true` | `num_workers` changes between chunks | Saved dataloader state is incompatible. | error | Keep worker count invariant or restart without dataloader state. | +| `use_stateful_dataloader: true` | `world_size` / rank topology changes | Saved iterator and sampler state are topology-sensitive. | error | Keep topology invariant or restart without dataloader state. | +| `force_map_dataset: false` | any non-indexed source in the chain | Non-indexed sources do not partition and are duplicated across ranks/workers. | fatal | Convert all sources to indexed access or split/remove the non-indexed source. Do not switch to map-style training to bypass this unless the user explicitly approves a temporary exception. | +| `force_map_dataset: false` | multiplexer seed is `"randomized"` | Shards may choose different sources at the same step. | error | Use a fixed integer seed. | +| `force_finite: true` | training dataset | Can cap infinite training mixtures unexpectedly. | error | Use finite mode for validation/test only unless intentionally bounded. | +| Checkpoint cadence absent | external preemption / walltime kill | Chunk progress can be lost without mid-chunk saves. | warning | Add frequent step- or time-based checkpoints. | +| Node-local `indexes_root` | no prefetch/staging before startup | `.idx` files are missing at runtime. | error | Point to a persistent mirror or stage indexes before every chunk. | +| AIStore batch mode | objects unavailable through batch endpoint | Batch loader may return empty content or fail collation. | warning | Verify object availability, replicate data, or set `USE_AIS_INDIVIDUAL_GETS=true`. | +| Container lacks AIStore SDK | AIStore source paths | Remote reads may fall back to the wrong backend or fail. | error | Install a compatible `aistore` SDK in build/training containers. | +| CPU-only index build | GPU container hook requires GPU runtime | Container startup can fail before index build begins. | warning | Use CPU-safe container settings or bypass GPU hooks. | diff --git a/.claude/skills/migrate-to-resumable-dataloader/references/failure-modes.md b/.claude/skills/migrate-to-resumable-dataloader/references/failure-modes.md new file mode 100644 index 000000000000..8927a805cce8 --- /dev/null +++ b/.claude/skills/migrate-to-resumable-dataloader/references/failure-modes.md @@ -0,0 +1,278 @@ +# Failure-mode catalog + +Failure signatures, triggers, and fixes for indexed + resumable Lhotse +migrations. These are generic patterns; verify exact file names and line numbers +against the user's checkout before citing them in a report. + +## §1 - Compressed JSONL, Shar cuts, or tar paths + +**Signature**: index build raises a `ValueError` saying the source requires +uncompressed JSONL or tar data but received a compressed path such as +`*.jsonl.gz` or `*.tar.gz`. + +**Trigger**: an indexed source points at compressed cuts, manifests, or tar +files. Sidecar offsets require stable byte positions in seekable files. + +**Fix**: re-export or materialize the source in an uncompressed seekable format. +For Shar-style data, export cuts as plain JSONL when sidecar indexing is needed. + +## §2 - `extra_fields` or `slice_length` on indexed NeMo entries + +**Signature**: an indexed NeMo iterator raises that `extra_fields` is not +supported, or data order diverges after slicing. + +**Trigger**: the source applies runtime field injection or slicing while also +requesting indexed access. + +**Fix**: preprocess the manifest offline so the indexed source already contains +all required fields and shard/slice layout. Drop `extra_fields` and +`slice_length` from the indexed YAML entry. + +## §3 - Remote object reader is not seekable + +**Signature**: `io.UnsupportedOperation: seek` or `tell` on first read of a +remote URL source. + +**Trigger**: the code path uses a backend reader that does not implement the +seek/tell operations required by indexing. + +**Fix**: ensure the remote-storage SDK is installed and that Lhotse routes the +path through the intended seekable/range-capable backend. For AIStore, verify +`aistore` is installed and `AIS_ENDPOINT` is set. + +## §4 - Stdlib filesystem operations on URLs + +**Signature**: `FileNotFoundError` from `open("s3://...")` or +`os.path.getsize("s3://...")`. + +**Trigger**: a URL path reaches code that assumes local filesystem semantics. + +**Fix**: route URL paths through the storage-aware reader and load index metadata +from the `.idx` file rather than local `os.path` calls. + +## §5 - Too many memory maps for large shard counts + +**Signature**: `OSError: [Errno 12] Cannot allocate memory` or system +`vm.max_map_count` exhaustion during startup. + +**Trigger**: one memory map per `.idx` file across a very large number of shards. + +**Fix**: load sidecars into resident arrays or otherwise reduce mmap count. The +sidecars are usually small enough that resident arrays are acceptable. + +## §6 - Line-delimited JSON with `.json` extension rejected + +**Signature**: index validation rejects a line-delimited JSON manifest with a +`.json` suffix. + +**Trigger**: extension filtering assumes only `.jsonl` is valid, while some NeMo +manifests use `.json` for one-record-per-line JSON. + +**Fix**: accept both `.jsonl` and line-delimited `.json` when the contents are +newline-separated records. + +## §7 - Process pool OOM during index build + +**Signature**: `concurrent.futures.process.BrokenProcessPool` after partial +index-build progress. + +**Trigger**: too many workers parse large manifests or tar headers concurrently, +exceeding available process memory. + +**Fix**: reduce worker count, split the blend/source list across multiple index +runs, or increase available memory. + +## §8 - GPU container hook runs during CPU-only index build + +**Signature**: container startup fails before Python runs, often around a GPU +runtime hook such as `nvidia-container-cli`. + +**Trigger**: a CPU-only index build uses a container/runtime setup that assumes +GPU devices are present. + +**Fix**: use CPU-safe container settings for index builds, or bypass/disable GPU +hooks when the runtime has no GPU access. + +## §9 - AIStore SDK response shape changed + +**Signature**: an AttributeError on fields returned by the AIStore batch API, +often in an error or empty-content path. + +**Trigger**: code assumes one SDK response schema while the installed SDK returns +another. + +**Fix**: normalize SDK response attributes at the boundary and use that helper at +all consumer sites. Avoid raw direct field access in error-handling code. + +## §10 - `shard_seed: "randomized"` with stateful dataloading + +**Signature**: usually silent. Resume is not bit-exact even though the dataloader +snapshot appears to restore. + +**Trigger**: randomized shard/sampler seed is re-derived at chunk startup while +stateful sampler data is loaded from checkpoint. + +**Fix**: pin `shard_seed` to a fixed integer, typically matching the top-level +training seed. + +## §11 - Per-chunk seed rotation in launcher + +**Signature**: silent model-level divergence across chunk boundaries. Data-order +state may restore, but dropout, augmentation, and other model/global RNG draws do +not match a continuous run. + +**Trigger**: the launcher chooses a different seed for each resumable chunk. + +**Fix**: use one invariant seed for the entire resumable chain. If the launcher +computes seeds from run index, override that behavior for indexed + stateful +runs. + +## §12 - No mid-chunk checkpoint trigger + +**Signature**: only epoch-boundary checkpoints exist; progress after the last +boundary is lost when a chunk is preempted or reaches walltime. + +**Trigger**: checkpoint config relies only on long epoch boundaries or sparse +validation events. + +**Fix**: add an appropriate step-based or time-based checkpoint trigger and keep +resume-required checkpoints from being pruned prematurely. + +## §13 - Internal time guard does not catch external termination + +**Signature**: the runtime sends SIGTERM/SIGKILL and no final checkpoint is +written. + +**Trigger**: external cancellation, node failure, preemption, or walltime signal +bypasses the framework's graceful preemption callback. + +**Fix**: leave a walltime buffer for graceful stops and rely on frequent +mid-chunk checkpoints as the primary mitigation. + +## §14 - Worker or world-size mismatch on resume + +**Signature**: `StatefulDataLoader` or indexed iterator state raises a mismatch +error during `load_state_dict`, or restored data order is invalid. + +**Trigger**: chunk restores with different `num_workers`, world size, or +rank/worker topology than the chunk that saved the checkpoint. + +**Fix**: keep topology invariant for a resumable chain. To change topology, +restart from model weights without restoring dataloader state. + +## §15 - AIStore batch endpoint returns empty content + +**Signature**: batch collation receives empty content for one or more requested +objects, often followed by a downstream `NoneType` or collation error. + +**Trigger**: object is not available through the batch endpoint, credentials are +wrong, or batch and individual-object paths exercise different backend state. + +**Fix**: verify object availability through the exact access mode used by +training. As a workaround, set `USE_AIS_INDIVIDUAL_GETS=true` and investigate +backend replication/permission issues separately. + +## §16 - `indexes_root` points at missing node-local storage + +**Signature**: `FileNotFoundError` or `.idx file not found` from an indexed +reader at startup. + +**Trigger**: YAML points at a node-local path such as `/tmp/idx`, but the launcher +does not stage sidecars there before every chunk; or the staging destination does +not match YAML. + +**Fix**: use a persistent shared mirror by default. If staging to node-local SSD, +ensure the preamble runs before training in every chunk and the YAML path matches +that destination exactly. + +## §17 - Concurrent bucketing breaks bit-exact resume + +**Signature**: silent data-order divergence across resume boundaries. + +**Trigger**: a background bucketing producer advances the source iterator outside +the checkpointed main-thread state. + +**Fix**: set `concurrent_bucketing: false` for resumable training so only the +checkpointed path advances the iterator. + +## §18 - Iterable mode partitions when partition signal is missing or wrong + +**Signature**: silent under-sampling or over-partitioning under distributed +environment variables. + +**Trigger**: indexed iterators read rank/world environment directly instead of +using a dataloader-worker partition signal. + +**Fix**: ensure partitioning is activated only by the intended worker init path. +Map-style mode should see the trivial `(0, 1)` partition. + +## §19 - Iterable mode with non-indexed source in the chain + +**Signature**: non-indexed sources appear on every rank/worker while indexed +sources are partitioned. + +**Trigger**: `force_map_dataset: false` with a chain that mixes indexed and +non-indexed iterators. + +**Fix**: convert every source in the iterable chain to indexed access, or split +or remove the non-indexed sources before launching training. Do not switch to +map-style training to bypass this unless the user explicitly approves a +temporary exception with the expected overhead. + +## §20 - Iterable mode with randomized multiplexer seed + +**Signature**: loud `ValueError` from the multiplexer, or silent source-weight +drift if no guard exists. + +**Trigger**: each shard draws a different multiplexer RNG state and chooses a +different source at the same logical step. + +**Fix**: pin multiplexer seed, usually through the top-level `shard_seed`. + +## §21 - Iterable resume topology mismatch + +**Signature**: indexed range or chain state reports `shard_id` / `num_shards` / +`world_size` mismatch on restore. + +**Trigger**: a checkpoint saved under one distributed-worker topology is restored +under another. + +**Fix**: keep `(world_size, num_workers)` invariant. To scale differently, +restart without dataloader state. + +## §22 - Training left in map-style mode + +**Signature**: long startup or step-time overhead from repeated sampler/manifest +work, especially at larger world sizes. + +**Trigger**: migrated training YAML keeps `data.train_ds.force_map_dataset: true` +instead of enforcing iterable partitioning. + +**Fix**: set `data.train_ds.force_map_dataset: false` and make every source in +the training iteration graph indexed and partition-compatible. If a source cannot +yet be indexed, mark the migration not launch-ready unless the user explicitly +approves a temporary map-style exception with the specific blocker and expected +overhead. + +## §23 - Build/prefetch tool imports stock Lhotse/NeMo + +**Signature**: `ModuleNotFoundError`, missing `lhotse.indexing`, or import errors +for indexed/resumable symbols. + +**Trigger**: build-index or prefetch command does not place the modified NeMo and +Lhotse checkouts before stock packages on `PYTHONPATH`. + +**Fix**: set `PYTHONPATH` or install the correct packages so helper scripts and +training use the same indexed/resumable implementation. + +## §23 - Distributed backend errors hide an earlier Python exception + +**Signature**: NCCL/watchdog/collective timeout or launcher-level distributed +failure appears after one rank already logged a Python traceback. + +**Trigger**: one rank fails during data loading or collation; other ranks block +in distributed work until the backend times out. + +**Fix**: inspect logs before the distributed timeout and identify the first +Python exception. Treat later backend chatter as a cascade unless it is the first +error in time. diff --git a/.claude/skills/migrate-to-resumable-dataloader/references/option-reference.md b/.claude/skills/migrate-to-resumable-dataloader/references/option-reference.md new file mode 100644 index 000000000000..aee3903a2b02 --- /dev/null +++ b/.claude/skills/migrate-to-resumable-dataloader/references/option-reference.md @@ -0,0 +1,92 @@ +# Option reference - indexed + resumable Lhotse migration + +Field-by-field reference for YAML and launcher settings that interact with +indexed access, `StatefulDataLoader`, distributed topology, and storage backend. +Line numbers in local code may drift; verify against the checkout in front of +you when producing a report. + +## `data.train_ds` + +| field | required value | purpose | see also | +|---|---|---|---| +| `indexed` | `true` | Routes supported sources through indexed adapters such as `IndexedJsonlReader` and indexed NeMo-tar readers. Without it, streaming/replay behavior remains active. | `nemo.collections.common.data.lhotse.dataloader`, `lhotse.indexing` | +| `use_stateful_dataloader` | `true` | Uses `torchdata.StatefulDataLoader` so dataloader iterator state can be saved in Lightning checkpoints. | NeMo Lhotse dataloader config | +| `force_map_dataset` | `false` for training | Enforces iterable partitioning across data-parallel ranks and workers. Map-style training has too much sampler/manifest overhead; if a source cannot yet be indexed, report the migration as not launch-ready unless the user explicitly approves a temporary exception. | failure-modes §§18-22, conflict-matrix | +| `indexes_root` | stable filesystem mirror, or node-local path populated before startup | Tells indexed readers where to find `.idx` sidecars. Prefer a persistent shared mirror. Use `/tmp/idx` only when the launcher stages indexes there before training. | failure-modes §16 | +| `seed` | fixed integer, invariant across chunks | Lightning reseeds Python/NumPy/Torch at chunk start. Rotating this across resumable chunks breaks model-level bit-exactness even when sampler state restores correctly. | failure-modes §11 | +| `shard_seed` | fixed integer, not `"randomized"` | Controls sampler/multiplexer RNG. Randomized shard seeds can diverge across resume and are invalid for multi-shard iterable partitioning. | conflict-matrix | +| `num_workers` | invariant between save and restore | `StatefulDataLoader` and iterable partition state depend on worker topology. | failure-modes §14, §21 | +| `concurrent_bucketing` | `false` for resumable training | Background bucketing producers can advance source iterators outside the checkpointed main-thread state. | failure-modes §17 | +| `force_iterable_dataset` | unset or compatible with `force_map_dataset: false` | Do not enable mutually exclusive dataset modes. The training target is iterable partitioning through `force_map_dataset: false`. | conflict-matrix | +| `force_finite` | unset/false for training | Training usually needs infinite or epoch-controlled iteration; finite mode is normally for validation. | validation section | +| `extra_fields` on indexed NeMo entries | unset | Indexed NeMo adapters cannot preserve arbitrary runtime field rewrites. Preprocess manifests instead. | failure-modes §2 | +| `slice_length` on indexed entries | unset | Slicing rewrites cut/audio access and has no stable index unless preprocessed. | failure-modes §2 | +| compressed `.jsonl.gz` / `.tar.gz` paths | reject for indexed sidecars | Indexing requires seekable uncompressed JSONL/tar inputs. Re-export or unpack first. | failure-modes §1 | +| `pipe:` paths | reject | Pipe commands are not seekable. Materialize data first. | `lhotse.indexing` | + +## Training iterable partition (`force_map_dataset: false`) + +This is the required training mode for efficient indexed/resumable runs. Do not +ship a migrated training config in map-style mode. If an indexing blocker +prevents iterable partitioning, mark the migration not launch-ready unless the +user explicitly approves a temporary exception. + +| concern | requirement | purpose | +|---|---|---| +| Worker partition signal | Set only by NeMo/Lhotse worker init path | Prevents map-style mode from accidentally partitioning under `torchrun` environment variables. | +| All sources indexed | required | Non-indexed sources do not partition and will be duplicated across ranks/workers. | +| Multiplexer seed | fixed integer | All shards must pick the same source at each multiplexing step to preserve global weighted distribution. | +| Resume topology | invariant `(world_size, num_workers)` | Saved iterator state validates topology on restore. | + +## `data.validation_ds` + +| field | required value | purpose | +|---|---|---| +| `indexed` | `true` when validation sources need indexed access | Uses the same sidecar/index readers as training. | +| `force_map_dataset` | `true` | Validation should be finite and deterministic; map-style access is simpler. | +| `force_finite` | `true` | Prevents infinite validation loops when the training blend is infinite. | +| `use_stateful_dataloader` | usually `false` | Validation is normally run to completion and not resumed mid-loop. | +| `indexes_root` | same mirror as training unless intentionally separate | Validation readers need the same sidecars. | +| `seed` / `shard_seed` | fixed integers | Keeps validation deterministic. | + +## Lightning / trainer settings + +| field | recommendation | purpose | +|---|---|---| +| `resume_if_exists` or equivalent | enabled for resumable chains | Ensures later chunks restore checkpointed model, optimizer, scheduler, and dataloader state. | +| `resume_ignore_no_checkpoint` or equivalent | enabled for first chunk when supported | Allows chunk 1 to start without an existing checkpoint. | +| Checkpoint cadence | frequent step- or time-based saves | External termination may bypass graceful preemption callbacks. Avoid losing an entire chunk. | +| `save_top_k` / pruning policy | do not prune required resume checkpoints | Resume needs recent checkpoints and dataloader metadata. | +| `max_time_per_run` / walltime guard | comfortably below runtime walltime | Internal graceful-stop callbacks need teardown time. | +| `devices`, `num_nodes`, distributed topology | invariant across resume | Dataloader state is topology-sensitive. To scale differently, restart without dataloader state. | +| `max_steps` | stable across chain | Later chunks continue global step accounting. | + +## Launcher contract + +| concern | requirement | purpose | +|---|---|---| +| Per-chunk seed | invariant for all chunks in a resumable chain | Prevents model-level RNG divergence across resumes. | +| Index mirror availability | `.idx` sidecars exist before training starts | Indexed readers fail or fall back to slow behavior when sidecars are missing. | +| Optional index staging | YAML `indexes_root` matches the staged destination | Node-local paths such as `/tmp/idx` must be populated in every chunk. | +| `num_workers`, `world_size` | unchanged between save and restore | Required by stateful dataloading and iterable partitioning. | +| Python path / package selection | loads the NeMo and Lhotse versions with indexed/resumable support | Avoids accidentally using stock packages without the required code. | +| Container/runtime hooks | compatible with available CPU/GPU runtime | CPU-only index builds may need different container settings than GPU training. | + +## AIStore environment + +| env var | required when | purpose | +|---|---|---| +| `AIS_ENDPOINT` | any `s3://` / `ais://` source is read through AIStore | Points Lhotse/AIS clients at the proxy. | +| `USE_AIS_GET_BATCH` | remote tar/audio sources should be fetched lazily by batch | Avoids eager tar-reader construction for every remote shard. | +| `USE_AIS_INDIVIDUAL_GETS` | batch endpoint is unavailable or returns empty content | Falls back to per-object reads. Slower but useful for backend-specific failures. | +| `aistore` SDK | AIStore backend in builder/training container | Required by Lhotse AIStore access paths. | + +## Index building + +| concern | recommendation | purpose | +|---|---|---| +| Source format | uncompressed, seekable JSONL/tar or supported Shar cuts | Sidecar offsets must map to stable byte positions. | +| Workers | tune for memory and storage backend | Large manifests/tars plus many workers can OOM. Reduce workers or split blends. | +| Mirror destination | persistent shared filesystem when available | Reuse sidecars across runs and avoid per-launch rebuilds. | +| Remote sources | verify credentials/backend before building | Indexing remote data exercises storage credentials and byte-range access. | +| Reusability | build once per source path set | Existing sidecars can be reused while source contents and paths are unchanged. | diff --git a/.claude/skills/migrate-to-resumable-dataloader/templates/migration-report.md b/.claude/skills/migrate-to-resumable-dataloader/templates/migration-report.md new file mode 100644 index 000000000000..e92d4e8d6d7a --- /dev/null +++ b/.claude/skills/migrate-to-resumable-dataloader/templates/migration-report.md @@ -0,0 +1,141 @@ +# Migration report - `` + +- **Generated**: +- **Source YAML**: `` +- **Patched YAML**: `` +- **Source blend** (if inspected): `` +- **Patched blend** (if emitted): `` +- **Launcher** (if inspected): `` (or "skipped - no launcher provided") +- **Storage workflow**: + +## Summary + + + +## Findings + +### Fatal (must fix; auto-patching not possible) + +- _none_ - OR - +- **``** (`:`): + - **Current**: `` + - **Recommended**: `` (or "manual rewrite") + - **Why fatal**: + - **References**: + +### Errors (auto-patched; review the diff) + +- **`data.train_ds.indexed`** (`:`): + - **Was**: `false` -> **now**: `true` + - **Why**: + - **References**: + +### Warnings (review manually) + +- **``** (`:`): + - **Current**: `` + - **Recommended**: `` + - **Why**: + - **References**: + +### Notes (informational) + +- **``** (`:`): + +## Dedup Mode + + + +- **Training target**: `force_map_dataset: false`. This enforces iterable + partitioning and avoids map-style sampler/manifest overhead. +- **Validation/test target**: `force_map_dataset: true` unless intentionally + testing iterable behavior; finite deterministic validation is simpler in + map-style mode. +- **Blocker/exception**: if training still uses `force_map_dataset: true`, mark + the migration not launch-ready unless the user explicitly approved an + exception; list the unindexed source or runtime blocker, expected overhead, and + work needed to move back to iterable training. + +For training iterable mode, list: + +- Sources confirmed indexed: +- Multiplexer seeds confirmed integer: +- World-size / num-workers commitment: `` x `` for the full chain + +## Data Blend Audit + + + +| entry | reason | upstream fix | +|---|---|---| +| `` | compressed cuts/manifests | re-export as uncompressed seekable files | +| `` | unsupported `extra_fields` | preprocess fields into the manifest | + +## Launcher Review + + + +- **Per-chunk seed rotation**: +- **Index access wired**: +- **AIStore batch audio fetch**: +- **Topology invariance**: +- **Python path/package selection**: + +## Storage Workflow + + + +## Patched Output Diff + +### `.yaml` -> `-resumable.yaml` + +```diff +- data.train_ds: +- indexed: false +- use_stateful_dataloader: false +- shard_seed: "randomized" ++ data.train_ds: ++ indexed: true ++ use_stateful_dataloader: true ++ force_map_dataset: false ++ indexes_root: /shared/fs/.../indexes_mirror ++ shard_seed: 42 +``` + +_(full diff inline)_ + +### `.yaml` -> `-resumable.yaml` + +```diff +- - type: lhotse_shar +- shar_path: +- cuts: s3://bucket/path/cuts.0.jsonl.gz ++ # Source excluded: compressed Shar cuts cannot be indexed. ++ # Re-export with uncompressed cuts or convert to another seekable format. +``` + +_(full diff inline)_ + +## Pre-flight Checklist + +1. Build indexes via the generated `build-indexes-cmd.sh`. +2. Run a bit-exact dataloader resume check on the migrated config. +3. Confirm storage SDKs and environment variables required by the selected + workflow. +4. Confirm `indexes_root` exists and is populated from every node/container that + will train. +5. Run single-node single-chunk, single-node resume, then full-topology smoke. +6. Submit the real run. + +## References + +- `references/option-reference.md` +- `references/conflict-matrix.md` +- `references/failure-modes.md` +- `references/best-practices.md` +- `references/aistore-vs-non-aistore.md` diff --git a/CLAUDE.md b/CLAUDE.md index 9a355c624155..097b681555f4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -20,6 +20,8 @@ Dev quickstart: `uv sync --extra all --extra cu13` (Python 3.12+, PyTorch 2.7+; - Check: `isort --check && black --check ` or `isort --check . && black --check .` - Fix: `isort && black ` or `isort . && black .` - Jupyter Notebooks are excluded from automatic black reformatting (see `extend-exclude`), but can be still reformatted when passed directly. Do not reformat notebooks outside your changes. +- **Helper placement**: keep public APIs and top-level classes/functions near the top of a file; place private + helpers and utilities at the bottom of the file unless a local module convention requires otherwise. ## Testing diff --git a/docs/source/asr/datasets.rst b/docs/source/asr/datasets.rst index 09ff87ea180c..620194c53727 100644 --- a/docs/source/asr/datasets.rst +++ b/docs/source/asr/datasets.rst @@ -3,6 +3,12 @@ Datasets NeMo ASR models expect data as a set of audio files plus a manifest file describing each utterance. +.. seealso:: + + For Lhotse-based dataloading (the recommended path for new ASR + recipes — dynamic bucketing, multi-source mixing, indexed/resumable + dataloading), see :doc:`/dataloaders`. + .. _section-with-manifest-format-explanation: Manifest Format diff --git a/docs/source/asr/speaker_diarization/intro.rst b/docs/source/asr/speaker_diarization/intro.rst index b72177dbaa42..c65163ffff88 100644 --- a/docs/source/asr/speaker_diarization/intro.rst +++ b/docs/source/asr/speaker_diarization/intro.rst @@ -99,4 +99,3 @@ The full documentation tree is as follows: configs api resources - diff --git a/docs/source/audio/datasets.rst b/docs/source/audio/datasets.rst index 4c023961a29e..781b0a9e99d8 100644 --- a/docs/source/audio/datasets.rst +++ b/docs/source/audio/datasets.rst @@ -3,6 +3,12 @@ Datasets The `audio` collection expect the training, validation and tests datasets in either NeMo format or Lhotse format. +.. seealso:: + + For the Lhotse dataloader's full surface — supported ``input_cfg`` + types, bucketing, indexed manifests + resumable dataloading, and the + ``LhotseDataLoadingConfig`` field reference — see :doc:`/dataloaders`. + NeMo Format ----------- diff --git a/docs/source/dataloaders.rst b/docs/source/dataloaders.rst index 20fd0f2f0b90..4eb5bfc59c1f 100644 --- a/docs/source/dataloaders.rst +++ b/docs/source/dataloaders.rst @@ -24,26 +24,6 @@ NeMo supports using `Lhotse`_, a speech data handling library, as a dataloading constant in time (i.e., stationary); in fact, each mini-batch will have roughly the same ratio of data coming from each source. Since the multiplexing is done dynamically, it is very easy to tune the sampling weights. -Lhotse dataloading supports the following types of inputs: - -* NeMo manifests - Regular NeMo JSON manifests. -* NeMo tarred data - Tarred NeMo JSON manifests + audio tar files; we also support combination of multiple NeMo - tarred data sources (e.g., multiple buckets of NeMo data or multiple datasets) via dynamic multiplexing. - - We support using a subset of Tarred NeMo JSON manifests along with audio tar files without disrupting the alignment between the tarred files and their corresponding manifests. - This feature is essential because large datasets often consist of numerous tar files and multiple versions of Tarred NeMo JSON manifest subsets, which may contain only a portion of the audio files due to filtering for various reasons. - To skip specific entries in the manifests without repeatedly copying and retarring audio files, the entries must include a ``_skipme`` key. This key should be set to ``True``, ``1``, or a reason for skipping (e.g., ``low character-rate``). - -* Lhotse CutSet manifests - Regular Lhotse CutSet manifests (typically gzipped JSONL). - See `Lhotse Cuts documentation`_ to learn more about Lhotse data formats. -* Lhotse Shar data - Lhotse Shar is a data format that also uses tar files for sequential data loading, - but is designed to be modular (i.e., easily extensible with new data sources and with new feature fields). - More details can be found here: |tutorial_shar| - .. caution:: As of now, Lhotse is mainly supported in most ASR model configurations. We aim to gradually extend this support to other speech tasks. .. _Lhotse: https://github.com/lhotse-speech/lhotse @@ -51,6 +31,269 @@ Lhotse dataloading supports the following types of inputs: .. |tutorial_shar| image:: https://colab.research.google.com/assets/colab-badge.svg :target: https://colab.research.google.com/github/lhotse-speech/lhotse/blob/master/examples/04-lhotse-shar.ipynb +Architecture overview +--------------------- + +The Lhotse dataloader is a pipeline of small components. Each YAML option you +set lands in exactly one of them, so it pays to know which is which:: + + input_cfg entry ──► parser_fn ──► Adapter (IteratorNode) + (registered │ + via @data_type_parser) ▼ + CutSet (lazy iterator graph) + │ + SamplingConstraint ──► CutSampler + │ + ▼ + IterableDatasetWrapper + │ + ▼ + user-defined Dataset + │ + ▼ + DataLoader + (or StatefulDataLoader) + +Components, top to bottom: + +* **input_cfg entry** — one YAML dict identified by ``type:`` (e.g. + ``type: nemo_tarred``). Listed below in :ref:`lhotse-format-reference`. +* **parser_fn** — registered with the ``@data_type_parser`` decorator in + ``nemo/collections/common/data/lhotse/cutset.py``. Reads the entry and + returns ``(CutSet, is_tarred)``. Users can add their own (see + :ref:`lhotse-extension-hooks`). +* **Adapter** — a class that knows how to iterate one specific on-disk + format (e.g. ``LazyNeMoTarredIterator``, ``LazyParquetIterator``, + ``NeMoMultimodalConversationJsonlAdapter``). All recent adapters are + Lhotse :class:`~lhotse.lazy.IteratorNode` subclasses and support + ``indexed=True`` for O(1) random access — see + :ref:`indexed-resumable-dataloading`. +* **CutSet** — Lhotse's lazy manifest wrapper. Composing multiple sources + produces a graph of iterator nodes (mux, mix, map, filter, …) underneath. +* **SamplingConstraint** — defines what "length" means for batch packing: + :class:`~lhotse.dataset.sampling.base.TimeConstraint` (audio duration, + default), :class:`~lhotse.dataset.sampling.base.TokenConstraint` (token + count, multimodal), ``MultimodalSamplingConstraint`` / + ``FixedBucketBatchSizeConstraint2D`` (NeMo extensions; see + :ref:`lhotse-sampling-constraints`). +* **CutSampler** — :class:`~lhotse.dataset.sampling.DynamicCutSampler` or + :class:`~lhotse.dataset.sampling.DynamicBucketingSampler`, picked + automatically based on ``use_bucketing``. +* **IterableDatasetWrapper** — Lhotse helper that turns the sampler-produced + ``CutSet`` mini-batches into a stream the PyTorch ``DataLoader`` can + consume. +* **Dataset class** — supplied by the model code; converts a ``CutSet`` + mini-batch into a ``dict[str, Tensor]``. The same dataset class can serve + multiple model architectures because all batching is upstream. + +.. _lhotse-format-reference: + +Supported input formats +----------------------- + +Every entry in ``input_cfg`` is identified by ``type:``. The table below is +the canonical list of every type the dataloader understands today, what it +returns, and the on-disk shape it expects. + +.. list-table:: + :header-rows: 1 + :widths: 18 32 14 8 8 10 10 + + * - ``type:`` + - Purpose + - Yields + - Audio + - Tarred + - Indexable + - Adapter / parser + * - ``nemo`` + - NeMo non-tarred JSON manifest (per-file audio) + - ``Cut`` + - yes + - no + - yes + - ``LazyNeMoIterator`` + * - ``nemo_tarred`` + - NeMo tarred manifest + audio tar shards + - ``Cut`` + - yes + - yes + - yes + - ``LazyNeMoTarredIterator`` + * - ``lhotse`` + - Plain Lhotse cuts JSONL + - ``Cut`` + - yes + - no + - yes + - lhotse ``LazyJsonlIterator`` / ``LazyIndexedManifestIterator`` + * - ``lhotse_shar`` + - Lhotse Shar (sharded archive directory) + - ``Cut`` + - yes + - yes + - yes + - lhotse ``LazySharIterator`` + * - ``parquet`` + - Parquet file with audio bytes column + - ``Cut`` + - yes + - no + - yes (row groups) + - ``LazyParquetIterator`` + * - ``txt`` + - One example per line, raw text + - ``TextExample`` + - no + - n/a + - no + - ``LhotseTextAdapter`` + * - ``txt_jsonl`` + - One JSON object per line; configurable text field + - ``TextExample`` + - no + - n/a + - yes + - ``LhotseTextJsonlAdapter`` + * - ``txt_pair`` + - Source + target text files for translation + - ``SourceTargetTextExample`` + - no + - n/a + - no + - ``LhotseTextPairAdapter`` + * - ``multimodal_conversation`` + - Multi-turn chat with mixed text/audio turns (JSONL) + - ``NeMoMultimodalConversation`` + - optional + - optional + - yes + - ``NeMoMultimodalConversationJsonlAdapter`` + * - ``share_gpt`` + - ShareGPT-format JSONL → conversation + - ``NeMoMultimodalConversation`` + - optional + - optional + - yes + - ``NeMoMultimodalConversationShareGPTJsonlAdapter`` + * - ``share_gpt_webdataset`` + - ShareGPT in WebDataset tar shards + - ``NeMoMultimodalConversation`` + - optional + - yes + - yes + - ``NeMoMultimodalConversationShareGPTWebdatasetAdapter`` + * - ``lhotse_as_conversation`` + - Read ASR data and emit it as ASR conversation + - ``NeMoMultimodalConversation`` + - yes + - inherits + - inherits + - transform on ``read_cutset_from_config`` + * - ``sqa_as_conversation`` + - Spoken-QA → 3-turn conversation (question / audio / answer) + - ``NeMoMultimodalConversation`` + - yes + - inherits + - inherits + - transform + * - ``s2s_as_conversation`` + - Duplex S2S → conversation + - ``NeMoMultimodalConversation`` + - yes + - inherits + - inherits + - transform + * - ``s2s_duplex_overlap_as_s2s_duplex`` + - Overlapping agent/user segments → unified S2S timeline + - ``Cut`` + - yes + - inherits + - inherits + - transform + * - ``s2s_duplex_reverse_role`` + - Swap user and agent in a duplex cut + - ``Cut`` + - yes + - inherits + - inherits + - transform + * - ``lhotse_magpietts_data_as_continuation`` + - MagpieTTS dataset → S2S duplex continuation + - ``Cut`` + - yes + - inherits + - inherits + - transform + * - ``nemo_tarred_to_duplex`` + - Single-supervision NeMo → duplex (user speech + agent silence) + - ``Cut`` + - yes + - yes + - inherits + - transform + * - ``multi_speaker_simulator`` + - Synthetic multi-speaker mixtures from a manifest + - ``Cut`` + - yes + - n/a + - no + - ``MultiSpeakerMixtureGenerator`` + * - ``group`` + - Wrap a list of entries with a shared ``weight`` and ``tags`` + - (nested) + - n/a + - n/a + - n/a + - n/a + +Notes: + +* "Inherits" means the type is a transform that wraps another underlying + source via ``read_cutset_from_config(config)``. Such entries accept the + underlying source's keys (e.g. ``cuts_path`` and ``manifest_filepath``) + *in addition to* their own. +* Tarred NeMo manifests support a ``_skipme`` key to omit specific manifest + rows without repacking tars (set to ``True``, ``1``, or a reason string). +* Lhotse Shar is documented in the upstream tutorial: |tutorial_shar|. + +Conversation / multimodal types — when to use which +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Six types yield ``NeMoMultimodalConversation`` from very different sources. +Pick by the shape of your input data: + +.. list-table:: + :header-rows: 1 + :widths: 35 25 40 + + * - Your data + - ``type:`` + - Notes + * - JSONL of multi-turn chats with mixed text/audio turns + - ``multimodal_conversation`` + - Native chat schema; audio turns reference paths or tar members + * - JSONL in ShareGPT chat schema + - ``share_gpt`` + - Adds ShareGPT-specific role/value parsing + * - ShareGPT data packed in WebDataset tar shards + - ``share_gpt_webdataset`` + - Same parsing as ``share_gpt``, reads tarred shards + * - ASR data in NeMo or Lhotse format + - ``lhotse_as_conversation`` + - Builds a 2-turn (instruction+audio / transcript) conversation per cut + * - Spoken-QA data with ``question`` / ``answer`` fields + - ``sqa_as_conversation`` + - Builds a 3-turn (question / audio / answer) conversation per cut + * - Duplex S2S data with user/agent supervisions + - ``s2s_as_conversation`` + - Maps duplex roles onto chat turns + +The last three (``*_as_conversation``) are *transforms*: they delegate to +``read_cutset_from_config(config)`` for the underlying audio source, so the +nested keys like ``manifest_filepath``, ``cuts_path``, or ``shar_path`` +belong on the same entry. + Enabling Lhotse via configuration ---------------------------------- @@ -128,6 +371,16 @@ Some other Lhotse related arguments we support: When ``batch_duration`` is not set, it acts as a static batch size. * ``seed`` sets a random seed for the shuffle buffer. +* ``indexed`` (default ``False``) opts the dataloader into Lhotse's indexed-manifest + path, giving every adapter O(1) random access and graph-token-based exact restore. + Requires ``.idx`` sidecars next to every JSONL/tar file. See + :ref:`indexed-resumable-dataloading` below. + +* ``use_stateful_dataloader`` (default ``False``) swaps PyTorch's + ``DataLoader`` for ``torchdata.stateful_dataloader.StatefulDataLoader`` so + that per-worker iterator state is captured in checkpoints and restored + exactly on resume. Pair with ``indexed: true`` for full O(1) restore. + The full and always up-to-date list of supported options can be found in ``LhotseDataLoadingConfig`` class. .. _asr-dataset-config-format: @@ -147,6 +400,29 @@ The dataset class which converts these examples to tensors can partition the min different processing to each group. For example, you may want to construct different prompts for the model using metadata in ``tags``. +How ``tags`` is applied +^^^^^^^^^^^^^^^^^^^^^^^ + +Every key/value pair in ``tags`` becomes an attribute on every cut produced +by that entry. The dataloader walks the cuts via ``cuts.map(...)`` and runs:: + + for key, val in tags.items(): + setattr(cut, key, val) + +So in your dataset class you read them back as ordinary attributes:: + + def __getitem__(self, cuts): + for cut in cuts: + lang = cut.lang + task = cut.task + ctx = cut.context + ... + +Tags set on a ``group`` apply to every nested entry; tags set on an inner +entry override the outer ones for that source. Conflicts with built-in cut +fields (``id``, ``duration``, ``supervisions``, …) silently overwrite the +built-in — pick tag names that don't collide. + .. note:: When fine-tuning a model that was trained with ``input_cfg`` option, typically you'd only need to override the following options: ``input_cfg=null`` and ``manifest_filepath=path/to/manifest.json``. @@ -384,6 +660,12 @@ Python dataloader instantiation example:: tokenizer=my_tokenizer, ) +**Indexed mode for text/multimodal sources.** All of the parsers above +(``txt_jsonl``, ``nemo_sft_jsonl``, ``multimodal_conversation``, ``share_gpt``, +``share_gpt_webdataset``) accept ``indexed: true`` and integrate with +``StatefulDataLoader``-based exact resume. ``txt`` and ``txt_pair`` are +intentionally streaming-only. See :ref:`indexed-resumable-dataloading`. + **Dataloading and bucketing of text and multimodal data.** When dataloading text or multimodal data, pay attention to the following config options (we provide example values for convenience): * ``use_multimodal_sampling: true`` tells Lhotse to switch from measuring audio duration to measuring token counts; required for text. @@ -419,6 +701,25 @@ To enable bucketing, set ``batch_size: null`` and use the following options: **Joint dataloading of text/audio/multimodal data.** The key strength of this approach is that we can easily combine audio datasets and text datasets, and benefit from every other technique we described in this doc, such as: dynamic data mixing, data weighting, dynamic bucketing, and so on. +Single-config vs. ``multi_config: true`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +By default the dataloader builds **one** ``CutSet`` and **one** sampler from +the top-level config. Setting ``multi_config: true`` switches to a +**multi-modality** layout where each named sub-block (typically ``audio:`` +and ``text:``) is parsed as its own dataloader config, with its own +sampling/bucketing options, and the per-modality samplers are fused at the +batch level. + +When ``multi_config: true`` is set: + +* Top-level keys (``num_workers``, ``shuffle``, ``seed``, ``sample_rate``, + …) apply globally and are inherited by every sub-block. +* Per-modality overrides — including the ``input_cfg`` itself — go inside + the named sub-block (``audio: ...`` / ``text: ...``). +* The per-modality samplers are combined into one stream by + ``sampler_fusion``. + This approach is described in the `EMMeTT`_ paper. There's also a notebook tutorial called Multimodal Lhotse Dataloading. We construct a separate sampler (with its own batching settings) for each modality, and specify how the samplers should be fused together via the option ``sampler_fusion``: @@ -481,6 +782,223 @@ Example. Combine an ASR (audio-text) dataset with an MT (text-only) dataset so t .. caution:: We strongly recommend to use multiple shards for text files as well so that different nodes and dataloading workers are able to randomize the order of text iteration. Otherwise, multi-GPU training has a high risk of duplication of text examples. +.. _lhotse-sampling-constraints: + +Sampling constraints +-------------------- + +A :class:`~lhotse.dataset.sampling.base.SamplingConstraint` decides what +"length" means when the sampler packs a mini-batch. NeMo uses four: + +* :class:`~lhotse.dataset.sampling.base.TimeConstraint` — default. + Length = audio duration in seconds. Enforces ``max_duration`` / + ``batch_duration`` / ``quadratic_duration``. +* :class:`~lhotse.dataset.sampling.base.TokenConstraint` — activated by + ``use_multimodal_sampling: true`` for text-only flows. Length = token + count after applying the tokenizer (and optionally the prompt format). + Enforces ``max_tokens`` / ``batch_tokens`` / ``quadratic_factor``. +* ``MultimodalSamplingConstraint`` — Lhotse-style mixed-modality + packing. Activated by setting both ``use_multimodal_sampling: true`` + and a ``token_equivalent_duration`` so audio cuts are measured in + equivalent-token units alongside text. Enforces all of the above plus + ``min_tpt``/``max_tpt`` (token-per-token ratio filtering). +* ``FixedBucketBatchSizeConstraint2D`` — activated automatically when + ``bucket_duration_bins`` is given as a list of ``[duration, tokens]`` + pairs **and** ``bucket_batch_size`` is set. Each bucket gets its own + fixed batch size; this is the layout produced by + ``estimate_duration_bins_2d.py`` and the OOMptimizer. + +You usually don't pick a constraint by name — it's inferred from the +combination of YAML options. The names matter when you read NeMo's source, +extend the system with a custom constraint, or interpret error messages. + +.. _indexed-resumable-dataloading: + +Resumable / indexed dataloading +------------------------------- + +Setting ``indexed: true`` (per-source or top-level) plus +``use_stateful_dataloader: true`` (top-level) opts NeMo's Lhotse dataloader +into Lhotse's indexed iterator graph and torchdata's +``StatefulDataLoader``. The combination gives you: + +* O(1) checkpoint/restore of the *whole* dataloading pipeline — sampler RNG, + bucketer state, multiplexer choice RNG, per-source iterator cursors, and + per-worker prefetch queues — without any replay from the start of the epoch. +* Random access (``__getitem__``) over every supported adapter. + +When set at the top level, ``indexed: true`` is propagated by +``read_dataset_config`` through the ``propagate_attrs`` cascade, so a single +top-level flag covers every nested ``input_cfg`` group. You can still override +it per-source if needed. + +Per-adapter support +^^^^^^^^^^^^^^^^^^^ + +The following ``input_cfg`` types accept ``indexed: true`` today and require an +``.idx`` sidecar next to each data file: + +* ``nemo`` / ``nemo_tarred`` — JSONL manifest gets ``manifest.json.idx``; + every audio tar in ``tarred_audio_filepaths`` gets ``shard.tar.idx``. +* ``lhotse`` (plain) — ``cuts.jsonl`` gets ``cuts.jsonl.idx``. +* ``lhotse_shar`` — every uncompressed ``cuts..jsonl`` and field tar + inside the Shar dir. +* ``parquet`` — no sidecar required, but the file must expose row-group + statistics (the default for files written by pyarrow / pandas). +* ``txt_jsonl`` — every file in ``paths``. +* ``multimodal_conversation`` and ``share_gpt`` — JSONL manifest plus optional + audio tars in ``tarred_audio_filepaths``. +* ``share_gpt_webdataset`` — every ``shard-*.tar`` inside ``data_dir``. + +``txt`` and ``txt_pair`` remain streaming-only (no random-access support). + +Two caveats to be aware of: + +* ``indexed: true`` is incompatible with ``extra_fields`` and ``slice_length`` + on ``nemo``/``nemo_tarred``: those features mutate or expand cuts in a way + that has no stable index. Pre-process the manifest offline if you need them + in an indexed pipeline. +* Only **uncompressed** files can be indexed (no ``.jsonl.gz``, + ``.tar.gz``, etc.) and only files on a backend that supports indexed reads + (local FS, S3-compatible object stores, AIStore). + +Building ``.idx`` sidecars +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Two equivalent ways: + +1. Lhotse's CLI per file:: + + lhotse index jsonl path/to/cuts.jsonl + lhotse index tar path/to/shard.tar + lhotse index shar path/to/shar_dir/ + +2. NeMo's batch helper that takes a config and indexes everything it + references in one shot:: + + python scripts/dataloading/build_indexes.py path/to/input_cfg.yaml + + The script walks ``input_cfg`` (including nested ``group`` entries and + per-entry YAML references), dispatches the right tar layout for each + adapter (NeMo one-member-per-sample vs. WebDataset/Shar pair format), and + skips files that already have an up-to-date ``.idx``. Use ``--force`` to + rebuild, ``--workers N`` for parallelism, ``--dry-run`` to preview. + + Pass ``--indexes-root /path/to/mirror`` to write the sidecars to a + separate directory tree that mirrors the data files' layout instead of + placing them next to the data — see :ref:`lhotse-indexes-root` below. + +.. _lhotse-indexes-root: + +Storing ``.idx`` sidecars in a separate directory (``indexes_root``) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +By default, every ``.idx`` lives next to its data file +(``cuts.jsonl`` ↔ ``cuts.jsonl.idx``). If your data sits on shared, slow, +or read-only storage (NFS, S3, AIStore), you may want to keep the indexes +on a fast local disk instead. Set ``indexes_root`` at the top of the +dataloader config: + +.. code-block:: yaml + + data: + train_ds: + indexed: true + use_stateful_dataloader: true + indexes_root: /scratch/idx # mirror lives here + input_cfg: + - type: nemo_tarred + manifest_filepath: /shared/data/asr/manifest__OP_0..127_CL_.jsonl + tarred_audio_filepaths: ais://bucket/asr/audio__OP_0..127_CL_.tar + +Index lookups for each data file ``D`` resolve to +``/.idx``. Examples:: + + /shared/data/asr/manifest_0.jsonl -> /scratch/idx/shared/data/asr/manifest_0.jsonl.idx + ais://bucket/asr/audio_0.tar -> /scratch/idx/bucket/asr/audio_0.tar.idx + +The setting cascades through ``read_dataset_config`` to every nested +``input_cfg`` entry, so a single top-level value covers the whole pipeline. +You can override it per-source on any entry that needs a different mirror. + +Two ways to populate the mirror: + +1. **Build the indexes there to begin with**:: + + python scripts/dataloading/build_indexes.py \ + --indexes-root /scratch/idx path/to/input_cfg.yaml + + The script reads each data file in place, computes the offsets, and + writes the ``.idx`` directly to the mirrored target. + +2. **Prefetch existing remote indexes** when sidecars already live next to + the data on shared/object storage and you just want a local copy:: + + python scripts/dataloading/prefetch_indexes.py \ + --indexes-root /scratch/idx path/to/input_cfg.yaml + + ``prefetch_indexes.py`` walks the same ``input_cfg``, locates every + sidecar at its natural location (via lhotse's ``open_best``, so + ``ais://`` / ``s3://`` / ``http://`` are all supported as sources), + and copies it into the local mirror. Use ``--source-indexes-root`` + when the source sidecars themselves live under another mirror. + +Both scripts accept ``--force``, ``--workers N``, and ``--dry-run``. + +End-to-end YAML example +^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: yaml + + model: + train_ds: + # Top-level switches enable indexed restore for every source below. + indexed: true + use_stateful_dataloader: true + force_finite: true + force_map_dataset: true + + sample_rate: 16000 + num_workers: 4 + seed: 42 + shard_seed: randomized + + # Bucketing and the rest of the dataloader knobs work exactly as before. + use_bucketing: true + num_buckets: 30 + batch_duration: 1100 + quadratic_duration: 30 + + input_cfg: + - type: nemo_tarred + manifest_filepath: /data/asr/manifest__OP_0..127_CL_.jsonl + tarred_audio_filepaths: /data/asr/audio__OP_0..127_CL_.tar + weight: 0.7 + - type: lhotse + cuts_path: /data/extra/cuts.jsonl + weight: 0.3 + +Resume contract +^^^^^^^^^^^^^^^ + +When ``use_stateful_dataloader: true`` is set, Lightning's checkpoint will +contain the full lhotse iterator graph state under the dataloader key. On +resume: + +* iterator positions advance to where they were at save time (no replay from + position 0); +* ``set_epoch`` is a no-op while restored state is pending, so the resumed run + continues the same epoch instead of starting a new one; +* ``num_workers`` and ``world_size`` must match between save and restore (a + hard requirement of ``StatefulDataLoader``). + +Non-indexed pipelines fall back to Lhotse's ``_fast_forward()`` replay (O(N) +in batches consumed before the checkpoint) and require ``num_workers`` only to +be consistent for replay-based restore — not exact restore. + +For the iterator graph contract itself, see Lhotse's +`indexed manifests guide `_. + Pre-computing bucket duration bins ------------------------------------ @@ -594,7 +1112,7 @@ For Canary-1B, we'll also provide the special tokens tokenizer. Example: input_cfg.yaml Pushing GPU utilization to the limits with bucketing and OOMptimizer -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The default approach of specifying a ``batch_duration``, ``bucket_duration_bins`` and ``quadratic_duration`` is quite flexible, but is not maximally efficient. We observed that in practice it often leads to under-utilization @@ -743,3 +1261,302 @@ implements those methods. The wrapper is a no-op when ``device_mesh`` is ``None`` or every named axis present in the mesh has size 1, so the same call site works for single-GPU, DDP-only, and CP/TP runs without a separate code path. + +Train vs. validation / test configs +----------------------------------- + +The training and validation/test sections of a NeMo recipe use the same +underlying dataloader builder but have a different shape and a different +default behavior. + +**Training (``train_ds``).** A single config that produces one infinite +``CutSet``. The dataloader is wrapped to never run out of data, so +``trainer.max_steps`` (and ``limit_train_batches`` for tarred sources) +controls the run length: + +.. code-block:: yaml + + model: + train_ds: + sample_rate: 16000 + num_workers: 4 + shuffle: true + use_bucketing: true + num_buckets: 30 + batch_duration: 1100 + input_cfg: + - type: nemo_tarred + manifest_filepath: /data/asr/manifest__OP_0..127_CL_.json + tarred_audio_filepaths: /data/asr/audio__OP_0..127_CL_.tar + +**Validation / test (``validation_ds`` / ``test_ds``).** A *named* dict of +configs — one per evaluation set — that produces finite iteration: + +.. code-block:: yaml + + model: + validation_ds: + sample_rate: 16000 + batch_size: 16 + # Per-set entries; keys become the metric prefixes in logging. + datasets: + dev_clean: + cuts_path: /data/dev-clean/cuts.jsonl + dev_other: + cuts_path: /data/dev-other/cuts.jsonl + +The most common eval-side overrides: + +* ``shuffle: false`` — deterministic order. +* ``force_finite: true`` — break out of the infinite-mux that's safe for + training but would loop forever in eval. +* ``use_bucketing: false`` — bucketing trades padding for randomness; on a + small eval set the savings are negligible and a fixed batch size makes + results easier to interpret. +* ``num_workers: 0`` (or a small number) — eval is short, the worker + startup cost matters more. + +When the model code expects a single eval set, use the plain ``cuts_path`` / +``manifest_filepath`` form at the same level as ``train_ds`` instead of the +``datasets:`` dict. + +Preparing your data +------------------- + +Three minimal recipes covering the main on-disk formats. + +**NeMo manifest** — one JSON object per line, fields read by ``LazyNeMoIterator``:: + + {"audio_filepath": "/data/utt_0001.wav", "duration": 3.42, "text": "hello world", "lang": "en"} + {"audio_filepath": "/data/utt_0002.wav", "duration": 5.10, "text": "another example", "lang": "en"} + +For tarred NeMo manifests, see +``scripts/speech_recognition/convert_to_tarred_audio_dataset.py`` in the NeMo +repo. + +**Lhotse cuts JSONL** — build a ``CutSet`` from raw recordings + supervisions: + +.. code-block:: python + + from lhotse import CutSet, Recording, SupervisionSegment + + cuts = [] + for path, transcript in pairs: + rec = Recording.from_file(path) + sup = SupervisionSegment( + id=rec.id, recording_id=rec.id, + start=0.0, duration=rec.duration, + text=transcript, language="en", + ) + cut = rec.to_cut() + cut.supervisions = [sup] + cuts.append(cut) + + CutSet.from_cuts(cuts).to_file("cuts.jsonl") # uncompressed! + +For Lhotse Shar (sharded archive), see the upstream tutorial: |tutorial_shar|. + +**Parquet** — write a ``pyarrow`` table with the column names the +``LazyParquetIterator`` reads (``audio``, ``text``, ``duration``, +optional ``lang``): + +.. code-block:: python + + import pyarrow as pa, pyarrow.parquet as pq + + table = pa.table({ + "audio": [open(p, "rb").read() for p in paths], + "text": transcripts, + "duration": durations, + "lang": ["en"] * len(paths), + }) + pq.write_table(table, "shard_000.parquet") # row-group stats kept by default + +Once your manifests are written, build the indexed sidecars in one shot:: + + python scripts/dataloading/build_indexes.py path/to/input_cfg.yaml + +See :ref:`indexed-resumable-dataloading` for the resumable side. + +.. _lhotse-storage-backends: + +Storage backends: local, object store, AIStore +---------------------------------------------- + +Every input path the dataloader reads goes through Lhotse's ``open_best``, +which routes file paths and URIs to the right backend automatically: + +* **Local files** — paths like ``/data/...`` work out of the box, no + configuration needed. +* **Generic object stores via ``smart_open``** — ``s3://``, ``gs://``, + ``http://``, ``https://`` URIs work after ``pip install smart_open``. + Authentication uses the underlying SDK's defaults (e.g. AWS env vars). +* **AIStore** — ``ais://bucket/key`` URIs work after ``pip install aistore`` + and ``export AIS_ENDPOINT=http://...``. Optional tuning env vars + ``AIS_CONNECT_TIMEOUT`` and ``AIS_READ_TIMEOUT`` are honored by the SDK. + +The same routing applies to ``.idx`` sidecars: they are read and written +next to the data file, so the backend must accept writes at that location +or the indexes need to be pre-built locally and uploaded. + +AIStore GetBatch (separate optimization) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +For tarred multimodal-conversation manifests, NeMo also supports AIStore's +batched object-fetch API (``GetBatch``) via ``USE_AIS_GET_BATCH=true``, +which issues one batched fetch per minibatch instead of per-cut tar reads. +This is independent of using AIStore as a generic backend — see +:doc:`speechlm2/datasets` for the speech-LM-specific details, including +how it composes with ``indexed: true``. + +.. _lhotse-extension-hooks: + +Registering a custom format +--------------------------- + +Adding a new ``type:`` to the ``input_cfg`` registry is one decorator and +one function: + +.. code-block:: python + + from nemo.collections.common.data.lhotse.cutset import data_type_parser + from lhotse import CutSet + + @data_type_parser("my_format") + def read_my_format(config) -> tuple[CutSet, bool]: + cuts = CutSet(MyAdapter(path=config.path, ...)) + is_tarred = True # True ⇒ IterableDataset path; False ⇒ map-style + return cuts, is_tarred + +The parser must accept arbitrary keys: ``read_dataset_config`` cascades +options like ``indexed``, ``shard_seed``, ``metadata_only``, +``force_finite``, ``audio_locator_tag`` from the top of the YAML down into +every entry via ``propagate_attrs``. Missing keys should fall back to +sensible defaults via ``config.get(...)``. + +To make ``MyAdapter`` participate in the indexed/resumable path +(:ref:`indexed-resumable-dataloading`), implement Lhotse's +:class:`~lhotse.lazy.IteratorNode` contract — see +`indexed manifests guide `_ +for the requirements. + +Common pitfalls +--------------- + +The most common foot-guns when standing up a NeMo Lhotse recipe: + +1. **Forgetting** ``trainer.use_distributed_sampler=false``. NeMo's Lhotse + integration handles distributed sampling itself; leaving Lightning's + default on causes silent batch duplication across DP ranks. + +2. **No** ``max_steps`` **with tarred / Shar data.** Tarred sources are + infinite by design, so without ``trainer.max_steps`` (and + ``limit_train_batches`` for the periodic validation cadence) training + never completes the first "epoch". Always set both. + +3. **Compressed inputs cannot be indexed.** ``.jsonl.gz`` and ``.tar.gz`` + work for streaming, but ``indexed: true`` requires uncompressed, + seekable files. Re-extract or re-write before building ``.idx``. + +4. **Mismatched** ``num_workers`` / ``world_size`` **on resume.** Exact + per-worker resume with ``StatefulDataLoader`` requires both to match + between save and restore. Replay-based restore with the regular + ``DataLoader`` is more lenient. + +5. ``indexed: true`` **is incompatible with** ``extra_fields`` **and** + ``slice_length`` on ``nemo`` / ``nemo_tarred``. Both expand or rewrite + cuts in a way that has no stable index. Pre-process the manifest + offline if you need them in an indexed pipeline. + +6. ``shard_seed: "trng"`` **deadlocks under TP/PP.** Tensor- and pipeline- + parallel ranks must see the same shard order, but ``"trng"`` draws an + independent seed per worker. Use ``shard_seed: "randomized"`` whenever + you have model parallelism on top of DDP. + +7. **Missing** ``force_finite: true`` **on validation.** Validation configs + that reuse training infrastructure inherit the infinite-mux behavior; + without ``force_finite: true`` the validation loop never terminates. + +.. _lhotse-config-reference: + +``LhotseDataLoadingConfig`` field reference +------------------------------------------- + +The complete option schema lives in ``LhotseDataLoadingConfig`` +(``nemo/collections/common/data/lhotse/dataloader.py``). It carries ~80 +fields; the categorization below mirrors the source order and groups +options by what they control. + +**Inputs.** ``input_cfg``, ``manifest_filepath``, +``tarred_audio_filepaths``, ``cuts_path``, ``shar_path``, +``skip_missing_manifest_entries``. + +**Sampling — basic.** ``batch_size``, ``batch_duration``, +``quadratic_duration``, ``min_duration``, ``max_duration``, ``min_tps``, +``max_tps``. + +**Sampling — bucketing.** ``use_bucketing``, ``num_buckets``, +``bucket_duration_bins``, ``bucket_batch_size``, ``bucket_buffer_size``, +``num_cuts_for_bins_estimate``, ``concurrent_bucketing``. + +**Sampling — multimodal.** ``use_multimodal_sampling``, ``prompt_format``, +``pretokenize``, ``audio_locator_tag``, ``token_equivalent_duration``, +``batch_tokens``, ``quadratic_factor``, ``min_tokens``, ``max_tokens``, +``min_tpt``, ``max_tpt``, ``measure_total_length``. + +**Sampling — fusion (multi-config).** ``multi_config``, ``sampler_fusion``, +``sampler_weights``. + +**Indexed / resumable.** ``indexed``, ``use_stateful_dataloader``, +``indexes_root``. See :ref:`indexed-resumable-dataloading` and +:ref:`lhotse-indexes-root`. + +**Mixing & weighting.** ``reweight_temperature``, ``max_open_streams``. + +**I/O & distributed.** ``num_workers``, ``pin_memory``, ``shard_seed``, +``seed``, ``shuffle``, ``shuffle_buffer_size``, ``drop_last``, +``force_finite``, ``force_map_dataset``, ``force_iterable_dataset``, +``metadata_only``, ``cuda_expandable_segments``. + +**On-the-fly augmentation.** + +* Speed/RIR — ``perturb_speed``, ``rir_enabled``, ``rir_path``, ``rir_prob``. +* Noise — ``noise_path``, ``noise_snr``, ``noise_mix_prob``. +* Lowpass — ``lowpass_enabled``, ``lowpass_frequencies_interval``, + ``lowpass_prob``. +* Compression — ``compression_enabled``, ``compression_prob``, + ``compression_level_interval``, ``compression_codecs``, + ``compression_codec_weights``, ``compression_enable_for_custom_fields``. +* Clipping — ``clipping_enabled``, ``clipping_gain_db``, + ``clipping_normalize``, ``clipping_oversampling``, ``clipping_prob``, + ``clipping_prob_hard``. +* Concatenation — ``concatenate_samples``, ``concatenate_gap_seconds``, + ``concatenate_duration_factor``, ``concatenate_merge_supervisions``, + ``db_norm``. + +**Cut transforms.** ``truncate_duration``, ``truncate_offset_type``, +``cut_into_windows_duration``, ``cut_into_windows_hop``, +``pad_min_duration``, ``pad_direction``, ``cut_text_into_windows_tokens``, +``keep_excessive_supervisions``. + +**Field-name overrides.** ``text_field``, ``lang_field``, +``channel_selector``, ``sample_rate``. + +**Filtering.** ``max_cer``, ``min_context_speaker_similarity``, ``keep``. + +For exact types and defaults, see the dataclass definition in the source +file — it is the single source of truth. + +See also +-------- + +* :doc:`speechlm2/datasets` — speech-LM-specific data classes, AIStore + GetBatch with indexed mode, and the SpeechLM ``DataModule`` resume + contract. +* :doc:`asr/datasets` — ASR-specific data preparation conventions. +* :doc:`audio/datasets` — audio (codec, enhancement) data flows. +* `Lhotse PyTorch Datasets `_ + — upstream sampler API, ``StatefulDataLoader`` integration, custom RNG + state in batch transforms. +* `Lhotse indexed manifests `_ + — the iterator-graph contract that makes O(1) restore work. diff --git a/docs/source/speechlm2/datasets.rst b/docs/source/speechlm2/datasets.rst index 2006fbc59cc6..505d0c7ce5b1 100644 --- a/docs/source/speechlm2/datasets.rst +++ b/docs/source/speechlm2/datasets.rst @@ -4,6 +4,16 @@ Datasets The speechlm2 collection supports datasets that contain both audio and text data for training models that can understand speech and generate appropriate responses. This section describes the dataset format, preparation, and usage with the speechlm2 models. +.. seealso:: + + :doc:`/dataloaders` is the canonical reference for the underlying Lhotse + dataloader: ``input_cfg`` shape, supported formats, sampling/bucketing + options, indexed manifests + resumable dataloading, and + ``LhotseDataLoadingConfig`` field schema. The page below covers what's + speech-LM-specific on top of that — datamodule resume contract, + AIStore GetBatch, conversation type semantics in the SALM/duplex + recipes. + Dataset Format -------------- @@ -228,6 +238,27 @@ When enabled: Leave the env var unset to keep the original tar-iterating loader. +Combining with ``indexed: true`` +"""""""""""""""""""""""""""""""" + +``USE_AIS_GET_BATCH=true`` coexists with ``indexed: true`` on +``LazyNeMoTarredIterator`` (and on the multimodal-conversation adapters). +Indexed mode keeps the JSONL-driven O(1) global indexing and graph-token +checkpointing, while AIStore GetBatch handles the actual audio fetch: + +* The audio-tar ``.idx`` sidecar is **not** required when GetBatch is enabled + — the iterator skips opening tar files entirely and emits URL-backed cuts + whose ``AudioSource`` points at ``{tar_path}/{audio_filename}`` + (``type="url"`` for ``ais://...`` paths, ``type="file"`` otherwise). +* Manifest JSONLs still need their ``.idx`` sidecars; they drive the indexed + iterator graph and the ``state_dict`` / ``load_state_dict`` round-trip. +* Audio bytes are fetched lazily by ``AudioSamples(use_batch_loader=True)`` at + collation time, which issues one batched GetBatch request per minibatch. + +Use this combination when shards live on AIStore and you want both the +network efficiency of GetBatch and the exact-resume guarantees of the +indexed/stateful pipeline. + DuplexSTTDataset **************** @@ -261,9 +292,43 @@ The DataModule class in the speechlm2 collection manages dataset loading, prepar ) The DataModule takes care of: + 1. Setting up proper data parallel ranks for dataloaders 2. Instantiating the dataloaders with configuration from YAML 3. Managing multiple datasets for validation/testing +4. Persisting the train dataloader's iterator state across checkpoints + (when ``use_stateful_dataloader: true``) + +Checkpointed / resumable training +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The DataModule caches the train dataloader on first ``train_dataloader()`` +call and exposes ``state_dict()`` / ``load_state_dict()`` that delegate to the +cached dataloader when it supports them. Lightning's trainer wires those into +every checkpoint automatically, so an experiment configured with:: + + data: + train_ds: + indexed: true + use_stateful_dataloader: true + ... + +resumes O(1) — sampler RNG, bucketer state, multiplexer choice RNG, +per-source iterator cursors, and per-worker prefetch queues are all restored +exactly without replay. + +With a regular ``DataLoader`` (``use_stateful_dataloader`` unset or +``False``) ``state_dict``/``load_state_dict`` become no-ops and resume falls +back to Lhotse's ``_fast_forward()`` replay path. + +Two constraints to keep in mind across save/restore: + +* ``num_workers`` and ``world_size`` must match between save and restore + (a hard requirement of ``StatefulDataLoader``). +* All data files must be **uncompressed** and accompanied by ``.idx`` + sidecars. Build them in one shot with ``scripts/dataloading/build_indexes.py`` + (see :ref:`indexed-resumable-dataloading` in the main Lhotse dataloading + guide). Bucketing for Efficient Training ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/examples/speechlm2/conf/salm_automodel_pee.yaml b/examples/speechlm2/conf/salm_automodel_pee.yaml index faceb4a9ddb1..7de36512c08a 100644 --- a/examples/speechlm2/conf/salm_automodel_pee.yaml +++ b/examples/speechlm2/conf/salm_automodel_pee.yaml @@ -192,7 +192,7 @@ trainer: # ``activation_checkpointing_llm`` is a single switch that covers both the # non-EP FSDP2 path (via FSDP2Config.activation_checkpointing) and the # EP/MoE parallelizer path. - # ``activation_checkpointing_perception`` wraps each layer in ``perception.encoder.layers`` + # ``activation_checkpointing_perception`` wraps each layer in ``perception.encoder.layers`` # with ``checkpoint_wrapper`` before FSDP2 sharding. activation_checkpointing_llm: false activation_checkpointing_perception: false diff --git a/examples/speechlm2/salm_train.py b/examples/speechlm2/salm_train.py index 02a56e1f2f44..eb6da6286384 100644 --- a/examples/speechlm2/salm_train.py +++ b/examples/speechlm2/salm_train.py @@ -19,6 +19,7 @@ from nemo.collections.speechlm2 import SALM, DataModule, SALMDataset from nemo.core.config import hydra_runner +from nemo.utils.callbacks.training_stats import TrainingStatsCallback from nemo.utils.exp_manager import exp_manager from nemo.utils.trainer_utils import resolve_trainer_cfg @@ -35,6 +36,11 @@ def train(cfg): torch.set_float32_matmul_precision("medium") trainer = Trainer(**resolve_trainer_cfg(cfg.trainer)) log_dir = exp_manager(trainer, cfg.get("exp_manager", None)) + # Insert at position 0 so our ``on_train_batch_end`` runs BEFORE the + # StatelessTimer's hook (which can trigger a checkpoint save mid- + # batch-end). Without this, the saved ``state_dict`` would lag the + # accumulators by one batch on every wall-time-induced save. + trainer.callbacks.insert(0, TrainingStatsCallback()) OmegaConf.save(cfg, log_dir / "exp_config.yaml") model_cls = SALM @@ -51,6 +57,9 @@ def train(cfg): trainer.fit(model, datamodule) + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + if __name__ == "__main__": train() diff --git a/nemo/collections/asr/data/audio_to_text_lhotse.py b/nemo/collections/asr/data/audio_to_text_lhotse.py index 46c301be0822..ec9b92045bb5 100644 --- a/nemo/collections/asr/data/audio_to_text_lhotse.py +++ b/nemo/collections/asr/data/audio_to_text_lhotse.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os from typing import Dict, Optional, Tuple @@ -53,20 +54,12 @@ def __init__(self, tokenizer: TokenizerSpec, return_cuts: bool = False): super().__init__() self.tokenizer = TokenizerWrapper(tokenizer) self.use_ais_get_batch = os.environ.get("USE_AIS_GET_BATCH", "False").lower() == "true" + self.ais_force_individual = os.environ.get("USE_AIS_INDIVIDUAL_GETS", "False").lower() == "true" - # Try to use use_batch_loader if available (Lhotse >= 1.32.0) - try: - self.load_audio = AudioSamples(fault_tolerant=True, use_batch_loader=self.use_ais_get_batch) - except TypeError: - # Lhotse < 1.32.0 doesn't support use_batch_loader - if self.use_ais_get_batch: - import logging - - logging.warning( - "AIS batch loading requested but not supported by this Lhotse version. " - "Please upgrade to Lhotse >= 1.32.0" - ) - self.load_audio = AudioSamples(fault_tolerant=True) + self.load_audio = _make_audio_samples( + use_batch_loader=self.use_ais_get_batch, + ais_force_individual=self.ais_force_individual, + ) self.return_cuts = return_cuts @@ -87,3 +80,30 @@ def __getitem__(self, cuts) -> Tuple[torch.Tensor, ...]: if self.return_cuts: return audio, audio_lens, tokens, token_lens, cuts.drop_in_memory_data() return audio, audio_lens, tokens, token_lens + + +def _make_audio_samples(use_batch_loader: bool, ais_force_individual: bool) -> AudioSamples: + kwargs = { + "fault_tolerant": True, + "use_batch_loader": use_batch_loader, + "ais_force_individual": ais_force_individual, + } + try: + return AudioSamples(**kwargs) + except TypeError as exc: + if "ais_force_individual" in str(exc): + kwargs.pop("ais_force_individual") + try: + return AudioSamples(**kwargs) + except TypeError as retry_exc: + exc = retry_exc + + if "use_batch_loader" not in str(exc): + raise + + if use_batch_loader: + logging.warning( + "AIS batch loading requested but not supported by this Lhotse version. " + "Please upgrade to Lhotse >= 1.32.0" + ) + return AudioSamples(fault_tolerant=True) diff --git a/nemo/collections/asr/data/audio_to_text_lhotse_prompted.py b/nemo/collections/asr/data/audio_to_text_lhotse_prompted.py index a3510a78836a..a4689fd177b4 100644 --- a/nemo/collections/asr/data/audio_to_text_lhotse_prompted.py +++ b/nemo/collections/asr/data/audio_to_text_lhotse_prompted.py @@ -18,9 +18,9 @@ import torch.utils.data from lhotse import CutSet from lhotse.cut import MixedCut -from lhotse.dataset import AudioSamples from lhotse.dataset.collation import collate_vectors +from nemo.collections.asr.data.audio_to_text_lhotse import _make_audio_samples from nemo.collections.common.data import apply_prompt_format_fn from nemo.collections.common.prompts import PromptFormatter from nemo.collections.common.tokenizers import TokenizerSpec @@ -83,20 +83,12 @@ def __init__( super().__init__() self.tokenizer = tokenizer self.use_ais_get_batch = os.environ.get("USE_AIS_GET_BATCH", "False").lower() == "true" + self.ais_force_individual = os.environ.get("USE_AIS_INDIVIDUAL_GETS", "False").lower() == "true" - # Try to use use_batch_loader if available (Lhotse >= 1.32.0) - try: - self.load_audio = AudioSamples(fault_tolerant=True, use_batch_loader=self.use_ais_get_batch) - except TypeError: - # Lhotse < 1.32.0 doesn't support use_batch_loader - if self.use_ais_get_batch: - import logging - - logging.warning( - "AIS batch loading requested but not supported by this Lhotse version. " - "Please upgrade to Lhotse >= 1.32.0" - ) - self.load_audio = AudioSamples(fault_tolerant=True) + self.load_audio = _make_audio_samples( + use_batch_loader=self.use_ais_get_batch, + ais_force_individual=self.ais_force_individual, + ) self.padding_value = self.tokenizer.pad_id self.prompt = prompt diff --git a/nemo/collections/common/callbacks/ema.py b/nemo/collections/common/callbacks/ema.py index dee125be54ef..7dbf08267ef5 100644 --- a/nemo/collections/common/callbacks/ema.py +++ b/nemo/collections/common/callbacks/ema.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=C0116 import contextlib import copy import os @@ -135,7 +136,7 @@ def on_load_checkpoint( return ema_path = ckpt_path.replace(ext, f'-EMA{ext}') if os.path.exists(ema_path): - ema_state_dict = torch.load(ema_path, map_location=torch.device('cpu')) + ema_state_dict = torch.load(ema_path, map_location=torch.device('cpu'), weights_only=False) checkpoint['optimizer_states'] = ema_state_dict['optimizer_states'] del ema_state_dict diff --git a/nemo/collections/common/data/lhotse/_compat.py b/nemo/collections/common/data/lhotse/_compat.py new file mode 100644 index 000000000000..12f5c3d78360 --- /dev/null +++ b/nemo/collections/common/data/lhotse/_compat.py @@ -0,0 +1,195 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=unused-import +"""Compatibility shims for optional Lhotse indexed/resumable dataloading APIs. + +This module lets NeMo import with released Lhotse versions that do not expose +those APIs yet, while delegating to the real implementations when a resumable +Lhotse checkout is available. +""" +import os +from collections.abc import Generator, Iterable +from typing import Any + +import torch +from torch import distributed as dist + +__all__ = [ + "GraphOriginDict", + "IteratorNode", + "LazyIndexedManifestIterator", + "PartitionedIndexedIterator", + "attach_graph_origin", + "normalize_graph_token", +] + +try: + from lhotse.dataset import dataloading as _lhotse_dataloading + + PartitionedIndexedIterator = _lhotse_dataloading.PartitionedIndexedIterator +except (ImportError, AttributeError): + LHOTSE_USE_WORKER_PARTITION = "LHOTSE_USE_WORKER_PARTITION" + + def _get_world_size() -> int: + if "WORLD_SIZE" in os.environ: + return int(os.environ["WORLD_SIZE"]) + if dist.is_available() and dist.is_initialized(): + return dist.get_world_size() + return 1 + + def _get_rank() -> int: + if "RANK" in os.environ: + return int(os.environ["RANK"]) + if dist.is_available() and dist.is_initialized(): + return dist.get_rank() + return 0 + + def _get_worker_partition() -> tuple[int, int]: + if os.environ.get(LHOTSE_USE_WORKER_PARTITION) != "1": + return 0, 1 + rank = _get_rank() + world_size = _get_world_size() + worker_info = torch.utils.data.get_worker_info() + if worker_info is None: + worker_id, num_workers = 0, 1 + else: + worker_id = worker_info.id + num_workers = max(worker_info.num_workers, 1) + return rank * num_workers + worker_id, world_size * num_workers + + class PartitionedIndexedIterator: + def __init__(self, shuffle: bool = False, seed: int = 0) -> None: + self._shuffle = shuffle + self._seed = seed + self._position = 0 + self._shard_id: int | None = None + self._num_shards: int | None = None + self._restored = False + self._range = None + self._pending_range_state = None + + @property + def position(self) -> int: + return self._position + + def iterate(self, total_len: int) -> Generator[int, None, None]: + shard_id, num_shards = _get_worker_partition() + + if self._restored: + self._restored = False + if self._num_shards is not None and (self._shard_id != shard_id or self._num_shards != num_shards): + raise ValueError( + f"PartitionedIndexedIterator topology mismatch on resume: " + f"saved (shard_id={self._shard_id}, num_shards={self._num_shards}), " + f"current (shard_id={shard_id}, num_shards={num_shards})." + ) + start = self._position + else: + start = 0 + self._position = 0 + + self._shard_id = shard_id + self._num_shards = num_shards + + if self._shuffle: + from lhotse.indexing import LazyShuffledRange + + self._range = LazyShuffledRange(total_len, seed=self._seed, shard_id=shard_id, num_shards=num_shards) + if self._pending_range_state is not None: + self._range.load_state_dict(self._pending_range_state) + self._pending_range_state = None + shard_len = len(self._range) + else: + self._range = None + shard_len = (total_len - shard_id + num_shards - 1) // num_shards if total_len > shard_id else 0 + + for i in range(start, shard_len): + self._position = i + 1 + yield self._range[i] if self._range is not None else shard_id + i * num_shards + + def state_dict(self) -> dict: + sd = { + "position": self._position, + "shard_id": self._shard_id, + "num_shards": self._num_shards, + } + if self._range is not None: + sd["range"] = self._range.state_dict() + elif self._pending_range_state is not None: + sd["range"] = self._pending_range_state + return sd + + def load_state_dict(self, sd: dict) -> None: + self._position = sd.get("position", 0) + self._shard_id = sd.get("shard_id") + self._num_shards = sd.get("num_shards") + if self._shuffle: + self._pending_range_state = sd.get("range") + self._range = None + self._restored = True + + +try: + from lhotse import lazy as _lhotse_lazy + + GraphOriginDict = _lhotse_lazy.GraphOriginDict + IteratorNode = _lhotse_lazy.IteratorNode + LazyIndexedManifestIterator = _lhotse_lazy.LazyIndexedManifestIterator + attach_graph_origin = _lhotse_lazy.attach_graph_origin + normalize_graph_token = _lhotse_lazy.normalize_graph_token +except (ImportError, AttributeError): + + class IteratorNode(Iterable): + is_checkpointable = False + is_indexed = False + has_constant_time_access = False + + def state_dict(self) -> dict: + raise NotImplementedError(f"{type(self).__name__} is not checkpointable.") + + def load_state_dict(self, sd: dict) -> None: + raise NotImplementedError(f"{type(self).__name__} is not checkpointable.") + + def iter_children(self): + if hasattr(self, "source"): + yield getattr(self, "source") + if hasattr(self, "sources"): + yield from getattr(self, "sources") + + class GraphOriginDict(dict): + __slots__ = ("_graph_origin",) + + def normalize_graph_token(token: Any) -> Any: + if isinstance(token, list): + return tuple(normalize_graph_token(part) for part in token) + if isinstance(token, tuple): + return tuple(normalize_graph_token(part) for part in token) + return token + + def attach_graph_origin(item: Any, token: Any) -> Any: + try: + object.__setattr__(item, "_graph_origin", token) + except Exception: + try: + setattr(item, "_graph_origin", token) + except Exception: + # Immutable extension objects may not accept ad-hoc metadata. + return item + return item + + class LazyIndexedManifestIterator(IteratorNode): + def __init__(self, *args, **kwargs) -> None: + raise ImportError( + "LazyIndexedManifestIterator requires a Lhotse version with indexed/resumable dataloading support." + ) diff --git a/nemo/collections/common/data/lhotse/cutset.py b/nemo/collections/common/data/lhotse/cutset.py index 8e613091b7e1..921dad55ea6e 100644 --- a/nemo/collections/common/data/lhotse/cutset.py +++ b/nemo/collections/common/data/lhotse/cutset.py @@ -51,6 +51,7 @@ NeMoMultimodalConversationShareGPTJsonlAdapter, NeMoMultimodalConversationShareGPTWebdatasetAdapter, NeMoSFTJsonlAdapter, + NemotronTextConversationAdapter, TextTurn, ) from nemo.collections.common.parts.preprocessing.manifest import get_full_path @@ -285,6 +286,8 @@ def read_dataset_config(config) -> tuple[CutSet, bool]: "force_map_dataset": config.get("force_map_dataset", False), "force_iterable_dataset": config.get("force_iterable_dataset", False), "slice_length": config.get("slice_length", None), + "indexed": config.get("indexed", False), + "indexes_root": config.get("indexes_root", None), # Temperature for re-weighting datasets. 1 is a neutral value. Lower temperature over-samples smaller datasets, and vice versa. "reweight_temperature": config.get("reweight_temperature", None), } @@ -348,6 +351,8 @@ def read_txt_jsonl_paths(config: DictConfig) -> tuple[CutSet, bool]: text_field=config.text_field, shuffle_shards=config.shuffle, shard_seed=config.shard_seed, + indexed=config.get("indexed", False), + indexes_root=config.get("indexes_root", None), ) ) if not config.get("force_finite", False): @@ -384,6 +389,25 @@ def read_nemo_sft_jsonl(config: DictConfig) -> tuple[CutSet, bool]: language=config.get("language"), shuffle_shards=config.shuffle, shard_seed=config.shard_seed, + indexed=config.get("indexed", False), + indexes_root=config.get("indexes_root", None), + ) + ) + if not config.get("force_finite", False): + cuts = cuts.repeat(preserve_id=True) + return cuts, True + + +@data_type_parser("nemotron_text_converation") +def read_nemotron_text_converation(config: DictConfig) -> tuple[CutSet, bool]: + """Read Nemotron/Energon text-only conversation JSONL files or tar directories.""" + cuts = CutSet( + NemotronTextConversationAdapter( + paths=config.paths, + shuffle_shards=config.shuffle, + shard_seed=config.shard_seed, + indexed=config.get("indexed", False), + indexes_root=config.get("indexes_root", None), ) ) if not config.get("force_finite", False): @@ -405,6 +429,8 @@ def read_multimodal_conversation_jsonl(config: DictConfig) -> tuple[CutSet, bool system_prompt=config.get("tags", {}).get("system_prompt"), context=config.get("tags", {}).get("context"), slice_length=config.get("slice_length"), + indexed=config.get("indexed", False), + indexes_root=config.get("indexes_root", None), ) ) if not config.get("force_finite", False): @@ -426,6 +452,9 @@ def read_share_gpt_as_conversation(config) -> tuple[CutSet, bool]: shuffle_shards=config.shuffle, shard_seed=config.shard_seed, slice_length=config.get("slice_length"), + indexed=config.get("indexed", False), + indexes_root=config.get("indexes_root", None), + skip_missing_manifest_entries=config.get("skip_missing_manifest_entries", False), ) ) if not config.get("force_finite", False): @@ -444,6 +473,8 @@ def read_share_gpt_webdataset_as_conversation(config) -> tuple[CutSet, bool]: token_equivalent_duration=config.get("token_equivalent_duration"), shuffle_shards=config.shuffle, shard_seed=config.shard_seed, + indexed=config.get("indexed", False), + indexes_root=config.get("indexes_root", None), ) ) # When force_finite is False (default), repeat the dataset infinitely so that @@ -640,6 +671,8 @@ def read_lhotse_manifest(config) -> tuple[CutSet, bool]: shard_seed = config.get("shard_seed", "trng") metadata_only = config.get("metadata_only", False) force_finite = config.get("force_finite", False) + indexed = config.get("indexed", False) + indexes_root = config.get("indexes_root", None) if config.get("cuts_path") is not None: warnings.warn("Note: lhotse.cuts_path will be ignored because lhotse.shar_path was provided.") if isinstance(config.shar_path, (str, Path)): @@ -648,6 +681,8 @@ def read_lhotse_manifest(config) -> tuple[CutSet, bool]: shuffle_shards=True, seed=shard_seed, slice_length=config.get("slice_length", None), + indexed=indexed, + indexes_root=indexes_root, ) if not metadata_only and not force_finite: cuts = cuts.repeat(preserve_id=True) @@ -664,6 +699,8 @@ def read_lhotse_manifest(config) -> tuple[CutSet, bool]: shuffle_shards=True, seed=shard_seed, slice_length=config.get("slice_length", None), + indexed=indexed, + indexes_root=indexes_root, ) weight = len(cs) else: @@ -679,6 +716,8 @@ def read_lhotse_manifest(config) -> tuple[CutSet, bool]: shuffle_shards=True, seed=shard_seed, slice_length=config.get("slice_length", None), + indexed=indexed, + indexes_root=indexes_root, ) cutsets.append(cs) weights.append(weight) @@ -703,6 +742,8 @@ def read_lhotse_manifest(config) -> tuple[CutSet, bool]: shuffle_shards=True, seed=shard_seed, slice_length=config.get("slice_length", None), + indexed=indexed, + indexes_root=indexes_root, ) if not metadata_only and not force_finite: cuts = cuts.repeat(preserve_id=True) @@ -715,7 +756,13 @@ def read_lhotse_manifest(config) -> tuple[CutSet, bool]: else: # Regular Lhotse manifest points to individual audio files (like native NeMo manifest). path = config.cuts_path - cuts = CutSet.from_file(path).map(partial(resolve_relative_paths, manifest_path=path)) + from lhotse.indexing import index_file_path + + indexes_root = config.get("indexes_root", None) + from_file_kwargs = {"indexed": config.get("indexed", None)} + if indexes_root is not None: + from_file_kwargs["index_path"] = index_file_path(path, indexes_root) + cuts = CutSet.from_file(path, **from_file_kwargs).map(partial(resolve_relative_paths, manifest_path=path)) return cuts, is_tarred @@ -749,6 +796,7 @@ def read_parquet_manifest(config: DictConfig) -> tuple[CutSet, bool]: # Extract shuffling options (CRITICAL for distributed training) shuffle_shards = config.get("shuffle", False) shard_seed = config.get("shard_seed", "trng") + indexed = config.get("indexed", False) # 3. Create Iterators for each file iterators = [] @@ -761,6 +809,7 @@ def read_parquet_manifest(config: DictConfig) -> tuple[CutSet, bool]: duration_field=duration_field, lang_field=lang_field, sampling_rate=sampling_rate, + indexed=indexed, ) iterators.append(adapter) @@ -1459,6 +1508,10 @@ def read_nemo_manifest(config) -> tuple[CutSet, bool]: common_kwargs["shuffle_shards"] = config[key] else: common_kwargs[key] = config[key] + indexed = config.get("indexed", False) + indexes_root = config.get("indexes_root", None) + indexed_extra = {"indexes_root": indexes_root} if (indexed and indexes_root is not None) else {} + notar_kwargs_extra = {"indexed": indexed, **indexed_extra} if indexed else {} # The option below is to allow a special case of NeMo manifest iteration as Lhotse CutSet # without performing any I/O. NeMo manifests typically don't have sampling_rate information required by Lhotse, # so lhotse has to look up the headers of audio files to fill it on-the-fly. @@ -1467,7 +1520,11 @@ def read_nemo_manifest(config) -> tuple[CutSet, bool]: # and other data statistics. metadata_only = config.get("metadata_only", False) force_finite = config.get("force_finite", False) - notar_kwargs = {"metadata_only": metadata_only} + notar_kwargs = { + "metadata_only": metadata_only, + "skip_missing_manifest_entries": config.get("skip_missing_manifest_entries", False), + } + tar_kwargs_extra = {"indexed": indexed, **indexed_extra} if indexed else {} is_tarred = config.get("tarred_audio_filepaths") is not None if isinstance(config.manifest_filepath, (str, Path)): if is_tarred and not metadata_only: @@ -1477,13 +1534,16 @@ def read_nemo_manifest(config) -> tuple[CutSet, bool]: tar_paths=config.tarred_audio_filepaths, skip_missing_manifest_entries=config.get("skip_missing_manifest_entries", False), slice_length=config.get("slice_length", None), + **tar_kwargs_extra, **common_kwargs, ) ) if not force_finite: cuts = cuts.repeat(preserve_id=True) else: - cuts = CutSet(LazyNeMoIterator(config.manifest_filepath, **notar_kwargs, **common_kwargs)) + cuts = CutSet( + LazyNeMoIterator(config.manifest_filepath, **notar_kwargs, **notar_kwargs_extra, **common_kwargs) + ) else: # Format option 1: # Assume it's [[path1], [path2], ...] (same for tarred_audio_filepaths). @@ -1517,10 +1577,11 @@ def read_nemo_manifest(config) -> tuple[CutSet, bool]: tar_paths=tar_path, skip_missing_manifest_entries=config.get("skip_missing_manifest_entries", False), slice_length=config.get("slice_length", None), + **tar_kwargs_extra, **common_kwargs, ) else: - nemo_iter = LazyNeMoIterator(manifest_path, **notar_kwargs, **common_kwargs) + nemo_iter = LazyNeMoIterator(manifest_path, **notar_kwargs, **notar_kwargs_extra, **common_kwargs) # Then, determine the weight or use one provided if isinstance(manifest_info, str) or len(manifest_info) == 1: weight = len(nemo_iter) diff --git a/nemo/collections/common/data/lhotse/dataloader.py b/nemo/collections/common/data/lhotse/dataloader.py index 5af5f5d004d7..fc7b82439356 100644 --- a/nemo/collections/common/data/lhotse/dataloader.py +++ b/nemo/collections/common/data/lhotse/dataloader.py @@ -40,7 +40,7 @@ from lhotse.dataset.dataloading import resolve_seed from lhotse.dataset.sampling.base import CutSampler, SamplingConstraint, TimeConstraint from lhotse.lazy import LazyFlattener -from lhotse.utils import fastcopy, fix_random_seed +from lhotse.utils import fix_random_seed from omegaconf import DictConfig, OmegaConf from nemo.collections.common.data.lhotse.cutset import ( @@ -254,6 +254,22 @@ class LhotseDataLoadingConfig: # The first K examples will actually be read and then discarded, incurring the IO cost, due to # our support of object stores and gzipped files that generally don't have indexes of byte offsets per line. slice_length: Optional[int] = None + # Forwarded to ``CutSet.from_file(path, indexed=...)`` for plain JSONL ``cuts_path`` inputs. + # ``None`` = lhotse auto-detect (uses .idx if present, falls back to streaming). + # ``True`` = require indexed reads (errors if .idx is missing). + # ``False`` = streaming reads only. + indexed: Optional[bool] = None + # When set, ``.idx`` sidecars are read from a mirror under this root that + # preserves the data files' directory structure (URL schemes are stripped, + # leading separators dropped). Use this to keep indexes on a fast local + # disk while the data lives on shared / object storage. Cascades through + # ``read_dataset_config`` to every nested ``input_cfg`` entry. + indexes_root: Optional[str] = None + # When True, build the dataloader with ``torchdata.stateful_dataloader.StatefulDataLoader`` + # instead of ``torch.utils.data.DataLoader``. Combined with a checkpointable lhotse sampler + # (DynamicBucketingSampler / DynamicCutSampler), this enables exact resume from the next batch + # within the current epoch via the standard PyTorch state_dict / load_state_dict protocol. + use_stateful_dataloader: bool = False def determine_use_iterable_dataset(use_iterable_dataset: bool, config: DictConfig) -> bool: @@ -265,12 +281,184 @@ def determine_use_iterable_dataset(use_iterable_dataset: bool, config: DictConfi return use_iterable_dataset +def _build_dataloader( + use_stateful_dataloader: bool, + *, + dp_rank: Optional[int] = None, + dp_world_size: Optional[int] = None, + dp_group: Optional[Any] = None, + **kwargs, +) -> torch.utils.data.DataLoader: + """ + Construct a DataLoader, optionally using ``torchdata.stateful_dataloader.StatefulDataLoader`` + so that resume picks up at the exact next batch via ``state_dict()`` / ``load_state_dict()``. + + When ``dp_rank`` / ``dp_world_size`` are provided AND we're building a + stateful loader under multi-rank training, wrap ``StatefulDataLoader`` in + :class:`_PerRankStatefulDataLoader`. The wrapper all-gathers each rank's + local state at save time and scatters back the right entry at load time, + so Lightning's automatic ``FitLoop`` save-and-restore of + ``CombinedLoader._state_dicts()`` doesn't broadcast rank-0's iterator + state to every rank (which would corrupt per-shard partitioning — see + the 2026-05-14 post-mortem). + """ + if use_stateful_dataloader: + from torchdata.stateful_dataloader import StatefulDataLoader + + if dp_world_size is not None and dp_world_size > 1: + return _PerRankStatefulDataLoader( + dp_rank=dp_rank if dp_rank is not None else 0, + dp_world_size=dp_world_size, + dp_group=dp_group, + **kwargs, + ) + return StatefulDataLoader(**kwargs) + return torch.utils.data.DataLoader(**kwargs) + + +class _PerRankStatefulDataLoader: + """``StatefulDataLoader`` whose ``state_dict`` is a per-rank list. + + Why this exists: Lightning's ``FitLoop`` saves dataloader state via + ``CombinedLoader._state_dicts()`` → ``loader.state_dict()`` (collective + across ranks but only rank 0's return value is persisted to meta.pt), + then on resume calls ``loader.load_state_dict(state)`` on EVERY rank with + that single rank-0-only state. Per-shard partitioning (``shard_id = + dp_rank * num_workers + worker_id`` inside lhotse's + ``PartitionedIndexedIterator``) then desynchronises — rank 28 worker 0 + loads rank 0 worker 0's ``shard_id=0`` while its own current shard_id is + 112, the iterator's first ``iterate()`` call raises ValueError, and the + rest of the ranks get SIGTERMed via ``srun --kill-on-bad-exit=1``. (See + ``agent-debug-workspace/0909-en-only-id2-4node-postfix/DIAGNOSIS_ORD_vs_IAD.md``.) + + The fix turns ``state_dict()`` into a per-rank gather and + ``load_state_dict(state)`` into a per-rank scatter. The serialised payload + on disk becomes a list of N tagged state dicts (one per DP rank); on + every rank, the wrapper picks ``per_rank[self._dp_rank]``. This works + whether the call comes from Lightning's automatic FitLoop path OR from + our DataModule.load_state_dict override, because both go through this + one method. + + We delegate to a contained ``StatefulDataLoader`` rather than subclass + it: subclassing would inherit ``_Stateful`` via the runtime-checkable + Protocol AND every attribute Lightning's iterator-management code + introspects (``flattened``, ``persistent_workers``, etc.), which is what + we want; but it would also inherit ``__init__`` whose signature includes + parameters we don't want at this layer. Composition keeps the wrapper's + constructor clean and lets us forward attribute lookups via + ``__getattr__``. + """ + + def __init__( + self, + *, + dp_rank: int, + dp_world_size: int, + dp_group: Optional[Any] = None, + **kwargs, + ) -> None: + from torchdata.stateful_dataloader import StatefulDataLoader + + self._dp_rank = int(dp_rank) + self._dp_world_size = int(dp_world_size) + self._dp_group = dp_group + self._inner = StatefulDataLoader(**kwargs) + + def state_dict(self) -> dict: + local_state = self._inner.state_dict() + tagged = { + "dp_rank": self._dp_rank, + "dp_world_size": self._dp_world_size, + "state": local_state, + } + if self._dp_world_size <= 1 or not (torch.distributed.is_available() and torch.distributed.is_initialized()): + per_rank = [tagged] + else: + per_rank: List[Optional[dict]] = [None] * self._dp_world_size + torch.distributed.all_gather_object(per_rank, tagged, group=self._dp_group) + return {"train_dataloader_per_rank": per_rank} + + def load_state_dict(self, state_dict: dict) -> None: + if not state_dict: + return + # We exclusively support the per-rank wire format produced by our + # own ``state_dict()``. Anything else — a bare inner state, a + # rank-0-only StatefulDataLoader payload (the shape Lightning's + # FitLoop used to broadcast and silently corrupt resume), an old + # DataModule key — must fail loudly so any partial-rollforward or + # checkpoint-format mismatch is caught at load time rather than + # producing wrong data several minutes into training. + if "train_dataloader_per_rank" not in state_dict: + raise RuntimeError( + "PerRankStatefulDataLoader.load_state_dict: state must use " + "the per-rank wire format (top-level key " + "'train_dataloader_per_rank'); got keys " + f"{sorted(state_dict.keys())}. This dataloader only supports " + "states produced by its own state_dict()." + ) + per_rank = state_dict["train_dataloader_per_rank"] + if not isinstance(per_rank, list) or len(per_rank) != self._dp_world_size: + raise RuntimeError( + f"PerRankStatefulDataLoader: state has dp_world_size=" + f"{len(per_rank) if isinstance(per_rank, list) else 'unknown'} " + f"but the current run has dp_world_size={self._dp_world_size}." + ) + entry = per_rank[self._dp_rank] + if ( + not isinstance(entry, dict) + or "state" not in entry + or "dp_rank" not in entry + or "dp_world_size" not in entry + ): + raise RuntimeError( + f"PerRankStatefulDataLoader: malformed per-rank entry at index " + f"{self._dp_rank}: expected keys {{'dp_rank', 'dp_world_size', " + f"'state'}}, got {list(entry.keys()) if isinstance(entry, dict) else type(entry).__name__}." + ) + saved_rank, saved_world = entry["dp_rank"], entry["dp_world_size"] + if saved_rank != self._dp_rank or saved_world != self._dp_world_size: + raise RuntimeError( + f"PerRankStatefulDataLoader: state tagged (dp_rank={saved_rank}, " + f"dp_world_size={saved_world}) loaded on (dp_rank={self._dp_rank}, " + f"dp_world_size={self._dp_world_size})." + ) + self._inner.load_state_dict(entry["state"]) + + # Forward everything else to the inner StatefulDataLoader so Lightning's + # iterator-management, ``flattened``-discovery and friends keep working. + def __getattr__(self, name: str) -> Any: + # ``__getattr__`` only fires when normal attribute lookup fails, so the + # explicit attributes (``_inner``, ``_dp_rank``, ...) are reached + # directly without bouncing through here. + return getattr(self._inner, name) + + def __iter__(self): + return iter(self._inner) + + def __len__(self): + return len(self._inner) + + +def _maybe_init_main_process_for_iterable(num_workers: int, global_rank: int, world_size: int, seed: int) -> None: + """When ``num_workers == 0`` the iterable-path sampler runs in the main training + process; PyTorch's DataLoader never invokes ``worker_init_fn`` in that case. + Call it eagerly so env vars (``RANK``/``WORLD_SIZE``/``LHOTSE_PROCESS_SEED``) and + the per-process random seed are set before any iterator is consumed — required so + ``get_worker_partition`` returns the correct DP-rank shard inside lhotse's lazy + indexed iterators (e.g. ``LazyShuffledRange``).""" + if num_workers == 0: + from lhotse.dataset.dataloading import worker_init_fn + + worker_init_fn(0, rank=global_rank, world_size=world_size, seed=seed) + + def get_lhotse_dataloader_from_config( config: Union[dict, DictConfig], global_rank: int, world_size: int, dataset: torch.utils.data.Dataset, tokenizer=None, + dp_group: Optional[Any] = None, ) -> torch.utils.data.DataLoader: """ Set up a Lhotse training dataloader. @@ -304,10 +492,16 @@ def get_lhotse_dataloader_from_config( world_size=world_size, dataset=dataset, tokenizer=tokenizer, + dp_group=dp_group, ) else: return get_lhotse_dataloader_from_single_config( - config=config, global_rank=global_rank, world_size=world_size, dataset=dataset, tokenizer=tokenizer + config=config, + global_rank=global_rank, + world_size=world_size, + dataset=dataset, + tokenizer=tokenizer, + dp_group=dp_group, ) @@ -317,6 +511,7 @@ def get_lhotse_dataloader_from_single_config( world_size: int, dataset: torch.utils.data.Dataset, tokenizer=None, + dp_group: Optional[Any] = None, ) -> torch.utils.data.DataLoader: """ Set up a Lhotse training dataloader. @@ -359,6 +554,7 @@ def get_lhotse_dataloader_from_single_config( # We use lhotse's own worker_init_fn which leverages information such as rank, world_size, # worker_id, etc. to set a different random seed for each (node, worker) combination. # This together with infinite datasets removes the need to split data across nodes/workers. + _maybe_init_main_process_for_iterable(config.num_workers, global_rank, world_size, config.seed) dloader_kwargs = dict( dataset=IterableDatasetWrapper(dataset=dataset, sampler=sampler), worker_init_fn=make_worker_init_fn(rank=global_rank, world_size=world_size, seed=config.seed), @@ -369,7 +565,11 @@ def get_lhotse_dataloader_from_single_config( # reads only light-weight JSON objects; it samples mini-batches and passes # the meta-data to Dataset, which performs the actual I/O inside its __getitem__ method. dloader_kwargs = dict(dataset=dataset, sampler=sampler) - dloader = torch.utils.data.DataLoader( + dloader = _build_dataloader( + use_stateful_dataloader=config.use_stateful_dataloader, + dp_rank=global_rank, + dp_world_size=world_size, + dp_group=dp_group, **dloader_kwargs, batch_size=None, num_workers=config.num_workers, @@ -385,6 +585,7 @@ def get_lhotse_dataloader_from_multi_config( world_size: int, dataset: torch.utils.data.Dataset, tokenizer=None, + dp_group: Optional[Any] = None, ) -> torch.utils.data.DataLoader: """ Set up a Lhotse training dataloder. @@ -420,6 +621,13 @@ def gather_shared_opts(): "multi_config", "metadata_only", "force_finite", + "use_stateful_dataloader", + # Indexed dataloading flags must propagate too — otherwise a + # top-level ``indexed: true`` / ``indexes_root: /tmp/idx`` on the + # train_ds namespace silently fails to reach sub-configs, and the + # underlying readers fall back to streaming. + "indexed", + "indexes_root", ] defaults = OmegaConf.structured(LhotseDataLoadingConfig) top_level_config["seed"] = resolve_seed(top_level_config["seed"]) @@ -483,6 +691,7 @@ def gather_shared_opts(): # We use lhotse's own worker_init_fn which leverages information such as rank, world_size, # worker_id, etc. to set a different random seed for each (node, worker) combination. # This together with infinite datasets removes the need to split data across nodes/workers. + _maybe_init_main_process_for_iterable(shared_opts.num_workers, global_rank, world_size, shared_opts.seed) dloader_kwargs = dict( dataset=IterableDatasetWrapper(dataset=dataset, sampler=sampler), worker_init_fn=make_worker_init_fn(rank=global_rank, world_size=world_size, seed=shared_opts.seed), @@ -493,7 +702,11 @@ def gather_shared_opts(): # reads only light-weight JSON objects; it samples mini-batches and passes # the meta-data to Dataset, which performs the actual I/O inside its __getitem__ method. dloader_kwargs = dict(dataset=dataset, sampler=sampler) - dloader = torch.utils.data.DataLoader( + dloader = _build_dataloader( + use_stateful_dataloader=shared_opts.use_stateful_dataloader, + dp_rank=global_rank, + dp_world_size=world_size, + dp_group=dp_group, **dloader_kwargs, batch_size=None, num_workers=shared_opts.num_workers, @@ -509,6 +722,38 @@ def get_lhotse_sampler_from_config(config, global_rank, world_size, tokenizer=No cuts, use_iterable_dataset = read_cutset_from_config(config) use_iterable_dataset = determine_use_iterable_dataset(use_iterable_dataset, config) + # Map-style + StatefulDataLoader requires shard_seed to be a fixed integer: + # * On the map path, cross-rank de-duplication is by ``rank/world_size`` + # index slicing (passed below to DynamicBucketingSampler/DynamicCutSampler), + # NOT by per-rank seed differentiation. ``shard_seed="randomized"`` is + # iterable-path machinery that injects worker-PID-derived seeding; + # across resume boundaries the new process has a different PID, so the + # freshly-initialised sampler RNG diverges from the saved snapshot. + # ``StatefulDataLoader.load_state_dict`` overrides that init RNG state + # in practice, but it's a footgun: any RNG draw before the first + # ``__iter__`` (e.g. shuffle of shards in the parent process) is lost. + # If the user sets ``shard_seed="randomized"`` AND ``force_map_dataset=True`` + # AND ``use_stateful_dataloader=True``, warn loudly and auto-overwrite with + # the fixed ``seed`` integer so resume semantics stay clean. + if ( + getattr(config, "force_map_dataset", False) + and getattr(config, "use_stateful_dataloader", False) + and isinstance(config.get("shard_seed"), str) + and str(config.shard_seed).lower() == "randomized" + ): + fixed_seed = int(config.seed) + logging.warning( + "shard_seed=%r is incompatible with force_map_dataset=True + " + "use_stateful_dataloader=True (the map path doesn't need per-rank " + "seed differentiation; cross-rank de-dup is by index slicing). " + "Auto-overriding shard_seed -> %d (the value of `seed`) for " + "deterministic StatefulDataLoader resume. Pin shard_seed to an " + "integer in your YAML to silence this warning.", + config.shard_seed, + fixed_seed, + ) + config.shard_seed = fixed_seed + _auto_detect_bucketing_and_validate_batch_size(config) # Apply channel selector @@ -519,9 +764,6 @@ def get_lhotse_sampler_from_config(config, global_rank, world_size, tokenizer=No # Resample as a safeguard; it's a no-op when SR is already OK cuts = cuts.map(partial(resample, sampling_rate=config.sample_rate), apply_fn=None) - # Expands cuts if multiple translations are provided. - cuts = CutSet(LazyFlattener(cuts.map(_flatten_alt_text, apply_fn=None))) - if config.use_multimodal_sampling: assert tokenizer is not None, ( "You must pass a tokenizer to `get_lhotse_dataloader_from_config` in order to" @@ -938,22 +1180,6 @@ def _merge_supervisions(cuts: CutSet) -> CutSet: return cuts.merge_supervisions() -def _flatten_alt_text(cut) -> list: - ans = [cut] - if not isinstance(cut, Cut) or cut.custom is None or cut.custom.get("alt_text") is None: - return ans - cut = cut.move_to_memory(audio_format="wav") # performs I/O once and holds audio in memory from now on - # Popping to ease eyesight on debug. - paired_text = cut.custom.pop("alt_text") - for data in paired_text.values(): - # Copy to avoid lazy dataloading issues - data = data.copy() - text_instance = cut.map_supervisions(lambda s: fastcopy(s, text=data["text"], language=data["lang"])) - text_instance.custom = {"text": data.pop("text"), "lang": data.pop("lang"), **data} - ans.append(text_instance) - return ans - - def maybe_set_cuda_expandable_segments(enabled: bool): """ Configures PyTorch memory allocator to expand existing allocated segments diff --git a/nemo/collections/common/data/lhotse/indexed_adapters.py b/nemo/collections/common/data/lhotse/indexed_adapters.py index 831edf0b1f54..989db03e0406 100644 --- a/nemo/collections/common/data/lhotse/indexed_adapters.py +++ b/nemo/collections/common/data/lhotse/indexed_adapters.py @@ -13,96 +13,90 @@ # limitations under the License. import json import os -import random +import re import struct import tarfile from pathlib import Path -from typing import NamedTuple +from typing import NamedTuple, Optional import numpy as np -# Knuth's multiplicative hash constant (golden-ratio derived, 32-bit). -_KNUTH_HASH = 2654435761 +# Tar block size + the all-zeros block that marks end-of-archive in tar. +_TAR_BLOCK_SIZE = 512 +_TAR_ZERO_BLOCK = b'\0' * _TAR_BLOCK_SIZE -class LazyShuffledRange: - """ - Generates a permutation of ``range(n)`` lazily using a Feistel cipher, - without materializing the full index list. Each element is computed on - the fly in O(1) time and the object itself uses O(1) memory regardless - of ``n``. - - The technique is known as *cycle-walking* format-preserving encryption: - a Feistel network is a bijection on ``[0, 2^k)``, and repeatedly applying - it until the output falls within ``[0, n)`` restricts it to a bijection - on the desired domain. - - Args: - n: Size of the range to permute. - rng: A ``random.Random`` instance used to derive round keys. - num_rounds: Number of Feistel rounds (more rounds = better uniformity, - 6 is a good default for typical dataset sizes). - """ +# Recognized URL schemes whose authority ("host" component) is part of the +# logical path (e.g. the bucket name). Stripping just the scheme keeps the +# bucket+key in the relative path used to mirror under indexes_root. +_URL_RE = re.compile(r"^[a-zA-Z][a-zA-Z0-9+.\-]*://") - def __init__(self, n: int, rng: random.Random, num_rounds: int = 6): - self.n = n - if n <= 1: - return - bits = (n - 1).bit_length() - if bits < 2: - bits = 2 - if bits % 2: - bits += 1 - self._half = bits // 2 - self._mask = (1 << self._half) - 1 - self._num_rounds = num_rounds - self._keys = [rng.getrandbits(64) for _ in range(num_rounds)] - - def _permute_one(self, x: int) -> int: - left = (x >> self._half) & self._mask - right = x & self._mask - for key in self._keys: - left, right = right, left ^ (((right * _KNUTH_HASH) ^ key) >> 32 & self._mask) - return (left << self._half) | right - def __len__(self) -> int: - return self.n +def _is_remote_path(path) -> bool: + """True if *path* is a URL/URI (s3://, ais://, http(s)://, gs://, …).""" + return bool(_URL_RE.match(str(path))) - def __iter__(self): - n = self.n - if n <= 0: - return - if n == 1: - yield 0 - return - for i in range(n): - x = i - while True: - x = self._permute_one(x) - if x < n: - yield x - break + +def _open_data_path(path: str): + """ + Return a seekable file-like for *path*, suitable for the indexed + tar readers' ``self._fh`` slot. + + Local paths get a regular ``open(path, "rb")``. URL/URI paths return an + :class:`lhotse.ais.AISRangeReader` (imported from lhotse to keep the + seekable-AIS wrapper as a single source of truth shared with + :func:`lhotse.indexing._open_for_indexed_read`). Other URL schemes + (``http://``, ``gs://``, …) currently fall through to ``AISRangeReader`` + as well — the aistore SDK is the only seekable remote backend lhotse + exposes today; if a future backend gains a seekable wrapper, dispatch + here. + """ + if _is_remote_path(path): + from lhotse.ais import AISRangeReader + + return AISRangeReader(str(path)) + return open(path, "rb") -def _load_index(data_path: str, idx_path: str | None = None): +def _load_index(data_path: str, idx_path: Optional[str] = None): """ - Load a memmap'd offset index for *data_path*. + Load an offset index for *data_path*, layering NeMo-specific validation + on top of :func:`lhotse.indexing.read_index`. Returns ``(offsets, num_samples)`` where ``offsets`` always has ``num_samples + 1`` entries — the last one being the data file size - (appended if absent in the on-disk index). + (appended if absent in the on-disk index, for legacy ``.idx`` files + written before the sentinel convention was added). Validates that all sample offsets fall within the data file. + + For remote ``data_path`` URIs (``s3://`` / ``ais://`` / ``http(s)://`` / + ``gs://``) ``os.path.getsize`` is not callable; we trust the size + sentinel that ``create_tar_index`` / ``create_jsonl_index`` recorded as + the last offset in the on-disk index. The same indexes are emitted for + local and remote sources, so the on-disk format is identical — only the + file-size cross-check is skipped. """ + from lhotse.indexing import read_index + if idx_path is None: idx_path = data_path + '.idx' - offsets = np.memmap(idx_path, dtype=np.dtype(' 0: max_offset = int(offsets[:num_samples].max()) if max_offset >= data_size: @@ -123,24 +117,6 @@ def _resolve_idx(idx: int, length: int) -> int: return idx -class IndexedJSONLReader: - def __init__(self, jsonl_path: Path | str, idx_path: Path | str | None = None): - self.data_path = str(jsonl_path) - self.offsets, self._len = _load_index(self.data_path, str(idx_path) if idx_path else None) - - def __len__(self): - return self._len - - def __getitem__(self, idx): - idx = _resolve_idx(idx, self._len) - start = int(self.offsets[idx]) - end = int(self.offsets[idx + 1]) - with open(self.data_path, 'rb') as f: - f.seek(start) - data = f.read(end - start) - return json.loads(data.decode('utf-8')) - - class TarSample(NamedTuple): """A single sample extracted from a WebDataset tar archive.""" @@ -161,8 +137,9 @@ def _split_json_audio_pair(name_a, bytes_a, name_b, bytes_b) -> TarSample: class IndexedTarSampleReader: """ Random access to WebDataset tar samples (``N.json`` + ``N.