diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 000000000..e44a44fc7 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,15 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). + +## [Unreleased] + +### Fixed + +- Connections no longer show as **Unhealthy** because of a shared destination's health. A destination integration is shared across every connection that routes to it, so an unhealthy destination previously marked all of its connections Unhealthy even when their own provider had no errors (the destination's errors live under the destination, not the connection). A healthy provider with an unhealthy or disabled shared destination is now surfaced as **Needs review** instead, in both `ConnectionRetrieveSerializer.get_status` and `filter_connections_by_status` (the `?status=` filter and the unhealthy-connections email). + +### Added + +- `recalculate_integration_statuses` management command — runs the same health calculation as the hourly "Calculate Integration Statuses" beat task on demand. Recalculates all integrations by default, or specific ones via `--integration-id` (repeatable); `--async` enqueues the Celery task instead of running inline. diff --git a/cdip_admin/api/v2/serializers.py b/cdip_admin/api/v2/serializers.py index f3cb0f632..2c48c8ffe 100644 --- a/cdip_admin/api/v2/serializers.py +++ b/cdip_admin/api/v2/serializers.py @@ -785,8 +785,13 @@ def get_status(self, obj): for destination in obj.destinations.all(): destination_status, _ = IntegrationStatus.objects.get_or_create(integration=destination) destination_statuses.append(destination_status.status) + # A destination is shared across every connection that routes to it, so its health + # must not mark this connection as unhealthy (that hides which connection is actually + # failing and floods the "unhealthy" list). An unhealthy or disabled destination is + # surfaced as "needs review" instead. Connection-level delivery failures are already + # attributed to the provider, so a genuinely broken connection still shows unhealthy. if IntegrationStatus.Status.UNHEALTHY.value in destination_statuses: - return ConnectionStatus.UNHEALTHY.value + return ConnectionStatus.NEEDS_REVIEW.value if IntegrationStatus.Status.DISABLED.value in destination_statuses: return ConnectionStatus.NEEDS_REVIEW.value return ConnectionStatus.HEALTHY.value diff --git a/cdip_admin/api/v2/tests/test_connections_api.py b/cdip_admin/api/v2/tests/test_connections_api.py index 4a9ae6048..8c8130b04 100644 --- a/cdip_admin/api/v2/tests/test_connections_api.py +++ b/cdip_admin/api/v2/tests/test_connections_api.py @@ -483,7 +483,9 @@ def test_filter_connections_by_status_unhealthy_as_superuser( filters={ "status": ConnectionStatus.UNHEALTHY.value }, - expected_integrations=[connection_with_unhealthy_provider, connection_with_unhealthy_destination] + # An unhealthy (shared) destination must NOT make the connection unhealthy. + # Only a connection whose provider is unhealthy is reported as unhealthy. + expected_integrations=[connection_with_unhealthy_provider] ) @@ -498,5 +500,29 @@ def test_filter_connections_by_status_needs_review_as_superuser( filters={ "status": ConnectionStatus.NEEDS_REVIEW.value }, - expected_integrations=[connection_with_disabled_destination] # provider OK but destination disabled + # Provider OK but a destination needs attention (disabled or unhealthy) + expected_integrations=[connection_with_unhealthy_destination, connection_with_disabled_destination] ) + + +def _get_connection_status(api_client, user, connection): + api_client.force_authenticate(user) + response = api_client.get(reverse("connections-list")) + assert response.status_code == status.HTTP_200_OK + connections = {c["id"]: c for c in response.json()["results"]} + return connections[str(connection.id)]["status"] + + +def test_connection_status_field_reflects_shared_destination_as_needs_review( + api_client, superuser, organization, + connection_with_unhealthy_provider, connection_with_unhealthy_destination, +): + # A connection whose provider has errors is unhealthy + assert _get_connection_status( + api_client, superuser, connection_with_unhealthy_provider + ) == ConnectionStatus.UNHEALTHY.value + # A connection with a healthy provider but an unhealthy (shared) destination + # must surface as "needs review", not "unhealthy". + assert _get_connection_status( + api_client, superuser, connection_with_unhealthy_destination + ) == ConnectionStatus.NEEDS_REVIEW.value diff --git a/cdip_admin/integrations/management/commands/recalculate_integration_statuses.py b/cdip_admin/integrations/management/commands/recalculate_integration_statuses.py new file mode 100644 index 000000000..f3dd4ff8c --- /dev/null +++ b/cdip_admin/integrations/management/commands/recalculate_integration_statuses.py @@ -0,0 +1,71 @@ +from django.core.management.base import BaseCommand + +from integrations.models import Integration +from integrations.models.v2 import calculate_integration_status + + +class Command(BaseCommand): + help = ( + "Recalculate the health status of integrations. By default every integration is " + "recalculated synchronously; pass --integration-id to limit the scope, or --async " + "to enqueue the Celery task instead of running inline. This is the same calculation " + "the hourly 'Calculate Integration Statuses' beat task performs." + ) + + def add_arguments(self, parser): + parser.add_argument( + "--integration-id", + action="append", + dest="integration_ids", + default=None, + help="Integration id to recalculate. Repeat to pass several. Defaults to all integrations.", + ) + parser.add_argument( + "--async", + action="store_true", + dest="run_async", + default=False, + help="Enqueue the Celery task(s) instead of recalculating inline.", + ) + + def handle(self, *args, **options): + integration_ids = options["integration_ids"] + run_async = options["run_async"] + + if integration_ids: + # Validate the ids so a typo fails loudly instead of silently doing nothing. + found = set( + str(i) for i in Integration.objects.filter( + id__in=integration_ids + ).values_list("id", flat=True) + ) + missing = [i for i in integration_ids if str(i) not in found] + if missing: + self.stderr.write(f"Unknown integration id(s): {', '.join(missing)}") + integration_ids = list(found) + else: + integration_ids = [ + str(i) for i in Integration.objects.values_list("id", flat=True) + ] + + if not integration_ids: + self.stdout.write("No integrations to recalculate.") + return + + if run_async: + from integrations.tasks import calculate_integration_statuses + calculate_integration_statuses.delay(integration_ids=integration_ids) + self.stdout.write( + f"Enqueued recalculation for {len(integration_ids)} integration(s)." + ) + return + + self.stdout.write(f"Recalculating status for {len(integration_ids)} integration(s)...") + for integration_id in integration_ids: + try: + status = calculate_integration_status(integration_id) + except Exception as e: + self.stderr.write(f"Error recalculating {integration_id}: {e}") + continue + self.stdout.write(f"{integration_id}: {status}") + self.stdout.write("Done!") diff --git a/cdip_admin/integrations/models/v2/services.py b/cdip_admin/integrations/models/v2/services.py index da1e60ac5..8e18dbe65 100644 --- a/cdip_admin/integrations/models/v2/services.py +++ b/cdip_admin/integrations/models/v2/services.py @@ -128,10 +128,15 @@ def filter_connections_by_status(queryset, status): destinations_unhealthy_q = Q( routing_rules_by_provider__destinations__status__status=IntegrationStatus.Status.UNHEALTHY.value ) - connection_unhealthy_q = Q( - provider_unhealthy_q | (destinations_unhealthy_q & ~provider_disabled_q) + # A destination is shared across every connection that routes to it, so an unhealthy + # destination must not make this connection unhealthy. Only a connection whose provider + # is unhealthy is reported as unhealthy; a connection with a healthy provider but an + # unhealthy or disabled destination is surfaced as "needs review". Keep this logic in + # sync with ConnectionRetrieveSerializer.get_status. + connection_unhealthy_q = Q(provider_unhealthy_q) + connection_needs_review_q = Q( + provider_healthy_q & (destinations_disabled_q | destinations_unhealthy_q) ) - connection_needs_review_q = Q(provider_healthy_q & destinations_disabled_q) connection_healthy_q = Q(provider_healthy_q & ~Q(destinations_unhealthy_q | destinations_disabled_q)) if status == ConnectionStatus.UNHEALTHY.value: return queryset.filter(connection_unhealthy_q) diff --git a/cdip_admin/integrations/tests/test_commands.py b/cdip_admin/integrations/tests/test_commands.py index e72be410d..6e3f78341 100644 --- a/cdip_admin/integrations/tests/test_commands.py +++ b/cdip_admin/integrations/tests/test_commands.py @@ -3,7 +3,7 @@ from django.core.management import call_command from django.core.management.base import CommandError -from integrations.models import IntegrationAction, IntegrationConfiguration +from integrations.models import IntegrationAction, IntegrationConfiguration, IntegrationStatus pytestmark = pytest.mark.django_db @@ -140,6 +140,38 @@ def test_repair_integration_configurations_with_all_flag( assert integration.configurations.filter(action=er_action_show_permissions).exists() +def test_recalculate_integration_statuses_command_for_single_integration( + provider_lotek_panthera, + pull_observations_action_started_activity_log, + pull_observations_action_failed_activity_log, + pull_observations_action_failed_activity_log_2, + pull_observations_action_failed_activity_log_3, +): + # Start from a stale healthy status; the command must recompute it from the error logs. + provider_lotek_panthera.status.status = IntegrationStatus.Status.HEALTHY + provider_lotek_panthera.status.save() + + call_command( + "recalculate_integration_statuses", + "--integration-id", str(provider_lotek_panthera.id), + ) + + provider_lotek_panthera.status.refresh_from_db() + assert provider_lotek_panthera.status.status == IntegrationStatus.Status.UNHEALTHY + + +def test_recalculate_integration_statuses_command_async_enqueues_task(mocker, provider_lotek_panthera): + mocked_task = mocker.patch("integrations.tasks.calculate_integration_statuses.delay") + + call_command( + "recalculate_integration_statuses", + "--integration-id", str(provider_lotek_panthera.id), + "--async", + ) + + mocked_task.assert_called_once_with(integration_ids=[str(provider_lotek_panthera.id)]) + + def test_call_set_action_configs_command_with_integration_type( er_destination_without_show_permissions_config, er_action_show_permissions ): diff --git a/cdip_admin/integrations/tests/test_connection_status_derivation.py b/cdip_admin/integrations/tests/test_connection_status_derivation.py new file mode 100644 index 000000000..0eba05722 --- /dev/null +++ b/cdip_admin/integrations/tests/test_connection_status_derivation.py @@ -0,0 +1,57 @@ +import pytest + +from api.v2.serializers import ConnectionRetrieveSerializer +from integrations.models import Integration, ConnectionStatus +from integrations.models.v2 import filter_connections_by_status + +pytestmark = pytest.mark.django_db + + +# A destination integration is shared across every connection that routes to it. Its health +# must therefore not drag a connection into the "unhealthy" bucket; an unhealthy or disabled +# destination is surfaced as "needs review" instead. These tests pin that derivation both for +# the queryset filter (filter_connections_by_status) and the live serializer (get_status). + + +def test_filter_unhealthy_excludes_connections_with_only_an_unhealthy_destination( + connection_with_unhealthy_provider, + connection_with_unhealthy_destination, +): + providers = Integration.providers.all() + unhealthy = filter_connections_by_status(providers, ConnectionStatus.UNHEALTHY.value) + review = filter_connections_by_status(providers, ConnectionStatus.NEEDS_REVIEW.value) + + # Provider with its own errors -> unhealthy + assert connection_with_unhealthy_provider in unhealthy + # Healthy provider but unhealthy shared destination -> NOT unhealthy, needs review + assert connection_with_unhealthy_destination not in unhealthy + assert connection_with_unhealthy_destination in review + + +def test_filter_needs_review_includes_unhealthy_and_disabled_destinations( + connection_with_unhealthy_destination, + connection_with_disabled_destination, +): + providers = Integration.providers.all() + review = filter_connections_by_status(providers, ConnectionStatus.NEEDS_REVIEW.value) + + assert connection_with_unhealthy_destination in review + assert connection_with_disabled_destination in review + + +def test_get_status_unhealthy_destination_resolves_to_needs_review( + connection_with_unhealthy_provider, + connection_with_unhealthy_destination, +): + serializer = ConnectionRetrieveSerializer() + + assert serializer.get_status(connection_with_unhealthy_provider) == ConnectionStatus.UNHEALTHY.value + assert serializer.get_status(connection_with_unhealthy_destination) == ConnectionStatus.NEEDS_REVIEW.value + + +def test_get_status_disabled_destination_resolves_to_needs_review( + connection_with_disabled_destination, +): + serializer = ConnectionRetrieveSerializer() + + assert serializer.get_status(connection_with_disabled_destination) == ConnectionStatus.NEEDS_REVIEW.value