Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions nemo_curator/stages/audio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@
SpeakerSeparationStage,
VADSegmentationStage,
)
from nemo_curator.stages.audio.text_filtering import (
FastTextLIDStage,
FinalizeFieldsStage,
InitializeFieldsStage,
RegexSubstitutionStage,
WhisperHallucinationStage,
)

__all__ = [
"ALMDataBuilderStage",
Expand All @@ -60,4 +67,9 @@
"TimestampMapperStage",
"UTMOSFilterStage",
"VADSegmentationStage",
"FastTextLIDStage",
"FinalizeFieldsStage",
"InitializeFieldsStage",
"RegexSubstitutionStage",
"WhisperHallucinationStage",
]
29 changes: 29 additions & 0 deletions nemo_curator/stages/audio/text_filtering/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Text filtering stages for ASR postprocessing."""

from nemo_curator.stages.audio.text_filtering.fasttext_lid import FastTextLIDStage
from nemo_curator.stages.audio.text_filtering.finalize_fields import FinalizeFieldsStage
from nemo_curator.stages.audio.text_filtering.initialize_fields import InitializeFieldsStage
from nemo_curator.stages.audio.text_filtering.regex_substitution import RegexSubstitutionStage
from nemo_curator.stages.audio.text_filtering.whisper_hallucination import WhisperHallucinationStage

__all__ = [
"FastTextLIDStage",
"FinalizeFieldsStage",
"InitializeFieldsStage",
"RegexSubstitutionStage",
"WhisperHallucinationStage",
]
113 changes: 113 additions & 0 deletions nemo_curator/stages/audio/text_filtering/fasttext_lid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import urllib.request
from dataclasses import dataclass, field
from typing import Any

from loguru import logger

from nemo_curator.stages.base import ProcessingStage
from nemo_curator.stages.resources import Resources
from nemo_curator.tasks import AudioTask

_FASTTEXT_MODEL_URLS: dict[str, str] = {
"lid.176.bin": "https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin",
"lid.176.ftz": "https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.ftz",
}
_DEFAULT_CACHE_DIR = os.path.expanduser("~/.cache/nemo_curator/fasttext")


@dataclass
class FastTextLIDStage(ProcessingStage[AudioTask, AudioTask]):
"""Language identification using FastText; flags non-target-language entries with skip_me=1.

Wraps the existing ``FastTextLangId`` filter for model loading and scoring,
adding AudioTask field access and optional model download by name.

``model_path`` can be:
- An absolute path to a local ``.bin`` or ``.ftz`` file.
- A known model name (``lid.176.bin`` or ``lid.176.ftz``), which is
downloaded to ``~/.cache/nemo_curator/fasttext/`` on first use.
"""

model_path: str = ""
target_lang: str = "en"
min_lang_prob: float = 0.3
text_key: str = "cleaned_text"
skip_me_key: str = "skip_me"
name: str = "FastTextLID"
resources: Resources = field(default_factory=lambda: Resources(cpus=1.0))

_lid: Any = field(default=None, init=False, repr=False)

def __post_init__(self) -> None:
if not self.model_path:
msg = "model_path is required for FastTextLIDStage"
raise ValueError(msg)

def _resolve_model_path(self) -> str:
if os.path.isfile(self.model_path):
return self.model_path
if self.model_path in _FASTTEXT_MODEL_URLS:
cache_path = os.path.join(_DEFAULT_CACHE_DIR, self.model_path)
if os.path.isfile(cache_path):
return cache_path
os.makedirs(_DEFAULT_CACHE_DIR, exist_ok=True)
url = _FASTTEXT_MODEL_URLS[self.model_path]
logger.info(f"FastTextLIDStage: downloading {self.model_path} from {url}")
urllib.request.urlretrieve(url, cache_path) # noqa: S310
return cache_path
msg = (
f"model_path '{self.model_path}' is not a valid file path and not a known model name. "
f"Known names: {list(_FASTTEXT_MODEL_URLS)}"
)
raise ValueError(msg)

def setup(self, worker_metadata: Any = None) -> None:
from nemo_curator.stages.text.filters.fasttext.fasttext_filters import FastTextLangId

resolved = self._resolve_model_path()
self._lid = FastTextLangId(model_path=resolved, min_langid_score=0.0)
self._lid.load_model()
logger.info(f"FastTextLIDStage: loaded model from {resolved}")

def inputs(self) -> tuple[list[str], list[str]]:
return [], [self.text_key, self.skip_me_key]

def outputs(self) -> tuple[list[str], list[str]]:
return [], [self.skip_me_key]

def process(self, task: AudioTask) -> AudioTask:
if self._lid is None:
logger.warning(
f"FastTextLIDStage ({self.name}): setup() was not called before process(). "
"Calling setup() now — check that your executor invokes setup() on each worker."
)
self.setup()
text = task.data[self.text_key]
if not isinstance(text, str):
return task
text = text.strip().replace("\n", " ")
if not text:
task.data[self.skip_me_key] = 1
return task
result_str = self._lid.score_document(text)
score_list = eval(result_str) # noqa: S307 — output of our own FastText model
prob = float(score_list[0])
lang = str(score_list[1]).lower()
if lang != self.target_lang.lower() or prob < self.min_lang_prob:
task.data[self.skip_me_key] = 1
return task
51 changes: 51 additions & 0 deletions nemo_curator/stages/audio/text_filtering/finalize_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass, field

from nemo_curator.stages.base import ProcessingStage
from nemo_curator.stages.resources import Resources
from nemo_curator.tasks import AudioTask


@dataclass
class FinalizeFieldsStage(ProcessingStage[AudioTask, AudioTask]):
"""Rename and drop fields to produce the final manifest schema.

- Renames ``source_text_key`` (``text``) → ``v1_text_key`` (``v1_text``).
- Renames ``cleaned_text_key`` (``cleaned_text``) → ``source_text_key`` (``text``).
- Drops all keys listed in ``drop_keys`` (silently ignores missing keys).
"""

source_text_key: str = "text"
v1_text_key: str = "v1_text"
cleaned_text_key: str = "cleaned_text"
drop_keys: list[str] = field(default_factory=lambda: ["pnc", "itn", "timestamp"])
name: str = "FinalizeFields"
resources: Resources = field(default_factory=lambda: Resources(cpus=1.0))

def inputs(self) -> tuple[list[str], list[str]]:
return [], [self.source_text_key, self.cleaned_text_key]

def outputs(self) -> tuple[list[str], list[str]]:
return [], [self.v1_text_key, self.source_text_key]

def process(self, task: AudioTask) -> AudioTask:
if self.source_text_key in task.data:
task.data[self.v1_text_key] = task.data.pop(self.source_text_key)
if self.cleaned_text_key in task.data:
task.data[self.source_text_key] = task.data.pop(self.cleaned_text_key)
for key in self.drop_keys:
task.data.pop(key, None)
return task
45 changes: 45 additions & 0 deletions nemo_curator/stages/audio/text_filtering/initialize_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass, field

from nemo_curator.stages.base import ProcessingStage
from nemo_curator.stages.resources import Resources
from nemo_curator.tasks import AudioTask


@dataclass
class InitializeFieldsStage(ProcessingStage[AudioTask, AudioTask]):
"""Copy pred_text into cleaned_text and initialize skip_me=0.

This stage sets up the two fields that all downstream text-filtering
stages depend on, leaving the original pred_text field intact.
"""

pred_text_key: str = "pred_text"
cleaned_text_key: str = "cleaned_text"
skip_me_key: str = "skip_me"
name: str = "InitializeFields"
resources: Resources = field(default_factory=lambda: Resources(cpus=1.0))

def inputs(self) -> tuple[list[str], list[str]]:
return [], [self.pred_text_key]

def outputs(self) -> tuple[list[str], list[str]]:
return [], [self.cleaned_text_key, self.skip_me_key]

def process(self, task: AudioTask) -> AudioTask:
task.data[self.cleaned_text_key] = task.data[self.pred_text_key]
task.data[self.skip_me_key] = 0
return task
80 changes: 80 additions & 0 deletions nemo_curator/stages/audio/text_filtering/regex_substitution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from dataclasses import dataclass, field
from typing import Any

import yaml
from loguru import logger

from nemo_curator.stages.base import ProcessingStage
from nemo_curator.stages.resources import Resources
from nemo_curator.tasks import AudioTask


@dataclass
class RegexSubstitutionStage(ProcessingStage[AudioTask, AudioTask]):
"""Apply a sequence of regex substitutions to a text field in each AudioTask.

Rules are loaded from a YAML file containing a list of dicts with
``pattern`` and ``repl`` keys (and an optional ``count`` key).
After all substitutions, if the result is empty the entry is flagged
with ``skip_me=1``.
"""

regex_params_yaml: str = ""
text_key: str = "cleaned_text"
skip_me_key: str = "skip_me"
name: str = "RegexSubstitution"
resources: Resources = field(default_factory=lambda: Resources(cpus=1.0))

_rules: list[dict[str, Any]] = field(default_factory=list, init=False, repr=False)
_setup_called: bool = field(default=False, init=False, repr=False)

def __post_init__(self) -> None:
if not self.regex_params_yaml:
msg = "regex_params_yaml is required for RegexSubstitutionStage"
raise ValueError(msg)

def setup(self, worker_metadata: Any = None) -> None:
with open(self.regex_params_yaml, encoding="utf-8") as f:
self._rules = yaml.safe_load(f)
self._setup_called = True
logger.info(f"RegexSubstitutionStage: loaded {len(self._rules)} rules from {self.regex_params_yaml}")

def inputs(self) -> tuple[list[str], list[str]]:
return [], [self.text_key]

def outputs(self) -> tuple[list[str], list[str]]:
return [], [self.text_key, self.skip_me_key]

def process(self, task: AudioTask) -> AudioTask:
if not self._setup_called:
logger.warning(
f"RegexSubstitutionStage ({self.name}): setup() was not called before process(). "
"Calling setup() now — check that your executor invokes setup() on each worker."
)
self.setup()
text = task.data[self.text_key]
if not isinstance(text, str):
return task
text = " " + text + " "
for rule in self._rules:
text = re.sub(rule["pattern"], rule["repl"], text, count=rule.get("count", 0))
text = re.sub(r"\s+", " ", text).strip()
task.data[self.text_key] = text
if not text:
task.data[self.skip_me_key] = 1
return task
Loading