diff --git a/sig-security-tooling/cve-feed/hack/__pycache__/cve_title_parser.cpython-313.pyc b/sig-security-tooling/cve-feed/hack/__pycache__/cve_title_parser.cpython-313.pyc new file mode 100644 index 0000000..362ec24 Binary files /dev/null and b/sig-security-tooling/cve-feed/hack/__pycache__/cve_title_parser.cpython-313.pyc differ diff --git a/sig-security-tooling/cve-feed/hack/fetch-official-cve-feed.py b/sig-security-tooling/cve-feed/hack/fetch-official-cve-feed.py index d8faece..7d7eb41 100755 --- a/sig-security-tooling/cve-feed/hack/fetch-official-cve-feed.py +++ b/sig-security-tooling/cve-feed/hack/fetch-official-cve-feed.py @@ -18,9 +18,29 @@ import json import requests import sys +import re + from datetime import datetime, timezone from cve_title_parser import parse_cve_title +def extract_osv_from_body(body): + # Extract an embedded OSV JSON object from a CVE issue body. + # New SRC CVE announcements may include the OSV data inline + # as a fenced ```json code block. This helper parses and returns + # that JSON when present. + if not body: + return None + + match = re.search(r"```json\s*(\{.*?\})\s*```", body, re.DOTALL) + if not match: + return None + + try: + return json.loads(match.group(1)) + except json.JSONDecodeError: + return None + + def getCVEStatus(state, state_reason): if state == "open": if state_reason == "reopened": @@ -76,7 +96,11 @@ def getCVEStatus(state, state_reason): cve = {'content_text': None, 'date_published': None, 'external_url': None, 'id': None,'summary': None, 'url': None, '_kubernetes_io': None} # This is a custom extension - item_kubernetes_io = {'google_group_url': None, 'issue_number': None} + item_kubernetes_io = { + 'google_group_url': None, + 'issue_number': None, + 'osv': None, + } cve['_kubernetes_io'] = item_kubernetes_io cve['url'] = item['html_url'] @@ -91,15 +115,67 @@ def getCVEStatus(state, state_reason): first_cve_id = cve_ids[0] cve['id'] = first_cve_id + + # Try extracting OSV from issue body first (SRC new format) + cve['_kubernetes_io']['osv'] = extract_osv_from_body(item.get('body')) + + # Fallback: fetch OSV from cve-feed-osv repository + if cve['_kubernetes_io']['osv'] is None: + osv_url = f'https://raw.githubusercontent.com/kubernetes-sigs/cve-feed-osv/main/vulns/{first_cve_id}.json' + try: + res_osv = requests.get(osv_url, timeout=5) + if res_osv.status_code == 200: + cve['_kubernetes_io']['osv'] = res_osv.json() + except requests.RequestException: + pass + + cve['external_url'] = f'https://www.cve.org/cverecord?id={first_cve_id}' cve['_kubernetes_io']['google_group_url'] = f'https://groups.google.com/g/kubernetes-announce/search?q={first_cve_id}' # Add additional entries for any remaining CVE IDs for additional_cve_id in cve_ids[1:]: + # Make a deep copy of the main CVE to avoid overwriting its data additional_cve = copy.deepcopy(cve) + + # Update the CVE ID for this additional CVE additional_cve['id'] = additional_cve_id + + # Set the external URL for this CVE on CVE.org additional_cve['external_url'] = f'https://www.cve.org/cverecord?id={additional_cve_id}' + + # Set the Google Group URL specific to this CVE additional_cve['_kubernetes_io']['google_group_url'] = f'https://groups.google.com/g/kubernetes-announce/search?q={additional_cve_id}' + + additional_cve['_kubernetes_io']['osv'] = extract_osv_from_body(item.get('body')) + + # Fallback: if no embedded OSV was found in the issue body, attempt to fetch + # the OSV JSON from the official cve-feed-osv repository for this CVE. + if additional_cve['_kubernetes_io']['osv'] is None: + additional_osv_url = f'https://raw.githubusercontent.com/kubernetes-sigs/cve-feed-osv/main/vulns/{additional_cve_id}.json' + try: + res_additional_osv = requests.get(additional_osv_url, timeout=5) + if res_additional_osv.status_code == 200: + additional_cve['_kubernetes_io']['osv'] = res_additional_osv.json() + except requests.RequestException: + pass + + # Construct the URL to fetch the OSV JSON from the official repository + additional_osv_url = f'https://raw.githubusercontent.com/kubernetes-sigs/cve-feed-osv/main/vulns/{additional_cve_id}.json' + + try: + # Attempt to fetch the OSV JSON with a 5-second timeout + res_additional_osv = requests.get(additional_osv_url, timeout=5) + + # If the file exists, parse it as JSON and store it in the 'osv' field + if res_additional_osv.status_code == 200: + additional_cve['_kubernetes_io']['osv'] = res_additional_osv.json() + + except requests.RequestException: + # If any network error occurs (timeout, connection issue, etc.), keep 'osv' as None + additional_cve['_kubernetes_io']['osv'] = None + + cve_list.append(additional_cve) cve_list.append(cve)