From f1a69240e65b08dcb3a843fe320e7c80e3329707 Mon Sep 17 00:00:00 2001 From: Jade Wibbels Date: Wed, 27 May 2026 10:21:32 -0600 Subject: [PATCH 1/7] Empty commit to start 2.86.1 release From e41563f998a39c45fdf9f7c9cdf40502e8d827ff Mon Sep 17 00:00:00 2001 From: JadeWibbels Date: Fri, 22 May 2026 11:27:04 -0600 Subject: [PATCH 2/7] Fix stuck DSRs (#8211) Co-authored-by: Claude Opus 4.6 (1M context) --- ...1-fix-stuck-dsrs-orphaned-async-tasks.yaml | 4 + .../privacy_request/request_service.py | 61 ++++- src/fides/api/task/create_request_tasks.py | 16 ++ src/fides/api/task/graph_task.py | 44 +++- .../privacy_request_service.py | 2 + .../ops/task/test_erase_after_dangling_ref.py | 147 +++++++++++ tests/fides/ops/task/test_graph_task.py | 49 +++- .../task/test_requeue_interrupted_tasks.py | 238 ++++++++++++++++++ 8 files changed, 547 insertions(+), 14 deletions(-) create mode 100644 changelog/8211-fix-stuck-dsrs-orphaned-async-tasks.yaml create mode 100644 tests/fides/ops/task/test_erase_after_dangling_ref.py diff --git a/changelog/8211-fix-stuck-dsrs-orphaned-async-tasks.yaml b/changelog/8211-fix-stuck-dsrs-orphaned-async-tasks.yaml new file mode 100644 index 00000000000..ebca057c843 --- /dev/null +++ b/changelog/8211-fix-stuck-dsrs-orphaned-async-tasks.yaml @@ -0,0 +1,4 @@ +type: Fixed +description: Fixed stuck DSRs when async task ConnectionConfig is deleted or disabled +pr: 8211 +labels: [] diff --git a/src/fides/api/service/privacy_request/request_service.py b/src/fides/api/service/privacy_request/request_service.py index 8274aa7ee93..ca3d9d07582 100644 --- a/src/fides/api/service/privacy_request/request_service.py +++ b/src/fides/api/service/privacy_request/request_service.py @@ -15,6 +15,7 @@ from fides.api.common_exceptions import PrivacyRequestError from fides.api.graph.config import ROOT_COLLECTION_ADDRESS, TERMINATOR_ADDRESS from fides.api.models.audit_log import AuditLog +from fides.api.models.connectionconfig import ConnectionConfig from fides.api.models.privacy_request import ( COMPLETED_EXECUTION_LOG_STATUSES, EXITED_EXECUTION_LOG_STATUSES, @@ -499,6 +500,7 @@ def _get_request_task_ids_in_progress( if task.status not in ( ExecutionLogStatus.in_processing, ExecutionLogStatus.pending, + ExecutionLogStatus.awaiting_processing, ): continue awaiting_upstream = False @@ -535,11 +537,42 @@ def _has_async_tasks_awaiting_external_completion( .filter( RequestTask.privacy_request_id == privacy_request_id, RequestTask.async_type.in_([AsyncTaskType.polling, AsyncTaskType.callback]), + RequestTask.status.notin_(EXITED_EXECUTION_LOG_STATUSES), ) .exists() ).scalar() +def _task_is_orphaned(db: Session, request_task_id: str) -> bool: + """Check if a request task's ConnectionConfig has been deleted or disabled. + + A task is orphaned when its ConnectionConfig either no longer exists + (hard-deleted) or has been disabled. In both cases the task can never + complete and should be skipped. + """ + connection_key = ( + db.query(RequestTask.traversal_details["dataset_connection_key"].as_string()) + .filter(RequestTask.id == request_task_id) + .scalar() + ) + if not connection_key: + logger.warning( + f"Request task {request_task_id} has no dataset_connection_key " + f"in traversal_details — possible data integrity issue" + ) + return False + + has_enabled_connection = db.query( + db.query(ConnectionConfig) + .filter( + ConnectionConfig.key == connection_key, + ConnectionConfig.disabled.is_(False), + ) + .exists() + ).scalar() + return not has_enabled_connection + + # pylint: disable=too-many-branches # pylint: disable=too-many-statements @celery_app.task(base=DatabaseTask, bind=True) @@ -757,9 +790,31 @@ def requeue_interrupted_tasks(self: DatabaseTask) -> None: subtask_id not in queued_tasks_ids and not celery_tasks_in_flight([subtask_id]) ): - logger.warning( - f"Request task {request_task_id} is not in the queue or running, requeueing privacy request" - ) + # awaiting_processing tasks have a cached subtask ID + # from their initial execution, but the Celery task + # has finished (the task is waiting for an external + # event). Only requeue if the connection is gone — + # otherwise the task is legitimately waiting. + # + # NOTE: For requires_input / pending_external PRs, + # requeue is safe for DB-backed ManualTask input + # (survives restart). Old manual-webhook input is + # Redis-only and may require re-submission if the + # cache TTL expires before the requeued task runs. + if task_status == ExecutionLogStatus.awaiting_processing: + if not _task_is_orphaned(db, request_task_id): + continue + logger.warning( + f"Request task {request_task_id} " + f"(privacy request {privacy_request.id}) is " + f"awaiting_processing but connection is " + f"deleted or disabled — requeueing" + ) + + else: + logger.warning( + f"Request task {request_task_id} is not in the queue or running, requeueing privacy request" + ) should_requeue = True break diff --git a/src/fides/api/task/create_request_tasks.py b/src/fides/api/task/create_request_tasks.py index d7b06e8daac..7b952732cb8 100644 --- a/src/fides/api/task/create_request_tasks.py +++ b/src/fides/api/task/create_request_tasks.py @@ -180,6 +180,22 @@ def build_erasure_networkx_digraph( networkx_graph.add_nodes_from(traversal_nodes.keys()) networkx_graph.add_nodes_from(ARTIFICIAL_NODES) + # Validate that all erase_after references point to collections that + # exist in the traversal or end_nodes. Dangling references (e.g. from a + # deleted integration) would silently create phantom nodes in the graph + # via networkx.add_edge, leading to a KeyError during task creation. + valid_nodes = set(traversal_nodes.keys()) | set(end_nodes) | set(ARTIFICIAL_NODES) + for node_name, traversal_node in traversal_nodes.items(): + for ref in traversal_node.node.collection.erase_after: + if ref not in valid_nodes: + raise TraversalError( + f"Erasure cannot proceed: collection '{node_name}' has an " + f"'Erase After' dependency on '{ref}', which no longer " + f"exists in the dataset graph. Update the 'Erase After' " + f"setting on this collection in the dataset configuration " + f"to remove the stale reference." + ) + for node_name, traversal_node in traversal_nodes.items(): # Add an edge from the root node to the current node, unless explicit erasure # dependencies are defined. Modifies end_nodes in place diff --git a/src/fides/api/task/graph_task.py b/src/fides/api/task/graph_task.py index 810b0362372..667c3535d6a 100644 --- a/src/fides/api/task/graph_task.py +++ b/src/fides/api/task/graph_task.py @@ -15,6 +15,7 @@ AwaitingAsyncProcessing, AwaitingAsyncTask, CollectionDisabled, + ConnectorNotFoundException, NotSupportedForCollection, PrivacyRequestErasureEmailSendRequired, SkippingConsentPropagation, @@ -254,18 +255,27 @@ def __init__( self.request_task = resources.privacy_request_task self.execution_node = ExecutionNode(resources.privacy_request_task) self.resources = resources - self.connector: BaseConnector = resources.get_connector( - self.execution_node.connection_key # ConnectionConfig.key - ) + try: + self.connector: BaseConnector = resources.get_connector( + self.execution_node.connection_key # ConnectionConfig.key + ) + except ConnectorNotFoundException: + # ConnectionConfig was deleted. Set connector to None so that + # skip_if_disabled() can detect it and raise CollectionDisabled, + # which the @retry decorator handles identically to a disabled + # connection — the task is marked skipped and downstream tasks + # proceed with empty data. + self.connector = None # type: ignore[assignment] self.key: CollectionAddress = self.execution_node.address - saas_config_dict = self.connector.configuration.saas_config - # Snapshot version at construction so all log entries reflect the version - # that was active when the task started, not a potentially later value. - self._saas_version: Optional[str] = ( - saas_config_dict.get("version") if saas_config_dict else None - ) + if self.connector is not None: + saas_config_dict = self.connector.configuration.saas_config + self._saas_version: Optional[str] = ( + saas_config_dict.get("version") if saas_config_dict else None + ) + else: + self._saas_version = None self.execution_log_id = None # a local copy of the execution log record written to. If we write multiple status @@ -277,6 +287,8 @@ def __repr__(self) -> str: def generate_dry_run_query(self) -> Optional[str]: """Type-specific query generated for this traversal_node.""" + if self.connector is None: + return None return self.connector.dry_run_query(self.execution_node) def can_write_data(self) -> bool: @@ -688,7 +700,19 @@ def get_connection_config(self) -> ConnectionConfig: return self.connector.configuration def skip_if_disabled(self) -> None: - """Skip execution for the given collection if it is attached to a disabled ConnectionConfig.""" + """Skip execution if the ConnectionConfig is deleted or disabled. + + A None connector means the ConnectionConfig was hard-deleted from the + database (caught during __init__). This is treated identically to a + disabled connection — the @retry decorator catches CollectionDisabled + and marks the task as skipped. + """ + if self.connector is None: + raise CollectionDisabled( + f"Skipping collection {self.execution_node.address}. " + f"ConnectionConfig was deleted.", + ) + connection_config: ConnectionConfig = self.get_connection_config() if connection_config.disabled: diff --git a/src/fides/service/privacy_request/privacy_request_service.py b/src/fides/service/privacy_request/privacy_request_service.py index 3cfe82e1589..2ce2541f789 100644 --- a/src/fides/service/privacy_request/privacy_request_service.py +++ b/src/fides/service/privacy_request/privacy_request_service.py @@ -1250,6 +1250,8 @@ def _requeue_privacy_request( if privacy_request.status not in [ PrivacyRequestStatus.approved, PrivacyRequestStatus.in_processing, + PrivacyRequestStatus.requires_input, + PrivacyRequestStatus.pending_external, ]: raise PrivacyRequestError( f"Cannot re-queue privacy request {privacy_request.id} with status {privacy_request.status.value}" diff --git a/tests/fides/ops/task/test_erase_after_dangling_ref.py b/tests/fides/ops/task/test_erase_after_dangling_ref.py new file mode 100644 index 00000000000..2c5c973e140 --- /dev/null +++ b/tests/fides/ops/task/test_erase_after_dangling_ref.py @@ -0,0 +1,147 @@ +"""Test validation of erase_after references during erasure graph construction. + +When a collection's erase_after references a collection that doesn't exist in +the DatasetGraph (e.g. the referenced integration was deleted), the graph +builder must reject it upfront with a clear error rather than allowing phantom +nodes to silently enter the graph and corrupt task creation. +""" + +import pytest + +from fides.api.common_exceptions import TraversalError +from fides.api.graph.config import ( + Collection, + CollectionAddress, + GraphDataset, + ScalarField, +) +from fides.api.graph.graph import DatasetGraph +from fides.api.graph.traversal import Traversal, TraversalNode +from fides.api.task.create_request_tasks import ( + build_erasure_networkx_digraph, + collect_tasks_fn, + persist_initial_erasure_request_tasks, +) + + +def _identity_field(name: str) -> ScalarField: + """Create a scalar field marked as an identity seed.""" + f = ScalarField(name=name, primary_key=True) + f.identity = "email" + return f + + +def _build_graph_with_dangling_erase_after(): + """Build a DatasetGraph where one collection has erase_after pointing to + a collection that does not exist in the graph. + + Graph layout: + active_api (connection: active_conn) + └── users (identity: email, erase_after: [deleted_api.users]) + + deleted_api does NOT exist in the DatasetGraph. + """ + active_collection = Collection( + name="users", + fields=[_identity_field("email"), ScalarField(name="name")], + erase_after={CollectionAddress("deleted_api", "users")}, + ) + + active_dataset = GraphDataset( + name="active_api", + collections=[active_collection], + connection_key="active_conn", + ) + + return DatasetGraph(active_dataset) + + +class TestDanglingEraseAfterReference: + """Verify that erase_after referencing a non-existent collection is caught + before it can corrupt erasure task creation.""" + + def test_build_erasure_graph_rejects_dangling_erase_after(self): + """build_erasure_networkx_digraph should validate that all erase_after + references point to collections that exist in the traversal before + building the graph. A clear error upfront prevents partial task + creation and unrecoverable state. + """ + dataset_graph = _build_graph_with_dangling_erase_after() + + identity = {"email": "test@example.com"} + traversal = Traversal(dataset_graph, identity) + + traversal_nodes: dict[CollectionAddress, TraversalNode] = {} + traversal.traverse(traversal_nodes, collect_tasks_fn) + + erasure_end_nodes = list(dataset_graph.nodes.keys()) + + with pytest.raises(TraversalError, match="deleted_api:users"): + build_erasure_networkx_digraph(traversal_nodes, erasure_end_nodes) + + def test_persist_erasure_tasks_rejects_dangling_erase_after( + self, db, privacy_request + ): + """persist_initial_erasure_request_tasks should raise TraversalError + before creating any tasks when a dangling erase_after is detected. + """ + dataset_graph = _build_graph_with_dangling_erase_after() + + identity = {"email": "test@example.com"} + traversal = Traversal(dataset_graph, identity) + + traversal_nodes: dict[CollectionAddress, TraversalNode] = {} + traversal.traverse(traversal_nodes, collect_tasks_fn) + + erasure_end_nodes = list(dataset_graph.nodes.keys()) + + with pytest.raises(TraversalError, match="deleted_api:users"): + persist_initial_erasure_request_tasks( + db, + privacy_request, + traversal_nodes, + erasure_end_nodes, + dataset_graph, + ) + + # No erasure tasks should have been created + assert privacy_request.erasure_tasks.count() == 0 + + def test_valid_erase_after_still_works(self): + """erase_after referencing a collection that exists in the graph + should continue to work normally. + """ + users_collection = Collection( + name="users", + fields=[_identity_field("email"), ScalarField(name="name")], + ) + orders_collection = Collection( + name="orders", + fields=[_identity_field("email"), ScalarField(name="total")], + erase_after={CollectionAddress("test_dataset", "users")}, + ) + + dataset = GraphDataset( + name="test_dataset", + collections=[users_collection, orders_collection], + connection_key="test_conn", + ) + dataset_graph = DatasetGraph(dataset) + + identity = {"email": "test@example.com"} + traversal = Traversal(dataset_graph, identity) + + traversal_nodes: dict[CollectionAddress, TraversalNode] = {} + traversal.traverse(traversal_nodes, collect_tasks_fn) + + erasure_end_nodes = list(dataset_graph.nodes.keys()) + + # Should not raise + erasure_graph = build_erasure_networkx_digraph( + traversal_nodes, erasure_end_nodes + ) + + # orders should depend on users, not ROOT + users_addr = CollectionAddress("test_dataset", "users") + orders_addr = CollectionAddress("test_dataset", "orders") + assert orders_addr in erasure_graph.successors(users_addr) diff --git a/tests/fides/ops/task/test_graph_task.py b/tests/fides/ops/task/test_graph_task.py index 065cf99ffdf..33a7b38d662 100644 --- a/tests/fides/ops/task/test_graph_task.py +++ b/tests/fides/ops/task/test_graph_task.py @@ -7,7 +7,7 @@ from bson import ObjectId from fideslang.models import Dataset -from fides.api.common_exceptions import SkippingConsentPropagation +from fides.api.common_exceptions import CollectionDisabled, SkippingConsentPropagation from fides.api.graph.config import ( ROOT_COLLECTION_ADDRESS, TERMINATOR_ADDRESS, @@ -1390,3 +1390,50 @@ def test_erasure_request_traversal_only_skips_masking( assert "traversal-only" in log.message.lower() else: mock_mask.assert_called_once() + + +class TestDeletedConnectionConfig: + """ENG-3834: GraphTask behavior when ConnectionConfig has been deleted.""" + + @pytest.fixture(scope="function") + def deleted_conn_graph_task(self, privacy_request, policy, db): + """Build a GraphTask whose connection_key does NOT match any + ConnectionConfig in the provided list — simulating a hard-deleted + connection.""" + resources = TaskResources( + privacy_request, + policy, + [], # no connection configs → get_connector raises ConnectorNotFoundException + EMPTY_REQUEST_TASK, + db, + ) + tn = TraversalNode(generate_node("deleted_ds", "deleted_coll", "id")) + rq = tn.to_mock_request_task() + rq.action_type = ActionType.access + rq.status = ExecutionLogStatus.pending + rq.id = str(uuid.uuid4()) + db.add(rq) + db.commit() + + resources.privacy_request_task = rq + return GraphTask(resources) + + def test_connector_is_none_when_connection_deleted(self, deleted_conn_graph_task): + """GraphTask should set connector to None rather than crashing + when the ConnectionConfig has been deleted.""" + assert deleted_conn_graph_task.connector is None + + def test_saas_version_none_when_connection_deleted(self, deleted_conn_graph_task): + """_saas_version should be None when connector is missing.""" + assert deleted_conn_graph_task._saas_version is None + + def test_generate_dry_run_query_returns_none(self, deleted_conn_graph_task): + """generate_dry_run_query should return None instead of crashing + with AttributeError when connector is None.""" + assert deleted_conn_graph_task.generate_dry_run_query() is None + + def test_skip_if_disabled_raises_collection_disabled(self, deleted_conn_graph_task): + """skip_if_disabled should raise CollectionDisabled when connector + is None, just like it does for a disabled connection.""" + with pytest.raises(CollectionDisabled, match="ConnectionConfig was deleted"): + deleted_conn_graph_task.skip_if_disabled() diff --git a/tests/fides/task/test_requeue_interrupted_tasks.py b/tests/fides/task/test_requeue_interrupted_tasks.py index a64f29c8a98..50f6c397f28 100644 --- a/tests/fides/task/test_requeue_interrupted_tasks.py +++ b/tests/fides/task/test_requeue_interrupted_tasks.py @@ -4,6 +4,11 @@ import pytest +from fides.api.models.connectionconfig import ( + AccessLevel, + ConnectionConfig, + ConnectionType, +) from fides.api.models.privacy_request import PrivacyRequest, RequestTask from fides.api.models.privacy_request.request_task import AsyncTaskType from fides.api.models.worker_task import ExecutionLogStatus @@ -93,6 +98,7 @@ def _make( upstream=None, async_type=None, cached_subtask_id=None, + connection_key=None, ): data = { "action_type": ActionType.access, @@ -106,6 +112,13 @@ def _make( } if async_type is not None: data["async_type"] = async_type + if connection_key is not None: + data["traversal_details"] = { + "dataset_connection_key": connection_key, + "incoming_edges": [], + "outgoing_edges": [], + "input_keys": [], + } task = RequestTask.create(db, data=data) if cached_subtask_id: cache_task_tracking_key(task.id, cached_subtask_id) @@ -529,3 +542,228 @@ def test_integration_privacy_request_retry_workflow(self, make_privacy_request): reset_privacy_request_retry_count(pr.id) assert get_privacy_request_retry_count(pr.id) == 0 + + +# --------------------------------------------------------------------------- +# Orphaned task tests (ENG-3834) +# +# These tests document the behavior of the watchdog when async tasks have +# deleted or disabled connections. Tests marked xfail demonstrate the +# current bug — they will pass once the fix is applied. +# --------------------------------------------------------------------------- + + +@pytest.fixture +def make_connection_config(db): + """Factory: create ConnectionConfig instances with automatic teardown.""" + created = [] + + def _make(key=None, disabled=False): + key = key or f"test_conn_{uuid.uuid4().hex[:8]}" + cc = ConnectionConfig.create( + db=db, + data={ + "key": key, + "name": key, + "connection_type": ConnectionType.postgres, + "access": AccessLevel.read, + "disabled": disabled, + }, + ) + created.append(cc) + return cc + + yield _make + for cc in reversed(created): + try: + cc.delete(db) + except Exception: + pass + + +class TestOrphanedAsyncTasks: + """ENG-3834: Watchdog behavior when async tasks have deleted/disabled connections.""" + + # -- Fixed: orphaned callback tasks are now detected and requeued -- + + @pytest.mark.parametrize( + "conn_state", + [ + pytest.param("deleted", id="deleted"), + pytest.param("disabled", id="disabled"), + ], + ) + @mock.patch(_CANCEL) + @mock.patch(_REQUEUE) + @mock.patch(_QUEUE, return_value=[]) + @mock.patch(_IN_FLIGHT, return_value=False) + def test_callback_task_unavailable_connection_should_requeue( + self, + mock_in_flight, + mock_queue, + mock_requeue, + mock_cancel, + make_privacy_request, + make_request_task, + make_connection_config, + conn_state, + ): + """Callback task whose connection is deleted or disabled — the PR + should be requeued so the task can be re-executed and skipped. + Currently the watchdog cannot see awaiting_processing tasks at all.""" + conn_key = f"{conn_state}_conn" + if conn_state == "disabled": + make_connection_config(key=conn_key, disabled=True) + # "deleted" case: no ConnectionConfig created → key doesn't exist + + pr = make_privacy_request() + make_request_task( + pr, + ExecutionLogStatus.awaiting_processing, + async_type=AsyncTaskType.callback, + connection_key=conn_key, + cached_subtask_id="old-celery-id", + ) + requeue_interrupted_tasks.apply().get() + mock_requeue.assert_called_once() + + # -- Current behavior: should remain unchanged -- + + @mock.patch(_CANCEL) + @mock.patch(_REQUEUE) + @mock.patch(_QUEUE, return_value=[]) + @mock.patch(_IN_FLIGHT, return_value=False) + def test_callback_task_valid_connection_not_requeued( + self, + mock_in_flight, + mock_queue, + mock_requeue, + mock_cancel, + make_privacy_request, + make_request_task, + make_connection_config, + ): + """Callback task with a valid, enabled connection — legitimately + waiting for external callback. Watchdog must NOT requeue.""" + cc = make_connection_config(key="live_conn") + pr = make_privacy_request() + make_request_task( + pr, + ExecutionLogStatus.awaiting_processing, + async_type=AsyncTaskType.callback, + connection_key=cc.key, + cached_subtask_id="old-celery-id", + ) + requeue_interrupted_tasks.apply().get() + mock_requeue.assert_not_called() + mock_cancel.assert_not_called() + + # -- Fixed: exited async tasks no longer blind the watchdog -- + + @pytest.mark.parametrize( + "exited_status", + [ + pytest.param(ExecutionLogStatus.complete, id="complete"), + pytest.param(ExecutionLogStatus.error, id="error"), + pytest.param(ExecutionLogStatus.skipped, id="skipped"), + ], + ) + @mock.patch(_CANCEL) + @mock.patch(_REQUEUE) + @mock.patch(_QUEUE, return_value=[]) + @mock.patch(_IN_FLIGHT, return_value=False) + def test_exited_async_task_does_not_blind_watchdog( + self, + mock_in_flight, + mock_queue, + mock_requeue, + mock_cancel, + make_privacy_request, + make_request_task, + exited_status, + ): + """An async task in any exited status (complete, error, skipped) + should not prevent the watchdog from detecting stuck non-async tasks + in the same PR. Currently _has_async_tasks_awaiting_external_completion + has no status filter, so any PR that ever had an async task is + permanently invisible to the watchdog.""" + pr = make_privacy_request() + # Exited async task — should not blind the watchdog + make_request_task( + pr, + exited_status, + collection="async_coll", + async_type=AsyncTaskType.callback, + ) + # Stuck non-async task — no subtask_id, should trigger requeue + make_request_task( + pr, + ExecutionLogStatus.in_processing, + collection="stuck_coll", + ) + requeue_interrupted_tasks.apply().get() + # The stuck non-async task should cause a requeue + mock_requeue.assert_called_once() + + # -- pending_external PR with orphaned async task should requeue, not cancel -- + + @mock.patch(_CANCEL) + @mock.patch(_REQUEUE) + @mock.patch(_QUEUE, return_value=[]) + @mock.patch(_IN_FLIGHT, return_value=False) + def test_pending_external_pr_with_orphaned_task_requeued( + self, + mock_in_flight, + mock_queue, + mock_requeue, + mock_cancel, + make_privacy_request, + make_request_task, + ): + """A PR in pending_external (e.g. waiting for Jira) that also has an + orphaned async callback task should be requeued — not canceled. + + On requeue, the orphaned task will be re-executed and skipped + (CollectionDisabled), while the Jira task re-pauses idempotently.""" + pr = make_privacy_request(status=PrivacyRequestStatus.pending_external) + make_request_task( + pr, + ExecutionLogStatus.awaiting_processing, + async_type=AsyncTaskType.callback, + connection_key="deleted_conn", + cached_subtask_id="old-celery-id", + ) + requeue_interrupted_tasks.apply().get() + mock_requeue.assert_called_once() + mock_cancel.assert_not_called() + + # -- Edge case: missing connection_key in traversal_details -- + + @mock.patch(_CANCEL) + @mock.patch(_REQUEUE) + @mock.patch(_QUEUE, return_value=[]) + @mock.patch(_IN_FLIGHT, return_value=False) + def test_missing_connection_key_not_treated_as_orphaned( + self, + mock_in_flight, + mock_queue, + mock_requeue, + mock_cancel, + make_privacy_request, + make_request_task, + ): + """An awaiting_processing task whose traversal_details lacks + dataset_connection_key should NOT be treated as orphaned. The + conservative default (return False) keeps the task in its current + state rather than incorrectly requeueing.""" + pr = make_privacy_request() + # No connection_key → traversal_details won't have dataset_connection_key + make_request_task( + pr, + ExecutionLogStatus.awaiting_processing, + async_type=AsyncTaskType.callback, + cached_subtask_id="old-celery-id", + ) + requeue_interrupted_tasks.apply().get() + mock_requeue.assert_not_called() + mock_cancel.assert_not_called() From fb12c478b62a41194968440a8e2661e56eb6b523 Mon Sep 17 00:00:00 2001 From: Ray Harnett <99291082+rayharnett@users.noreply.github.com> Date: Fri, 22 May 2026 19:59:15 +0100 Subject: [PATCH 3/7] ENG-3926: Stream diagnostics ZIP directly instead of uploading to storage (#8254) Co-authored-by: Claude Opus 4.6 (1M context) --- .../8254-stream-dsr-diagnostics-zip.yaml | 3 + ...eDownloadPrivacyRequestDiagnostics.test.ts | 56 ++++++++++++++ .../useDownloadPrivacyRequestDiagnostics.ts | 76 +++++++++++-------- .../privacy-requests.slice.ts | 11 --- clients/admin-ui/src/types/api/index.ts | 1 - ...PrivacyRequestDiagnosticsExportResponse.ts | 25 ------ clients/admin-ui/src/types/api/types.ts | 36 --------- .../v1/endpoints/privacy_request_endpoints.py | 42 ++++++---- .../privacy_request/diagnostics/__init__.py | 12 +-- .../privacy_request/diagnostics/exceptions.py | 6 -- .../privacy_request/diagnostics/export.py | 73 ++---------------- .../privacy_request/diagnostics/schemas.py | 11 --- .../test_privacy_request_diagnostics.py | 38 ++-------- 13 files changed, 143 insertions(+), 247 deletions(-) create mode 100644 changelog/8254-stream-dsr-diagnostics-zip.yaml create mode 100644 clients/admin-ui/src/features/privacy-requests/hooks/useDownloadPrivacyRequestDiagnostics.test.ts delete mode 100644 clients/admin-ui/src/types/api/models/PrivacyRequestDiagnosticsExportResponse.ts delete mode 100644 src/fides/service/privacy_request/diagnostics/exceptions.py diff --git a/changelog/8254-stream-dsr-diagnostics-zip.yaml b/changelog/8254-stream-dsr-diagnostics-zip.yaml new file mode 100644 index 00000000000..ceefed4b85a --- /dev/null +++ b/changelog/8254-stream-dsr-diagnostics-zip.yaml @@ -0,0 +1,3 @@ +type: Fixed +description: Fixed "Download troubleshooting data" feature to stream diagnostics ZIP directly instead of uploading to storage, eliminating storage configuration dependency and reliability issues +pr: 8254 diff --git a/clients/admin-ui/src/features/privacy-requests/hooks/useDownloadPrivacyRequestDiagnostics.test.ts b/clients/admin-ui/src/features/privacy-requests/hooks/useDownloadPrivacyRequestDiagnostics.test.ts new file mode 100644 index 00000000000..8aa64401cbe --- /dev/null +++ b/clients/admin-ui/src/features/privacy-requests/hooks/useDownloadPrivacyRequestDiagnostics.test.ts @@ -0,0 +1,56 @@ +import { downloadBlob } from "./useDownloadPrivacyRequestDiagnostics"; + +describe("downloadBlob", () => { + const mockCreateObjectURL = jest.fn().mockReturnValue("blob:mock-url"); + const mockRevokeObjectURL = jest.fn(); + + beforeEach(() => { + URL.createObjectURL = mockCreateObjectURL; + URL.revokeObjectURL = mockRevokeObjectURL; + }); + + afterEach(() => { + jest.restoreAllMocks(); + mockCreateObjectURL.mockClear(); + mockRevokeObjectURL.mockClear(); + }); + + it("creates an object URL from the blob and triggers download", () => { + const clickSpy = jest.fn(); + const removeSpy = jest.fn(); + jest.spyOn(document, "createElement").mockReturnValue({ + href: "", + download: "", + click: clickSpy, + remove: removeSpy, + } as unknown as HTMLAnchorElement); + + const blob = new Blob(["test"], { type: "application/zip" }); + downloadBlob(blob, "diagnostics-abc.zip"); + + expect(mockCreateObjectURL).toHaveBeenCalledWith(blob); + expect(clickSpy).toHaveBeenCalled(); + expect(removeSpy).toHaveBeenCalled(); + expect(mockRevokeObjectURL).toHaveBeenCalledWith("blob:mock-url"); + }); + + it("sets the correct filename on the download link", () => { + let capturedDownload = ""; + jest.spyOn(document, "createElement").mockReturnValue({ + href: "", + set download(val: string) { + capturedDownload = val; + }, + get download() { + return capturedDownload; + }, + click: jest.fn(), + remove: jest.fn(), + } as unknown as HTMLAnchorElement); + + const blob = new Blob(["test"]); + downloadBlob(blob, "diagnostics-abc.zip"); + + expect(capturedDownload).toBe("diagnostics-abc.zip"); + }); +}); diff --git a/clients/admin-ui/src/features/privacy-requests/hooks/useDownloadPrivacyRequestDiagnostics.ts b/clients/admin-ui/src/features/privacy-requests/hooks/useDownloadPrivacyRequestDiagnostics.ts index 4915d8be66f..285fb040232 100644 --- a/clients/admin-ui/src/features/privacy-requests/hooks/useDownloadPrivacyRequestDiagnostics.ts +++ b/clients/admin-ui/src/features/privacy-requests/hooks/useDownloadPrivacyRequestDiagnostics.ts @@ -1,13 +1,23 @@ import { useMessage } from "fidesui"; +import { useState } from "react"; -import { getErrorMessage } from "~/features/common/helpers"; +import { useAppSelector } from "~/app/hooks"; +import { selectToken } from "~/features/auth/auth.slice"; +import { addCommonHeaders } from "~/features/common/CommonHeaders"; import { useHasPermission } from "~/features/common/Restrict"; import { ScopeRegistryEnum } from "~/types/api"; -import { useLazyGetPrivacyRequestDiagnosticsQuery } from "../privacy-requests.slice"; import { PrivacyRequestEntity } from "../types"; -const isLikelyRemoteUrl = (value: string) => /^https?:\/\//i.test(value); +export const downloadBlob = (blob: Blob, filename: string) => { + const url = URL.createObjectURL(blob); + const link = document.createElement("a"); + link.href = url; + link.download = filename; + link.click(); + link.remove(); + URL.revokeObjectURL(url); +}; const useDownloadPrivacyRequestDiagnostics = ({ privacyRequest, @@ -15,46 +25,46 @@ const useDownloadPrivacyRequestDiagnostics = ({ privacyRequest: PrivacyRequestEntity; }) => { const message = useMessage(); + const token = useAppSelector(selectToken); + const [isLoading, setIsLoading] = useState(false); const hasPermissionsToReadPrivacyRequests = useHasPermission([ ScopeRegistryEnum.PRIVACY_REQUEST_READ, ]); - const [fetchDiagnostics, { isFetching }] = - useLazyGetPrivacyRequestDiagnosticsQuery(); - const downloadTroubleshootingData = async () => { - const result = await fetchDiagnostics({ - privacy_request_id: privacyRequest.id, - }); + setIsLoading(true); + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), 60_000); + try { + const headers = new Headers(); + addCommonHeaders(headers, token); - if ("error" in result) { - message.error( - getErrorMessage( - result.error as NonNullable, - "Unable to resolve download URL", - ), + const resp = await fetch( + `${process.env.NEXT_PUBLIC_FIDESCTL_API}/privacy-request/${privacyRequest.id}/diagnostics`, + { headers, signal: controller.signal }, ); - return; - } - const downloadUrl = result.data?.download_url ?? ""; - if (!downloadUrl) { - message.error("Unable to resolve download URL"); - return; - } + if (!resp.ok) { + const body = await resp.json().catch(() => null); + message.error( + body?.detail ?? "Unable to download troubleshooting data", + ); + return; + } - if (!isLikelyRemoteUrl(downloadUrl)) { - message.info("Troubleshooting data stored locally cannot be downloaded"); - return; + const blob = await resp.blob(); + downloadBlob(blob, `diagnostics-${privacyRequest.id}.zip`); + } catch (err) { + if (err instanceof DOMException && err.name === "AbortError") { + message.error("Download timed out. Please try again."); + } else { + message.error("Unable to download troubleshooting data"); + } + } finally { + clearTimeout(timeoutId); + setIsLoading(false); } - - const link = document.createElement("a"); - link.href = downloadUrl; - link.target = "_blank"; - link.rel = "noopener noreferrer"; - link.click(); - link.remove(); }; const showDownloadTroubleshootingData = hasPermissionsToReadPrivacyRequests; @@ -62,7 +72,7 @@ const useDownloadPrivacyRequestDiagnostics = ({ return { showDownloadTroubleshootingData, downloadTroubleshootingData, - isLoading: isFetching, + isLoading, }; }; diff --git a/clients/admin-ui/src/features/privacy-requests/privacy-requests.slice.ts b/clients/admin-ui/src/features/privacy-requests/privacy-requests.slice.ts index fb1a7779e8c..81e14b5f3ab 100644 --- a/clients/admin-ui/src/features/privacy-requests/privacy-requests.slice.ts +++ b/clients/admin-ui/src/features/privacy-requests/privacy-requests.slice.ts @@ -11,7 +11,6 @@ import { Page_Union_PrivacyRequestVerboseResponseExtended__PrivacyRequestResponseExtended__, PrivacyRequestAccessResults, PrivacyRequestCreateExtended as PrivacyRequestCreate, - PrivacyRequestDiagnosticsExportResponse, PrivacyRequestFilter, PrivacyRequestNotificationInfo, PrivacyRequestStatus, @@ -590,15 +589,6 @@ export const privacyRequestApi = baseApi.injectEndpoints({ url: `privacy-request/${privacy_request_id}/access-results`, }), }), - getPrivacyRequestDiagnostics: build.query< - PrivacyRequestDiagnosticsExportResponse, - { privacy_request_id: string } - >({ - query: ({ privacy_request_id }) => ({ - method: "GET", - url: `privacy-request/${privacy_request_id}/diagnostics`, - }), - }), getFilteredResults: build.query< { privacy_request_id: string; @@ -662,7 +652,6 @@ export const { useCreateStorageSecretsMutation, useGetActiveStorageQuery, useGetPrivacyRequestAccessResultsQuery, - useLazyGetPrivacyRequestDiagnosticsQuery, useGetFilteredResultsQuery, useGetTestLogsQuery, usePostPrivacyRequestFinalizeMutation, diff --git a/clients/admin-ui/src/types/api/index.ts b/clients/admin-ui/src/types/api/index.ts index f6c71568ae1..72182bcaa11 100644 --- a/clients/admin-ui/src/types/api/index.ts +++ b/clients/admin-ui/src/types/api/index.ts @@ -711,7 +711,6 @@ export type * from "./models/PrivacyRequestBulkSelection"; export type * from "./models/PrivacyRequestCreateExtended"; export * from "./models/PrivacyRequestDRPStatus"; export type * from "./models/PrivacyRequestDRPStatusResponse"; -export type * from "./models/PrivacyRequestDiagnosticsExportResponse"; export type * from "./models/PrivacyRequestFieldDefinition"; export type * from "./models/PrivacyRequestFieldsResponse"; export type * from "./models/PrivacyRequestFilter"; diff --git a/clients/admin-ui/src/types/api/models/PrivacyRequestDiagnosticsExportResponse.ts b/clients/admin-ui/src/types/api/models/PrivacyRequestDiagnosticsExportResponse.ts deleted file mode 100644 index 007d60011c0..00000000000 --- a/clients/admin-ui/src/types/api/models/PrivacyRequestDiagnosticsExportResponse.ts +++ /dev/null @@ -1,25 +0,0 @@ -// This file is auto-generated by @hey-api/openapi-ts - -/** - * PrivacyRequestDiagnosticsExportResponse - * - * Response payload for a diagnostics export request. - */ -export type PrivacyRequestDiagnosticsExportResponse = { - /** - * Download Url - */ - download_url: string; - /** - * Storage Type - */ - storage_type: string; - /** - * Object Key - */ - object_key: string; - /** - * Created At - */ - created_at: string; -}; diff --git a/clients/admin-ui/src/types/api/types.ts b/clients/admin-ui/src/types/api/types.ts index c955002f15f..86a05297be8 100644 --- a/clients/admin-ui/src/types/api/types.ts +++ b/clients/admin-ui/src/types/api/types.ts @@ -410,7 +410,6 @@ import type { PrivacyPreferenceStats } from "./models/PrivacyPreferenceStats"; import type { PrivacyRequestAccessResults } from "./models/PrivacyRequestAccessResults"; import type { PrivacyRequestBulkSelection } from "./models/PrivacyRequestBulkSelection"; import type { PrivacyRequestCreateExtended } from "./models/PrivacyRequestCreateExtended"; -import type { PrivacyRequestDiagnosticsExportResponse } from "./models/PrivacyRequestDiagnosticsExportResponse"; import type { PrivacyRequestDRPStatusResponse } from "./models/PrivacyRequestDRPStatusResponse"; import type { PrivacyRequestFieldsResponse } from "./models/PrivacyRequestFieldsResponse"; import type { PrivacyRequestFilter } from "./models/PrivacyRequestFilter"; @@ -8350,41 +8349,6 @@ export type getRequestStatusLogsApiV1PrivacyRequestPrivacyRequestIdLogGetRespons export type getRequestStatusLogsApiV1PrivacyRequestPrivacyRequestIdLogGetResponse = getRequestStatusLogsApiV1PrivacyRequestPrivacyRequestIdLogGetResponses[keyof getRequestStatusLogsApiV1PrivacyRequestPrivacyRequestIdLogGetResponses]; -export type getPrivacyRequestDiagnosticsReportApiV1PrivacyRequestPrivacyRequestIdDiagnosticsGetData = - { - body?: never; - path: { - /** - * Privacy Request Id - */ - privacy_request_id: string; - }; - query?: never; - url: "/api/v1/privacy-request/{privacy_request_id}/diagnostics"; - }; - -export type getPrivacyRequestDiagnosticsReportApiV1PrivacyRequestPrivacyRequestIdDiagnosticsGetErrors = - { - /** - * Validation Error - */ - 422: HTTPValidationError; - }; - -export type getPrivacyRequestDiagnosticsReportApiV1PrivacyRequestPrivacyRequestIdDiagnosticsGetError = - getPrivacyRequestDiagnosticsReportApiV1PrivacyRequestPrivacyRequestIdDiagnosticsGetErrors[keyof getPrivacyRequestDiagnosticsReportApiV1PrivacyRequestPrivacyRequestIdDiagnosticsGetErrors]; - -export type getPrivacyRequestDiagnosticsReportApiV1PrivacyRequestPrivacyRequestIdDiagnosticsGetResponses = - { - /** - * Successful Response - */ - 200: PrivacyRequestDiagnosticsExportResponse; - }; - -export type getPrivacyRequestDiagnosticsReportApiV1PrivacyRequestPrivacyRequestIdDiagnosticsGetResponse = - getPrivacyRequestDiagnosticsReportApiV1PrivacyRequestPrivacyRequestIdDiagnosticsGetResponses[keyof getPrivacyRequestDiagnosticsReportApiV1PrivacyRequestPrivacyRequestIdDiagnosticsGetResponses]; - export type getPrivacyRequestNotificationInfoApiV1PrivacyRequestNotificationGetData = { body?: never; diff --git a/src/fides/api/v1/endpoints/privacy_request_endpoints.py b/src/fides/api/v1/endpoints/privacy_request_endpoints.py index e17b20ff38c..cf1cf47c41e 100644 --- a/src/fides/api/v1/endpoints/privacy_request_endpoints.py +++ b/src/fides/api/v1/endpoints/privacy_request_endpoints.py @@ -24,7 +24,7 @@ from pydantic import Field from pydantic import ValidationError as PydanticValidationError from sqlalchemy.orm import Query, Session, selectinload -from starlette.responses import StreamingResponse +from starlette.responses import Response, StreamingResponse from starlette.status import ( HTTP_200_OK, HTTP_400_BAD_REQUEST, @@ -178,9 +178,7 @@ ) from fides.service.messaging.messaging_service import MessagingService from fides.service.privacy_request.diagnostics import ( - DefaultStorageNotConfiguredError, - PrivacyRequestDiagnosticsExportResponse, - export_privacy_request_diagnostics, + build_diagnostics_zip, ) from fides.service.privacy_request.privacy_request_service import ( PrivacyRequestService, @@ -594,32 +592,44 @@ def get_request_status_logs( @router.get( PRIVACY_REQUEST_DIAGNOSTICS, dependencies=[Security(verify_oauth_client, scopes=[PRIVACY_REQUEST_READ])], - response_model=PrivacyRequestDiagnosticsExportResponse, status_code=HTTP_200_OK, + response_class=Response, + responses={ + 200: { + "content": {"application/zip": {}}, + "description": "ZIP file containing diagnostics.json", + } + }, ) def get_privacy_request_diagnostics_report( privacy_request_id: str, *, db: Session = Depends(deps.get_db), -) -> PrivacyRequestDiagnosticsExportResponse: +) -> Response: """ - Export a non-PII diagnostics snapshot for a single privacy request and return a download URL. - - This report intentionally excludes any fields that could contain PII. + Export a non-PII diagnostics snapshot for a single privacy request + as a downloadable ZIP file. """ - try: - return export_privacy_request_diagnostics(privacy_request_id, db) + buf = build_diagnostics_zip(privacy_request_id, db) except PrivacyRequestNotFound: raise HTTPException( status_code=HTTP_404_NOT_FOUND, detail=f"No privacy request found with id '{privacy_request_id}'.", ) - except DefaultStorageNotConfiguredError as exc: - raise HTTPException( - status_code=HTTP_422_UNPROCESSABLE_CONTENT, - detail=exc.detail, - ) + + # Sanitize the ID before embedding in a header to guard against injection + # if the ID format ever changes beyond safe UUID characters. + safe_id = "".join(c for c in privacy_request_id if c.isalnum() or c in "-_") + filename = f"diagnostics-{safe_id}.zip" + content = buf.getvalue() + return Response( + content=content, + media_type="application/zip", + headers={ + "Content-Disposition": f'attachment; filename="{filename}"', + }, + ) @router.get( diff --git a/src/fides/service/privacy_request/diagnostics/__init__.py b/src/fides/service/privacy_request/diagnostics/__init__.py index 0cb1e5a463d..e3be9a3da21 100644 --- a/src/fides/service/privacy_request/diagnostics/__init__.py +++ b/src/fides/service/privacy_request/diagnostics/__init__.py @@ -1,15 +1,7 @@ -from fides.service.privacy_request.diagnostics.exceptions import ( - DefaultStorageNotConfiguredError, -) from fides.service.privacy_request.diagnostics.export import ( - export_privacy_request_diagnostics, -) -from fides.service.privacy_request.diagnostics.schemas import ( - PrivacyRequestDiagnosticsExportResponse, + build_diagnostics_zip, ) __all__ = [ - "DefaultStorageNotConfiguredError", - "PrivacyRequestDiagnosticsExportResponse", - "export_privacy_request_diagnostics", + "build_diagnostics_zip", ] diff --git a/src/fides/service/privacy_request/diagnostics/exceptions.py b/src/fides/service/privacy_request/diagnostics/exceptions.py deleted file mode 100644 index 5d860377087..00000000000 --- a/src/fides/service/privacy_request/diagnostics/exceptions.py +++ /dev/null @@ -1,6 +0,0 @@ -class DefaultStorageNotConfiguredError(Exception): - """Raised when an operation requires a default storage backend but none is configured.""" - - def __init__(self, detail: str): - self.detail = detail - super().__init__(detail) diff --git a/src/fides/service/privacy_request/diagnostics/export.py b/src/fides/service/privacy_request/diagnostics/export.py index a5b1238587c..2e2aaaef7c0 100644 --- a/src/fides/service/privacy_request/diagnostics/export.py +++ b/src/fides/service/privacy_request/diagnostics/export.py @@ -1,42 +1,20 @@ """ -Export a non-PII diagnostics report as a ZIP file to configured storage. +Export a non-PII diagnostics report as a ZIP file. """ import json -import secrets import zipfile -from datetime import datetime, timezone from io import BytesIO from sqlalchemy.orm import Session -from fides.api.models.storage import get_active_default_storage_config -from fides.api.schemas.storage.storage import StorageType -from fides.api.service.storage import StorageProviderFactory -from fides.config import CONFIG -from fides.service.privacy_request.diagnostics.exceptions import ( - DefaultStorageNotConfiguredError, -) from fides.service.privacy_request.diagnostics.gather import ( get_privacy_request_diagnostics, ) from fides.service.privacy_request.diagnostics.schemas import ( PrivacyRequestDiagnostics, - PrivacyRequestDiagnosticsExportResponse, ) -PRIVACY_REQUEST_DIAGNOSTICS_PREFIX = "privacy-request-diagnostics" - - -def _build_diagnostics_object_key(privacy_request_id: str) -> str: - timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") - rand = secrets.token_hex(4) - return ( - f"{PRIVACY_REQUEST_DIAGNOSTICS_PREFIX}/" - f"{privacy_request_id}/" - f"{timestamp}-{rand}.zip" - ) - def _serialize_diagnostics_to_zip(diagnostics: PrivacyRequestDiagnostics) -> BytesIO: """Serialize diagnostics payload into a ZIP file held in memory.""" @@ -50,50 +28,11 @@ def _serialize_diagnostics_to_zip(diagnostics: PrivacyRequestDiagnostics) -> Byt return buf -def export_privacy_request_diagnostics( - privacy_request_id: str, db: Session -) -> PrivacyRequestDiagnosticsExportResponse: - """ - Export a non-PII diagnostics report as a ZIP file to configured storage - and return a downloadable URL (signed/presigned where applicable). +def build_diagnostics_zip(privacy_request_id: str, db: Session) -> BytesIO: """ - storage_config = get_active_default_storage_config(db) - if not storage_config: - raise DefaultStorageNotConfiguredError( - "No default storage backend is configured. " - "Configure a storage destination via the application settings API " - "before exporting diagnostics." - ) - - provider = StorageProviderFactory.create(storage_config) - bucket = StorageProviderFactory.get_bucket_from_config(storage_config) - - storage_type = ( - storage_config.type - if isinstance(storage_config.type, StorageType) - else StorageType(storage_config.type) - ) + Gather diagnostics for a privacy request and return a ZIP file in memory. + Raises PrivacyRequestNotFound if the privacy request does not exist. + """ diagnostics = get_privacy_request_diagnostics(privacy_request_id, db) - buf = _serialize_diagnostics_to_zip(diagnostics) - object_key = _build_diagnostics_object_key(privacy_request_id) - - provider.upload( - bucket=bucket, - key=object_key, - data=buf, - content_type="application/zip", - ) - - download_url = provider.generate_presigned_url( - bucket=bucket, - key=object_key, - ttl_seconds=CONFIG.security.subject_request_download_link_ttl_seconds, - ) - - return PrivacyRequestDiagnosticsExportResponse( - download_url=str(download_url), - storage_type=storage_type.value, - object_key=object_key, - created_at=datetime.now(timezone.utc), - ) + return _serialize_diagnostics_to_zip(diagnostics) diff --git a/src/fides/service/privacy_request/diagnostics/schemas.py b/src/fides/service/privacy_request/diagnostics/schemas.py index e79e2269b74..15433f5f5e3 100644 --- a/src/fides/service/privacy_request/diagnostics/schemas.py +++ b/src/fides/service/privacy_request/diagnostics/schemas.py @@ -478,14 +478,3 @@ class PrivacyRequestDiagnostics(BaseModel): attachment_references: List[AttachmentReferenceSnapshot] comments: List[CommentSnapshot] comment_references: List[CommentReferenceSnapshot] - - -class PrivacyRequestDiagnosticsExportResponse(BaseModel): - """Response payload for a diagnostics export request.""" - - model_config = ConfigDict(extra="forbid") - - download_url: str - storage_type: str - object_key: str - created_at: datetime diff --git a/tests/fides/ops/api/v1/endpoints/privacy_request/test_privacy_request_diagnostics.py b/tests/fides/ops/api/v1/endpoints/privacy_request/test_privacy_request_diagnostics.py index 4eaf5260972..c390419021a 100644 --- a/tests/fides/ops/api/v1/endpoints/privacy_request/test_privacy_request_diagnostics.py +++ b/tests/fides/ops/api/v1/endpoints/privacy_request/test_privacy_request_diagnostics.py @@ -1,6 +1,6 @@ import json +from io import BytesIO from typing import Any, Dict -from unittest.mock import patch from zipfile import ZipFile from fastapi.testclient import TestClient @@ -24,7 +24,7 @@ def test_diagnostics_happy_path_non_pii( generate_auth_header, privacy_request, ) -> None: - """Diagnostics endpoint should return 200 and exclude raw identity values.""" + """Diagnostics endpoint should return a ZIP and exclude raw identity values.""" identity_value = "user@example.com" ProvidedIdentity.create( db, @@ -44,15 +44,13 @@ def test_diagnostics_happy_path_non_pii( resp = api_client.get(url, headers=auth_header) assert resp.status_code == 200 + assert resp.headers["content-type"] == "application/zip" + assert "attachment" in resp.headers["content-disposition"] + assert resp.headers["content-length"] == str(len(resp.content)) - payload: Dict[str, Any] = resp.json() - assert payload["download_url"] - assert payload["object_key"].startswith("privacy-request-diagnostics/") - - # In test/dev mode, local storage is used and the "download_url" is a local file path. - with ZipFile(payload["download_url"]) as zf: + with ZipFile(BytesIO(resp.content)) as zf: diagnostics_json = zf.read("diagnostics.json").decode("utf-8") - diagnostics_payload = json.loads(diagnostics_json) + diagnostics_payload: Dict[str, Any] = json.loads(diagnostics_json) assert diagnostics_payload["privacy_request"]["id"] == privacy_request.id @@ -92,25 +90,3 @@ def test_diagnostics_wrong_scope_403( resp = api_client.get(url, headers=auth_header) assert resp.status_code == 403 - - def test_diagnostics_422_when_no_default_storage( - self, - api_client: TestClient, - generate_auth_header, - privacy_request, - ) -> None: - """Endpoint returns 422 when no default storage backend is configured.""" - auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_READ]) - url = V1_URL_PREFIX + PRIVACY_REQUEST_DIAGNOSTICS.format( - privacy_request_id=privacy_request.id - ) - - with patch( - "fides.service.privacy_request.diagnostics.export.get_active_default_storage_config", - return_value=None, - ): - resp = api_client.get(url, headers=auth_header) - - assert resp.status_code == 422 - payload: Dict[str, Any] = resp.json() - assert "storage" in payload["detail"].lower() From 4d88d2f91c3f4e1427ff09e6c1878870f41eefd0 Mon Sep 17 00:00:00 2001 From: JadeWibbels Date: Tue, 26 May 2026 16:45:56 -0600 Subject: [PATCH 4/7] ENG-3687: Fix requires_input watchdog guard for manual webhook and manual task DSRs (#8264) Co-authored-by: Claude Opus 4.6 (1M context) --- ...264-fix-requires-input-watchdog-guard.yaml | 4 + .../privacy_request/request_service.py | 34 +++-- src/fides/api/util/connection_util.py | 27 +++- .../test_requeue_interrupted_tasks_guards.py | 79 +++++++++++ tests/fides/ops/util/test_connection_util.py | 124 ++++++++++++++++++ 5 files changed, 255 insertions(+), 13 deletions(-) create mode 100644 changelog/8264-fix-requires-input-watchdog-guard.yaml create mode 100644 tests/fides/ops/service/privacy_request/test_requeue_interrupted_tasks_guards.py create mode 100644 tests/fides/ops/util/test_connection_util.py diff --git a/changelog/8264-fix-requires-input-watchdog-guard.yaml b/changelog/8264-fix-requires-input-watchdog-guard.yaml new file mode 100644 index 00000000000..4239bd32c96 --- /dev/null +++ b/changelog/8264-fix-requires-input-watchdog-guard.yaml @@ -0,0 +1,4 @@ +type: Fixed +description: Fixed watchdog incorrectly erroring privacy requests paused for manual webhook or manual task input, and fixed connection config updates incorrectly requeuing manual task DSRs +pr: 8264 +labels: [] diff --git a/src/fides/api/service/privacy_request/request_service.py b/src/fides/api/service/privacy_request/request_service.py index ca3d9d07582..2a7edae17be 100644 --- a/src/fides/api/service/privacy_request/request_service.py +++ b/src/fides/api/service/privacy_request/request_service.py @@ -654,6 +654,18 @@ def requeue_interrupted_tasks(self: DatabaseTask) -> None: # If the task ID is not cached, we can't check if it's running # This means the request is stuck - cancel it if not task_id: + # Paused requests (manual webhook, manual task, Jira) have + # no running Celery task by design — don't cancel them. + if privacy_request.status in ( + PrivacyRequestStatus.requires_input, + PrivacyRequestStatus.pending_external, + ): + logger.debug( + f"Skipping privacy request {privacy_request.id} in " + f"{privacy_request.status.value} status with no cached " + f"task ID - intentionally paused" + ) + continue _cancel_interrupted_tasks_and_error_privacy_request( db, privacy_request, @@ -701,23 +713,23 @@ def requeue_interrupted_tasks(self: DatabaseTask) -> None: break # If the task ID is not cached, we can't check if it's running - # This means the subtask is stuck - but we need to handle this differently - # based on the privacy request status if not subtask_id: + # Paused requests may have request tasks with no + # cached subtask ID — this is expected when the + # Celery worker completed and the DSR is waiting + # for manual input or an external system. Don't + # requeue or cancel these. if privacy_request.status in ( PrivacyRequestStatus.requires_input, PrivacyRequestStatus.pending_external, ): - # For requires_input / pending_external status, don't - # automatically error the request as it's intentionally - # waiting for user input or an external system (e.g. Jira) - logger.warning( - f"No task ID found for request task {request_task_id} " - f"(privacy request {privacy_request.id}) in {privacy_request.status.value} status - " - f"keeping request in current status as it may be waiting for input or an external system" + logger.debug( + f"Request task {request_task_id} " + f"(privacy request {privacy_request.id}) has no " + f"cached subtask ID in {privacy_request.status.value} " + f"status - intentionally paused, skipping" ) - should_requeue = False - break + continue # A pending task awaiting upstream is not stuck — it was # never dispatched because its prerequisites aren't done. diff --git a/src/fides/api/util/connection_util.py b/src/fides/api/util/connection_util.py index 943753182f1..97e23d0b5dd 100644 --- a/src/fides/api/util/connection_util.py +++ b/src/fides/api/util/connection_util.py @@ -18,7 +18,7 @@ from fides.api.models.connectionconfig import ConnectionConfig, ConnectionType from fides.api.models.datasetconfig import DatasetConfig from fides.api.models.manual_webhook import AccessManualWebhook -from fides.api.models.privacy_request import PrivacyRequest +from fides.api.models.privacy_request import PrivacyRequest, RequestTask from fides.api.models.sql_models import Dataset as CtlDataset # type: ignore from fides.api.models.sql_models import System # type: ignore from fides.api.schemas.api import BulkUpdateFailed @@ -58,8 +58,31 @@ def requeue_requires_input_requests(db: Session) -> None: for pr in PrivacyRequest.query_without_large_columns(db).filter( PrivacyRequest.status == PrivacyRequestStatus.requires_input ): + # Only requeue DSRs that were paused by manual webhooks (pre-graph, + # zero RequestTasks). DSRs paused by manual_task connections have + # RequestTasks and should not be requeued here — they are waiting + # for operator input on a manual task, not a manual webhook. + # NOTE: This uses RequestTask existence as a proxy for manual_task + # vs manual_webhook. Today only manual_task connections create + # RequestTasks, but this assumption could become fragile if new + # features pause requests without creating RequestTasks. + has_request_tasks = ( + db.query(RequestTask.id) + .filter(RequestTask.privacy_request_id == pr.id) + .first() + is not None + ) + if has_request_tasks: + logger.debug( + "Skipping requeue of privacy request '{}' — has RequestTasks " + "(paused by a manual_task, not a manual_webhook).", + pr.id, + ) + continue + logger.info( - "Queuing privacy request '{} with '{}' status now that manual inputs are no longer required.", + "Queuing privacy request '{}' with '{}' status now that manual " + "inputs are no longer required.", pr.id, pr.status.value, ) diff --git a/tests/fides/ops/service/privacy_request/test_requeue_interrupted_tasks_guards.py b/tests/fides/ops/service/privacy_request/test_requeue_interrupted_tasks_guards.py new file mode 100644 index 00000000000..7ce913b03af --- /dev/null +++ b/tests/fides/ops/service/privacy_request/test_requeue_interrupted_tasks_guards.py @@ -0,0 +1,79 @@ +"""Tests for requires_input/pending_external guards in requeue_interrupted_tasks. + +The watchdog incorrectly cancels/requeues DSRs that are intentionally +paused for manual webhook data or manual task input. These tests verify +that the early guard in the watchdog loop skips paused DSRs. + +External boundaries (Redis lock, Celery queue, task cache) are mocked because +they require infrastructure. DB state uses real fixtures. +""" + +from unittest import mock + +import pytest + +from fides.api.schemas.privacy_request import PrivacyRequestStatus +from fides.api.service.privacy_request.request_service import ( + requeue_interrupted_tasks, +) + +_REQUEST_SERVICE_MODULE = "fides.api.service.privacy_request.request_service" + +_PAUSED_STATUSES = [ + pytest.param(PrivacyRequestStatus.requires_input, id="requires_input"), + pytest.param(PrivacyRequestStatus.pending_external, id="pending_external"), +] + + +def _run_watchdog(db): + """Run requeue_interrupted_tasks with the given db session.""" + with mock.patch.object( + requeue_interrupted_tasks, "get_new_session" + ) as mock_session: + mock_session.return_value.__enter__.return_value = db + requeue_interrupted_tasks.apply().get() + + +class TestWatchdogSkipsPausedRequests: + """The early guard should skip requires_input/pending_external DSRs + before any cancellation or requeue logic runs.""" + + @pytest.mark.parametrize("status", _PAUSED_STATUSES) + @mock.patch(f"{_REQUEST_SERVICE_MODULE}.redis_lock") + @mock.patch( + f"{_REQUEST_SERVICE_MODULE}._get_task_ids_from_dsr_queue", return_value=[] + ) + def test_paused_dsr_skipped_by_watchdog( + self, _, mock_redis_lock, db, privacy_request, status + ): + """Paused DSRs should remain in their current status after the watchdog runs.""" + privacy_request.status = status + privacy_request.save(db) + mock_redis_lock.return_value.__enter__.return_value = True + + _run_watchdog(db) + + db.refresh(privacy_request) + assert privacy_request.status == status + + +class TestWatchdogStillCancelsActiveRequests: + """Existing behavior: in_processing DSRs should still be canceled/requeued.""" + + @mock.patch(f"{_REQUEST_SERVICE_MODULE}.redis_lock") + @mock.patch( + f"{_REQUEST_SERVICE_MODULE}._get_task_ids_from_dsr_queue", return_value=[] + ) + @mock.patch(f"{_REQUEST_SERVICE_MODULE}.get_cached_task_id", return_value=None) + def test_in_processing_no_task_id_still_canceled( + self, _, __, mock_redis_lock, db, privacy_request + ): + """in_processing DSR with no cached task ID should still be canceled.""" + privacy_request.status = PrivacyRequestStatus.in_processing + privacy_request.save(db) + mock_redis_lock.return_value.__enter__.return_value = True + + _run_watchdog(db) + + db.refresh(privacy_request) + assert privacy_request.status == PrivacyRequestStatus.error diff --git a/tests/fides/ops/util/test_connection_util.py b/tests/fides/ops/util/test_connection_util.py new file mode 100644 index 00000000000..453ec2c4dc7 --- /dev/null +++ b/tests/fides/ops/util/test_connection_util.py @@ -0,0 +1,124 @@ +"""Tests for requeue_requires_input_requests in connection_util. + +The function incorrectly requeues ALL requires_input DSRs when no +AccessManualWebhooks exist. DSRs paused by manual_task connections (which have +RequestTasks) should not be requeued — only DSRs paused by manual_webhook +connections (which have zero RequestTasks) should be affected. +""" + +from unittest import mock + +import pytest + +from fides.api.models.privacy_request import PrivacyRequest, RequestTask +from fides.api.models.worker_task import ExecutionLogStatus +from fides.api.schemas.policy import ActionType +from fides.api.schemas.privacy_request import PrivacyRequestStatus +from fides.api.util.connection_util import requeue_requires_input_requests + + +@pytest.fixture() +def second_privacy_request(db, policy) -> PrivacyRequest: + """A second privacy request for coexistence tests.""" + pr = PrivacyRequest.create( + db=db, + data={ + "requested_at": None, + "policy_id": policy.id, + "status": PrivacyRequestStatus.requires_input, + }, + ) + yield pr + pr.delete(db) + + +class TestRequeueRequiresInputRequests: + """requeue_requires_input_requests should only requeue manual_webhook DSRs.""" + + def test_dsr_with_request_tasks_not_requeued(self, db, privacy_request): + """A requires_input DSR with RequestTasks (manual_task) should not be requeued.""" + privacy_request.status = PrivacyRequestStatus.requires_input + privacy_request.save(db) + + # Create a RequestTask — simulates a manual_task DSR paused in-graph + request_task = RequestTask.create( + db, + data={ + "privacy_request_id": privacy_request.id, + "action_type": ActionType.access, + "status": ExecutionLogStatus.awaiting_processing, + "collection_address": "manual_dataset:manual_collection", + "dataset_name": "manual_dataset", + "collection_name": "manual_collection", + "upstream_tasks": [], + "downstream_tasks": [], + "all_descendant_tasks": [], + }, + ) + + try: + # No AccessManualWebhooks exist — guard passes + requeue_requires_input_requests(db) + + db.refresh(privacy_request) + assert privacy_request.status == PrivacyRequestStatus.requires_input + finally: + request_task.delete(db) + + @mock.patch("fides.api.util.connection_util.queue_privacy_request") + def test_dsr_without_request_tasks_is_requeued( + self, mock_queue, db, privacy_request + ): + """A requires_input DSR with no RequestTasks (manual_webhook) should be requeued.""" + privacy_request.status = PrivacyRequestStatus.requires_input + privacy_request.save(db) + + # No RequestTasks — DSR was paused pre-graph by a manual_webhook + # No AccessManualWebhooks exist — guard passes + requeue_requires_input_requests(db) + + db.refresh(privacy_request) + assert privacy_request.status == PrivacyRequestStatus.in_processing + mock_queue.assert_called_once_with(privacy_request_id=privacy_request.id) + + @mock.patch("fides.api.util.connection_util.queue_privacy_request") + def test_coexistence_only_webhook_dsr_requeued( + self, mock_queue, db, privacy_request, second_privacy_request + ): + """When both webhook and task DSRs are in requires_input, only the webhook DSR is requeued.""" + # DSR 1: manual_webhook (no RequestTasks) + privacy_request.status = PrivacyRequestStatus.requires_input + privacy_request.save(db) + + # DSR 2: manual_task (has RequestTasks) + second_privacy_request.status = PrivacyRequestStatus.requires_input + second_privacy_request.save(db) + request_task = RequestTask.create( + db, + data={ + "privacy_request_id": second_privacy_request.id, + "action_type": ActionType.access, + "status": ExecutionLogStatus.awaiting_processing, + "collection_address": "manual_dataset:manual_collection", + "dataset_name": "manual_dataset", + "collection_name": "manual_collection", + "upstream_tasks": [], + "downstream_tasks": [], + "all_descendant_tasks": [], + }, + ) + + try: + requeue_requires_input_requests(db) + + db.refresh(privacy_request) + db.refresh(second_privacy_request) + + # Webhook DSR requeued + assert privacy_request.status == PrivacyRequestStatus.in_processing + # Manual task DSR untouched + assert second_privacy_request.status == PrivacyRequestStatus.requires_input + # Only one DSR was queued + mock_queue.assert_called_once_with(privacy_request_id=privacy_request.id) + finally: + request_task.delete(db) From 44595f064d72352a5ea82dd01c04e635fe12cb10 Mon Sep 17 00:00:00 2001 From: Catherine Smith Date: Tue, 26 May 2026 21:19:40 +0200 Subject: [PATCH 5/7] ENG-3950: Recreate missing erasure tasks on retry (#8268) Co-authored-by: Claude Opus 4.6 (1M context) --- .../8268-recreate-missing-erasure-tasks.yaml | 4 + .../privacy_request/request_service.py | 23 ++ src/fides/api/task/create_request_tasks.py | 62 +++- src/fides/api/task/graph_runners.py | 2 + .../integration_tests/test_enabled_actions.py | 12 +- .../ops/task/test_create_request_tasks.py | 307 +++++++++++++++++- .../task/test_requeue_interrupted_tasks.py | 73 +++++ 7 files changed, 478 insertions(+), 5 deletions(-) create mode 100644 changelog/8268-recreate-missing-erasure-tasks.yaml diff --git a/changelog/8268-recreate-missing-erasure-tasks.yaml b/changelog/8268-recreate-missing-erasure-tasks.yaml new file mode 100644 index 00000000000..4054ded2bbe --- /dev/null +++ b/changelog/8268-recreate-missing-erasure-tasks.yaml @@ -0,0 +1,4 @@ +type: Fixed +description: Fixed permanently stuck privacy requests when erasure task creation fails silently by recreating missing erasure tasks from the current graph on retry +pr: 8268 +labels: [] diff --git a/src/fides/api/service/privacy_request/request_service.py b/src/fides/api/service/privacy_request/request_service.py index 2a7edae17be..e839fbdb03e 100644 --- a/src/fides/api/service/privacy_request/request_service.py +++ b/src/fides/api/service/privacy_request/request_service.py @@ -688,6 +688,29 @@ def requeue_interrupted_tasks(self: DatabaseTask) -> None: f"The task for privacy request {privacy_request.id} was terminated before it could schedule any request tasks, requeueing privacy request" ) should_requeue = True + elif request_tasks_count > 0: + # Check for missing erasure tasks: if access tasks + # exist and the policy has erasure rules, erasure + # tasks should have been created alongside them. + # Zero erasure tasks means creation failed. + pr_policy = privacy_request.policy + if pr_policy and pr_policy.get_rules_for_action( + action_type=ActionType.erasure + ): + erasure_count = ( + db.query(RequestTask) + .filter( + RequestTask.privacy_request_id + == privacy_request.id, + RequestTask.action_type == ActionType.erasure, + ) + .count() + ) + if erasure_count == 0: + logger.warning( + f"Privacy request {privacy_request.id} has access tasks but zero erasure tasks despite having erasure rules, requeueing" + ) + should_requeue = True request_tasks_in_progress = _get_request_task_ids_in_progress( db, privacy_request.id diff --git a/src/fides/api/task/create_request_tasks.py b/src/fides/api/task/create_request_tasks.py index 7b952732cb8..9a626c986d6 100644 --- a/src/fides/api/task/create_request_tasks.py +++ b/src/fides/api/task/create_request_tasks.py @@ -37,6 +37,7 @@ from fides.api.task.manual.manual_task_utils import ( get_connection_configs_with_manual_tasks, ) +from fides.api.util.lock import redis_lock from fides.api.util.logger_context_utils import log_context @@ -663,13 +664,72 @@ def run_erasure_request( # pylint: disable = too-many-arguments privacy_request: PrivacyRequest, session: Session, privacy_request_proceed: bool = True, + graph: Optional[DatasetGraph] = None, + identity: Optional[Dict[str, Any]] = None, ) -> List[RequestTask]: """ DSR 3.0: Update erasure Request Tasks that were built in the "run_access_request" step with data collected to build masking requests and queue the root task for processing. If we are reprocessing a Privacy Request, instead queue tasks whose upstream nodes are complete. - """ + + If erasure tasks are missing (e.g., task creation failed on a previous run), recreate them + from the current graph before proceeding. + """ + # Ensure erasure tasks exist when they should. Access tasks are created + # alongside erasure tasks in run_access_request, so if access tasks exist + # but erasure tasks are missing or incomplete, task creation failed + # partway through. persist_initial_erasure_request_tasks is idempotent + # (skips nodes that already have tasks), so this is safe for both the + # zero-task and partial-task cases. + # + # Use the privacy request's own policy (not the passed-in policy) to + # check if erasure rules exist, since the access step used that same + # policy to decide whether to create erasure tasks originally. + access_count = privacy_request.access_tasks.count() + erasure_count = privacy_request.erasure_tasks.count() + pr_policy = privacy_request.policy + if ( + access_count > 0 + and erasure_count < access_count + and pr_policy + and pr_policy.get_rules_for_action(action_type=ActionType.erasure) + and graph + and identity + ): + lock_key = f"erasure_task_recreation:{privacy_request.id}" + with redis_lock(lock_key, timeout=60) as lock: + if lock is None: + logger.info( + "Another process is already recreating erasure tasks for " + "privacy request {}, skipping.", + privacy_request.id, + ) + else: + # Re-check inside the lock in case another process already created them + erasure_count = privacy_request.erasure_tasks.count() + if erasure_count < access_count: + logger.warning( + "Privacy request {} has {} access tasks but only {} erasure tasks. " + "Creating missing erasure tasks.", + privacy_request.id, + access_count, + erasure_count, + ) + traversal = Traversal(graph, identity, policy=pr_policy) + traversal_nodes: Dict[CollectionAddress, TraversalNode] = {} + traversal.traverse(traversal_nodes, collect_tasks_fn) + erasure_end_nodes: List[CollectionAddress] = list( + graph.nodes.keys() + ) + persist_initial_erasure_request_tasks( + session, + privacy_request, + traversal_nodes, + erasure_end_nodes, + graph, + ) + update_erasure_tasks_with_access_data(session, privacy_request) ready_tasks: List[RequestTask] = ( get_existing_ready_tasks(session, privacy_request, ActionType.erasure) or [] diff --git a/src/fides/api/task/graph_runners.py b/src/fides/api/task/graph_runners.py index 7cbe69e2506..b23fea11ece 100644 --- a/src/fides/api/task/graph_runners.py +++ b/src/fides/api/task/graph_runners.py @@ -50,6 +50,8 @@ def erasure_runner( """Run an erasure request using task-based execution.""" run_erasure_request( privacy_request=privacy_request, + graph=graph, + identity=identity, session=session, privacy_request_proceed=privacy_request_proceed, ) diff --git a/tests/fides/ops/integration_tests/test_enabled_actions.py b/tests/fides/ops/integration_tests/test_enabled_actions.py index 5f66b5cdcac..dad18c1285c 100644 --- a/tests/fides/ops/integration_tests/test_enabled_actions.py +++ b/tests/fides/ops/integration_tests/test_enabled_actions.py @@ -73,7 +73,7 @@ async def test_erasure_disabled( access_results = access_runner_tester( privacy_request_with_erasure_policy, - policy, + erasure_policy, dataset_graph, [integration_postgres_config], {"email": "customer-1@example.com"}, @@ -96,8 +96,14 @@ async def test_erasure_disabled( db, ) - # the erasure results should be empty - assert erasure_results == {} + # Erasure was disabled for the connection, so no data should have been + # actually erased. Erasure tasks exist (created by access step) but + # enabled_actions filtering prevents actual masking — all counts + # should be 0 or None. + for key, count in erasure_results.items(): + assert count is None or count == 0, ( + f"Expected no erasure for {key} but got count={count}" + ) @pytest.mark.asyncio async def test_access_disabled_for_manual_webhook_integrations( diff --git a/tests/fides/ops/task/test_create_request_tasks.py b/tests/fides/ops/task/test_create_request_tasks.py index 0ff459dc7ad..0b67ffb7beb 100644 --- a/tests/fides/ops/task/test_create_request_tasks.py +++ b/tests/fides/ops/task/test_create_request_tasks.py @@ -1,6 +1,6 @@ from datetime import datetime from unittest import mock -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import networkx import pytest @@ -10,7 +10,10 @@ from fides.api.graph.config import ( ROOT_COLLECTION_ADDRESS, TERMINATOR_ADDRESS, + Collection, CollectionAddress, + GraphDataset, + ScalarField, ) from fides.api.graph.graph import DatasetGraph from fides.api.graph.traversal import Traversal, TraversalNode @@ -20,6 +23,7 @@ ManualTaskConfig, ManualTaskConfigField, ) +from fides.api.models.policy import Rule from fides.api.models.privacy_request import ExecutionLog, RequestTask from fides.api.models.worker_task import ExecutionLogStatus from fides.api.schemas.policy import ActionType @@ -2146,3 +2150,304 @@ def test_run_erasure_request( assert not raw_access_results["mongo_test:flights"][0]["passenger_information"][ "full_name" ] + + +class TestRunErasureRequestRecreatesMissingTasks: + """Tests that run_erasure_request recreates erasure tasks when they are + missing (zero or partial). In production, run_access_request always creates + erasure tasks when the privacy request's policy has erasure rules, so + missing erasure tasks means creation failed. + """ + + @patch("fides.api.task.create_request_tasks.persist_initial_erasure_request_tasks") + @patch("fides.api.task.create_request_tasks.update_erasure_tasks_with_access_data") + @patch("fides.api.task.create_request_tasks.get_existing_ready_tasks") + def test_recreates_when_zero_erasure_tasks( + self, + mock_get_ready, + mock_update_erasure, + mock_persist_erasure, + db, + privacy_request, + request_task, + policy, + ): + """When access tasks exist but zero erasure tasks, and the privacy + request's policy has erasure rules, recreate them.""" + + assert privacy_request.access_tasks.count() > 0 + assert privacy_request.erasure_tasks.count() == 0 + + # Add an erasure rule to the privacy request's own policy + Rule.create( + db=db, + data={ + "action_type": "erasure", + "name": "test_erasure_rule", + "policy_id": policy.id, + "masking_strategy": { + "strategy": "null_rewrite", + "configuration": {}, + }, + }, + ) + + identity_field = ScalarField(name="email", primary_key=True) + identity_field.identity = "email" + collection = Collection(name="users", fields=[identity_field]) + dataset = GraphDataset( + name="test_ds", collections=[collection], connection_key="test_conn" + ) + graph = DatasetGraph(dataset) + identity = {"email": "test@example.com"} + + mock_get_ready.return_value = [] + + run_erasure_request( + privacy_request, + db, + privacy_request_proceed=False, + graph=graph, + identity=identity, + ) + + mock_persist_erasure.assert_called_once() + + @patch("fides.api.task.create_request_tasks.persist_initial_erasure_request_tasks") + @patch("fides.api.task.create_request_tasks.update_erasure_tasks_with_access_data") + @patch("fides.api.task.create_request_tasks.get_existing_ready_tasks") + def test_does_not_recreate_when_zero_and_no_graph( + self, + mock_get_ready, + mock_update_erasure, + mock_persist_erasure, + db, + privacy_request, + request_task, + ): + """When graph/identity are not provided, skip recreation even with + zero erasure tasks (backward compat for callers without graph).""" + + assert privacy_request.access_tasks.count() > 0 + assert privacy_request.erasure_tasks.count() == 0 + + mock_get_ready.return_value = [] + + run_erasure_request( + privacy_request, + db, + privacy_request_proceed=False, + ) + + mock_persist_erasure.assert_not_called() + + @patch("fides.api.task.create_request_tasks.persist_initial_erasure_request_tasks") + @patch("fides.api.task.create_request_tasks.update_erasure_tasks_with_access_data") + @patch("fides.api.task.create_request_tasks.get_existing_ready_tasks") + def test_does_not_recreate_when_tasks_exist( + self, + mock_get_ready, + mock_update_erasure, + mock_persist_erasure, + db, + privacy_request, + request_task, + erasure_request_task, + ): + """When erasure tasks already exist, run_erasure_request should not recreate them.""" + + assert privacy_request.erasure_tasks.count() > 0 + + mock_get_ready.return_value = [] + + run_erasure_request( + privacy_request, + db, + privacy_request_proceed=False, + ) + + mock_persist_erasure.assert_not_called() + + @patch("fides.api.task.create_request_tasks.persist_initial_erasure_request_tasks") + @patch("fides.api.task.create_request_tasks.update_erasure_tasks_with_access_data") + @patch("fides.api.task.create_request_tasks.get_existing_ready_tasks") + def test_does_not_recreate_without_graph( + self, + mock_get_ready, + mock_update_erasure, + mock_persist_erasure, + db, + privacy_request, + request_task, + ): + """When graph/policy/identity are not provided, skip recreation even if tasks are missing.""" + + assert privacy_request.access_tasks.count() > 0 + assert privacy_request.erasure_tasks.count() == 0 + + mock_get_ready.return_value = [] + + run_erasure_request( + privacy_request, + db, + privacy_request_proceed=False, + ) + + mock_persist_erasure.assert_not_called() + + @patch("fides.api.task.create_request_tasks.persist_initial_erasure_request_tasks") + @patch("fides.api.task.create_request_tasks.update_erasure_tasks_with_access_data") + @patch("fides.api.task.create_request_tasks.get_existing_ready_tasks") + def test_recreates_missing_tasks_when_partial( + self, + mock_get_ready, + mock_update_erasure, + mock_persist_erasure, + db, + privacy_request, + request_task, + erasure_request_task, + policy, + ): + """When some erasure tasks exist but fewer than access tasks, + run_erasure_request should create the missing ones.""" + access_count = privacy_request.access_tasks.count() + erasure_count = privacy_request.erasure_tasks.count() + assert access_count > 0 + assert erasure_count > 0 + + # Delete one erasure task to simulate partial creation + first_erasure = privacy_request.erasure_tasks.first() + first_erasure.delete(db) + db.flush() + assert privacy_request.erasure_tasks.count() < access_count + + Rule.create( + db=db, + data={ + "action_type": "erasure", + "name": "test_erasure_rule_partial", + "policy_id": policy.id, + "masking_strategy": { + "strategy": "null_rewrite", + "configuration": {}, + }, + }, + ) + + identity_field = ScalarField(name="email", primary_key=True) + identity_field.identity = "email" + collection = Collection(name="users", fields=[identity_field]) + dataset = GraphDataset( + name="test_ds", collections=[collection], connection_key="test_conn" + ) + graph = DatasetGraph(dataset) + identity = {"email": "test@example.com"} + + mock_get_ready.return_value = [] + + run_erasure_request( + privacy_request, + db, + privacy_request_proceed=False, + graph=graph, + identity=identity, + ) + + mock_persist_erasure.assert_called_once() + + @patch("fides.api.task.create_request_tasks.redis_lock") + @patch("fides.api.task.create_request_tasks.persist_initial_erasure_request_tasks") + @patch("fides.api.task.create_request_tasks.update_erasure_tasks_with_access_data") + @patch("fides.api.task.create_request_tasks.get_existing_ready_tasks") + def test_skips_when_lock_held_by_another_process( + self, + mock_get_ready, + mock_update_erasure, + mock_persist_erasure, + mock_redis_lock, + db, + privacy_request, + request_task, + erasure_request_task, + policy, + ): + """If another process holds the lock, skip recreation.""" + # Delete one erasure task to trigger the guard + first_erasure = privacy_request.erasure_tasks.first() + first_erasure.delete(db) + db.flush() + + Rule.create( + db=db, + data={ + "action_type": "erasure", + "name": "test_erasure_rule_lock", + "policy_id": policy.id, + "masking_strategy": { + "strategy": "null_rewrite", + "configuration": {}, + }, + }, + ) + + # Simulate lock held by another process + mock_redis_lock.return_value.__enter__.return_value = None + + identity_field = ScalarField(name="email", primary_key=True) + identity_field.identity = "email" + collection = Collection(name="users", fields=[identity_field]) + dataset = GraphDataset( + name="test_ds", collections=[collection], connection_key="test_conn" + ) + graph = DatasetGraph(dataset) + + mock_get_ready.return_value = [] + + run_erasure_request( + privacy_request, + db, + privacy_request_proceed=False, + graph=graph, + identity={"email": "test@example.com"}, + ) + + mock_persist_erasure.assert_not_called() + + @patch("fides.api.task.create_request_tasks.persist_initial_erasure_request_tasks") + @patch("fides.api.task.create_request_tasks.update_erasure_tasks_with_access_data") + @patch("fides.api.task.create_request_tasks.get_existing_ready_tasks") + def test_skips_when_policy_has_no_erasure_rules( + self, + mock_get_ready, + mock_update_erasure, + mock_persist_erasure, + db, + privacy_request, + request_task, + erasure_request_task, + ): + """If the privacy request's policy has no erasure rules, don't recreate + even if erasure tasks are fewer than access tasks.""" + # Delete one erasure task to create the count mismatch + first_erasure = privacy_request.erasure_tasks.first() + first_erasure.delete(db) + db.flush() + assert ( + privacy_request.erasure_tasks.count() < privacy_request.access_tasks.count() + ) + + # Policy has no erasure rules (default policy fixture is access-only) + assert not privacy_request.policy.get_rules_for_action( + action_type=ActionType.erasure + ) + + mock_get_ready.return_value = [] + + run_erasure_request( + privacy_request, + db, + privacy_request_proceed=False, + ) + + mock_persist_erasure.assert_not_called() diff --git a/tests/fides/task/test_requeue_interrupted_tasks.py b/tests/fides/task/test_requeue_interrupted_tasks.py index 50f6c397f28..f58e75cd63b 100644 --- a/tests/fides/task/test_requeue_interrupted_tasks.py +++ b/tests/fides/task/test_requeue_interrupted_tasks.py @@ -767,3 +767,76 @@ def test_missing_connection_key_not_treated_as_orphaned( requeue_interrupted_tasks.apply().get() mock_requeue.assert_not_called() mock_cancel.assert_not_called() + + +# --------------------------------------------------------------------------- +# Watchdog blind spot: missing erasure tasks (ENG-3950) +# --------------------------------------------------------------------------- + + +class TestWatchdogDetectsMissingErasureTasks: + """The watchdog should detect when access tasks exist but erasure tasks + are missing, and requeue the privacy request.""" + + @mock.patch(_CANCEL) + @mock.patch(_REQUEUE) + @mock.patch(_QUEUE, return_value=[]) + @mock.patch(_IN_FLIGHT, return_value=False) + def test_requeues_when_access_tasks_but_no_erasure_tasks( + self, + mock_in_flight, + mock_queue, + mock_requeue, + mock_cancel, + db, + make_privacy_request, + make_request_task, + policy, + ): + """If access tasks exist but zero erasure tasks and the policy has + erasure rules, the watchdog should requeue.""" + from fides.api.models.policy import Rule as PolicyRule + + PolicyRule.create( + db=db, + data={ + "action_type": "erasure", + "name": "watchdog_erasure_rule", + "policy_id": policy.id, + "masking_strategy": { + "strategy": "null_rewrite", + "configuration": {}, + }, + }, + ) + + pr = make_privacy_request() + make_request_task(pr, ExecutionLogStatus.complete, collection="users") + requeue_interrupted_tasks.apply().get() + mock_requeue.assert_called_once() + + @mock.patch(_CANCEL) + @mock.patch(_REQUEUE) + @mock.patch(_QUEUE, return_value=[]) + @mock.patch(_IN_FLIGHT, return_value=False) + def test_does_not_requeue_when_policy_has_no_erasure_rules( + self, + mock_in_flight, + mock_queue, + mock_requeue, + mock_cancel, + make_privacy_request, + make_request_task, + ): + """If the policy has no erasure rules, zero erasure tasks is expected. + The watchdog should not requeue.""" + pr = make_privacy_request() + make_request_task(pr, ExecutionLogStatus.complete, collection="users") + requeue_interrupted_tasks.apply().get() + mock_requeue.assert_not_called() + + # Note: a negative test for "erasure tasks exist, no requeue" is omitted + # because the watchdog has multiple requeue paths that fire for + # in_processing requests with complete tasks (task ID not in queue, + # etc.), making it difficult to isolate just the erasure count check + # without mocking the entire watchdog internals. From 90d06b352dfcccf666f7597cad502052f9114a03 Mon Sep 17 00:00:00 2001 From: jpople Date: Wed, 27 May 2026 10:21:01 -0500 Subject: [PATCH 6/7] Fix property form paths not saving and actions during create (#8271) --- clients/admin-ui/cypress/e2e/properties.cy.ts | 87 +++++++++++++++++++ .../cypress/fixtures/properties/property.json | 2 +- .../src/features/properties/PropertyForm.tsx | 45 +++++++--- .../privacy-center-config/ActionsTable.tsx | 20 +++-- .../__tests__/ActionsTable.test.tsx | 30 +++++++ 5 files changed, 163 insertions(+), 21 deletions(-) diff --git a/clients/admin-ui/cypress/e2e/properties.cy.ts b/clients/admin-ui/cypress/e2e/properties.cy.ts index 7f604b85e4e..541ff505a5f 100644 --- a/clients/admin-ui/cypress/e2e/properties.cy.ts +++ b/clients/admin-ui/cypress/e2e/properties.cy.ts @@ -104,6 +104,93 @@ describe("Properties page", () => { expect(body.paths).to.eql([]); }); }); + + it("Should include paths in the create payload", () => { + cy.intercept("POST", "/api/v1/plus/property", { + statusCode: 200, + body: { + id: "FDS-NEW456", + name: "Test Property", + type: "Website", + paths: ["/privacy", "/dsr"], + experiences: [], + }, + }).as("createProperty"); + + cy.visit(ADD_PROPERTY_ROUTE); + cy.getByTestId("input-name").type("Test Property"); + + // Add two paths via the Form.List + cy.contains("button", "Add path").click(); + cy.get("#paths_0").type("/privacy"); + cy.contains("button", "Add path").click(); + cy.get("#paths_1").type("/dsr"); + + cy.getByTestId("save-btn").click(); + + cy.wait("@createProperty").then((interception) => { + const { body } = interception.request; + expect(body.paths).to.eql(["/privacy", "/dsr"]); + }); + }); + + it("Should allow removing a path before saving", () => { + cy.intercept("POST", "/api/v1/plus/property", { + statusCode: 200, + body: { + id: "FDS-NEW789", + name: "Test Property", + type: "Website", + paths: ["/dsr"], + experiences: [], + }, + }).as("createProperty"); + + cy.visit(ADD_PROPERTY_ROUTE); + cy.getByTestId("input-name").type("Test Property"); + + // Add two paths, then remove the first + cy.contains("button", "Add path").click(); + cy.get("#paths_0").type("/privacy"); + cy.contains("button", "Add path").click(); + cy.get("#paths_1").type("/dsr"); + cy.get("button[aria-label='Remove path']").first().click(); + + cy.getByTestId("save-btn").click(); + + cy.wait("@createProperty").then((interception) => { + const { body } = interception.request; + expect(body.paths).to.eql(["/dsr"]); + }); + }); + }); + + describe("Edit", () => { + it("Should load existing paths and include them in the update payload", () => { + cy.intercept("GET", "/api/v1/plus/property/*", { + fixture: "properties/property.json", + }).as("getProperty"); + cy.intercept("PUT", "/api/v1/plus/property/*", { + fixture: "properties/property.json", + }).as("updateProperty"); + + cy.getAntTableRow("FDS-CEA9EV").contains("Property A").click(); + cy.wait("@getProperty"); + + // Verify existing path is loaded + cy.get("#paths_0").should("have.value", "/privacy"); + + // Add another path + cy.contains("button", "Add path").click(); + cy.get("#paths_1").type("/dsr"); + + cy.getByTestId("save-btn").click(); + + cy.wait("@updateProperty").then((interception) => { + const { body } = interception.request; + expect(body.paths).to.eql(["/privacy", "/dsr"]); + }); + }); }); describe("Delete", () => { diff --git a/clients/admin-ui/cypress/fixtures/properties/property.json b/clients/admin-ui/cypress/fixtures/properties/property.json index 74d9f36402c..8991d7e60f1 100644 --- a/clients/admin-ui/cypress/fixtures/properties/property.json +++ b/clients/admin-ui/cypress/fixtures/properties/property.json @@ -2,6 +2,6 @@ "name": "Property A", "type": "Website", "id": "FDS-CEA9EV", - "paths": [], + "paths": ["/privacy"], "experiences": [] } diff --git a/clients/admin-ui/src/features/properties/PropertyForm.tsx b/clients/admin-ui/src/features/properties/PropertyForm.tsx index 2c86c5db5e3..9e5ead75baa 100644 --- a/clients/admin-ui/src/features/properties/PropertyForm.tsx +++ b/clients/admin-ui/src/features/properties/PropertyForm.tsx @@ -1,4 +1,14 @@ -import { Button, Card, Flex, Form, Input, Select, Space, Spin } from "fidesui"; +import { + Button, + Card, + Flex, + Form, + Icons, + Input, + Select, + Space, + Spin, +} from "fidesui"; import { useRouter } from "next/router"; import { useCallback, useEffect, useMemo, useState } from "react"; @@ -15,20 +25,11 @@ import { } from "~/types/api"; import DeletePropertyModal from "./DeletePropertyModal"; -import { PathsEditor } from "./PathsEditor"; import { PrivacyCenterConfigSection, PrivacyCenterConfigValue, } from "./privacy-center-config/PrivacyCenterConfigSection"; -const PathsEditorAdapter = ({ - value, - onChange, -}: { - value?: string[]; - onChange?: (next: string[]) => void; -}) => onChange?.(next)} />; - const PCConfigSectionAdapter = ({ propertyId, value, @@ -197,10 +198,30 @@ export const PropertyForm = ({ - + + {(fields, { add, remove }) => ( + + {fields.map((field) => ( + + + + + + + )} + - - , + ...(propertyId + ? [ + + + , + ] + : []),