Updating package managers#346
Conversation
| # def test_get_package_manager_azure_linux_4_and_rhel10_not_supported(self): | ||
| # """Test that Azure Linux 4 and RHEL 10 log unsupported message""" | ||
| # self.backup_platform_system = platform.system | ||
| # self.backup_linux_distribution = self.envlayer.platform.linux_distribution | ||
| # self.backup_distro_os_release_attr = distro.os_release_attr | ||
| # | ||
| # platform.system = self.mock_platform_system | ||
| # test_input_output_table = [ | ||
| # [self.mock_linux_distribution_to_return_azure_linux_4, self.mock_distro_os_release_attr_return_azure_linux_4, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Microsoft Azure Linux][Version=4.0][Code=]\n"], | ||
| # [self.mock_linux_distribution_to_return_rhel_10, self.mock_distro_os_release_attr_return_rhel_10, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Red Hat][Version=10.0][Code=abc]\n"], | ||
| # ] | ||
| # | ||
| # for row in test_input_output_table: | ||
| # self.envlayer.platform.linux_distribution = row[0] | ||
| # distro.os_release_attr = row[1] | ||
| # | ||
| # captured_output = io.StringIO() | ||
| # sys.stdout = captured_output | ||
| # result = self.envlayer.get_package_manager() | ||
| # sys.stdout = sys.__stdout__ | ||
| # self.assertEqual(row[2], captured_output.getvalue()) | ||
| # self.assertEqual(result, "") | ||
| # | ||
| # # restore | ||
| # self.__restore_mocks() |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #346 +/- ##
==========================================
- Coverage 93.84% 93.59% -0.25%
==========================================
Files 105 105
Lines 18218 18337 +119
==========================================
+ Hits 17096 17162 +66
- Misses 1122 1175 +53
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR introduces a new “update certificates” flow intended to run during patch installation, adds related constants/error codes, and adjusts tests around package manager detection.
Changes:
- Adds certificate-update plumbing to
PackageManager(mokutil detection, cert status retrieval helpers, andupdate_certs()orchestration). - Invokes
package_manager.update_certs()at the start of patch installation. - Adds
CERTIFICATE_UPDATEerror code + certificate-related constants, and comments out an EnvLayer unsupported-distro test.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/core/tests/Test_EnvLayer.py | Comments out the Azure Linux 4 / RHEL 10 unsupported-distro test. |
| src/core/src/package_managers/ZypperPackageManager.py | Adds abstract stub cert-update methods that override base behavior. |
| src/core/src/package_managers/YumPackageManager.py | Adds abstract stub cert-update methods that override base behavior. |
| src/core/src/package_managers/TdnfPackageManager.py | Adds abstract stub cert-update methods that override base behavior. |
| src/core/src/package_managers/PackageManager.py | Adds shared cert-update orchestration and mokutil/cert status command constants. |
| src/core/src/package_managers/AptitudePackageManager.py | Adds apt/fwupd-based cert update flow, but also overrides key base methods with abstract stubs. |
| src/core/src/core_logic/PatchInstaller.py | Calls package_manager.update_certs() during installation start. |
| src/core/src/bootstrap/Constants.py | Adds Certificates enum + LATEST_CERTIFICATE_TIMESTAMP + CERTIFICATE_UPDATE error code. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # self.backup_platform_system = platform.system | ||
| # self.backup_linux_distribution = self.envlayer.platform.linux_distribution | ||
| # self.backup_distro_os_release_attr = distro.os_release_attr | ||
| # | ||
| # platform.system = self.mock_platform_system | ||
| # test_input_output_table = [ | ||
| # [self.mock_linux_distribution_to_return_azure_linux_4, self.mock_distro_os_release_attr_return_azure_linux_4, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Microsoft Azure Linux][Version=4.0][Code=]\n"], | ||
| # [self.mock_linux_distribution_to_return_rhel_10, self.mock_distro_os_release_attr_return_rhel_10, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Red Hat][Version=10.0][Code=abc]\n"], | ||
| # ] | ||
| # | ||
| # for row in test_input_output_table: | ||
| # self.envlayer.platform.linux_distribution = row[0] | ||
| # distro.os_release_attr = row[1] | ||
| # | ||
| # captured_output = io.StringIO() | ||
| # sys.stdout = captured_output | ||
| # result = self.envlayer.get_package_manager() | ||
| # sys.stdout = sys.__stdout__ | ||
| # self.assertEqual(row[2], captured_output.getvalue()) | ||
| # self.assertEqual(result, "") | ||
| # | ||
| # # restore | ||
| # self.__restore_mocks() | ||
|
|
| reboot_manager.start_reboot_if_required_and_time_available(maintenance_window.get_remaining_time_in_minutes(None, False)) | ||
|
|
||
| # Update certs if available | ||
| self.package_manager.update_certs() |
| self.get_kek_cert_status_cmd = "mokutil --kek | grep 'CN='" # todo: review if this should be changed to mokutil --kek 2>&1 | grep -oP 'CN=\K[^,]+' | sort -u | tr '\n' '; ' || echo "none" | ||
| self.get_db_cert_status_cmd = "mokutil --db | grep 'CN='" |
| @abstractmethod | ||
| def update_certs(self): | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_install_mokutil(self): | ||
| """ Attempts to install mokutil """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def fetch_current_certs(self, cert_type, get_cert_status_cmd, raise_on_exception=False): | ||
| # """ Fetches the status of the certificates on the machine """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def is_latest_cert_installed(self, status_code, status_output): | ||
| """ Checks if the latest certs are already installed on the machine based on mokutil output """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_update_certs(self): | ||
| """ Attempts to update certificate status """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def are_latest_certs_present(self): | ||
| pass |
| # region Update certificates in factory defaults | ||
| @abstractmethod | ||
| def update_certs(self): | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_install_mokutil(self): | ||
| """ Attempts to install mokutil """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def fetch_current_certs(self, cert_type, get_cert_status_cmd, raise_on_exception=False): | ||
| # """ Fetches the status of the certificates on the machine """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def is_latest_cert_installed(self, status_code, status_output): | ||
| """ Checks if the latest certs are already installed on the machine based on mokutil output """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_update_certs(self): | ||
| """ Attempts to update certificate status """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def are_latest_certs_present(self): | ||
| pass | ||
| # endregion |
| @abstractmethod | ||
| def update_certs(self): | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_install_mokutil(self): | ||
| """ Attempts to install mokutil """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def fetch_current_certs(self, cert_type, get_cert_status_cmd, raise_on_exception=False): | ||
| # """ Fetches the status of the certificates on the machine """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def is_latest_cert_installed(self, status_code, status_output): | ||
| """ Checks if the latest certs are already installed on the machine based on mokutil output """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def try_update_certs(self): | ||
| """ Attempts to update certificate status """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def are_latest_certs_present(self): | ||
| pass |
| # region Update certificates in factory defaults | ||
| @abstractmethod | ||
| def try_install_mokutil(self): | ||
| # type: () -> bool | ||
| """ Attempts to install mokutil """ | ||
| cmd = self.install_mokutil_cmd | ||
| self.composite_logger.log_verbose('[APM] Invoking install mokutil command [Command={0}]'.format(cmd)) | ||
| out, code = self.invoke_package_manager_advanced(cmd, raise_on_exception=False) | ||
| self.composite_logger.log_debug('[APM] Invoked install mokutil exists. [Command={0}][Code={1}][Output={2}]'.format(cmd, str(code), str(out))) | ||
| return code == 0 | ||
|
|
||
| @abstractmethod | ||
| def fetch_current_certs(self, cert_type, get_cert_status_cmd, raise_on_exception=False): | ||
| # """ Fetches the status of the certificates on the machine """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def is_latest_cert_installed(self, status_code, status_output): | ||
| """ Checks if the latest certs are already installed on the machine based on mokutil output """ | ||
| pass | ||
|
|
||
| def try_update_certs(self): | ||
| """ Attempts to update certificate status """ | ||
| self.composite_logger.log("[APM][Certs] Starting cert update flow.") | ||
| if self.are_latest_certs_present(): | ||
| self.composite_logger.log("[APM][Certs] Latest certs already present. Skipping.") | ||
| return True | ||
|
|
||
| success = False | ||
| try: | ||
| # shell/file steps | ||
| self.__run_cert_shell_command(self.add_proposed_repo_cmd, "AddProposedRepo", raise_on_error=True) | ||
| self.__run_cert_shell_command(self.create_proposed_pin_cmd, "CreateAptPin", raise_on_error=True) | ||
|
|
||
| # apt/package-manager steps | ||
| self.__run_cert_apt_command(self.apt_update_cmd, "AptUpdateAfterRepo", raise_on_error=True) | ||
| self.__run_cert_apt_command(self.install_fwupd_from_proposed_cmd, "InstallFwupdFromProposed", raise_on_error=True) | ||
|
|
||
| # shell fwupd steps | ||
| self.__run_cert_shell_command(self.fwupd_refresh_cmd, "FwupdRefresh", raise_on_error=True) | ||
| self.__run_cert_shell_command(self.fwupd_update_cmd, "FwupdUpdate", raise_on_error=True) | ||
|
|
||
| if self.are_latest_certs_present(): | ||
| self.composite_logger.log("[APM][Certs] Cert update completed and verified.") | ||
| success = True | ||
| else: | ||
| msg = "[APM][Certs] Cert update commands completed but latest certs not detected." | ||
| self.composite_logger.log_error(msg) | ||
| self.status_handler.add_error_to_status(msg, Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE) | ||
|
|
||
| except Exception as error: | ||
| self.composite_logger.log_error( | ||
| "[APM][Certs] Exception during cert update flow. [Error={0}]".format(repr(error))) | ||
|
|
||
| finally: | ||
| # best-effort cleanup | ||
| self.__run_cert_shell_command(self.remove_proposed_pin_cmd, "CleanupAptPin", raise_on_error=False) | ||
| self.__run_cert_shell_command(self.remove_proposed_repo_cmd, "CleanupProposedRepo", raise_on_error=False) | ||
| self.__run_cert_apt_command(self.apt_update_cmd, "AptUpdateAfterCleanup", raise_on_error=False) | ||
|
|
||
| if success: | ||
| self.status_handler.set_reboot_pending(self.is_reboot_pending()) | ||
|
|
||
| return success | ||
|
|
||
| @abstractmethod | ||
| def are_latest_certs_present(self): | ||
| pass |
| self.proposed_repo_file_path = "/etc/apt/sources.list.d/proposed.list" | ||
| self.proposed_pin_file_path = "/etc/apt/preferences.d/proposed" | ||
| self.add_proposed_repo_cmd = ( | ||
| "bash -c 'echo \"deb http://archive.ubuntu.com/ubuntu/ " |
| import re | ||
| import shutil | ||
| import sys | ||
| from abc import abstractmethod |
| import os | ||
| import re | ||
| import time | ||
| from abc import abstractmethod |
| import json | ||
| import os | ||
| import re | ||
| from abc import abstractmethod |
| pass | ||
|
|
||
| # region Update certificates in factory defaults | ||
| def update_certs(self): |
There was a problem hiding this comment.
Lets add VM security type condition too. This update should not be done in CVMs in current form.
Updating package managers