diff --git a/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/dali_structured_output_iterator_data_loader_wrapper.py b/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/_dali_structured_output_iterator_data_loader_wrapper.py similarity index 100% rename from packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/dali_structured_output_iterator_data_loader_wrapper.py rename to packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/_dali_structured_output_iterator_data_loader_wrapper.py diff --git a/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/_insert_copy_for_passthrough.py b/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/_insert_copy_for_passthrough.py new file mode 100644 index 0000000..7e326e7 --- /dev/null +++ b/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/_insert_copy_for_passthrough.py @@ -0,0 +1,212 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Optional, Sequence, Set, Tuple, Union + +import nvidia.dali.fn as fn + +from .sample_data_group import SampleDataGroup + +PathElement = Union[str, int] +PathTuple = Tuple[PathElement, ...] +PathType = Union[PathElement, Sequence[PathElement]] + + +class _InsertCopyForPassthrough: + '''Insert ``fn.copy`` on selected pipeline outputs. + + Workaround for cases where parallel external-source outputs would otherwise be returned + directly from the pipeline (see DALI dynamic-executor parallel ES notes), which can lead + to data corruption in certain cases. + + This helper is used internally by the pipeline construction code. Fields to copy can be selected by + name, by branch path, or by name within selected branches. If no selectors are configured, + every output data field is copied. + ''' + + def __init__( + self, + data_empty: SampleDataGroup, + field_names: Optional[Sequence[Union[str, int]]] = None, + field_names_scope_paths: Optional[Sequence[PathType]] = None, + branch_paths: Optional[Sequence[PathType]] = None, + ): + ''' + + Args: + data_empty: Final output data format blueprint. + field_names: Names of data fields to copy. By default, every occurrence in the final output + structure is copied. Use ``field_names_scope_paths`` to limit the search to specific + sub-trees. + field_names_scope_paths: Optional paths to sub-trees (data group fields) under which + ``field_names`` are resolved. Each entry is a path to a data group; name lookup is + performed only inside that group and its descendants. Ignored when ``field_names`` + is ``None``. + branch_paths: Paths selecting branches to copy. If a path refers to a data field, that + field is copied. If it refers to a data group field, every data field in that + sub-tree (recursively) is copied. + + Raises: + ValueError: If ``field_names_scope_paths`` is set without ``field_names``, or if a configured + path does not exist or has the wrong node kind (e.g. a scope path that is not a data group + field). + ''' + + self._field_names = tuple(field_names) if field_names is not None else None + self._field_names_scope_paths = ( + tuple(self._normalize_path(path) for path in field_names_scope_paths) + if field_names_scope_paths is not None + else None + ) + self._branch_paths = ( + tuple(self._normalize_path(path) for path in branch_paths) if branch_paths is not None else None + ) + + if ( + self._field_names_scope_paths is not None + and len(self._field_names_scope_paths) > 0 + and (self._field_names is None or len(self._field_names) == 0) + ): + raise ValueError( + "`field_names_scope_paths` can only be used together with non-empty `field_names`." + ) + + self._paths_to_copy = self._sort_paths(self._resolve_paths_to_copy(data_empty)) + + def __call__(self, data: SampleDataGroup) -> SampleDataGroup: + '''Apply ``fn.copy`` to the configured output fields. + + Args: + data: Final pipeline output structure before flattening. + + Returns: + The input structure with selected output fields replaced by copied data nodes. + ''' + + for path in self._paths_to_copy: + copy = fn.copy(data.get_item_in_path(path)) + data.set_item_in_path(path, copy) + return data + + def _resolve_paths_to_copy(self, data: SampleDataGroup) -> Set[PathTuple]: + '''Resolve configured selectors to concrete output data field paths.''' + + paths: Set[PathTuple] = set() + + # Field-name selectors resolve every matching output leaf, optionally constrained by scope paths. + if self._field_names is not None: + paths.update(self._resolve_field_name_paths(data)) + + # Branch selectors resolve explicit paths; group paths are expanded to their contained leaves. + if self._branch_paths is not None: + paths.update(self._resolve_branch_paths(data)) + + # No configured selector means copy every output leaf; explicit empty selectors copy none. + if not self._has_selection(): + paths.update(self._collect_data_field_paths_under_group(data, ())) + + return paths + + def _has_selection(self) -> bool: + '''Check whether any copy selector was explicitly configured.''' + + # Use presence rather than truthiness: an explicit empty selector means "copy nothing", + # while omitting all selectors means "copy every output field". + has_name_selection = self._field_names is not None + has_branch_selection = self._branch_paths is not None + return has_name_selection or has_branch_selection + + def _resolve_field_name_paths(self, data: SampleDataGroup) -> Set[PathTuple]: + '''Resolve field-name selectors to matching output data field paths.''' + + assert self._field_names is not None + paths: Set[PathTuple] = set() + + # If no scope paths are configured, resolve field names directly against the entire data tree. + if self._field_names_scope_paths is None: + for name in self._field_names: + paths.update(tuple(path) for path in data.find_all_occurrences(name)) + return paths + + # Otherwise, resolve field names within each configured scope group. + for scope_path in self._field_names_scope_paths: + self._ensure_path_exists(data, scope_path) + if not data.path_exists_and_is_data_group_field(scope_path): + raise ValueError(f"Field name scope path `{scope_path}` must refer to a data group field.") + subtree = data.get_item_in_path(scope_path) + for name in self._field_names: + paths.update( + scope_path + tuple(relative_path) for relative_path in subtree.find_all_occurrences(name) + ) + return paths + + def _resolve_branch_paths(self, data: SampleDataGroup) -> Set[PathTuple]: + '''Resolve branch selectors to selected output data field paths.''' + + assert self._branch_paths is not None + paths: Set[PathTuple] = set() + + for branch_path in self._branch_paths: + # Reject missing selector paths during setup rather than during graph execution. + self._ensure_path_exists(data, branch_path) + if data.path_exists_and_is_data_group_field(branch_path): + # A branch path to a group selects all data leaves below that group, but a branch path to + # a leaf selects only that exact field. + subtree = data.get_item_in_path(branch_path) + paths.update( + branch_path + relative_path + for relative_path in self._collect_data_field_paths_under_group(subtree, ()) + ) + else: + # A branch path that already points to a data field is copied as-is. + paths.add(branch_path) + return paths + + @classmethod + def _collect_data_field_paths_under_group( + cls, group: SampleDataGroup, prefix: PathTuple + ) -> Tuple[PathTuple, ...]: + '''Collect tuple paths for all data fields under a group.''' + + # Branch selections can point to a data group; expand those groups to concrete leaf data fields so + # ``__call__`` can replace each selected value in-place. + paths: List[PathTuple] = [] + for name in group.contained_top_level_field_names: + current = prefix + (name,) + if group.is_data_group_field(name): + paths.extend(cls._collect_data_field_paths_under_group(group[name], current)) + else: + paths.append(current) + return tuple(paths) + + @staticmethod + def _normalize_path(path: PathType) -> PathTuple: + '''Convert a single name or path sequence into a tuple path.''' + + if SampleDataGroup.path_is_single_name(path): + return (path,) + return tuple(path) + + @staticmethod + def _ensure_path_exists(data: SampleDataGroup, path: PathTuple) -> None: + '''Raise an error if a path does not exist in the data format.''' + + if not data.path_exists(path): + raise ValueError(f"Path `{path}` does not exist in the output data format.") + + @staticmethod + def _sort_paths(paths: Set[PathTuple]) -> Tuple[PathTuple, ...]: + '''Sort paths deterministically across mixed string and integer names.''' + + return tuple(sorted(paths, key=lambda path: tuple(str(part) for part in path))) diff --git a/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/dali_structured_output_iterator.py b/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/dali_structured_output_iterator.py index 185567c..4c06d75 100644 --- a/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/dali_structured_output_iterator.py +++ b/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/dali_structured_output_iterator.py @@ -193,7 +193,7 @@ def __len__(self): @classmethod def CreateAsDataLoaderObject(cls, *args, **kwargs): - from .dali_structured_output_iterator_data_loader_wrapper import get_masked_as_type + from ._dali_structured_output_iterator_data_loader_wrapper import get_masked_as_type used_type = get_masked_as_type(cls, DataLoader) obj = used_type(*args, **kwargs) diff --git a/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/pipeline.py b/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/pipeline.py index b8476d4..033b712 100644 --- a/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/pipeline.py +++ b/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/pipeline.py @@ -23,6 +23,10 @@ import nvidia.dali as dali from .sample_data_group import SampleDataGroup +from ._insert_copy_for_passthrough import ( + PathType, + _InsertCopyForPassthrough, +) from ..processing_steps import PipelineStepBase if TYPE_CHECKING: @@ -46,6 +50,10 @@ def __init__( use_parallel_external_source: bool = True, prefetch_queue_depth: int = 2, print_sample_data_group_format: bool = False, + copy_external_source_passthrough_outputs: Optional[bool] = None, + passthrough_copy_field_names: Optional[Sequence[Union[str, int]]] = None, + passthrough_copy_field_names_scope_paths: Optional[Sequence[PathType]] = None, + passthrough_copy_branch_paths: Optional[Sequence[PathType]] = None, ): ''' @@ -59,6 +67,25 @@ def __init__( is True. print_sample_data_group_format: Whether to print the sample data group formats after each processing step during the setup of the pipeline (e.g. for debugging purposes). + copy_external_source_passthrough_outputs: Optional control for copying final pipeline outputs before + returning them. When omitted and no pass-through copy selectors are configured, all final output + data fields are copied and a warning about the possible unintended overhead is issued. Set to ``True`` to + explicitly copy outputs according to the configured selectors, or all final output data fields + if no selectors are configured. Set to ``False`` to disable copying. This can be used as a + workaround for DALI pass-through outputs from parallel external sources. See the + :ref:`Important note about DALI pass-through outputs ` + for details. + passthrough_copy_field_names: Optional data field names to copy in the final output structure. If + omitted together with the other copy selectors while ``copy_external_source_passthrough_outputs`` + is enabled, all final output data fields are copied. + passthrough_copy_field_names_scope_paths: Optional final output group paths under which + ``passthrough_copy_field_names`` are resolved. For each configured scope path, all configured + field names are searched recursively inside that scope. Paths use the same format as in + processing steps: a single field name or a sequence of field names for nested fields, e.g. + ``("camera", "annotations")``. + passthrough_copy_branch_paths: Optional final output paths to copy. Paths use the same format as + in processing steps: a single field name or a sequence of field names for nested fields, e.g. + ``("camera", "image")``. A path to a data group copies all data fields under that group. ''' self._data_loading_callable_iterable = data_loading_callable_iterable @@ -69,6 +96,38 @@ def __init__( self._prefetch_queue_depth = prefetch_queue_depth self._print_sample_data_group_format = print_sample_data_group_format + has_passthrough_copy_selection = ( + passthrough_copy_field_names is not None + or passthrough_copy_field_names_scope_paths is not None + or passthrough_copy_branch_paths is not None + ) + if copy_external_source_passthrough_outputs is None: + if has_passthrough_copy_selection: + raise ValueError( + "Pass-through output copy selectors require explicitly setting " + "`copy_external_source_passthrough_outputs=True`." + ) + warnings.warn( + "`copy_external_source_passthrough_outputs` was not set. Copying all final pipeline " + "outputs by default to avoid potential pipeline pass-through output corruption. This may add " + "overhead. To reduce overhead, see the `PipelineDefinition` API docs for further details and " + "configure `passthrough_copy_field_names`, `passthrough_copy_field_names_scope_paths`, or " + "`passthrough_copy_branch_paths`; or explicitly set `copy_external_source_passthrough_outputs=False` " + "to disable copying." + ) + copy_external_source_passthrough_outputs = True + if has_passthrough_copy_selection and not copy_external_source_passthrough_outputs: + raise ValueError( + "Pass-through output copy selectors require " + "`copy_external_source_passthrough_outputs=True`." + ) + + self._copy_external_source_passthrough_outputs = copy_external_source_passthrough_outputs + self._passthrough_copy_field_names = passthrough_copy_field_names + self._passthrough_copy_field_names_scope_paths = passthrough_copy_field_names_scope_paths + self._passthrough_copy_branch_paths = passthrough_copy_branch_paths + self._passthrough_output_copy: Optional[_InsertCopyForPassthrough] = None + if self._check_data_format: warnings.warn( "Data format checking is enabled. This may add some overhead. " @@ -113,6 +172,9 @@ def check_and_get_output_data_structure(self) -> SampleDataGroup: for pf in self._preprocess_functors: intermediate_setup = pf.check_input_data_format_and_set_output_data_format(intermediate_setup) + if self._copy_external_source_passthrough_outputs: + self._prepare_passthrough_output_copy(intermediate_setup) + return intermediate_setup def get_dali_pipeline(self, *args, **kwargs) -> dali.pipeline.Pipeline: @@ -142,6 +204,8 @@ def get_dali_pipeline(self, *args, **kwargs) -> dali.pipeline.Pipeline: input_data_structure ) print(f"\n{input_data_structure.get_string_no_details()}\n") + if self._copy_external_source_passthrough_outputs: + self._prepare_passthrough_output_copy(input_data_structure) print("///////////////////////////////////////////////////////////////") else: # If no pre-processing steps are provided, we still need to check the compatibility of the data @@ -199,7 +263,32 @@ def _get_dali_pipeline_inner(self) -> dali.pipeline.Pipeline: # Pad each string fiels to the same length in the batch data_structure_used.ensure_uniform_size_in_batch_for_all_strings() + data_structure_used = self._copy_passthrough_outputs_if_enabled(data_structure_used) + # Get the data as a flat sequence. Similar to the external source, we can only output sequences of DataNode elements, no nested data structures. data_out = data_structure_used.get_data() # And return the flat data. return data_out + + def _copy_passthrough_outputs_if_enabled(self, data: SampleDataGroup) -> SampleDataGroup: + if not self._copy_external_source_passthrough_outputs: + return data + if self._passthrough_output_copy is None: + self._prepare_passthrough_output_copy(data.get_empty_like_self()) + assert self._passthrough_output_copy is not None + return self._passthrough_output_copy(data) + + def _prepare_passthrough_output_copy(self, data_empty: SampleDataGroup) -> None: + if not self._copy_external_source_passthrough_outputs: + return + try: + self._passthrough_output_copy = _InsertCopyForPassthrough( + data_empty, + field_names=self._passthrough_copy_field_names, + field_names_scope_paths=self._passthrough_copy_field_names_scope_paths, + branch_paths=self._passthrough_copy_branch_paths, + ) + except ValueError as error: + raise ValueError( + "Invalid pass-through output copy configuration for final output format: " f"{error}" + ) from error diff --git a/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/sample_data_group.py b/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/sample_data_group.py index df3d0f1..4e400db 100644 --- a/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/sample_data_group.py +++ b/packages/dali_pipeline_framework/accvlab/dali_pipeline_framework/pipeline/sample_data_group.py @@ -395,7 +395,7 @@ def __setitem__(self, name: Union[str, int], value: Any): f"(fields of type SampleDataGroup), but types do not match." ) - if self._types[name] == types.DALIDataType.STRING and not isinstance(value, dali.data_node.DataNode): + if self._types[name] == types.DALIDataType.STRING and not self._is_dali_data_node(value): self._values[name] = self._convert_from_string(value) else: self._values[name] = self._apply_mapping_check_and_convert(name, value) @@ -450,7 +450,7 @@ def __getitem__(self, name: Union[str, int]) -> Any: raise KeyError(f"No field with name '{name}'") value = self._values[name] - if self._types[name] == types.DALIDataType.STRING and not isinstance(value, dali.data_node.DataNode): + if self._types[name] == types.DALIDataType.STRING and not self._is_dali_data_node(value): return self._convert_to_string(value) return value @@ -1541,15 +1541,18 @@ def _apply_mapping_if_set(self, name: Union[str, int], data: Any) -> Any: def _check_or_convert_types(self, name: Union[str, int], data: Any) -> Any: # Get the expected type of the data field dali_type = self._types[name] + is_data_node = self._is_dali_data_node(data) + + if dali_type == types.DALIDataType.STRING and is_data_node: + # String fields are represented as uint8 tensors inside the DALI graph. String handling + # outside the DALI pipeline remains restricted to regular string values. + if self._do_check_type: + return check_type(data, np.uint8, name) + return data # Only perform runtime type checking inside the DALI pipeline when explicitly enabled. # Skipping this preserves tensor layout metadata (important for steps like AxesLayoutSetter). if self._do_check_type: - # Support both regular and debug-mode DALI nodes - is_data_node = isinstance(data, getattr(dali.data_node, "DataNode", ())) or isinstance( - data, getattr(getattr(dali, "_debug_mode", object()), "DataNodeDebug", ()) - ) - # If we are inside the DALI pipeline, we need to check that the data type is correct (regardless of # the `_do_convert` flag). if is_data_node: @@ -1582,6 +1585,12 @@ def _check_or_convert_types(self, name: Union[str, int], data: Any) -> Any: return data + @staticmethod + def _is_dali_data_node(data: Any) -> bool: + return isinstance(data, dali.data_node.DataNode) or isinstance( + data, getattr(getattr(dali, "_debug_mode", object()), "DataNodeDebug", ()) + ) + def _convert_from_string( self, data: Union[dali.pipeline.DataNode, str, Sequence[str], None] ) -> Union[dali.pipeline.DataNode, np.ndarray, None]: diff --git a/packages/dali_pipeline_framework/docs/design/output.rst b/packages/dali_pipeline_framework/docs/design/output.rst index 4a8c815..460ed21 100644 --- a/packages/dali_pipeline_framework/docs/design/output.rst +++ b/packages/dali_pipeline_framework/docs/design/output.rst @@ -34,3 +34,30 @@ replacement for a PyTorch DataLoader. Apart from the re-assembly of the data, th these cases, the :class:`~accvlab.dali_pipeline_framework.pipeline.DALIStructuredOutputIterator` provides a :meth:`~accvlab.dali_pipeline_framework.pipeline.DALIStructuredOutputIterator.CreateAsDataLoaderObject` method, which creates an iterator object masked as a PyTorch DataLoader object, so that these checks pass. + +.. _dali-pipeline-framework-external-source-pass-through-note: + +.. important:: + + In some DALI versions, pass-through outputs from a parallel + :func:`~nvidia.dali.fn.external_source` can be corrupted when they are returned directly from a pipeline + using the dynamic executor. For example, the `Known Issues section in the DALI 1.53.0 Release Notes + `_ + describes the conditions under which this may occur and recommends adding :func:`~nvidia.dali.fn.copy` to avoid returning + :func:`~nvidia.dali.fn.external_source` outputs directly. + + If a pipeline is affected, enable ``copy_external_source_passthrough_outputs=True`` when constructing + :class:`~accvlab.dali_pipeline_framework.pipeline.PipelineDefinition`. This inserts + :func:`~nvidia.dali.fn.copy` internally before the final flattened output is returned. If no copy selectors + are provided, all final output data fields are copied. To reduce overhead, limit copying to final output + fields where the corresponding data passes or may pass through the pipeline unchanged (i.e. without being + modified by processing steps) by using the ``passthrough_copy_field_names``, + ``passthrough_copy_field_names_scope_paths``, or ``passthrough_copy_branch_paths`` constructor arguments. + + This package configures DALI :func:`~nvidia.dali.fn.external_source` differently depending on the input + base class. Inputs derived from :class:`~accvlab.dali_pipeline_framework.inputs.CallableBase` are used in + per-sample mode, while inputs derived from :class:`~accvlab.dali_pipeline_framework.inputs.IterableBase` + are used in per-batch mode. Therefore, for callable inputs the single-contiguous-buffer case relevant to + this workaround is expected only when the pipeline batch size is ``1``. Iterable inputs provide whole + batches, so pass-through outputs from parallel :func:`~nvidia.dali.fn.external_source` can be affected + independently of the pipeline batch size. diff --git a/packages/dali_pipeline_framework/examples/pipeline_setup/object_detection_2d_pipeline.py b/packages/dali_pipeline_framework/examples/pipeline_setup/object_detection_2d_pipeline.py index 11ba7a8..c773510 100644 --- a/packages/dali_pipeline_framework/examples/pipeline_setup/object_detection_2d_pipeline.py +++ b/packages/dali_pipeline_framework/examples/pipeline_setup/object_detection_2d_pipeline.py @@ -270,12 +270,20 @@ def setup_dali_pipeline_2d_object_detection( # @NOTE # Define the pipeline consisting of the 'input_callable' and 'pre_processing_steps'. # - # IMPORTANT: Note how `check_data_format` is set to `False` here. This is done to avoid the overhead of + # IMPORTANT: Note that `check_data_format` is set to `False` here. This is done to avoid the overhead of # checking the data format during pipeline execution. During development, it is recommended to set it # to `True` to catch potential issues early, and later set it to `False` to avoid the overhead in # production. pipeline_def = PipelineDefinition( - input_callable, pre_processing_steps, check_data_format=False, print_sample_data_group_format=True + input_callable, + pre_processing_steps, + check_data_format=False, + print_sample_data_group_format=True, + # Disable the DALI external-source pass-through issue copy workaround: all final outputs are processed + # by the configured pipeline steps instead of being returned directly from the input source. See the + # API docs for this constructor argument for details about the underlying issue and when a copy is + # needed. + copy_external_source_passthrough_outputs=False, ) # @NOTE diff --git a/packages/dali_pipeline_framework/examples/pipeline_setup/stream_petr_pipeline.py b/packages/dali_pipeline_framework/examples/pipeline_setup/stream_petr_pipeline.py index c4baac8..341cbf4 100644 --- a/packages/dali_pipeline_framework/examples/pipeline_setup/stream_petr_pipeline.py +++ b/packages/dali_pipeline_framework/examples/pipeline_setup/stream_petr_pipeline.py @@ -546,12 +546,20 @@ def setup_dali_pipeline_stream_petr_train( # @NOTE # Define the pipeline by wiring the input implementation and the pre-processing steps. # - # IMPORTANT: Note how `check_data_format` is set to `False` here. This is done to avoid the overhead of + # IMPORTANT: Note that `check_data_format` is set to `False` here. This is done to avoid the overhead of # checking the data format during pipeline execution. During development, it is recommended to set it # to `True` to catch potential issues early, and later set it to `False` to avoid the overhead in # production. pipeline_def = PipelineDefinition( - input_impl, pre_processing_steps, check_data_format=False, print_sample_data_group_format=True + input_impl, + pre_processing_steps, + check_data_format=False, + print_sample_data_group_format=True, + # Enable the DALI external-source pass-through issue copy workaround only for final fields that are or + # may be passed through the pipeline unchanged. See the API docs for this constructor argument for + # details about the underlying issue and when a copy is needed. + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["timestamp", "ego_pose", "ego_pose_inv"], ) # @NOTE diff --git a/packages/dali_pipeline_framework/examples/simple_full_pipeline/simple_pipeline_setup.py b/packages/dali_pipeline_framework/examples/simple_full_pipeline/simple_pipeline_setup.py index af7e00c..59de040 100644 --- a/packages/dali_pipeline_framework/examples/simple_full_pipeline/simple_pipeline_setup.py +++ b/packages/dali_pipeline_framework/examples/simple_full_pipeline/simple_pipeline_setup.py @@ -149,13 +149,16 @@ def setup_simple_pipeline( # `check_data_format` controls type/format checks inside the DALI pipeline. It is useful to enable during # development, when adding custom steps, or when changing the input data structure, as it catches mistakes # such as assigning a field with the wrong DALI type. After the pipeline and data format have been - # validated, it is usually disabled in production to avoid the additional runtime overhead. We keep it - # enabled here because this small tutorial benefits from clearer errors. + # validated, it is usually disabled in production to avoid the related overhead. pipeline_def = PipelineDefinition( input_callable, preprocess_functors=processing_steps, - check_data_format=True, # Enable during development/debugging, disable in production. + check_data_format=False, # Enable during development/debugging, disable in production. print_sample_data_group_format=True, # This is useful to inspect the data format after each step (for debugging). + # Disable the DALI external-source pass-through issue copy workaround: callable inputs with + # batch_size > 1 are outside the affected case. See the API docs for this constructor argument for + # details about the underlying issue and when a copy is needed. + copy_external_source_passthrough_outputs=False, ) # @NOTE diff --git a/packages/dali_pipeline_framework/tests/inputs/_test_helpers.py b/packages/dali_pipeline_framework/tests/inputs/_sample_selection_test_helpers.py similarity index 76% rename from packages/dali_pipeline_framework/tests/inputs/_test_helpers.py rename to packages/dali_pipeline_framework/tests/inputs/_sample_selection_test_helpers.py index 9af2680..f385646 100644 --- a/packages/dali_pipeline_framework/tests/inputs/_test_helpers.py +++ b/packages/dali_pipeline_framework/tests/inputs/_sample_selection_test_helpers.py @@ -27,8 +27,14 @@ from typing_extensions import override -def build_pipeline_and_iterator(input_obj, batch_size: int, num_batches: int): - pipeline_def = PipelineDefinition(data_loading_callable_iterable=input_obj, preprocess_functors=[]) +def build_no_processing_pipeline_and_iterator(input_obj, batch_size: int, num_batches: int): + """Build a DALI pipeline that forwards input samples without processing steps.""" + + pipeline_def = PipelineDefinition( + data_loading_callable_iterable=input_obj, + preprocess_functors=[], + copy_external_source_passthrough_outputs=True, + ) pipe = pipeline_def.get_dali_pipeline( enable_conditionals=True, batch_size=batch_size, @@ -42,12 +48,16 @@ def build_pipeline_and_iterator(input_obj, batch_size: int, num_batches: int): return pipeline_def, pipe, iterator -def next_ids(iterator_iter): +def get_next_batch_ids(iterator_iter): + """Advance a structured-output iterator and return the current batch IDs.""" + out = next(iterator_iter) return out["id"].tolist() class SimpleDataProvider(DataProvider): + """Data provider used by input tests to produce deterministic sample IDs.""" + def __init__(self): # Blueprint with a single INT32 field 'id' and some additional (constant) fields self._blueprint = SampleDataGroup() @@ -59,6 +69,8 @@ def __init__(self): @override def get_data(self, sample_id: int) -> SampleDataGroup: + """Return one sample with ``id`` equal to ``sample_id``.""" + s = self.sample_data_structure s["id"] = int(sample_id) s["additional"]["some_data"] = [1, 2, 3] @@ -67,10 +79,14 @@ def get_data(self, sample_id: int) -> SampleDataGroup: @override def get_number_of_samples(self) -> int: + """Return a large fixed sample count for sharding and epoch tests.""" + return 10000 @property @override def sample_data_structure(self) -> SampleDataGroup: + """Return an empty sample structure matching this provider's output.""" + # Return an empty-like blueprint each time to avoid sharing state return self._blueprint.get_empty_like_self() diff --git a/packages/dali_pipeline_framework/tests/inputs/sampler_input_test.py b/packages/dali_pipeline_framework/tests/inputs/sampler_input_test.py index 3946d1a..ba0e0ad 100644 --- a/packages/dali_pipeline_framework/tests/inputs/sampler_input_test.py +++ b/packages/dali_pipeline_framework/tests/inputs/sampler_input_test.py @@ -15,13 +15,16 @@ import pytest import numpy as np -from accvlab.dali_pipeline_framework.pipeline import PipelineDefinition, DALIStructuredOutputIterator +from accvlab.dali_pipeline_framework.pipeline import DALIStructuredOutputIterator, PipelineDefinition from accvlab.dali_pipeline_framework.inputs.sampler_base import SamplerBase from accvlab.dali_pipeline_framework.inputs.sampler_input_callable import SamplerInputCallable from accvlab.dali_pipeline_framework.inputs.sampler_input_iterable import SamplerInputIterable -from _test_helpers import build_pipeline_and_iterator, SimpleDataProvider +from _sample_selection_test_helpers import ( + build_no_processing_pipeline_and_iterator, + SimpleDataProvider, +) try: from typing import override @@ -127,7 +130,7 @@ def test_sampler_inputs_basic_flow_and_ids_match(input_kind: str): num_shards=1, ) - _, pipe, iterator = build_pipeline_and_iterator(input_obj, batch_size, num_test_batches) + _, pipe, iterator = build_no_processing_pipeline_and_iterator(input_obj, batch_size, num_test_batches) try: it = iter(iterator) @@ -169,7 +172,7 @@ def test_sampler_across_epoch_boundaries(input_kind: str): num_shards=1, ) - pipeline_def, pipe, it = build_pipeline_and_iterator(input_obj, batch_size, total_batches) + pipeline_def, pipe, it = build_no_processing_pipeline_and_iterator(input_obj, batch_size, total_batches) iter_it = iter(it) # Determine the actual starting epoch base from the first batch. Note that this could be > 0 as @@ -246,8 +249,16 @@ def test_sampler_inputs_sharding_splits_total_batch(input_kind: str): ) # Build two pipelines (one per shard) - pd0 = PipelineDefinition(data_loading_callable_iterable=input0, preprocess_functors=[]) - pd1 = PipelineDefinition(data_loading_callable_iterable=input1, preprocess_functors=[]) + pd0 = PipelineDefinition( + data_loading_callable_iterable=input0, + preprocess_functors=[], + copy_external_source_passthrough_outputs=True, + ) + pd1 = PipelineDefinition( + data_loading_callable_iterable=input1, + preprocess_functors=[], + copy_external_source_passthrough_outputs=True, + ) pipe0 = pd0.get_dali_pipeline( enable_conditionals=True, @@ -344,8 +355,16 @@ def test_sampler_inputs_sharding_across_epoch_boundaries(input_kind: str): num_shards=num_shards, ) - pd0 = PipelineDefinition(data_loading_callable_iterable=input0, preprocess_functors=[]) - pd1 = PipelineDefinition(data_loading_callable_iterable=input1, preprocess_functors=[]) + pd0 = PipelineDefinition( + data_loading_callable_iterable=input0, + preprocess_functors=[], + copy_external_source_passthrough_outputs=True, + ) + pd1 = PipelineDefinition( + data_loading_callable_iterable=input1, + preprocess_functors=[], + copy_external_source_passthrough_outputs=True, + ) pipe0 = pd0.get_dali_pipeline( enable_conditionals=True, diff --git a/packages/dali_pipeline_framework/tests/inputs/shuffled_sharded_input_callable_test.py b/packages/dali_pipeline_framework/tests/inputs/shuffled_sharded_input_callable_test.py index 62da885..4a1a790 100644 --- a/packages/dali_pipeline_framework/tests/inputs/shuffled_sharded_input_callable_test.py +++ b/packages/dali_pipeline_framework/tests/inputs/shuffled_sharded_input_callable_test.py @@ -18,7 +18,11 @@ ShuffledShardedInputCallable, ) -from _test_helpers import build_pipeline_and_iterator, next_ids, SimpleDataProvider +from _sample_selection_test_helpers import ( + build_no_processing_pipeline_and_iterator, + get_next_batch_ids, + SimpleDataProvider, +) @pytest.mark.parametrize("shuffle", [True, False]) @@ -42,12 +46,12 @@ def test_single_epoch_no_duplicates_and_expected_count(shuffle: bool): seed=123, ) - _, pipe, iterator = build_pipeline_and_iterator(callable_obj, batch_size, full_iterations) + _, pipe, iterator = build_no_processing_pipeline_and_iterator(callable_obj, batch_size, full_iterations) try: it = iter(iterator) ids = [] for _ in range(full_iterations): - ids.extend(next_ids(it)) + ids.extend(get_next_batch_ids(it)) # Correct total unique count (only full batches, discard potential duplicates) assert len(set(ids)) == full_iterations * batch_size @@ -94,8 +98,8 @@ def test_two_shards_no_overlap_and_complete_partition(): seed=999, ) - _, pipe0, it0 = build_pipeline_and_iterator(call0, batch_size, full_iterations) - _, pipe1, it1 = build_pipeline_and_iterator(call1, batch_size, full_iterations) + _, pipe0, it0 = build_no_processing_pipeline_and_iterator(call0, batch_size, full_iterations) + _, pipe1, it1 = build_no_processing_pipeline_and_iterator(call1, batch_size, full_iterations) try: it0_iter = iter(it0) @@ -103,8 +107,8 @@ def test_two_shards_no_overlap_and_complete_partition(): acc0 = [] acc1 = [] for _ in range(full_iterations): - acc0.extend(next_ids(it0_iter)) - acc1.extend(next_ids(it1_iter)) + acc0.extend(get_next_batch_ids(it0_iter)) + acc1.extend(get_next_batch_ids(it1_iter)) ids0 = set(acc0) ids1 = set(acc1) @@ -141,20 +145,20 @@ def test_two_epochs_shuffle_changes_order_and_each_epoch_valid(): ) # First epoch - _, pipe, it = build_pipeline_and_iterator(call, batch_size, full_iterations) + _, pipe, it = build_no_processing_pipeline_and_iterator(call, batch_size, full_iterations) it_iter = iter(it) ids_epoch0 = [] for _ in range(full_iterations): - ids_epoch0.extend(next_ids(it_iter)) + ids_epoch0.extend(get_next_batch_ids(it_iter)) with pytest.raises(StopIteration): - next_ids(it_iter) + get_next_batch_ids(it_iter) # Reset iterator (new epoch) it_iter.reset() ids_epoch1 = [] for _ in range(full_iterations): - ids_epoch1.extend(next_ids(it_iter)) + ids_epoch1.extend(get_next_batch_ids(it_iter)) try: # Each epoch internally: no duplicates, no overlaps within epoch, correct counts diff --git a/packages/dali_pipeline_framework/tests/pipeline/internal_output_copy_test.py b/packages/dali_pipeline_framework/tests/pipeline/internal_output_copy_test.py new file mode 100644 index 0000000..fe7b9aa --- /dev/null +++ b/packages/dali_pipeline_framework/tests/pipeline/internal_output_copy_test.py @@ -0,0 +1,404 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pytest +import torch +from typing import Set, Tuple, Union + +try: + from typing import override +except ImportError: + from typing_extensions import override + +from nvidia.dali.types import DALIDataType + +from accvlab.dali_pipeline_framework.inputs import DataProvider, ShuffledShardedInputCallable +from accvlab.dali_pipeline_framework.pipeline import ( + DALIStructuredOutputIterator, + PipelineDefinition, + SampleDataGroup, +) +from accvlab.dali_pipeline_framework.pipeline import _insert_copy_for_passthrough +from accvlab.dali_pipeline_framework.pipeline._insert_copy_for_passthrough import ( + _InsertCopyForPassthrough, +) + + +def _build_output_structure() -> SampleDataGroup: + # SampleDataGroup structure used for testing the copy helper. + res = SampleDataGroup() + res.add_data_field("image", DALIDataType.UINT8) + + camera = SampleDataGroup() + camera.add_data_field("image", DALIDataType.UINT8) + camera.add_data_field("label", DALIDataType.INT32) + + metadata = SampleDataGroup() + metadata.add_data_field("label", DALIDataType.INT32) + metadata.add_data_field("score", DALIDataType.FLOAT) + camera.add_data_group_field("metadata", metadata) + + lidar = SampleDataGroup() + lidar.add_data_field("points", DALIDataType.FLOAT) + + res.add_data_group_field("camera", camera) + res.add_data_group_field("lidar", lidar) + return res + + +def _build_copy_helper(**kwargs) -> _InsertCopyForPassthrough: + structure = _build_output_structure() + return _InsertCopyForPassthrough(structure, **kwargs) + + +def _paths_as_set(helper: _InsertCopyForPassthrough) -> Set[Tuple[Union[str, int], ...]]: + # Keep resolution assertions readable without exposing the helper's tuple ordering. + return set(helper._paths_to_copy) + + +# Values assigned to each leaf in the test output tree when using string marker values (see tests). The path keys are also the +# complete set of leaf paths expected when the copy helper is configured without selectors. +_MARKER_VALUES = { + ("image",): "root-image", + ("camera", "image"): "camera-image", + ("camera", "label"): "camera-label", + ("camera", "metadata", "label"): "metadata-label", + ("camera", "metadata", "score"): "metadata-score", + ("lidar", "points"): "lidar-points", +} + + +def _build_data_with_marker_values() -> SampleDataGroup: + '''Build test data populated with string marker values at every leaf.''' + + data = _build_output_structure() + # Use plain string marker values instead of DALI nodes so tests can assert exact object flow without + # building a graph. Conversion and type checks are disabled because the blueprint declares numeric fields. + data.set_do_convert(False) + data.set_do_check_type(False) + for path, value in _MARKER_VALUES.items(): + data.set_item_in_path(path, value) + return data + + +def _apply_copy_with_fake_fn(data: SampleDataGroup, monkeypatch, **kwargs) -> SampleDataGroup: + '''Apply the copy helper with a fake ``fn.copy`` that marks copied values.''' + + def fake_copy(value): + return f"copied-{value}" + + monkeypatch.setattr(_insert_copy_for_passthrough.fn, "copy", fake_copy) + + helper = _InsertCopyForPassthrough(data.get_empty_like_self(), **kwargs) + return helper(data) + + +def _assert_copied_paths(data: SampleDataGroup, copied_paths: Set[Tuple[Union[str, int], ...]]) -> None: + '''Assert selected paths were copied and unselected paths stayed unchanged.''' + + for path, original_value in _MARKER_VALUES.items(): + expected_value = f"copied-{original_value}" if path in copied_paths else original_value + assert data.get_item_in_path(path) == expected_value + + +def test_internal_output_copy_without_selectors_resolves_all_output_leaves(): + helper = _build_copy_helper() + + assert _paths_as_set(helper) == { + ("image",), + ("camera", "image"), + ("camera", "label"), + ("camera", "metadata", "label"), + ("camera", "metadata", "score"), + ("lidar", "points"), + } + + +@pytest.mark.parametrize("kwargs", [{"field_names": []}, {"branch_paths": []}]) +def test_internal_output_copy_empty_selectors_resolve_no_output_leaves(kwargs): + helper = _build_copy_helper(**kwargs) + + assert _paths_as_set(helper) == set() + + +@pytest.mark.parametrize("kwargs", [{"field_names": []}, {"branch_paths": []}]) +def test_internal_output_copy_empty_selectors_do_not_copy_outputs(kwargs, monkeypatch): + data = _build_data_with_marker_values() + + _apply_copy_with_fake_fn(data, monkeypatch, **kwargs) + + _assert_copied_paths(data, set()) + + +def test_internal_output_copy_resolves_field_names_globally(): + helper = _build_copy_helper(field_names=["label"]) + + assert _paths_as_set(helper) == { + ("camera", "label"), + ("camera", "metadata", "label"), + } + + +def test_internal_output_copy_resolves_field_names_under_scope_paths(): + helper = _build_copy_helper( + field_names=["label"], + field_names_scope_paths=[("camera", "metadata")], + ) + + assert _paths_as_set(helper) == {("camera", "metadata", "label")} + + +def test_internal_output_copy_resolves_branch_paths(): + helper = _build_copy_helper(branch_paths=[("camera", "metadata"), ("lidar", "points")]) + + assert _paths_as_set(helper) == { + ("camera", "metadata", "label"), + ("camera", "metadata", "score"), + ("lidar", "points"), + } + + +def test_internal_output_copy_rejects_invalid_paths(): + with pytest.raises(ValueError, match="does not exist"): + _build_copy_helper(branch_paths=["missing"]) + + with pytest.raises(ValueError, match="data group field"): + _build_copy_helper( + field_names=["label"], + field_names_scope_paths=["image"], + ) + + +def test_internal_output_copy_applies_copy_to_field_names_globally(monkeypatch): + data = _build_data_with_marker_values() + + _apply_copy_with_fake_fn(data, monkeypatch, field_names=["label"]) + + _assert_copied_paths( + data, + { + ("camera", "label"), + ("camera", "metadata", "label"), + }, + ) + + +def test_internal_output_copy_applies_copy_to_field_names_under_scope_paths(monkeypatch): + data = _build_data_with_marker_values() + + _apply_copy_with_fake_fn( + data, + monkeypatch, + field_names=["label"], + field_names_scope_paths=[("camera", "metadata")], + ) + + _assert_copied_paths(data, {("camera", "metadata", "label")}) + + +def test_internal_output_copy_applies_copy_to_branch_paths(monkeypatch): + data = _build_data_with_marker_values() + + _apply_copy_with_fake_fn( + data, + monkeypatch, + branch_paths=[("camera", "metadata"), ("lidar", "points")], + ) + + _assert_copied_paths( + data, + { + ("camera", "metadata", "label"), + ("camera", "metadata", "score"), + ("lidar", "points"), + }, + ) + + +def test_internal_output_copy_applies_copy_to_all_outputs_when_no_selectors(monkeypatch): + data = _build_data_with_marker_values() + + _apply_copy_with_fake_fn(data, monkeypatch) + + _assert_copied_paths(data, set(_MARKER_VALUES)) + + +class _OutputCopyProvider(DataProvider): + @override + def get_data(self, sample_id: int) -> SampleDataGroup: + res = self.sample_data_structure + res["image"] = np.full((2,), sample_id, dtype=np.uint8) + res["camera"]["label"] = np.array([sample_id + 10], dtype=np.int32) + return res + + @override + def get_number_of_samples(self) -> int: + return 4 + + @property + @override + def sample_data_structure(self) -> SampleDataGroup: + res = SampleDataGroup() + res.add_data_field("image", DALIDataType.UINT8) + + camera = SampleDataGroup() + camera.add_data_field("label", DALIDataType.INT32) + res.add_data_group_field("camera", camera) + return res + + +def test_pipeline_definition_copies_all_passthrough_outputs_by_default(monkeypatch): + batch_size = 1 + input_callable = ShuffledShardedInputCallable( + data_provider=_OutputCopyProvider(), + batch_size=batch_size, + shard_id=0, + num_shards=1, + shuffle=False, + ) + with pytest.warns(UserWarning, match="Copying all final pipeline outputs by default"): + pipeline_def = PipelineDefinition( + data_loading_callable_iterable=input_callable, + preprocess_functors=[], + # The fake copy below adds an integer constant, which can promote DALI output dtypes. + # Keep this test focused on whether the copy hook is applied. + check_data_format=False, + ) + + def fake_copy(value): + # Deliberately modify the graph output so the test can observe that the copy hook was applied. + return value + 1 + + monkeypatch.setattr(_insert_copy_for_passthrough.fn, "copy", fake_copy) + + pipeline = pipeline_def.get_dali_pipeline( + enable_conditionals=True, + batch_size=batch_size, + prefetch_queue_depth=1, + num_threads=1, + py_start_method="spawn", + ) + iterator = DALIStructuredOutputIterator( + batch_size, pipeline, pipeline_def.check_and_get_output_data_structure() + ) + + batch = next(iter(iterator)) + + assert torch.equal(batch["image"][0].to(torch.int64), torch.tensor([1, 1], dtype=torch.int64)) + assert torch.equal(batch["camera"]["label"][0].to(torch.int64), torch.tensor([11], dtype=torch.int64)) + + +def test_pipeline_definition_rejects_copy_selectors_when_copying_is_default(): + with pytest.raises(ValueError, match="copy_external_source_passthrough_outputs=True"): + PipelineDefinition( + data_loading_callable_iterable=ShuffledShardedInputCallable( + data_provider=_OutputCopyProvider(), + batch_size=1, + shard_id=0, + num_shards=1, + shuffle=False, + ), + preprocess_functors=[], + passthrough_copy_branch_paths=["image"], + ) + + +def test_pipeline_definition_can_copy_selected_passthrough_outputs(monkeypatch): + batch_size = 2 + input_callable = ShuffledShardedInputCallable( + data_provider=_OutputCopyProvider(), + batch_size=batch_size, + shard_id=0, + num_shards=1, + shuffle=False, + ) + pipeline_def = PipelineDefinition( + data_loading_callable_iterable=input_callable, + preprocess_functors=[], + # The fake copy below adds an integer constant, which can promote DALI output dtypes. + # Batch size > 1 in combination with callable (i.e. per-sample) inputs avoids the potential pass-through issue, + # allowing to reliably test for unchanged values. + check_data_format=False, + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["label"], + ) + + def fake_copy(value): + return value + 1 + + monkeypatch.setattr(_insert_copy_for_passthrough.fn, "copy", fake_copy) + + pipeline = pipeline_def.get_dali_pipeline( + enable_conditionals=True, + batch_size=batch_size, + prefetch_queue_depth=1, + num_threads=1, + py_start_method="spawn", + ) + iterator = DALIStructuredOutputIterator( + batch_size, pipeline, pipeline_def.check_and_get_output_data_structure() + ) + + batch = next(iter(iterator)) + + # With ``shuffle=False`` the first batch contains sample ids 0 and 1. ``image`` is not selected for + # copying, so it should remain filled with the original sample id. ``label`` starts at + # ``sample_id + 10`` and is selected, so the fake copy adds one more. ``batch_size=2`` keeps the + # unselected callable-input output away from the single-sample pass-through case documented above. + assert torch.equal(batch["image"].to(torch.int64), torch.tensor([[0, 0], [1, 1]], dtype=torch.int64)) + assert torch.equal( + batch["camera"]["label"].to(torch.int64), torch.tensor([[11], [12]], dtype=torch.int64) + ) + + +def test_pipeline_definition_rejects_copy_selectors_when_copying_is_disabled(): + with pytest.raises(ValueError, match="copy_external_source_passthrough_outputs=True"): + PipelineDefinition( + data_loading_callable_iterable=ShuffledShardedInputCallable( + data_provider=_OutputCopyProvider(), + batch_size=1, + shard_id=0, + num_shards=1, + shuffle=False, + ), + preprocess_functors=[], + copy_external_source_passthrough_outputs=False, + passthrough_copy_branch_paths=["image"], + ) + + +def test_pipeline_definition_reports_invalid_copy_setup_with_context(): + pipeline_def = PipelineDefinition( + data_loading_callable_iterable=ShuffledShardedInputCallable( + data_provider=_OutputCopyProvider(), + batch_size=1, + shard_id=0, + num_shards=1, + shuffle=False, + ), + preprocess_functors=[], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["missing"], + ) + + with pytest.raises( + ValueError, + match="Invalid pass-through output copy configuration for final output format: .*does not exist", + ): + pipeline_def.check_and_get_output_data_structure() + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/packages/dali_pipeline_framework/tests/pipeline/sample_data_group_strings_in_pipeline_test.py b/packages/dali_pipeline_framework/tests/pipeline/sample_data_group_strings_in_pipeline_test.py index fc6b00d..0bdfea0 100644 --- a/packages/dali_pipeline_framework/tests/pipeline/sample_data_group_strings_in_pipeline_test.py +++ b/packages/dali_pipeline_framework/tests/pipeline/sample_data_group_strings_in_pipeline_test.py @@ -76,6 +76,7 @@ def test_strings_roundtrip_through_pipeline(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[], # No processing; pass-through + copy_external_source_passthrough_outputs=True, ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/annotation_element_condition_eval_test.py b/packages/dali_pipeline_framework/tests/processing_steps/annotation_element_condition_eval_test.py index 20e4c7a..897b7ed 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/annotation_element_condition_eval_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/annotation_element_condition_eval_test.py @@ -114,6 +114,8 @@ def test_simple_comparison_condition(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -160,6 +162,8 @@ def test_complex_condition_with_and_or(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -206,6 +210,8 @@ def test_not_condition(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -248,6 +254,8 @@ def test_remove_condition_fields(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -297,6 +305,8 @@ def test_multiple_annotation_fields(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -354,6 +364,8 @@ def test_comparison_operators(operator, operator_name, value, expected): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -392,6 +404,8 @@ def test_boolean_field_condition(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -431,6 +445,8 @@ def test_other_data_preserved(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -477,6 +493,8 @@ def test_number_in_identifier(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -542,6 +560,8 @@ def test_logical_vs_bitwise_and(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -606,6 +626,8 @@ def test_direct_field_comparison_operators(operator, operator_name, expected): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -646,6 +668,8 @@ def test_decimal_values_in_condition(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -687,6 +711,8 @@ def test_negative_values_and_unary_minus(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -714,6 +740,8 @@ def test_negative_values_and_unary_minus(): pipeline_def2 = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step2], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline2 = pipeline_def2.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/axes_layout_setter_test.py b/packages/dali_pipeline_framework/tests/processing_steps/axes_layout_setter_test.py index d1a89f2..a9df6e5 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/axes_layout_setter_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/axes_layout_setter_test.py @@ -94,9 +94,16 @@ def test_axes_layout_setter(field_names, layout_to_set): layout_to_set=layout_to_set, ) + selected_field_names = {field_names} if isinstance(field_names, str) else set(field_names) + passthrough_field_names = [ + name for name in ["image", "image2", "feature_map", "other_data"] if name not in selected_field_names + ] + pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=passthrough_field_names, ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/bev_bboxes_transformer_3d_test.py b/packages/dali_pipeline_framework/tests/processing_steps/bev_bboxes_transformer_3d_test.py index 76000e8..0822a90 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/bev_bboxes_transformer_3d_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/bev_bboxes_transformer_3d_test.py @@ -220,6 +220,26 @@ def create_reference_transformation_matrices(rotation_angle_rad, rotation_axis, return rotation_matrix, scaling_matrix, translation_matrix +def apply_transformation_to_ego_to_world(ego_to_world, rotation_matrix, scaling_matrix, translation_matrix): + """Reference for BEVBBoxesTransformer3D ego_to_world updates (rotation, then scaling, then translation).""" + + res = ego_to_world.copy() + res = res @ rotation_matrix.T + res = res @ np.linalg.inv(scaling_matrix) + res = res @ np.linalg.inv(translation_matrix) + return res + + +def apply_transformation_to_world_to_ego(world_to_ego, rotation_matrix, scaling_matrix, translation_matrix): + """Reference for BEVBBoxesTransformer3D world_to_ego updates (rotation, then scaling, then translation).""" + + res = world_to_ego.copy() + res = rotation_matrix @ res + res = scaling_matrix @ res + res = translation_matrix @ res + return res + + def apply_transformation_to_vecs(vecs, rotation_matrix, scaling_matrix, translation_matrix, are_points): """Apply transformations to points using reference implementation.""" @@ -275,13 +295,19 @@ def apply_transformation_to_points(points, rotation_matrix, scaling_matrix, tran def verify_matrix_inverse_relationship(ego_to_world, world_to_ego, tolerance=1e-6): """Verify that world_to_ego is the inverse of ego_to_world.""" + expected_world_to_ego = torch.linalg.inv(ego_to_world) + assert torch.allclose(world_to_ego, expected_world_to_ego, atol=tolerance), ( + "world_to_ego does not match inv(ego_to_world). " + f"Max difference: {torch.max(torch.abs(world_to_ego - expected_world_to_ego)).item()}" + ) + # Check that world_to_ego @ ego_to_world = identity identity_check = world_to_ego @ ego_to_world expected_identity = torch.eye(4, dtype=torch.float32) assert torch.allclose( identity_check, expected_identity, atol=tolerance - ), f"world_to_ego is not the inverse of ego_to_world. Max difference: {np.max(np.abs(identity_check - expected_identity))}" + ), f"world_to_ego is not the inverse of ego_to_world. Max difference: {torch.max(torch.abs(identity_check - expected_identity)).item()}" def verify_transformation_consistency( @@ -328,6 +354,7 @@ def verify( orientations_in, proj_matrix_in, ego_to_world_in, + world_to_ego_in, points_out, vels_out, sizes_out, @@ -383,6 +410,27 @@ def verify( abs_diff_orientations, torch.zeros_like(abs_diff_orientations), atol=tolerance ), f"Orientations transformation does not match reference implementation" + ego_to_world_ref = torch.tensor( + apply_transformation_to_ego_to_world( + ego_to_world_in, rotation_matrix, scaling_matrix, translation_matrix + ), + dtype=torch.float32, + ) + world_to_ego_ref = torch.tensor( + apply_transformation_to_world_to_ego( + world_to_ego_in, rotation_matrix, scaling_matrix, translation_matrix + ), + dtype=torch.float32, + ) + assert torch.allclose(ego_to_world_ref, ego_to_world_out, atol=tolerance), ( + "ego_to_world transformation does not match reference implementation. " + f"Max difference: {torch.max(torch.abs(ego_to_world_ref - ego_to_world_out)).item()}" + ) + assert torch.allclose(world_to_ego_ref, world_to_ego_out, atol=tolerance), ( + "world_to_ego transformation does not match reference implementation. " + f"Max difference: {torch.max(torch.abs(world_to_ego_ref - world_to_ego_out)).item()}" + ) + verify_transformation_consistency(points_in, proj_matrix_in, points_out, proj_matrix_out, tolerance) verify_transformation_consistency(points_in, ego_to_world_in, points_out, ego_to_world_out, tolerance) verify_matrix_inverse_relationship(ego_to_world_out, world_to_ego_out, tolerance) @@ -427,74 +475,78 @@ def run_transformation_test_with_reference_comparison( sequences.extend([translation_range_repl_x, translation_range_repl_y, translation_range_repl_z]) original_gen, fake_gen = set_dali_uniform_generator_and_get_orig_and_replacement(sequences) - provider = TestProvider() - input_callable = ShuffledShardedInputCallable( - provider, - batch_size=1, - num_shards=1, - shard_id=0, - shuffle=False, - ) - - step = BEVBBoxesTransformer3D( - data_field_names_points="points", - data_field_names_velocities="velocities", - data_field_names_sizes="sizes", - data_field_names_orientation="orientations", - data_field_names_proj_matrices_and_extrinsics="proj_matrix", - data_field_names_ego_to_world="ego_to_world", - data_field_names_world_to_ego="world_to_ego", - rotation_range=rotation_range, - rotation_axis=2, # Z-axis - scaling_range=scaling_range, - translation_max_abs=translation_max_abs, - ) + try: + provider = TestProvider() + input_callable = ShuffledShardedInputCallable( + provider, + batch_size=1, + num_shards=1, + shard_id=0, + shuffle=False, + ) - pipeline_def = PipelineDefinition( - data_loading_callable_iterable=input_callable, - preprocess_functors=[step], - ) + step = BEVBBoxesTransformer3D( + data_field_names_points="points", + data_field_names_velocities="velocities", + data_field_names_sizes="sizes", + data_field_names_orientation="orientations", + data_field_names_proj_matrices_and_extrinsics="proj_matrix", + data_field_names_ego_to_world="ego_to_world", + data_field_names_world_to_ego="world_to_ego", + rotation_range=rotation_range, + rotation_axis=2, # Z-axis + scaling_range=scaling_range, + translation_max_abs=translation_max_abs, + ) - pipeline = pipeline_def.get_dali_pipeline( - enable_conditionals=True, - batch_size=1, - prefetch_queue_depth=1, - num_threads=1, - py_start_method="spawn", - exec_dynamic=True, - ) + pipeline_def = PipelineDefinition( + data_loading_callable_iterable=input_callable, + preprocess_functors=[step], + ) - iterator = DALIStructuredOutputIterator(1, pipeline, pipeline_def.check_and_get_output_data_structure()) - iterator_iter = iter(iterator) - result = next(iterator_iter) + pipeline = pipeline_def.get_dali_pipeline( + enable_conditionals=True, + batch_size=1, + prefetch_queue_depth=1, + num_threads=1, + py_start_method="spawn", + exec_dynamic=True, + ) - # Get original data for comparison - original_data = provider.get_data(0) + iterator = DALIStructuredOutputIterator( + 1, pipeline, pipeline_def.check_and_get_output_data_structure() + ) + iterator_iter = iter(iterator) + result = next(iterator_iter) - # Note that `result` has a batch dimension (as it is a DALI iterator output), so we need to index - # with [0] to get the single sample. `original_data` does not have a batch dimension as it is the - # input data for a single sample. - verify( - original_data["points"], - original_data["velocities"], - original_data["sizes"], - original_data["orientations"], - original_data["proj_matrix"], - original_data["ego_to_world"], - result["points"][0], - result["velocities"][0], - result["sizes"][0], - result["orientations"][0], - result["proj_matrix"][0], - result["ego_to_world"][0], - result["world_to_ego"][0], - expected_rotation_angle, - rotation_axis, - expected_scaling_factor, - expected_translation, - ) + # Get original data for comparison + original_data = provider.get_data(0) - restore_generator(original_gen) + # Note that `result` has a batch dimension (as it is a DALI iterator output), so we need to index + # with [0] to get the single sample. `original_data` does not have a batch dimension as it is the + # input data for a single sample. + verify( + original_data["points"], + original_data["velocities"], + original_data["sizes"], + original_data["orientations"], + original_data["proj_matrix"], + original_data["ego_to_world"], + original_data["world_to_ego"], + result["points"][0], + result["velocities"][0], + result["sizes"][0], + result["orientations"][0], + result["proj_matrix"][0], + result["ego_to_world"][0], + result["world_to_ego"][0], + expected_rotation_angle, + rotation_axis, + expected_scaling_factor, + expected_translation, + ) + finally: + restore_generator(original_gen) def test_combined_transformations(): @@ -810,6 +862,9 @@ def test_no_transformations_enabled(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + # No copy selectors: all output fields may pass through unchanged when no + # transformations are enabled. ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/bounding_box_to_heatmap_converter_test.py b/packages/dali_pipeline_framework/tests/processing_steps/bounding_box_to_heatmap_converter_test.py index 0fd643a..9acd02a 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/bounding_box_to_heatmap_converter_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/bounding_box_to_heatmap_converter_test.py @@ -345,6 +345,9 @@ def test_annotation_to_heatmap_converter(use_other_diagonal, reverse_point_order pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["image_hw"], + passthrough_copy_branch_paths=["annotation"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -483,6 +486,9 @@ def test_annotation_to_heatmap_converter_single_heatmap(use_per_class_thresholds pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["image_hw"], + passthrough_copy_branch_paths=["annotation"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -577,6 +583,9 @@ def test_annotation_to_heatmap_converter_optional_outputs(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["image_hw"], + passthrough_copy_branch_paths=["annotation"], ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/conditional_elements_removal_test.py b/packages/dali_pipeline_framework/tests/processing_steps/conditional_elements_removal_test.py index 76c90d9..61d4abf 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/conditional_elements_removal_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/conditional_elements_removal_test.py @@ -122,6 +122,8 @@ def test_conditional_element_removal(remove_mask_field): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["other_data", "is_active"], ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/coordinate_cropper_test.py b/packages/dali_pipeline_framework/tests/processing_steps/coordinate_cropper_test.py index c7344de..58193ed 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/coordinate_cropper_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/coordinate_cropper_test.py @@ -114,6 +114,8 @@ def test_coordinate_cropper(points_field_name, minimum_point, maximum_point): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["points_2d", "points_3d", "bboxes", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/data_group_array_in_path_elements_applied_step_test.py b/packages/dali_pipeline_framework/tests/processing_steps/data_group_array_in_path_elements_applied_step_test.py index 66840cd..82b1145 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/data_group_array_in_path_elements_applied_step_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/data_group_array_in_path_elements_applied_step_test.py @@ -173,6 +173,7 @@ def test_data_group_array_in_path_elements_applied_step_independent_processing() pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/data_group_array_with_name_elements_applied_step_test.py b/packages/dali_pipeline_framework/tests/processing_steps/data_group_array_with_name_elements_applied_step_test.py index 93c05fa..2440540 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/data_group_array_with_name_elements_applied_step_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/data_group_array_with_name_elements_applied_step_test.py @@ -190,6 +190,7 @@ def test_data_group_array_with_name_elements_applied_step_independent_processing preprocess_functors=[step], prefetch_queue_depth=2, check_data_format=False, + copy_external_source_passthrough_outputs=True, ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/data_group_in_path_applied_step_test.py b/packages/dali_pipeline_framework/tests/processing_steps/data_group_in_path_applied_step_test.py index 16ed301..d15571d 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/data_group_in_path_applied_step_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/data_group_in_path_applied_step_test.py @@ -167,6 +167,7 @@ def test_data_group_in_path_applied_step_replaces_only_in_subtree(path_to_apply_ preprocess_functors=[step], prefetch_queue_depth=3, use_parallel_external_source=True, + copy_external_source_passthrough_outputs=True, ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/data_groups_with_name_applied_step_test.py b/packages/dali_pipeline_framework/tests/processing_steps/data_groups_with_name_applied_step_test.py index b0e766d..0c339f5 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/data_groups_with_name_applied_step_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/data_groups_with_name_applied_step_test.py @@ -181,6 +181,7 @@ def test_data_groups_with_name_applied_step_independent_processing(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/image_decoder_test.py b/packages/dali_pipeline_framework/tests/processing_steps/image_decoder_test.py index c69f582..b6dfa6c 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/image_decoder_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/image_decoder_test.py @@ -153,6 +153,8 @@ def test_image_decoder(use_device_mixed, as_bgr): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -237,6 +239,8 @@ def test_image_decoder_no_images_found(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) # This should raise an error during pipeline creation @@ -270,6 +274,8 @@ def test_image_decoder_multiple_batches(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/image_mean_std_dev_normalizer_test.py b/packages/dali_pipeline_framework/tests/processing_steps/image_mean_std_dev_normalizer_test.py index b017374..d453fed 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/image_mean_std_dev_normalizer_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/image_mean_std_dev_normalizer_test.py @@ -135,6 +135,8 @@ def test_image_mean_std_dev_normalizer_single_values(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -230,6 +232,8 @@ def test_image_mean_std_dev_normalizer_channel_specific(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -296,6 +300,8 @@ def test_image_mean_std_dev_normalizer_edge_cases(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -354,6 +360,8 @@ def test_image_mean_std_dev_normalizer_no_images_found(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) # This should raise an error during pipeline building diff --git a/packages/dali_pipeline_framework/tests/processing_steps/image_range_01_normalizer_test.py b/packages/dali_pipeline_framework/tests/processing_steps/image_range_01_normalizer_test.py index 749b0d7..d23ff18 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/image_range_01_normalizer_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/image_range_01_normalizer_test.py @@ -120,6 +120,8 @@ def test_image_range_01_normalizer(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -211,6 +213,8 @@ def test_image_range_01_normalizer_no_images_found(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) # This should raise an error during pipeline building diff --git a/packages/dali_pipeline_framework/tests/processing_steps/image_to_tile_size_padder_test.py b/packages/dali_pipeline_framework/tests/processing_steps/image_to_tile_size_padder_test.py index f021947..85272f3 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/image_to_tile_size_padder_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/image_to_tile_size_padder_test.py @@ -115,6 +115,8 @@ def test_image_to_tile_size_padder(tile_size, expected_sizes): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -174,6 +176,8 @@ def test_image_to_tile_size_padder_no_images_found(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) # Should raise KeyError during pipeline construction when trying to find images @@ -209,6 +213,8 @@ def test_image_to_tile_size_padder_already_padded(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/padding_to_uniform_test.py b/packages/dali_pipeline_framework/tests/processing_steps/padding_to_uniform_test.py index ab7de64..020e1db 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/padding_to_uniform_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/padding_to_uniform_test.py @@ -110,9 +110,29 @@ def test_padding_to_uniform_parametrized( step = PaddingToUniform(field_names=field_names, fill_value=fill_value) + passthrough_copy_kwargs = {} + if field_names == "image": + passthrough_copy_kwargs = { + "copy_external_source_passthrough_outputs": True, + "passthrough_copy_branch_paths": ["metadata"], + } + elif field_names == ["image", "values"]: + passthrough_copy_kwargs = { + "copy_external_source_passthrough_outputs": True, + "passthrough_copy_field_names": ["features"], + "passthrough_copy_field_names_scope_paths": ["metadata"], + } + elif field_names == ["image", "features"]: + passthrough_copy_kwargs = { + "copy_external_source_passthrough_outputs": True, + "passthrough_copy_field_names": ["values"], + "passthrough_copy_field_names_scope_paths": ["metadata"], + } + pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + **passthrough_copy_kwargs, ) pipeline = pipeline_def.get_dali_pipeline( @@ -213,6 +233,8 @@ def test_padding_to_uniform_nested_structure(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["image"], ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/photo_metric_distorter_test.py b/packages/dali_pipeline_framework/tests/processing_steps/photo_metric_distorter_test.py index 4e01fb0..4a65b2b 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/photo_metric_distorter_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/photo_metric_distorter_test.py @@ -353,6 +353,8 @@ def test_photometric_distorter_single_transformation( pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -457,6 +459,8 @@ def test_photometric_distorter_basic(use_uint8): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -606,6 +610,8 @@ def run_once(use_uint8: bool): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -707,6 +713,8 @@ def test_photometric_distorter_bgr(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -795,6 +803,8 @@ def test_photometric_distorter_no_augmentations(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -857,6 +867,8 @@ def test_photometric_distorter_no_images_found(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["metadata"], ) # This should raise an error during pipeline building diff --git a/packages/dali_pipeline_framework/tests/processing_steps/points_in_range_check_test.py b/packages/dali_pipeline_framework/tests/processing_steps/points_in_range_check_test.py index 8db451e..7da7aeb 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/points_in_range_check_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/points_in_range_check_test.py @@ -90,6 +90,8 @@ def test_points_in_range_check(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["labels", "other_field", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/unneeded_fields_remover_test.py b/packages/dali_pipeline_framework/tests/processing_steps/unneeded_fields_remover_test.py index 045e036..9badd66 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/unneeded_fields_remover_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/unneeded_fields_remover_test.py @@ -165,6 +165,8 @@ def test_remove_single_field(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data", "metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -247,6 +249,8 @@ def test_remove_multiple_fields(field_names): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -294,6 +298,8 @@ def test_remove_nonexistent_fields(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_branch_paths=["annotation", "other_data", "metadata", "temp_field"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -348,6 +354,8 @@ def test_remove_all_fields(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["other_data", "metadata"], ) pipeline = pipeline_def.get_dali_pipeline( @@ -398,6 +406,9 @@ def test_remove_nested_to_remove_fields(): pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["keep_this_field"], + passthrough_copy_branch_paths=["nested"], ) pipeline = pipeline_def.get_dali_pipeline( diff --git a/packages/dali_pipeline_framework/tests/processing_steps/visible_bbox_selector_test.py b/packages/dali_pipeline_framework/tests/processing_steps/visible_bbox_selector_test.py index 1e2ec42..b43dd4d 100644 --- a/packages/dali_pipeline_framework/tests/processing_steps/visible_bbox_selector_test.py +++ b/packages/dali_pipeline_framework/tests/processing_steps/visible_bbox_selector_test.py @@ -139,6 +139,8 @@ def test_visible_bbox_selector_occlusions( pipeline_def = PipelineDefinition( data_loading_callable_iterable=input_callable, preprocess_functors=[step], + copy_external_source_passthrough_outputs=True, + passthrough_copy_field_names=["bboxes", "depths", "image_hw"], ) pipeline = pipeline_def.get_dali_pipeline(