|
10 | 10 |
|
11 | 11 | import sentry_sdk |
12 | 12 | from celery.utils.log import get_task_logger |
| 13 | +from config.django.base import DJANGO_FINDINGS_BATCH_SIZE |
13 | 14 | from config.env import env |
14 | 15 | from config.settings.celery import CELERY_DEADLOCK_ATTEMPTS |
15 | 16 | from django.db import IntegrityError, OperationalError |
16 | | -from django.db.models import Case, Count, IntegerField, Max, Min, Prefetch, Q, Sum, When |
| 17 | +from django.db.models import ( |
| 18 | + Case, |
| 19 | + Count, |
| 20 | + Exists, |
| 21 | + IntegerField, |
| 22 | + Max, |
| 23 | + Min, |
| 24 | + OuterRef, |
| 25 | + Prefetch, |
| 26 | + Q, |
| 27 | + Sum, |
| 28 | + When, |
| 29 | +) |
17 | 30 | from django.utils import timezone as django_timezone |
18 | 31 | from tasks.jobs.queries import ( |
19 | 32 | COMPLIANCE_UPSERT_PROVIDER_SCORE_SQL, |
20 | 33 | COMPLIANCE_UPSERT_TENANT_SUMMARY_SQL, |
21 | 34 | ) |
22 | | -from tasks.utils import CustomEncoder |
| 35 | +from tasks.utils import CustomEncoder, batched |
23 | 36 |
|
24 | 37 | from api.compliance import PROWLER_COMPLIANCE_OVERVIEW_TEMPLATE |
25 | 38 | from api.constants import SEVERITY_ORDER |
@@ -2069,3 +2082,169 @@ def aggregate_finding_group_summaries(tenant_id: str, scan_id: str): |
2069 | 2082 | "created": created_count, |
2070 | 2083 | "updated": updated_count, |
2071 | 2084 | } |
| 2085 | + |
| 2086 | + |
| 2087 | +def reset_ephemeral_resource_findings_count(tenant_id: str, scan_id: str) -> dict: |
| 2088 | + """Zero failed_findings_count for resources missing from a completed full-scope scan. |
| 2089 | +
|
| 2090 | + Resources that exist in the database for the scan's provider but were not |
| 2091 | + touched by this scan are treated as ephemeral. We keep their historical |
| 2092 | + findings, but reset the denormalized counter that drives the Resources page |
| 2093 | + sort so they stop ranking at the top. |
| 2094 | +
|
| 2095 | + Skipped (no-op) when: |
| 2096 | + - The scan is not in COMPLETED state. |
| 2097 | + - The scan ran with any scoping filter in scanner_args (partial scope). |
| 2098 | +
|
| 2099 | + Query design (must scale to 500k+ resources per provider): |
| 2100 | + Phase 1 — collect ephemeral IDs with one anti-join read. |
| 2101 | + Outer filter ``(tenant_id, provider_id, failed_findings_count > 0)`` |
| 2102 | + uses ``resources_tenant_provider_idx``. The correlated |
| 2103 | + ``NOT EXISTS`` subquery hits the implicit unique index |
| 2104 | + ``(tenant_id, scan_id, resource_id)`` on ``ResourceScanSummary``. |
| 2105 | + ``NOT EXISTS`` (vs ``NOT IN``) is null-safe and lets the planner |
| 2106 | + choose between hash anti-join and indexed nested-loop anti-join. |
| 2107 | + ``.iterator(chunk_size=...)`` skips the queryset cache so memory |
| 2108 | + stays bounded while streaming UUIDs. |
| 2109 | + Phase 2 — UPDATE in fixed-size batches. |
| 2110 | + One large UPDATE would hold row-exclusive locks for seconds and |
| 2111 | + create a WAL spike. Batched UPDATEs by ``id__in`` (~1k rows each) |
| 2112 | + hit the primary key, keep each lock window ~50ms, bound WAL chunks, |
| 2113 | + and let other writers proceed between batches. |
| 2114 | + ``failed_findings_count__gt=0`` in the UPDATE is idempotent under |
| 2115 | + concurrent scans and skips no-op rewrites. |
| 2116 | + Reads use the primary DB, not the replica: ``ResourceScanSummary`` rows |
| 2117 | + were written by the same scan task that triggered this one, so replica |
| 2118 | + lag could falsely classify scanned resources as ephemeral. |
| 2119 | +
|
| 2120 | + Scope detection (``Scan.is_full_scope()``) derives the set of scoping |
| 2121 | + scanner_args from ``prowler.lib.scan.scan.Scan.__init__`` via |
| 2122 | + introspection, so the API can never drift from the SDK's filter |
| 2123 | + contract. Imported scans are also rejected by trigger — they may only |
| 2124 | + cover a partial slice of resources. |
| 2125 | + """ |
| 2126 | + with rls_transaction(tenant_id): |
| 2127 | + scan = Scan.objects.filter(tenant_id=tenant_id, id=scan_id).first() |
| 2128 | + |
| 2129 | + if scan is None: |
| 2130 | + logger.warning(f"Scan {scan_id} not found") |
| 2131 | + return {"status": "skipped", "reason": "scan not found"} |
| 2132 | + |
| 2133 | + if scan.state != StateChoices.COMPLETED: |
| 2134 | + logger.info(f"Scan {scan_id} not completed; skipping ephemeral reset") |
| 2135 | + return {"status": "skipped", "reason": "scan not completed"} |
| 2136 | + |
| 2137 | + if not scan.is_full_scope(): |
| 2138 | + logger.info( |
| 2139 | + f"Scan {scan_id} ran with scoping filters; skipping ephemeral reset" |
| 2140 | + ) |
| 2141 | + return {"status": "skipped", "reason": "partial scan scope"} |
| 2142 | + |
| 2143 | + # Race protection: if a newer completed full-scope scan exists for this |
| 2144 | + # provider, our ResourceScanSummary set is stale relative to the resources' |
| 2145 | + # current failed_findings_count values (which the newer scan already |
| 2146 | + # refreshed). Wiping based on the older scan would zero counts the newer |
| 2147 | + # scan just set. Skip and let the newer scan's reset task do the work; if |
| 2148 | + # this task was delayed in the queue, that's the correct outcome. |
| 2149 | + # `completed_at__isnull=False` is required: Postgres orders NULL first in |
| 2150 | + # DESC, so a sibling COMPLETED scan with a missing completed_at would sort |
| 2151 | + # as "newest" and incorrectly cause us to skip. |
| 2152 | + with rls_transaction(tenant_id): |
| 2153 | + latest_full_scope_scan_id = ( |
| 2154 | + Scan.objects.filter( |
| 2155 | + tenant_id=tenant_id, |
| 2156 | + provider_id=scan.provider_id, |
| 2157 | + state=StateChoices.COMPLETED, |
| 2158 | + completed_at__isnull=False, |
| 2159 | + ) |
| 2160 | + .order_by("-completed_at", "-inserted_at") |
| 2161 | + .values_list("id", flat=True) |
| 2162 | + .first() |
| 2163 | + ) |
| 2164 | + if latest_full_scope_scan_id != scan.id: |
| 2165 | + logger.info( |
| 2166 | + f"Scan {scan_id} is not the latest completed scan for provider " |
| 2167 | + f"{scan.provider_id}; skipping ephemeral reset" |
| 2168 | + ) |
| 2169 | + return {"status": "skipped", "reason": "newer scan exists"} |
| 2170 | + |
| 2171 | + # Defensive gate: ResourceScanSummary rows are written by perform_prowler_scan |
| 2172 | + # via best-effort bulk_create. If those writes failed silently (or the scan |
| 2173 | + # genuinely produced resources but no summaries were persisted), the |
| 2174 | + # ~Exists(in_scan) anti-join below would classify EVERY resource for this |
| 2175 | + # provider as ephemeral and zero their counts. Bail loudly instead. |
| 2176 | + with rls_transaction(tenant_id): |
| 2177 | + summaries_present = ResourceScanSummary.objects.filter( |
| 2178 | + tenant_id=tenant_id, scan_id=scan_id |
| 2179 | + ).exists() |
| 2180 | + if scan.unique_resource_count > 0 and not summaries_present: |
| 2181 | + logger.error( |
| 2182 | + f"Scan {scan_id} reports {scan.unique_resource_count} unique " |
| 2183 | + f"resources but no ResourceScanSummary rows are persisted; " |
| 2184 | + f"skipping ephemeral reset to avoid wiping valid counts" |
| 2185 | + ) |
| 2186 | + return {"status": "skipped", "reason": "summaries missing"} |
| 2187 | + |
| 2188 | + # Stays on the primary DB intentionally. ResourceScanSummary rows are |
| 2189 | + # written by perform_prowler_scan in the same chain that triggered this |
| 2190 | + # task, so replica lag could return an empty/partial summary set; a stale |
| 2191 | + # read here would classify every Resource as ephemeral and wipe valid |
| 2192 | + # failed_findings_count values on the primary. Same rationale as |
| 2193 | + # update_provider_compliance_scores below in this module. |
| 2194 | + # Materializing the ID list (rather than streaming the iterator into |
| 2195 | + # batched UPDATEs) is intentional: it lets the UPDATEs run in their own |
| 2196 | + # short rls_transactions instead of one long transaction holding row locks |
| 2197 | + # on every batch. At 500k UUIDs the peak memory is ~40 MB — acceptable for |
| 2198 | + # a Celery worker — and is the better trade-off versus a multi-second |
| 2199 | + # write-lock window blocking concurrent scans. |
| 2200 | + with rls_transaction(tenant_id): |
| 2201 | + in_scan = ResourceScanSummary.objects.filter( |
| 2202 | + tenant_id=tenant_id, |
| 2203 | + scan_id=scan_id, |
| 2204 | + resource_id=OuterRef("pk"), |
| 2205 | + ) |
| 2206 | + ephemeral_ids = list( |
| 2207 | + Resource.objects.filter( |
| 2208 | + tenant_id=tenant_id, |
| 2209 | + provider_id=scan.provider_id, |
| 2210 | + failed_findings_count__gt=0, |
| 2211 | + ) |
| 2212 | + .filter(~Exists(in_scan)) |
| 2213 | + .values_list("id", flat=True) |
| 2214 | + .iterator(chunk_size=DJANGO_FINDINGS_BATCH_SIZE) |
| 2215 | + ) |
| 2216 | + |
| 2217 | + if not ephemeral_ids: |
| 2218 | + logger.info(f"No ephemeral resources for scan {scan_id}") |
| 2219 | + return { |
| 2220 | + "status": "completed", |
| 2221 | + "scan_id": str(scan_id), |
| 2222 | + "provider_id": str(scan.provider_id), |
| 2223 | + "reset": 0, |
| 2224 | + } |
| 2225 | + |
| 2226 | + total_updated = 0 |
| 2227 | + for batch, _ in batched(ephemeral_ids, DJANGO_FINDINGS_BATCH_SIZE): |
| 2228 | + # batched() always yields a final tuple, which is empty when the input |
| 2229 | + # length is an exact multiple of the batch size. Skip it so we don't |
| 2230 | + # issue a no-op UPDATE ... WHERE id IN (). |
| 2231 | + if not batch: |
| 2232 | + continue |
| 2233 | + with rls_transaction(tenant_id): |
| 2234 | + total_updated += Resource.objects.filter( |
| 2235 | + tenant_id=tenant_id, |
| 2236 | + id__in=batch, |
| 2237 | + failed_findings_count__gt=0, |
| 2238 | + ).update(failed_findings_count=0) |
| 2239 | + |
| 2240 | + logger.info( |
| 2241 | + f"Ephemeral resource reset for scan {scan_id}: " |
| 2242 | + f"{total_updated} resources zeroed for provider {scan.provider_id}" |
| 2243 | + ) |
| 2244 | + |
| 2245 | + return { |
| 2246 | + "status": "completed", |
| 2247 | + "scan_id": str(scan_id), |
| 2248 | + "provider_id": str(scan.provider_id), |
| 2249 | + "reset": total_updated, |
| 2250 | + } |
0 commit comments