Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## [Unreleased]

### Fixed

- Connections no longer show as **Unhealthy** because of a shared destination's health. A destination integration is shared across every connection that routes to it, so an unhealthy destination previously marked all of its connections Unhealthy even when their own provider had no errors (the destination's errors live under the destination, not the connection). A healthy provider with an unhealthy or disabled shared destination is now surfaced as **Needs review** instead, in both `ConnectionRetrieveSerializer.get_status` and `filter_connections_by_status` (the `?status=` filter and the unhealthy-connections email).

### Added

- `recalculate_integration_statuses` management command — runs the same health calculation as the hourly "Calculate Integration Statuses" beat task on demand. Recalculates all integrations by default, or specific ones via `--integration-id` (repeatable); `--async` enqueues the Celery task instead of running inline.
7 changes: 6 additions & 1 deletion cdip_admin/api/v2/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,13 @@ def get_status(self, obj):
for destination in obj.destinations.all():
destination_status, _ = IntegrationStatus.objects.get_or_create(integration=destination)
destination_statuses.append(destination_status.status)
# A destination is shared across every connection that routes to it, so its health
# must not mark this connection as unhealthy (that hides which connection is actually
# failing and floods the "unhealthy" list). An unhealthy or disabled destination is
# surfaced as "needs review" instead. Connection-level delivery failures are already
# attributed to the provider, so a genuinely broken connection still shows unhealthy.
if IntegrationStatus.Status.UNHEALTHY.value in destination_statuses:
return ConnectionStatus.UNHEALTHY.value
return ConnectionStatus.NEEDS_REVIEW.value
if IntegrationStatus.Status.DISABLED.value in destination_statuses:
return ConnectionStatus.NEEDS_REVIEW.value
return ConnectionStatus.HEALTHY.value
Expand Down
30 changes: 28 additions & 2 deletions cdip_admin/api/v2/tests/test_connections_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,9 @@ def test_filter_connections_by_status_unhealthy_as_superuser(
filters={
"status": ConnectionStatus.UNHEALTHY.value
},
expected_integrations=[connection_with_unhealthy_provider, connection_with_unhealthy_destination]
# An unhealthy (shared) destination must NOT make the connection unhealthy.
# Only a connection whose provider is unhealthy is reported as unhealthy.
expected_integrations=[connection_with_unhealthy_provider]
)


Expand All @@ -498,5 +500,29 @@ def test_filter_connections_by_status_needs_review_as_superuser(
filters={
"status": ConnectionStatus.NEEDS_REVIEW.value
},
expected_integrations=[connection_with_disabled_destination] # provider OK but destination disabled
# Provider OK but a destination needs attention (disabled or unhealthy)
expected_integrations=[connection_with_unhealthy_destination, connection_with_disabled_destination]
)


def _get_connection_status(api_client, user, connection):
api_client.force_authenticate(user)
response = api_client.get(reverse("connections-list"))
assert response.status_code == status.HTTP_200_OK
connections = {c["id"]: c for c in response.json()["results"]}
return connections[str(connection.id)]["status"]


def test_connection_status_field_reflects_shared_destination_as_needs_review(
api_client, superuser, organization,
connection_with_unhealthy_provider, connection_with_unhealthy_destination,
):
# A connection whose provider has errors is unhealthy
assert _get_connection_status(
api_client, superuser, connection_with_unhealthy_provider
) == ConnectionStatus.UNHEALTHY.value
# A connection with a healthy provider but an unhealthy (shared) destination
# must surface as "needs review", not "unhealthy".
assert _get_connection_status(
api_client, superuser, connection_with_unhealthy_destination
) == ConnectionStatus.NEEDS_REVIEW.value
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from django.core.management.base import BaseCommand

from integrations.models import Integration
from integrations.models.v2 import calculate_integration_status


class Command(BaseCommand):
help = (
"Recalculate the health status of integrations. By default every integration is "
"recalculated synchronously; pass --integration-id to limit the scope, or --async "
"to enqueue the Celery task instead of running inline. This is the same calculation "
"the hourly 'Calculate Integration Statuses' beat task performs."
)

def add_arguments(self, parser):
parser.add_argument(
"--integration-id",
action="append",
dest="integration_ids",
default=None,
help="Integration id to recalculate. Repeat to pass several. Defaults to all integrations.",
)
parser.add_argument(
"--async",
action="store_true",
dest="run_async",
default=False,
help="Enqueue the Celery task(s) instead of recalculating inline.",
)

def handle(self, *args, **options):
integration_ids = options["integration_ids"]
run_async = options["run_async"]

if integration_ids:
# Validate the ids so a typo fails loudly instead of silently doing nothing.
found = set(
str(i) for i in Integration.objects.filter(
id__in=integration_ids
).values_list("id", flat=True)
)
missing = [i for i in integration_ids if str(i) not in found]
if missing:
self.stderr.write(f"Unknown integration id(s): {', '.join(missing)}")
integration_ids = list(found)
else:
integration_ids = [
str(i) for i in Integration.objects.values_list("id", flat=True)
]

if not integration_ids:
self.stdout.write("No integrations to recalculate.")
return

if run_async:
from integrations.tasks import calculate_integration_statuses
calculate_integration_statuses.delay(integration_ids=integration_ids)
self.stdout.write(
f"Enqueued recalculation for {len(integration_ids)} integration(s)."
)
return

self.stdout.write(f"Recalculating status for {len(integration_ids)} integration(s)...")
for integration_id in integration_ids:
try:
status = calculate_integration_status(integration_id)
except Exception as e:
self.stderr.write(f"Error recalculating {integration_id}: {e}")
continue
self.stdout.write(f"{integration_id}: {status}")
self.stdout.write("Done!")
11 changes: 8 additions & 3 deletions cdip_admin/integrations/models/v2/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,15 @@ def filter_connections_by_status(queryset, status):
destinations_unhealthy_q = Q(
routing_rules_by_provider__destinations__status__status=IntegrationStatus.Status.UNHEALTHY.value
)
connection_unhealthy_q = Q(
provider_unhealthy_q | (destinations_unhealthy_q & ~provider_disabled_q)
# A destination is shared across every connection that routes to it, so an unhealthy
# destination must not make this connection unhealthy. Only a connection whose provider
# is unhealthy is reported as unhealthy; a connection with a healthy provider but an
# unhealthy or disabled destination is surfaced as "needs review". Keep this logic in
# sync with ConnectionRetrieveSerializer.get_status.
connection_unhealthy_q = Q(provider_unhealthy_q)
connection_needs_review_q = Q(
provider_healthy_q & (destinations_disabled_q | destinations_unhealthy_q)
)
connection_needs_review_q = Q(provider_healthy_q & destinations_disabled_q)
connection_healthy_q = Q(provider_healthy_q & ~Q(destinations_unhealthy_q | destinations_disabled_q))
if status == ConnectionStatus.UNHEALTHY.value:
return queryset.filter(connection_unhealthy_q)
Expand Down
34 changes: 33 additions & 1 deletion cdip_admin/integrations/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django.core.management import call_command
from django.core.management.base import CommandError

from integrations.models import IntegrationAction, IntegrationConfiguration
from integrations.models import IntegrationAction, IntegrationConfiguration, IntegrationStatus


pytestmark = pytest.mark.django_db
Expand Down Expand Up @@ -140,6 +140,38 @@ def test_repair_integration_configurations_with_all_flag(
assert integration.configurations.filter(action=er_action_show_permissions).exists()


def test_recalculate_integration_statuses_command_for_single_integration(
provider_lotek_panthera,
pull_observations_action_started_activity_log,
pull_observations_action_failed_activity_log,
pull_observations_action_failed_activity_log_2,
pull_observations_action_failed_activity_log_3,
):
# Start from a stale healthy status; the command must recompute it from the error logs.
provider_lotek_panthera.status.status = IntegrationStatus.Status.HEALTHY
provider_lotek_panthera.status.save()

call_command(
"recalculate_integration_statuses",
"--integration-id", str(provider_lotek_panthera.id),
)

provider_lotek_panthera.status.refresh_from_db()
assert provider_lotek_panthera.status.status == IntegrationStatus.Status.UNHEALTHY


def test_recalculate_integration_statuses_command_async_enqueues_task(mocker, provider_lotek_panthera):
mocked_task = mocker.patch("integrations.tasks.calculate_integration_statuses.delay")

call_command(
"recalculate_integration_statuses",
"--integration-id", str(provider_lotek_panthera.id),
"--async",
)

mocked_task.assert_called_once_with(integration_ids=[str(provider_lotek_panthera.id)])


def test_call_set_action_configs_command_with_integration_type(
er_destination_without_show_permissions_config, er_action_show_permissions
):
Expand Down
57 changes: 57 additions & 0 deletions cdip_admin/integrations/tests/test_connection_status_derivation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pytest

from api.v2.serializers import ConnectionRetrieveSerializer
from integrations.models import Integration, ConnectionStatus
from integrations.models.v2 import filter_connections_by_status

pytestmark = pytest.mark.django_db


# A destination integration is shared across every connection that routes to it. Its health
# must therefore not drag a connection into the "unhealthy" bucket; an unhealthy or disabled
# destination is surfaced as "needs review" instead. These tests pin that derivation both for
# the queryset filter (filter_connections_by_status) and the live serializer (get_status).


def test_filter_unhealthy_excludes_connections_with_only_an_unhealthy_destination(
connection_with_unhealthy_provider,
connection_with_unhealthy_destination,
):
providers = Integration.providers.all()
unhealthy = filter_connections_by_status(providers, ConnectionStatus.UNHEALTHY.value)
review = filter_connections_by_status(providers, ConnectionStatus.NEEDS_REVIEW.value)

# Provider with its own errors -> unhealthy
assert connection_with_unhealthy_provider in unhealthy
# Healthy provider but unhealthy shared destination -> NOT unhealthy, needs review
assert connection_with_unhealthy_destination not in unhealthy
assert connection_with_unhealthy_destination in review


def test_filter_needs_review_includes_unhealthy_and_disabled_destinations(
connection_with_unhealthy_destination,
connection_with_disabled_destination,
):
providers = Integration.providers.all()
review = filter_connections_by_status(providers, ConnectionStatus.NEEDS_REVIEW.value)

assert connection_with_unhealthy_destination in review
assert connection_with_disabled_destination in review


def test_get_status_unhealthy_destination_resolves_to_needs_review(
connection_with_unhealthy_provider,
connection_with_unhealthy_destination,
):
serializer = ConnectionRetrieveSerializer()

assert serializer.get_status(connection_with_unhealthy_provider) == ConnectionStatus.UNHEALTHY.value
assert serializer.get_status(connection_with_unhealthy_destination) == ConnectionStatus.NEEDS_REVIEW.value


def test_get_status_disabled_destination_resolves_to_needs_review(
connection_with_disabled_destination,
):
serializer = ConnectionRetrieveSerializer()

assert serializer.get_status(connection_with_disabled_destination) == ConnectionStatus.NEEDS_REVIEW.value