From a0db69740f65f826f6981eeaa771a01618ba8228 Mon Sep 17 00:00:00 2001 From: ziad hany Date: Mon, 1 Jun 2026 13:43:47 +0300 Subject: [PATCH] Add API support for PackageCommitPatch Add a test Signed-off-by: ziad hany --- vulnerabilities/api_v3.py | 63 +++++++++++++++- .../templates/advisory_detail.html | 46 +++++++++++- .../advisory_package_commit_details.html | 75 +++++++++++++++++++ vulnerabilities/tests/test_api_v3.py | 68 ++++++++++++++++- vulnerabilities/views.py | 52 +++++++++++++ vulnerablecode/urls.py | 6 ++ 6 files changed, 305 insertions(+), 5 deletions(-) create mode 100644 vulnerabilities/templates/advisory_package_commit_details.html diff --git a/vulnerabilities/api_v3.py b/vulnerabilities/api_v3.py index 12f10ed1c..07712c33e 100644 --- a/vulnerabilities/api_v3.py +++ b/vulnerabilities/api_v3.py @@ -221,18 +221,24 @@ def get_affected_by_vulnerabilities(self, package): """Return a dictionary with advisory as keys and their details, including fixed_by_packages.""" advisories = self.context["advisory_map"].get(package.id, []) impact_map = self.context["impact_map"].get(package.id, {}) + introduced_patch_map = self.context.get("introduced_patch_map", {}) + fixed_patch_map = self.context.get("fixed_patch_map", {}) if advisories: result = [] for adv in advisories: fixed = impact_map.get(adv["avid"]) + introduced_patches = introduced_patch_map.get((package.id, adv["avid"]), []) + fixed_patches = fixed_patch_map.get((package.id, adv["avid"]), []) adv.pop("avid", None) result.append( { **adv, "fixed_by_packages": fixed, + "introduced_by_patch": introduced_patches, + "fixed_by_patch": fixed_patches, } ) @@ -266,7 +272,8 @@ def get_affected_by_vulnerabilities(self, package): impact = impact_by_avid.get(advisory.avid) if not impact: continue - + introduced_patches = introduced_patch_map.get((package.id, advisory.avid), []) + fixed_patches = fixed_patch_map.get((package.id, advisory.avid), []) result.append( { "advisory_id": advisory.advisory_id.split("/")[-1], @@ -276,9 +283,10 @@ def get_affected_by_vulnerabilities(self, package): "exploitability": advisory.exploitability, "risk_score": advisory.risk_score, "fixed_by_packages": [pkg.purl for pkg in impact.fixed_by_packages.all()], + "introduced_by_patch": introduced_patches, + "fixed_by_patch": fixed_patches, } ) - return result if not advisories: @@ -353,6 +361,8 @@ def return_advisories_data(self, package, advisories_qs, advisories): ) impact_by_avid = {impact.advisory.avid: impact for impact in impacts} + introduced_patch_map = self.context.get("introduced_patch_map", {}) + fixed_patch_map = self.context.get("fixed_patch_map", {}) result = [] for advisory in advisories: @@ -361,6 +371,8 @@ def return_advisories_data(self, package, advisories_qs, advisories): if not impact: continue + introduced_patches = introduced_patch_map.get((package.id, advisory.advisory.avid), []) + fixed_patches = fixed_patch_map.get((package.id, advisory.advisory.avid), []) result.append( { "advisory_id": advisory.identifier, @@ -372,6 +384,8 @@ def return_advisories_data(self, package, advisories_qs, advisories): "fixed_by_packages": list( set([pkg.purl for pkg in impact.fixed_by_packages.all()]) ), + "introduced_by_patch": introduced_patches, + "fixed_by_patch": fixed_patches, } ) @@ -456,6 +470,7 @@ def create(self, request, *args, **kwargs): affected_advisory_map = get_affected_advisories_bulk(page) fixing_advisory_map = get_fixing_advisories_bulk(page) impact_map = get_impacts_bulk(page) + introduced_patch_map, fixed_patch_map = get_patches_bulk(page) serializer = self.get_serializer( page, many=True, @@ -464,6 +479,8 @@ def create(self, request, *args, **kwargs): "advisory_map": affected_advisory_map, "impact_map": impact_map, "fixing_advisory_map": fixing_advisory_map, + "introduced_patch_map": introduced_patch_map, + "fixed_patch_map": fixed_patch_map, }, ) return self.get_paginated_response(serializer.data) @@ -673,6 +690,48 @@ def get_impacts_bulk(packages): return impact_map +def get_patches_bulk(packages): + """ + Returns a tuple of two dicts: + introduced_map: (package_id, advisory_avid) -> list of introduced package_commit_patches dicts + fixed_map: (package_id, advisory_avid) -> list of fixed package_commit_patches dicts + Each package_commit_patches dict contains 'commit_hash' and 'vcs_url' + """ + package_ids = [p.id for p in packages] + if not package_ids: + return {}, {} + + impacted_packages_qs = ( + ImpactedPackageAffecting.objects.filter(package_id__in=package_ids) + .select_related("impacted_package__advisory") + .prefetch_related( + "impacted_package__introduced_by_package_commit_patches", + "impacted_package__fixed_by_package_commit_patches", + ) + ) + + introduced_patches = defaultdict(list) + fixed_patches = defaultdict(list) + + for impacted_pkg_qs in impacted_packages_qs: + pkg_id = impacted_pkg_qs.package_id + impact = impacted_pkg_qs.impacted_package + avid = impact.advisory.avid + key = (pkg_id, avid) + + for patch in impact.introduced_by_package_commit_patches.all(): + patch_data = {"commit_hash": patch.commit_hash, "vcs_url": patch.vcs_url} + if patch_data not in introduced_patches[key]: + introduced_patches[key].append(patch_data) + + for patch in impact.fixed_by_package_commit_patches.all(): + patch_data = {"commit_hash": patch.commit_hash, "vcs_url": patch.vcs_url} + if patch_data not in fixed_patches[key]: + fixed_patches[key].append(patch_data) + + return introduced_patches, fixed_patches + + def get_fixing_advisories_bulk(packages): package_ids = [p.id for p in packages] diff --git a/vulnerabilities/templates/advisory_detail.html b/vulnerabilities/templates/advisory_detail.html index 90f1d6d8b..acf3d9379 100644 --- a/vulnerabilities/templates/advisory_detail.html +++ b/vulnerabilities/templates/advisory_detail.html @@ -80,7 +80,16 @@ {% endif %} - + +
  • + + + {% with pcp_length=package_commit_patches|length %} + Patches: ({{ advisory.patches.count|add:pcp_length }}) + {% endwith %} + + +