From c6150384c0985d25ffe96f573a5c9df38fd0c5aa Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Mon, 11 May 2026 16:42:54 -0400 Subject: [PATCH 1/8] Skeleton PR for dnf5 linux4 implementation --- .../src/bootstrap/ConfigurationFactory.py | 6 +- src/core/src/bootstrap/Constants.py | 1 + src/core/src/bootstrap/EnvLayer.py | 26 +- .../package_managers/AzL4DnfPackageManager.py | 232 ++++++++++++++++++ 4 files changed, 258 insertions(+), 7 deletions(-) create mode 100644 src/core/src/package_managers/AzL4DnfPackageManager.py diff --git a/src/core/src/bootstrap/ConfigurationFactory.py b/src/core/src/bootstrap/ConfigurationFactory.py index 30bf7db0..db0972b5 100644 --- a/src/core/src/bootstrap/ConfigurationFactory.py +++ b/src/core/src/bootstrap/ConfigurationFactory.py @@ -38,6 +38,7 @@ from core.src.package_managers.AptitudePackageManager import AptitudePackageManager from core.src.package_managers.AzL3TdnfPackageManager import AzL3TdnfPackageManager +from core.src.package_managers.AzL4DnfPackageManager import AzL4DnfPackageManager from core.src.package_managers.YumPackageManager import YumPackageManager from core.src.package_managers.ZypperPackageManager import ZypperPackageManager @@ -70,16 +71,19 @@ def __init__(self, log_file_path, events_folder, telemetry_supported): self.configurations = { 'apt_prod_config': self.new_prod_configuration(Constants.APT, AptitudePackageManager), + 'dnf_prod_config': self.new_prod_configuration(Constants.DNF, AzL4DnfPackageManager), 'tdnf_prod_config': self.new_prod_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_prod_config': self.new_prod_configuration(Constants.YUM, YumPackageManager), 'zypper_prod_config': self.new_prod_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_dev_config': self.new_dev_configuration(Constants.APT, AptitudePackageManager), + 'dnf_dev_config': self.new_dev_configuration(Constants.DNF, AzL4DnfPackageManager), 'tdnf_dev_config': self.new_dev_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_dev_config': self.new_dev_configuration(Constants.YUM, YumPackageManager), 'zypper_dev_config': self.new_dev_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_test_config': self.new_test_configuration(Constants.APT, AptitudePackageManager), + 'dnf_test_config': self.new_test_configuration(Constants.DNF, AzL4DnfPackageManager), 'tdnf_test_config': self.new_test_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_test_config': self.new_test_configuration(Constants.YUM, YumPackageManager), 'zypper_test_config': self.new_test_configuration(Constants.ZYPPER, ZypperPackageManager) @@ -116,7 +120,7 @@ def get_configuration(self, env, package_manager_name): print ("Error: Environment configuration not supported - " + str(env)) return None - if str(package_manager_name) not in [Constants.APT, Constants.TDNF, Constants.YUM, Constants.ZYPPER]: + if str(package_manager_name) not in [Constants.APT, Constants.DNF, Constants.TDNF, Constants.YUM, Constants.ZYPPER]: print ("Error: Package manager configuration not supported - " + str(package_manager_name)) return None diff --git a/src/core/src/bootstrap/Constants.py b/src/core/src/bootstrap/Constants.py index 51da02c5..46fb0fe0 100644 --- a/src/core/src/bootstrap/Constants.py +++ b/src/core/src/bootstrap/Constants.py @@ -201,6 +201,7 @@ class StatusTruncationConfig(EnumBackport): # Package Managers APT = 'apt' + DNF = 'dnf' TDNF = 'tdnf' YUM = 'yum' ZYPPER = 'zypper' diff --git a/src/core/src/bootstrap/EnvLayer.py b/src/core/src/bootstrap/EnvLayer.py index f79f7b2c..f5dc35ff 100644 --- a/src/core/src/bootstrap/EnvLayer.py +++ b/src/core/src/bootstrap/EnvLayer.py @@ -49,7 +49,7 @@ def is_distro_azure_linux(distro_name): def is_distro_azure_linux_3_or_beyond(self): # type: () -> bool - """ Checks if the current distro is Azure Linux 3 """ + """ Checks if the current distro is Azure Linux 3 or greater""" if self.is_distro_azure_linux(self.platform.linux_distribution()): version = distro.os_release_attr('version') major = version.split('.')[0] if version else None @@ -63,12 +63,26 @@ def get_package_manager(self): return Constants.APT if self.is_distro_azure_linux(str(self.platform.linux_distribution())): - code, out = self.run_command_output('which tdnf', False, False) - if code == 0: - return Constants.TDNF + # Determine package manager based on Azure Linux version + version = distro.os_release_attr('version') + major = version.split('.')[0] if version else None + + # Azure Linux 4 uses DNF + if major is not None and int(major) >= 4: + code, out = self.run_command_output('which dnf', False, False) + if code == 0: + return 'dnf' + else: + print("Error: Expected package manager dnf not found on this Azure Linux 4 VM.") + return str() + # Azure Linux 3 uses TDNF else: - print("Error: Expected package manager tdnf not found on this Azure Linux VM.") - return str() + code, out = self.run_command_output('which tdnf', False, False) + if code == 0: + return Constants.TDNF + else: + print("Error: Expected package manager tdnf not found on this Azure Linux VM.") + return str() # choose default package manager package_manager_map = (('apt-get', Constants.APT), diff --git a/src/core/src/package_managers/AzL4DnfPackageManager.py b/src/core/src/package_managers/AzL4DnfPackageManager.py new file mode 100644 index 00000000..c16d1048 --- /dev/null +++ b/src/core/src/package_managers/AzL4DnfPackageManager.py @@ -0,0 +1,232 @@ +# Copyright 2026 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ + +"""AzL4DnfPackageManager for Azure Linux L4""" +from abc import ABCMeta +from core.src.bootstrap.Constants import Constants +from core.src.package_managers.PackageManager import PackageManager + + +class AzL4DnfPackageManager(PackageManager): + """Implementation of Azure Linux L4 DNF package management operations""" + + def __init__(self, env_layer, execution_config, composite_logger, telemetry_writer, status_handler): + super(AzL4DnfPackageManager, self).__init__(env_layer, execution_config, composite_logger, telemetry_writer, status_handler) + # TODO: Add AzL4 DNF specific initialization + self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf') + + __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) + + # ConfigurePatch Method + def refresh_repo(self): + """Refreshes the DNF repository cache and lists available updates by cleaning expired cache entries + Commands: + - sudo dnf clean expire-cache (cleans expired cache entries) + - sudo dnf -q check-update (checks for available updates) + """ + pass + + # AssessPatch method + def invoke_package_manager_advanced(self, command, raise_on_exception=True): + """Invokes the DNF package manager with standardized command execution, logging, and error handling + Parameters: + - command (string): The DNF command to execute + - raise_on_exception (boolean): Whether to raise exception on non-zero exit code + Returns: + - Tuple of (output, return_code) from the command execution + """ + pass + + # AssessPatch method + def get_all_updates(self, cached=False): + """Gets all missing updates available for the system and returns the cached updates list and versions list + Cache Check Logic: + - If cached=True and cache has data, return cached updates and versions immediately (high performance reuse) + - If cache miss or cached=False, execute the DNF command to get fresh updates and populate cache + Command: + - sudo dnf -q check-update (checks for all available updates) + 1. If cached=True and cache has data, return cached results + 2. Execute command, parse output, cache results + 3. Return all_updates_cached and all_update_versions_cached + """ + return [], [] + + # AssessPatch method + def get_security_updates(self): + """Gets all missing security updates available for the system and returns packages and versions list + Command: + - sudo dnf -q check-update --security (checks for available security updates only) + Returns: + - List of security package names + - List of corresponding security package versions + """ + pass + + # AssessPatch method + def get_other_updates(self): + """Gets missing (non-security) updates. Record log and return + """ + return [], [] + + def set_max_patch_publish_date(self, max_patch_publish_date=str()): + pass + + # Install Patch method + def get_composite_package_identifier(self, package_name, package_version): + """Creates a version+architecture-specific package identifier for install commands + Parameters: + - package_name (string): Name of the package (may include architecture) + - package_version (string): Version of the package + Returns: + - String: Composite package identifier (e.g., "package-1.0.0.x86_64") + """ + pass + + def install_updates_fail_safe(self, excluded_packages): + pass + + # AssessPatch method + def get_all_available_versions_of_package(self, package_name): + """Returns a list of all available versions of a package + Parameters: + - package_name (string): Name of the package to get versions for + Commands used: + - sudo dnf list --available (lists all available versions of the package) + Returns: + - List of all available package versions + """ + return [] + + # AssessPatch method + def is_package_version_installed(self, package_name, package_version): + """Checks if a specific package version is installed + Parameters: + - package_name (string): Name of the package + - package_version (string): Version of the package to check + Commands used: + - sudo dnf list installed (checks if specific package version is installed) + Returns: + - Boolean: True if the specific package version is installed, False otherwise + """ + pass + + + def get_dependent_list(self, packages): + """Returns dependent list for the list of packages + Parameters: + - packages (list): List of package names to get dependencies for + Commands used: + - sudo dnf install --assumeno --skip-broken (simulates installation to find dependencies without actually installing) + Returns: List of dependency package names required for the input packages + """ + pass + + def get_product_name(self, package_name): + pass + + def get_package_size(self, output): + """Retrieves package size from installation output string + Parameters: + - output (string): The output string from DNF installation command + Returns: + - String: Package size (e.g., "15 M") or UNKNOWN_PACKAGE_SIZE if not found + """ + pass + + # Install Patch method + def install_security_updates_azgps_coordinated(self): + """Installs security updates in Azure Linux 4 following strict safe deployment practices + Commands used: + - sudo dnf -y upgrade --security --skip-broken (installs security updates only) + Returns: + - Tuple of (return code, output) from the command execution + """ + pass + + def try_meet_azgps_coordinated_requirements(self): + """ + Do we need this for dnf5? + """ + return False + + # ConfigurePatch Method + def get_current_auto_os_patch_state(self): + """ Gets the current auto OS update patch state on the machine """ + pass + + # ConfigurePatch Method + def disable_auto_os_update(self): + """ + Disables auto OS updates on the machine only if they are enabled + Comments from yashna : The current VM with AzLinux4 installed doesnt have dnf automatic/auto OS updates installed. + Will we have this installed in other machines which leads to my question on whether we need this or not ? + """ + pass + + def backup_image_default_patch_configuration_if_not_exists(self): + """ + This method saves the original auto-update configuration so it can be restored later. + """ + pass + + def is_image_default_patch_configuration_backup_valid(self, image_default_patch_configuration_backup): + pass + + def update_os_patch_configuration_sub_setting(self, patch_configuration_sub_setting, value, patch_configuration_sub_setting_pattern_to_match): + pass + + # Post Install method/ Install Patch + def is_reboot_pending(self): + """Checks if there is a pending reboot on the machine + Returns: + - Boolean: True if reboot is pending, False otherwise + """ + pass + + # Post Install method / Install Patch + def do_processes_require_restart(self): + """Checks if processes require a restart due to updates + Commands used: + - sudo dnf -y install dnf-utils (installs dnf-utils if not already present) + - sudo LANG=en_US.UTF8 needs-restarting -r (checks if processes require restart) + Returns: + - Boolean: True if processes require restart, False otherwise + """ + pass + + def add_arch_dependencies(self, package_manager, package, version, packages, package_versions, package_and_dependencies, package_and_dependency_versions): + """ + Unnecessary for DNF because the package manager already handles multi-architecture dependencies automatically + Command Used to confirm above: sudo dnf -y install jq + """ + pass + + def set_security_esm_package_status(self, operation, packages): + """No-op for dnf, tdnf, yum and zypper """ + pass + + def separate_out_esm_packages(self, packages, package_versions): + """No-op for dnf, tdnf, yum and zypper """ + pass + + def get_package_install_expected_avg_time_in_seconds(self): + pass + + # ConfigurePatch method + def revert_auto_os_update_to_system_default(self): + """ Reverts the auto OS update patch state on the machine to its system default value, if one exists in our backup file """ + pass + From f11560ee475791f2bf77d9324f7b401f2286ae9a Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Mon, 11 May 2026 17:05:48 -0400 Subject: [PATCH 2/8] Address Copilot comments --- src/core/src/bootstrap/EnvLayer.py | 2 +- .../package_managers/AzL4DnfPackageManager.py | 54 +++++++++---------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/core/src/bootstrap/EnvLayer.py b/src/core/src/bootstrap/EnvLayer.py index f5dc35ff..8a8abf03 100644 --- a/src/core/src/bootstrap/EnvLayer.py +++ b/src/core/src/bootstrap/EnvLayer.py @@ -71,7 +71,7 @@ def get_package_manager(self): if major is not None and int(major) >= 4: code, out = self.run_command_output('which dnf', False, False) if code == 0: - return 'dnf' + return Constants.DNF else: print("Error: Expected package manager dnf not found on this Azure Linux 4 VM.") return str() diff --git a/src/core/src/package_managers/AzL4DnfPackageManager.py b/src/core/src/package_managers/AzL4DnfPackageManager.py index c16d1048..388a58e5 100644 --- a/src/core/src/package_managers/AzL4DnfPackageManager.py +++ b/src/core/src/package_managers/AzL4DnfPackageManager.py @@ -37,7 +37,7 @@ def refresh_repo(self): - sudo dnf clean expire-cache (cleans expired cache entries) - sudo dnf -q check-update (checks for available updates) """ - pass + raise NotImplementedError("DNF: refresh_repo not implemented yet") # AssessPatch method def invoke_package_manager_advanced(self, command, raise_on_exception=True): @@ -48,7 +48,7 @@ def invoke_package_manager_advanced(self, command, raise_on_exception=True): Returns: - Tuple of (output, return_code) from the command execution """ - pass + raise NotImplementedError("DNF: invoke_package_manager_advanced not implemented yet") # AssessPatch method def get_all_updates(self, cached=False): @@ -62,7 +62,7 @@ def get_all_updates(self, cached=False): 2. Execute command, parse output, cache results 3. Return all_updates_cached and all_update_versions_cached """ - return [], [] + raise NotImplementedError("DNF: get_all_updates not implemented yet") # AssessPatch method def get_security_updates(self): @@ -73,7 +73,7 @@ def get_security_updates(self): - List of security package names - List of corresponding security package versions """ - pass + raise NotImplementedError("DNF: get_security_updates not implemented yet") # AssessPatch method def get_other_updates(self): @@ -82,7 +82,7 @@ def get_other_updates(self): return [], [] def set_max_patch_publish_date(self, max_patch_publish_date=str()): - pass + raise NotImplementedError("DNF: set_max_patch_publish_date not implemented yet") # Install Patch method def get_composite_package_identifier(self, package_name, package_version): @@ -93,10 +93,10 @@ def get_composite_package_identifier(self, package_name, package_version): Returns: - String: Composite package identifier (e.g., "package-1.0.0.x86_64") """ - pass + raise NotImplementedError("DNF: get_composite_package_identifier not implemented yet") def install_updates_fail_safe(self, excluded_packages): - pass + raise NotImplementedError("DNF: install_updates_fail_safe not implemented yet") # AssessPatch method def get_all_available_versions_of_package(self, package_name): @@ -108,7 +108,7 @@ def get_all_available_versions_of_package(self, package_name): Returns: - List of all available package versions """ - return [] + raise NotImplementedError("DNF: get_all_available_versions_of_package not implemented yet") # AssessPatch method def is_package_version_installed(self, package_name, package_version): @@ -121,7 +121,7 @@ def is_package_version_installed(self, package_name, package_version): Returns: - Boolean: True if the specific package version is installed, False otherwise """ - pass + raise NotImplementedError("DNF: is_package_version_installed not implemented yet") def get_dependent_list(self, packages): @@ -132,10 +132,10 @@ def get_dependent_list(self, packages): - sudo dnf install --assumeno --skip-broken (simulates installation to find dependencies without actually installing) Returns: List of dependency package names required for the input packages """ - pass + raise NotImplementedError("DNF: get_dependent_list not implemented yet") def get_product_name(self, package_name): - pass + raise NotImplementedError("DNF: get_product_name not implemented yet") def get_package_size(self, output): """Retrieves package size from installation output string @@ -144,7 +144,7 @@ def get_package_size(self, output): Returns: - String: Package size (e.g., "15 M") or UNKNOWN_PACKAGE_SIZE if not found """ - pass + raise NotImplementedError("DNF: get_package_size not implemented yet") # Install Patch method def install_security_updates_azgps_coordinated(self): @@ -154,18 +154,18 @@ def install_security_updates_azgps_coordinated(self): Returns: - Tuple of (return code, output) from the command execution """ - pass + raise NotImplementedError("DNF: install_security_updates_azgps_coordinated not implemented yet") def try_meet_azgps_coordinated_requirements(self): """ - Do we need this for dnf5? + Do we need this for dnf? """ - return False + raise NotImplementedError("DNF: try_meet_azgps_coordinated_requirements not implemented yet") # ConfigurePatch Method def get_current_auto_os_patch_state(self): """ Gets the current auto OS update patch state on the machine """ - pass + raise NotImplementedError("DNF: get_current_auto_os_patch_state not implemented yet") # ConfigurePatch Method def disable_auto_os_update(self): @@ -174,19 +174,19 @@ def disable_auto_os_update(self): Comments from yashna : The current VM with AzLinux4 installed doesnt have dnf automatic/auto OS updates installed. Will we have this installed in other machines which leads to my question on whether we need this or not ? """ - pass + raise NotImplementedError("DNF: disable_auto_os_update not implemented yet") def backup_image_default_patch_configuration_if_not_exists(self): """ This method saves the original auto-update configuration so it can be restored later. """ - pass + raise NotImplementedError("DNF: backup_image_default_patch_configuration_if_not_exists not implemented yet") def is_image_default_patch_configuration_backup_valid(self, image_default_patch_configuration_backup): - pass + raise NotImplementedError("DNF: is_image_default_patch_configuration_backup_valid not implemented yet") def update_os_patch_configuration_sub_setting(self, patch_configuration_sub_setting, value, patch_configuration_sub_setting_pattern_to_match): - pass + raise NotImplementedError("DNF: update_os_patch_configuration_sub_setting not implemented yet") # Post Install method/ Install Patch def is_reboot_pending(self): @@ -194,7 +194,7 @@ def is_reboot_pending(self): Returns: - Boolean: True if reboot is pending, False otherwise """ - pass + raise NotImplementedError("DNF: is_reboot_pending not implemented yet") # Post Install method / Install Patch def do_processes_require_restart(self): @@ -205,28 +205,28 @@ def do_processes_require_restart(self): Returns: - Boolean: True if processes require restart, False otherwise """ - pass + raise NotImplementedError("DNF: do_processes_require_restart not implemented yet") def add_arch_dependencies(self, package_manager, package, version, packages, package_versions, package_and_dependencies, package_and_dependency_versions): """ Unnecessary for DNF because the package manager already handles multi-architecture dependencies automatically Command Used to confirm above: sudo dnf -y install jq """ - pass + return def set_security_esm_package_status(self, operation, packages): """No-op for dnf, tdnf, yum and zypper """ - pass + return def separate_out_esm_packages(self, packages, package_versions): """No-op for dnf, tdnf, yum and zypper """ - pass + return def get_package_install_expected_avg_time_in_seconds(self): - pass + raise NotImplementedError("DNF: get_package_install_expected_avg_time_in_seconds not implemented yet") # ConfigurePatch method def revert_auto_os_update_to_system_default(self): """ Reverts the auto OS update patch state on the machine to its system default value, if one exists in our backup file """ - pass + raise NotImplementedError("DNF: revert_auto_os_update_to_system_default not implemented yet") From 1b80cf7603a6f4ef61510f1c3d644ac215c37f9c Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Tue, 19 May 2026 13:31:10 -0400 Subject: [PATCH 3/8] Update class name --- src/core/src/bootstrap/ConfigurationFactory.py | 8 ++++---- .../{AzL4DnfPackageManager.py => DnfPackageManager.py} | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) rename src/core/src/package_managers/{AzL4DnfPackageManager.py => DnfPackageManager.py} (96%) diff --git a/src/core/src/bootstrap/ConfigurationFactory.py b/src/core/src/bootstrap/ConfigurationFactory.py index db0972b5..27897574 100644 --- a/src/core/src/bootstrap/ConfigurationFactory.py +++ b/src/core/src/bootstrap/ConfigurationFactory.py @@ -38,7 +38,7 @@ from core.src.package_managers.AptitudePackageManager import AptitudePackageManager from core.src.package_managers.AzL3TdnfPackageManager import AzL3TdnfPackageManager -from core.src.package_managers.AzL4DnfPackageManager import AzL4DnfPackageManager +from core.src.package_managers.DnfPackageManager import DnfPackageManager from core.src.package_managers.YumPackageManager import YumPackageManager from core.src.package_managers.ZypperPackageManager import ZypperPackageManager @@ -71,19 +71,19 @@ def __init__(self, log_file_path, events_folder, telemetry_supported): self.configurations = { 'apt_prod_config': self.new_prod_configuration(Constants.APT, AptitudePackageManager), - 'dnf_prod_config': self.new_prod_configuration(Constants.DNF, AzL4DnfPackageManager), + 'dnf_prod_config': self.new_prod_configuration(Constants.DNF, DnfPackageManager), 'tdnf_prod_config': self.new_prod_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_prod_config': self.new_prod_configuration(Constants.YUM, YumPackageManager), 'zypper_prod_config': self.new_prod_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_dev_config': self.new_dev_configuration(Constants.APT, AptitudePackageManager), - 'dnf_dev_config': self.new_dev_configuration(Constants.DNF, AzL4DnfPackageManager), + 'dnf_dev_config': self.new_dev_configuration(Constants.DNF, DnfPackageManager), 'tdnf_dev_config': self.new_dev_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_dev_config': self.new_dev_configuration(Constants.YUM, YumPackageManager), 'zypper_dev_config': self.new_dev_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_test_config': self.new_test_configuration(Constants.APT, AptitudePackageManager), - 'dnf_test_config': self.new_test_configuration(Constants.DNF, AzL4DnfPackageManager), + 'dnf_test_config': self.new_test_configuration(Constants.DNF, DnfPackageManager), 'tdnf_test_config': self.new_test_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_test_config': self.new_test_configuration(Constants.YUM, YumPackageManager), 'zypper_test_config': self.new_test_configuration(Constants.ZYPPER, ZypperPackageManager) diff --git a/src/core/src/package_managers/AzL4DnfPackageManager.py b/src/core/src/package_managers/DnfPackageManager.py similarity index 96% rename from src/core/src/package_managers/AzL4DnfPackageManager.py rename to src/core/src/package_managers/DnfPackageManager.py index 388a58e5..aa9c70a2 100644 --- a/src/core/src/package_managers/AzL4DnfPackageManager.py +++ b/src/core/src/package_managers/DnfPackageManager.py @@ -14,18 +14,18 @@ # # Requires Python 2.7+ -"""AzL4DnfPackageManager for Azure Linux L4""" +"""DnfPackageManager for Azure Linux L4 and RHEL 10""" from abc import ABCMeta from core.src.bootstrap.Constants import Constants from core.src.package_managers.PackageManager import PackageManager -class AzL4DnfPackageManager(PackageManager): - """Implementation of Azure Linux L4 DNF package management operations""" +class DnfPackageManager(PackageManager): + """Implementation of Azure Linux L4/RHEL 10 DNF package management operations""" def __init__(self, env_layer, execution_config, composite_logger, telemetry_writer, status_handler): - super(AzL4DnfPackageManager, self).__init__(env_layer, execution_config, composite_logger, telemetry_writer, status_handler) - # TODO: Add AzL4 DNF specific initialization + super(DnfPackageManager, self).__init__(env_layer, execution_config, composite_logger, telemetry_writer, status_handler) + # TODO: Add AzL4/Red hat 10 DNF specific initialization self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf') __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) From e42e6013300e3c027b8439d876b05882fba53ccb Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Wed, 20 May 2026 09:05:08 -0400 Subject: [PATCH 4/8] Iternation_1 --- .../src/package_managers/DnfPackageManager.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/core/src/package_managers/DnfPackageManager.py b/src/core/src/package_managers/DnfPackageManager.py index aa9c70a2..a419f5be 100644 --- a/src/core/src/package_managers/DnfPackageManager.py +++ b/src/core/src/package_managers/DnfPackageManager.py @@ -28,6 +28,9 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ # TODO: Add AzL4/Red hat 10 DNF specific initialization self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf') + # commands for DNF Automatic updates service + self.__init_constants_for_dnf_automatic() + __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) # ConfigurePatch Method @@ -230,3 +233,17 @@ def revert_auto_os_update_to_system_default(self): """ Reverts the auto OS update patch state on the machine to its system default value, if one exists in our backup file """ raise NotImplementedError("DNF: revert_auto_os_update_to_system_default not implemented yet") + # region auto OS updates + def __init_constants_for_dnf5_automatic(self): + self.dnf5_automatic_configuration_file_path = None + self.dnf5_automatic_install_check_cmd = 'rpm -qa | grep dnf5-plugin-automatic' + self.dnf5_automatic_enable_on_reboot_check_cmd = 'systemctl is-enabled dnf5-automatic.timer' + self.dnf5_automatic_disable_on_reboot_cmd = 'systemctl disable --now dnf5-automatic.timer' + self.dnf5_automatic_enable_on_reboot_cmd = 'systemctl enable --now dnf5-automatic.timer' + self.dnf5_automatic_config_pattern_match_text = None + # Detect them from ExecStart flags instead of a file: + self.dnf5_automatic_download_updates_identifier_text = '--downloadupdates' + self.dnf5_automatic_apply_updates_identifier_text = '--installupdates' + self.dnf5_automatic_enable_on_reboot_identifier_text = "enable_on_reboot" + self.dnf5_automatic_installation_state_identifier_text = "installation_state" + self.dnf5_auto_os_update_service = "dnf5-automatic" \ No newline at end of file From 9b9fd31d4aa81216eed14e8530840423fa5efcc8 Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Wed, 20 May 2026 14:29:36 -0400 Subject: [PATCH 5/8] First Iteration PR for Enablement/Disablement changes --- .../src/package_managers/DnfPackageManager.py | 141 ++++++++++++++++-- src/core/tests/Test_DnfPackageManager.py | 59 ++++++++ 2 files changed, 186 insertions(+), 14 deletions(-) create mode 100644 src/core/tests/Test_DnfPackageManager.py diff --git a/src/core/src/package_managers/DnfPackageManager.py b/src/core/src/package_managers/DnfPackageManager.py index a419f5be..fcab868b 100644 --- a/src/core/src/package_managers/DnfPackageManager.py +++ b/src/core/src/package_managers/DnfPackageManager.py @@ -28,8 +28,16 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ # TODO: Add AzL4/Red hat 10 DNF specific initialization self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf') + # auto OS updates + self.current_auto_os_update_service = None + self.enable_on_reboot_identifier_text = "" + self.enable_on_reboot_check_cmd = '' + self.enable_on_reboot_cmd = '' + self.installation_state_identifier_text = "" + self.install_check_cmd = "" + # commands for DNF Automatic updates service - self.__init_constants_for_dnf_automatic() + self.__init_constants_for_dnf5_automatic() __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) @@ -168,16 +176,79 @@ def try_meet_azgps_coordinated_requirements(self): # ConfigurePatch Method def get_current_auto_os_patch_state(self): """ Gets the current auto OS update patch state on the machine """ - raise NotImplementedError("DNF: get_current_auto_os_patch_state not implemented yet") + self.composite_logger.log("[DNF] Fetching the current automatic OS patch state on the machine...") + + current_auto_os_patch_state_for_dnf5_automatic = self.__get_current_auto_os_patch_state_for_dnf5_automatic() + + self.composite_logger.log("[DNF] OS patch state per auto OS update service: [dnf5-automatic={0}]".format(str(current_auto_os_patch_state_for_dnf5_automatic))) + + if current_auto_os_patch_state_for_dnf5_automatic == Constants.AutomaticOSPatchStates.ENABLED: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.ENABLED + elif current_auto_os_patch_state_for_dnf5_automatic == Constants.AutomaticOSPatchStates.DISABLED: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.DISABLED + else: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.UNKNOWN + + self.composite_logger.log_debug("[DNF] Overall Auto OS Patch State based on all auto OS update service states [OverallAutoOSPatchState={0}]".format(str(current_auto_os_patch_state))) + return current_auto_os_patch_state + # raise NotImplementedError("DNF: get_current_auto_os_patch_state not implemented yet") + + def __get_current_auto_os_patch_state_for_dnf5_automatic(self): + """ Gets current auto OS update patch state for dnf5-automatic """ + self.composite_logger.log_debug("[DNF] Fetching current automatic OS patch state in dnf5-automatic service. This includes checks on whether the service is installed, current auto patch enable state and whether it is set to enable on reboot") + self.__init_auto_update_for_dnf5_automatic() + + is_service_installed = self.is_auto_update_service_installed(self.install_check_cmd) + enable_on_reboot_value = False + + if is_service_installed: + enable_on_reboot_value = self.is_service_set_to_enable_on_reboot(self.enable_on_reboot_check_cmd) + + if enable_on_reboot_value: + return Constants.AutomaticOSPatchStates.ENABLED + else: + return Constants.AutomaticOSPatchStates.DISABLED # ConfigurePatch Method def disable_auto_os_update(self): - """ - Disables auto OS updates on the machine only if they are enabled - Comments from yashna : The current VM with AzLinux4 installed doesnt have dnf automatic/auto OS updates installed. - Will we have this installed in other machines which leads to my question on whether we need this or not ? - """ - raise NotImplementedError("DNF: disable_auto_os_update not implemented yet") + """ Disables auto OS updates on the machine only if they are enabled and logs the default settings the machine comes with """ + try: + self.composite_logger.log_verbose("[DNF] Disabling auto OS updates in all identified services...") + self.__disable_auto_os_update_for_dnf5_automatic() + self.composite_logger.log_debug("[DNF] Successfully disabled auto OS updates") + + except Exception as error: + self.composite_logger.log_error("[DNF] Could not disable auto OS updates. [Error={0}]".format(repr(error))) + raise + + def __disable_auto_os_update_for_dnf5_automatic(self): + """ Disables auto OS updates, using dnf5-automatic service, and logs the default settings the machine comes with """ + self.composite_logger.log_verbose("[DNF] Disabling auto OS updates using dnf5-automatic") + self.__init_auto_update_for_dnf5_automatic() + + #self.backup_image_default_patch_configuration_if_not_exists() - Will uncomment for later iterations + + if not self.is_auto_update_service_installed(self.dnf5_automatic_install_check_cmd): + self.composite_logger.log_debug("[DNF] Cannot disable as dnf5-automatic is not installed on the machine") + return + + self.composite_logger.log_verbose("[DNF] Preemptively disabling auto OS updates using dnf5-automatic") + self.disable_auto_update_on_reboot(self.dnf5_automatic_disable_on_reboot_cmd) + + self.composite_logger.log_debug("[DNF] Successfully disabled auto OS updates using dnf5-automatic") + + def disable_auto_update_on_reboot(self, command): + """ Disables auto update on reboot by executing systemctl command """ + self.composite_logger.log_verbose("[DNF] Disabling auto update on reboot. [Command={0}] ".format(command)) + code, out = self.env_layer.run_command_output(command, False, False) + + if code != 0: + self.composite_logger.log_error("[DNF][ERROR] Error disabling auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + error_msg = 'Unexpected return code (' + str(code) + ') on command: ' + command + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.OPERATION_FAILED) + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + else: + self.composite_logger.log_debug("[DNF] Disabled auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) def backup_image_default_patch_configuration_if_not_exists(self): """ @@ -235,15 +306,57 @@ def revert_auto_os_update_to_system_default(self): # region auto OS updates def __init_constants_for_dnf5_automatic(self): - self.dnf5_automatic_configuration_file_path = None self.dnf5_automatic_install_check_cmd = 'rpm -qa | grep dnf5-plugin-automatic' self.dnf5_automatic_enable_on_reboot_check_cmd = 'systemctl is-enabled dnf5-automatic.timer' self.dnf5_automatic_disable_on_reboot_cmd = 'systemctl disable --now dnf5-automatic.timer' self.dnf5_automatic_enable_on_reboot_cmd = 'systemctl enable --now dnf5-automatic.timer' - self.dnf5_automatic_config_pattern_match_text = None - # Detect them from ExecStart flags instead of a file: - self.dnf5_automatic_download_updates_identifier_text = '--downloadupdates' - self.dnf5_automatic_apply_updates_identifier_text = '--installupdates' self.dnf5_automatic_enable_on_reboot_identifier_text = "enable_on_reboot" self.dnf5_automatic_installation_state_identifier_text = "installation_state" - self.dnf5_auto_os_update_service = "dnf5-automatic" \ No newline at end of file + self.dnf5_auto_os_update_service = "dnf5-automatic" + + def __init_auto_update_for_dnf5_automatic(self): + """ Initializes all generic auto OS update variables with the config values for dnf5 automatic service """ + self.enable_on_reboot_identifier_text = self.dnf5_automatic_enable_on_reboot_identifier_text + self.installation_state_identifier_text = self.dnf5_automatic_installation_state_identifier_text + self.enable_on_reboot_check_cmd = self.dnf5_automatic_enable_on_reboot_check_cmd + self.enable_on_reboot_cmd = self.dnf5_automatic_enable_on_reboot_cmd + self.install_check_cmd = self.dnf5_automatic_install_check_cmd + self.current_auto_os_update_service = self.dnf5_auto_os_update_service + + def is_auto_update_service_installed(self, install_check_cmd): + """ Checks if the auto update service is installed on the VM """ + code, out = self.env_layer.run_command_output(install_check_cmd, False, False) + self.composite_logger.log_debug("[DNF] Checked if auto update service is installed. [Command={0}][Code={1}][Output={2}]".format(install_check_cmd, str(code), out)) + if len(out.strip()) > 0 and code == 0: + self.composite_logger.log_debug("[DNF] > Auto OS update service is installed on the machine") + return True + else: + self.composite_logger.log_debug("[DNF] > Auto OS update service is NOT installed on the machine") + return False + + def is_service_set_to_enable_on_reboot(self, command): + """ Checking if auto update is set to enable on reboot on the machine. An enable_on_reboot service will be activated (if currently inactive) on machine reboot """ + code, out = self.env_layer.run_command_output(command, False, False) + self.composite_logger.log_debug("[DNF] Checked if auto update service is set to enable on reboot. [Code={0}][Out={1}]".format(str(code), out)) + if len(out.strip()) > 0 and code == 0 and 'enabled' in out: + self.composite_logger.log_debug("[DNF] > Auto OS update service will enable on reboot") + return True + self.composite_logger.log_debug("[DNF] > Auto OS update service will NOT enable on reboot") + return False + + def enable_auto_update_on_reboot(self): + """ Enables machine default auto update on reboot """ + # type () -> None + command = self.enable_on_reboot_cmd + self.composite_logger.log_verbose("[DNF] Enabling auto update on reboot. [Command={0}] ".format(command)) + code, out = self.env_layer.run_command_output(command, False, False) + + if code != 0: + self.composite_logger.log_error("[DNF][ERROR] Error enabling auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + error_msg = 'Unexpected return code (' + str(code) + ') on command: ' + command + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.OPERATION_FAILED) + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + else: + self.composite_logger.log_debug("[DNF] Enabled auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + + # endregion diff --git a/src/core/tests/Test_DnfPackageManager.py b/src/core/tests/Test_DnfPackageManager.py new file mode 100644 index 00000000..e3601213 --- /dev/null +++ b/src/core/tests/Test_DnfPackageManager.py @@ -0,0 +1,59 @@ +# Copyright 2026 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ +import unittest + +from core.src.bootstrap.Constants import Constants +from core.tests.library.ArgumentComposer import ArgumentComposer +from core.tests.library.RuntimeCompositor import RuntimeCompositor + + +class TestDnfPackageManager(unittest.TestCase): + def setUp(self): + self.runtime = RuntimeCompositor(ArgumentComposer().get_composed_arguments(), True, Constants.DNF) + self.container = self.runtime.container + + def tearDown(self): + self.runtime.stop() + + def test_disable_auto_os_updates_with_uninstalled_services(self): + """Test disable_auto_os_update when dnf5-automatic is not installed""" + self.runtime.set_legacy_test_type('SadPath') + package_manager = self.container.get('package_manager') + + # Should complete without error even when service is not installed + package_manager.disable_auto_os_update() + + def test_get_current_auto_os_patch_state_with_uninstalled_services(self): + """Test get_current_auto_os_patch_state when dnf5-automatic is not installed""" + self.runtime.set_legacy_test_type('SadPath') + package_manager = self.container.get('package_manager') + + current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.DISABLED) + + def test_get_current_auto_os_patch_state_with_installed_services_and_state_disabled(self): + """Test get_current_auto_os_patch_state when dnf5-automatic is installed but disabled""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + # Restore original implementation so package manager logic (rpm + systemctl checks) runs + package_manager.get_current_auto_os_patch_state = self.runtime.backup_get_current_auto_os_patch_state + + current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.DISABLED) + +if __name__ == '__main__': + unittest.main() + From b950de5e8124e183757a5797255fdaebe37e13b6 Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Thu, 21 May 2026 18:38:56 -0400 Subject: [PATCH 6/8] Iteration_2 : Auto assessment --- .../src/package_managers/DnfPackageManager.py | 167 ++++++++++++++---- src/core/tests/Test_DnfPackageManager.py | 61 +++++++ 2 files changed, 198 insertions(+), 30 deletions(-) diff --git a/src/core/src/package_managers/DnfPackageManager.py b/src/core/src/package_managers/DnfPackageManager.py index fcab868b..0f1f8db6 100644 --- a/src/core/src/package_managers/DnfPackageManager.py +++ b/src/core/src/package_managers/DnfPackageManager.py @@ -16,17 +16,22 @@ """DnfPackageManager for Azure Linux L4 and RHEL 10""" from abc import ABCMeta +import json from core.src.bootstrap.Constants import Constants from core.src.package_managers.PackageManager import PackageManager class DnfPackageManager(PackageManager): - """Implementation of Azure Linux L4/RHEL 10 DNF package management operations""" + """Implementation of Azure Linux L4/RHEL 10 DNF5 package management operations""" def __init__(self, env_layer, execution_config, composite_logger, telemetry_writer, status_handler): super(DnfPackageManager, self).__init__(env_layer, execution_config, composite_logger, telemetry_writer, status_handler) - # TODO: Add AzL4/Red hat 10 DNF specific initialization - self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf') + + self.cmd_clean_cache = "sudo dnf5 -q clean expire-cache" + self.cmd_repo_refresh = "sudo dnf5 -q check-update " + + # TODO: Add AzL4/Red hat 10 DNF5 specific initialization + self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf5') # auto OS updates self.current_auto_os_update_service = None @@ -35,31 +40,42 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ self.enable_on_reboot_cmd = '' self.installation_state_identifier_text = "" self.install_check_cmd = "" + self.apply_updates_enabled = "Enabled" + self.apply_updates_disabled = "Disabled" + self.apply_updates_unknown = "Unknown" # commands for DNF Automatic updates service self.__init_constants_for_dnf5_automatic() __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) - # ConfigurePatch Method def refresh_repo(self): - """Refreshes the DNF repository cache and lists available updates by cleaning expired cache entries - Commands: - - sudo dnf clean expire-cache (cleans expired cache entries) - - sudo dnf -q check-update (checks for available updates) - """ - raise NotImplementedError("DNF: refresh_repo not implemented yet") + self.composite_logger.log("[DNF] Refreshing local repo...") + self.invoke_package_manager(self.cmd_clean_cache) + self.invoke_package_manager(self.cmd_repo_refresh) # AssessPatch method def invoke_package_manager_advanced(self, command, raise_on_exception=True): - """Invokes the DNF package manager with standardized command execution, logging, and error handling - Parameters: - - command (string): The DNF command to execute - - raise_on_exception (boolean): Whether to raise exception on non-zero exit code - Returns: - - Tuple of (output, return_code) from the command execution - """ - raise NotImplementedError("DNF: invoke_package_manager_advanced not implemented yet") + self.composite_logger.log_verbose("[DNF] Invoking package manager. [Command={0}]".format(str(command))) + # env_layer.run_command_output returns (code, output) + code, out = self.env_layer.run_command_output(command, False, False) + + # Treat exit code 0 as success. No updates available. + if code == 0: + self.composite_logger.log_debug('[DNF] Invoked package manager. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) + + elif code == 100: + # Updates available + self.composite_logger.log_debug( + '[DNF] Updates available. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) + else: + self.composite_logger.log_warning('[ERROR] Customer environment error. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) + error_msg = "Customer environment error: Investigate and resolve unexpected return code ({0}) from package manager on command: {1}".format(str(code), command) + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.PACKAGE_MANAGER_FAILURE) + if raise_on_exception: + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + + return out, code # AssessPatch method def get_all_updates(self, cached=False): @@ -173,7 +189,6 @@ def try_meet_azgps_coordinated_requirements(self): """ raise NotImplementedError("DNF: try_meet_azgps_coordinated_requirements not implemented yet") - # ConfigurePatch Method def get_current_auto_os_patch_state(self): """ Gets the current auto OS update patch state on the machine """ self.composite_logger.log("[DNF] Fetching the current automatic OS patch state on the machine...") @@ -191,25 +206,22 @@ def get_current_auto_os_patch_state(self): self.composite_logger.log_debug("[DNF] Overall Auto OS Patch State based on all auto OS update service states [OverallAutoOSPatchState={0}]".format(str(current_auto_os_patch_state))) return current_auto_os_patch_state - # raise NotImplementedError("DNF: get_current_auto_os_patch_state not implemented yet") def __get_current_auto_os_patch_state_for_dnf5_automatic(self): """ Gets current auto OS update patch state for dnf5-automatic """ self.composite_logger.log_debug("[DNF] Fetching current automatic OS patch state in dnf5-automatic service. This includes checks on whether the service is installed, current auto patch enable state and whether it is set to enable on reboot") self.__init_auto_update_for_dnf5_automatic() - is_service_installed = self.is_auto_update_service_installed(self.install_check_cmd) - enable_on_reboot_value = False + is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value = self.__get_current_auto_os_updates_setting_on_machine() - if is_service_installed: - enable_on_reboot_value = self.is_service_set_to_enable_on_reboot(self.enable_on_reboot_check_cmd) + if not is_service_installed: + return Constants.AutomaticOSPatchStates.DISABLED if enable_on_reboot_value: return Constants.AutomaticOSPatchStates.ENABLED - else: - return Constants.AutomaticOSPatchStates.DISABLED - # ConfigurePatch Method + return Constants.AutomaticOSPatchStates.DISABLED + def disable_auto_os_update(self): """ Disables auto OS updates on the machine only if they are enabled and logs the default settings the machine comes with """ try: @@ -226,7 +238,7 @@ def __disable_auto_os_update_for_dnf5_automatic(self): self.composite_logger.log_verbose("[DNF] Disabling auto OS updates using dnf5-automatic") self.__init_auto_update_for_dnf5_automatic() - #self.backup_image_default_patch_configuration_if_not_exists() - Will uncomment for later iterations + self.backup_image_default_patch_configuration_if_not_exists() if not self.is_auto_update_service_installed(self.dnf5_automatic_install_check_cmd): self.composite_logger.log_debug("[DNF] Cannot disable as dnf5-automatic is not installed on the machine") @@ -254,10 +266,58 @@ def backup_image_default_patch_configuration_if_not_exists(self): """ This method saves the original auto-update configuration so it can be restored later. """ - raise NotImplementedError("DNF: backup_image_default_patch_configuration_if_not_exists not implemented yet") + try: + self.composite_logger.log_debug("[DNF] Ensuring there is a backup of the default patch state for [AutoOSUpdateService={0}]".format(str(self.current_auto_os_update_service))) + + # read existing backup since it also contains backup from other update services. We need to preserve any existing data within the backup file + image_default_patch_configuration_backup = {} + if self.image_default_patch_configuration_backup_exists(): + try: + image_default_patch_configuration_backup = json.loads(self.env_layer.file_system.read_with_retry(self.image_default_patch_configuration_backup_path)) + except Exception as error: + self.composite_logger.log_error("[DNF] Unable to read backup for default patch state. Will attempt to re-write. [Exception={0}]".format(repr(error))) + + # verify if existing backup is valid if not, write to backup + is_backup_valid = self.is_image_default_patch_configuration_backup_valid(image_default_patch_configuration_backup) + if is_backup_valid: + self.composite_logger.log_debug("[DNF] Since extension has a valid backup, no need to log the current settings again. [Default Auto OS update settings={0}] [File path={1}]" + .format(str(image_default_patch_configuration_backup), self.image_default_patch_configuration_backup_path)) + else: + self.composite_logger.log_debug("[DNF] Since the backup is invalid, will add a new backup with the current auto OS update settings") + self.composite_logger.log_debug("[DNF] Fetching current auto OS update settings for [AutoOSUpdateService={0}]".format(str(self.current_auto_os_update_service))) + is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value = self.__get_current_auto_os_updates_setting_on_machine() + + backup_image_default_patch_configuration_json_to_add = { + self.current_auto_os_update_service: { + self.enable_on_reboot_identifier_text: enable_on_reboot_value, + self.installation_state_identifier_text: is_service_installed + } + } + + image_default_patch_configuration_backup.update(backup_image_default_patch_configuration_json_to_add) + + self.composite_logger.log_debug("[DNF] Logging default system configuration settings for auto OS updates. [Settings={0}] [Log file path={1}]" + .format(str(image_default_patch_configuration_backup), self.image_default_patch_configuration_backup_path)) + self.env_layer.file_system.write_with_retry(self.image_default_patch_configuration_backup_path, '{0}'.format(json.dumps(image_default_patch_configuration_backup)), mode='w+') + except Exception as error: + error_message = "[DNF] Exception during fetching and logging default auto update settings on the machine. [Exception={0}]".format(repr(error)) + self.composite_logger.log_error(error_message) + self.status_handler.add_error_to_status(error_message, Constants.PatchOperationErrorCodes.DEFAULT_ERROR) + raise def is_image_default_patch_configuration_backup_valid(self, image_default_patch_configuration_backup): - raise NotImplementedError("DNF: is_image_default_patch_configuration_backup_valid not implemented yet") + # Validate backup JSON for dnf5 automatic service + try: + if self.dnf5_auto_os_update_service in image_default_patch_configuration_backup \ + and self.dnf5_automatic_enable_on_reboot_identifier_text in image_default_patch_configuration_backup[self.dnf5_auto_os_update_service] \ + and self.dnf5_automatic_installation_state_identifier_text in image_default_patch_configuration_backup[self.dnf5_auto_os_update_service]: + self.composite_logger.log_debug("[DNF] Extension has a valid backup for default dnf5-automatic configuration settings") + return True + else: + self.composite_logger.log_debug("[DNF] Extension does not have a valid backup for default dnf5-automatic configuration settings") + return False + except Exception: + return False def update_os_patch_configuration_sub_setting(self, patch_configuration_sub_setting, value, patch_configuration_sub_setting_pattern_to_match): raise NotImplementedError("DNF: update_os_patch_configuration_sub_setting not implemented yet") @@ -306,16 +366,24 @@ def revert_auto_os_update_to_system_default(self): # region auto OS updates def __init_constants_for_dnf5_automatic(self): + self.dnf5_automatic_configuration_service = 'systemctl cat dnf5-automatic.service' self.dnf5_automatic_install_check_cmd = 'rpm -qa | grep dnf5-plugin-automatic' self.dnf5_automatic_enable_on_reboot_check_cmd = 'systemctl is-enabled dnf5-automatic.timer' self.dnf5_automatic_disable_on_reboot_cmd = 'systemctl disable --now dnf5-automatic.timer' self.dnf5_automatic_enable_on_reboot_cmd = 'systemctl enable --now dnf5-automatic.timer' + self.dnf5_automatic_config_pattern_match_text = None + # Detect them from ExecStart flags instead of a file: + self.dnf5_automatic_download_updates_identifier_text = '--downloadupdates' + self.dnf5_automatic_apply_updates_identifier_text = '--installupdates' self.dnf5_automatic_enable_on_reboot_identifier_text = "enable_on_reboot" self.dnf5_automatic_installation_state_identifier_text = "installation_state" self.dnf5_auto_os_update_service = "dnf5-automatic" def __init_auto_update_for_dnf5_automatic(self): """ Initializes all generic auto OS update variables with the config values for dnf5 automatic service """ + self.os_patch_configuration_settings_read_cmd = self.dnf5_automatic_configuration_service + self.download_updates_identifier_text = self.dnf5_automatic_download_updates_identifier_text + self.apply_updates_identifier_text = self.dnf5_automatic_apply_updates_identifier_text self.enable_on_reboot_identifier_text = self.dnf5_automatic_enable_on_reboot_identifier_text self.installation_state_identifier_text = self.dnf5_automatic_installation_state_identifier_text self.enable_on_reboot_check_cmd = self.dnf5_automatic_enable_on_reboot_check_cmd @@ -323,6 +391,45 @@ def __init_auto_update_for_dnf5_automatic(self): self.install_check_cmd = self.dnf5_automatic_install_check_cmd self.current_auto_os_update_service = self.dnf5_auto_os_update_service + def __get_current_auto_os_updates_setting_on_machine(self): + """ Gets all the update settings related to auto OS updates via dnf """ + try: + download_updates_value = "" + apply_updates_value = "" + is_service_installed = False + enable_on_reboot_value = False + + # get install state + if not self.is_auto_update_service_installed(self.install_check_cmd): + return is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value + + is_service_installed = True + enable_on_reboot_value = self.is_service_set_to_enable_on_reboot(self.enable_on_reboot_check_cmd) + + self.composite_logger.log_debug( + "[DNF] Checking if auto updates are currently enabled...") + + # Check systemd service unit file for ExecStart flags to determine current settings + # Get the dnf5-automatic.service configuration + code, unit_output = self.env_layer.run_command_output(self.os_patch_configuration_settings_read_cmd, False, False) + + if code == 0: + self.composite_logger.log_debug( + "[DNF] Retrieved dnf5-automatic service unit configuration...") + + # ExecStart line format example: ExecStart=/usr/bin/dnf5 automatic --timer + for line in unit_output.split('\n'): + if line.strip().startswith('ExecStart=') and 'dnf5 automatic' in line: + self.composite_logger.log_debug("[DNF] ExecStart line: {0}".format(line)) + break + + return is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value + + except Exception as error: + raise Exception( + "[DNF] Error occurred in fetching current auto OS update settings from the machine (dnf5). [Exception={0}]".format( + repr(error))) + def is_auto_update_service_installed(self, install_check_cmd): """ Checks if the auto update service is installed on the VM """ code, out = self.env_layer.run_command_output(install_check_cmd, False, False) diff --git a/src/core/tests/Test_DnfPackageManager.py b/src/core/tests/Test_DnfPackageManager.py index e3601213..280fe1ac 100644 --- a/src/core/tests/Test_DnfPackageManager.py +++ b/src/core/tests/Test_DnfPackageManager.py @@ -28,6 +28,13 @@ def setUp(self): def tearDown(self): self.runtime.stop() + + def test_refresh_repo(self): + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_manager.refresh_repo_safely() + def test_disable_auto_os_updates_with_uninstalled_services(self): """Test disable_auto_os_update when dnf5-automatic is not installed""" self.runtime.set_legacy_test_type('SadPath') @@ -44,6 +51,58 @@ def test_get_current_auto_os_patch_state_with_uninstalled_services(self): current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.DISABLED) + def test_get_current_auto_os_patch_state_with_installed_services_and_state_enabled(self): + """Test get_current_auto_os_patch_state for DNF when service is installed and enabled""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + package_manager.get_current_auto_os_patch_state = self.runtime.backup_get_current_auto_os_patch_state + + # Mock the systemctl cat output with enabled flags + systemctl_cat_output = '''[Unit] + Description=Run dnf5 automatic updates + After=network-online.target + + [Timer] + OnBootSec=1h + OnUnitActiveSec=24h + AccuracySec=1h + Persistent=true + + [Install] + WantedBy=timers.target + + [Service] + Type=oneshot + ExecStart=/usr/bin/dnf5 automatic --timer + StandardOutput=journal + StandardError=journal + ''' + + # Mock the run_command_output for systemctl cat + backup_run_command_output = self.runtime.env_layer.run_command_output + + def mock_systemctl_cat(cmd, no_output=False, chk_err=False): + if 'rpm -qa | grep dnf5-plugin-automatic' in cmd: + return 0, 'dnf5-plugin-automatic-xyz' + + # Mock timer enabled + elif 'systemctl is-enabled dnf5-automatic.timer' in cmd: + return 0, 'enabled' + + # Mock service file + elif 'systemctl cat dnf5-automatic' in cmd: + return 0, systemctl_cat_output + + return backup_run_command_output(cmd, no_output, chk_err) + + self.runtime.env_layer.run_command_output = mock_systemctl_cat + + try: + current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.ENABLED) + finally: + self.runtime.env_layer.run_command_output = backup_run_command_output + def test_get_current_auto_os_patch_state_with_installed_services_and_state_disabled(self): """Test get_current_auto_os_patch_state when dnf5-automatic is installed but disabled""" self.runtime.set_legacy_test_type('HappyPath') @@ -52,6 +111,8 @@ def test_get_current_auto_os_patch_state_with_installed_services_and_state_disab package_manager.get_current_auto_os_patch_state = self.runtime.backup_get_current_auto_os_patch_state current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + + self.assertFalse(package_manager.image_default_patch_configuration_backup_exists()) self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.DISABLED) if __name__ == '__main__': From 43cf0fa984160a10e3ce5fddab7844ee53e7e177 Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Tue, 26 May 2026 15:44:25 -0400 Subject: [PATCH 7/8] Address Code Review and added new code --- .../src/bootstrap/ConfigurationFactory.py | 8 +- src/core/src/bootstrap/Constants.py | 2 +- .../package_managers/Dnf5PackageManager.py | 746 ++++++++++++++++++ .../src/package_managers/DnfPackageManager.py | 469 ----------- ...eManager.py => Test_Dnf5PackageManager.py} | 99 ++- 5 files changed, 849 insertions(+), 475 deletions(-) create mode 100644 src/core/src/package_managers/Dnf5PackageManager.py delete mode 100644 src/core/src/package_managers/DnfPackageManager.py rename src/core/tests/{Test_DnfPackageManager.py => Test_Dnf5PackageManager.py} (51%) diff --git a/src/core/src/bootstrap/ConfigurationFactory.py b/src/core/src/bootstrap/ConfigurationFactory.py index 27897574..07281a24 100644 --- a/src/core/src/bootstrap/ConfigurationFactory.py +++ b/src/core/src/bootstrap/ConfigurationFactory.py @@ -38,7 +38,7 @@ from core.src.package_managers.AptitudePackageManager import AptitudePackageManager from core.src.package_managers.AzL3TdnfPackageManager import AzL3TdnfPackageManager -from core.src.package_managers.DnfPackageManager import DnfPackageManager +from core.src.package_managers.Dnf5PackageManager import Dnf5PackageManager from core.src.package_managers.YumPackageManager import YumPackageManager from core.src.package_managers.ZypperPackageManager import ZypperPackageManager @@ -71,19 +71,19 @@ def __init__(self, log_file_path, events_folder, telemetry_supported): self.configurations = { 'apt_prod_config': self.new_prod_configuration(Constants.APT, AptitudePackageManager), - 'dnf_prod_config': self.new_prod_configuration(Constants.DNF, DnfPackageManager), + 'dnf5_prod_config': self.new_prod_configuration(Constants.DNF, Dnf5PackageManager), 'tdnf_prod_config': self.new_prod_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_prod_config': self.new_prod_configuration(Constants.YUM, YumPackageManager), 'zypper_prod_config': self.new_prod_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_dev_config': self.new_dev_configuration(Constants.APT, AptitudePackageManager), - 'dnf_dev_config': self.new_dev_configuration(Constants.DNF, DnfPackageManager), + 'dnf5_dev_config': self.new_dev_configuration(Constants.DNF, Dnf5PackageManager), 'tdnf_dev_config': self.new_dev_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_dev_config': self.new_dev_configuration(Constants.YUM, YumPackageManager), 'zypper_dev_config': self.new_dev_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_test_config': self.new_test_configuration(Constants.APT, AptitudePackageManager), - 'dnf_test_config': self.new_test_configuration(Constants.DNF, DnfPackageManager), + 'dnf5_test_config': self.new_test_configuration(Constants.DNF, Dnf5PackageManager), 'tdnf_test_config': self.new_test_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_test_config': self.new_test_configuration(Constants.YUM, YumPackageManager), 'zypper_test_config': self.new_test_configuration(Constants.ZYPPER, ZypperPackageManager) diff --git a/src/core/src/bootstrap/Constants.py b/src/core/src/bootstrap/Constants.py index 204ccd24..ed524026 100644 --- a/src/core/src/bootstrap/Constants.py +++ b/src/core/src/bootstrap/Constants.py @@ -201,7 +201,7 @@ class StatusTruncationConfig(EnumBackport): # Package Managers APT = 'apt' - DNF = 'dnf' + DNF = 'dnf5' TDNF = 'tdnf' YUM = 'yum' ZYPPER = 'zypper' diff --git a/src/core/src/package_managers/Dnf5PackageManager.py b/src/core/src/package_managers/Dnf5PackageManager.py new file mode 100644 index 00000000..31e69bff --- /dev/null +++ b/src/core/src/package_managers/Dnf5PackageManager.py @@ -0,0 +1,746 @@ +# Copyright 2026 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ + +"""DnfPackageManager for Azure Linux and RHEL""" +import json +import re + +from abc import ABCMeta +from core.src.bootstrap.Constants import Constants +from core.src.package_managers.PackageManager import PackageManager + + +class Dnf5PackageManager(PackageManager): + """Implementation of Azure Linux/ RHEL package management operations""" + + def __init__(self, env_layer, execution_config, composite_logger, telemetry_writer, status_handler): + super(Dnf5PackageManager, self).__init__(env_layer, execution_config, composite_logger, telemetry_writer, status_handler) + # Repo refresh + self.cmd_clean_cache = "sudo dnf5 -q clean expire-cache" + self.cmd_repo_refresh = "sudo dnf5 -q check-update " + + # Get updates and dependencies. dnf5 'list available ' returns BOTH installed and available versions. + # This command is used for both version lookup and installed-state checks. + self.single_package_installed_and_available_query = 'sudo dnf5 list available ' + self.single_package_upgrade_simulation_cmd = "sudo dnf5 install --assumeno --skip-broken " + + # Install update + # dnf5 does not support --skip-broken for upgrade; uses full system upgrade + # --allowerasing enables safe dependency resolution instead of skipping package + self.single_package_upgrade_cmd = ' sudo dnf5 -y upgrade --allowerasing' + + # Support to check if reboot is required + # dnf-utils not required (needs-restarting is built into dnf5) + self.needs_restarting_with_flag = 'sudo LANG=en_US.UTF8 dnf5 needs-restarting' + + # DNF5 exit codes + self.dnf_exitcode_ok = 0 + self.dnf_exitcode_updates_available = 100 + # Commands where 100 is expected + self.commands_allowing_100_exitcode = [ + "check-update" + ] + # TODO: Add AzL4/Red hat 10 DNF5 specific initialization + self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, Constants.DNF) + + # Caching for updates + self.all_updates_cached = [] + self.all_update_versions_cached = [] + + # auto OS updates + self.current_auto_os_update_service = None + self.enable_on_reboot_identifier_text = "" + self.enable_on_reboot_check_cmd = '' + self.enable_on_reboot_cmd = '' + self.installation_state_identifier_text = "" + self.install_check_cmd = "" + self.apply_updates_enabled = "Enabled" + self.apply_updates_disabled = "Disabled" + self.apply_updates_unknown = "Unknown" + + # commands for DNF Automatic updates service + self.__init_constants_for_dnf5_automatic() + self.STR_TOTAL_DOWNLOAD_SIZE = "Total download size: " + self.package_install_expected_avg_time_in_seconds = 90 + + __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) + + def refresh_repo(self): + self.composite_logger.log("[DNF5] Refreshing local repo...") + self.invoke_package_manager(self.cmd_clean_cache) + self.invoke_package_manager(self.cmd_repo_refresh) + + # AssessPatch method + def invoke_package_manager_advanced(self, command, raise_on_exception=True): + self.composite_logger.log_verbose("[DNF5] Invoking package manager. [Command={0}]".format(str(command))) + # env_layer.run_command_output returns (code, output) + code, out = self.env_layer.run_command_output(command, False, False) + + if code == self.dnf_exitcode_ok: + self.composite_logger.log_debug('[DNF5] Invoked package manager. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) + + elif code == self.dnf_exitcode_updates_available and any( + allowed_cmd in command for allowed_cmd in self.commands_allowing_100_exitcode): + self.composite_logger.log_debug('[DNF5] Updates available. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) + else: + self.composite_logger.log_warning('[ERROR] Customer environment error. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) + error_msg = "Customer environment error: Investigate and resolve unexpected return code ({0}) from package manager on command: {1}".format(str(code), command) + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.PACKAGE_MANAGER_FAILURE) + if raise_on_exception: + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + + return out, code + + # region Output Parser(s) + def extract_packages_and_versions(self, output): + """Returns packages and versions from given output""" + packages, versions = self.extract_packages_and_versions_including_duplicates(output) + packages, versions = self.dedupe_update_packages_to_get_latest_versions(packages, versions) + return packages, versions + + def extract_packages_and_versions_including_duplicates(self, output): + """Returns packages and versions from given output - DNF format similar to TDNF""" + self.composite_logger.log_verbose("[DNF5] Extracting package and version data...") + packages, versions = [], [] + + lines = output.strip().split('\n') + + for line_index in range(0, len(lines)): + # Do not install Obsoleting Packages. The obsoleting packages list comes towards end in the output. + if lines[line_index].strip().startswith("Obsoleting"): + break + + line = re.split(r'\s+', lines[line_index].strip()) + + # DNF check-update returns 3-column format: package.arch version repo + #Sample output : rubygem-json.x86_64 2.13.2-2.azl4~20260501 azurelinux-base + if len(line) == 3 and self.__is_package(line[0]): + packages.append(self.get_product_name(line[0])) + versions.append(line[1]) + else: + self.composite_logger.log_verbose("[DNF5] > Inapplicable line (" + str(line_index) + "): " + lines[line_index]) + + return packages, versions + + def dedupe_update_packages_to_get_latest_versions(self, packages, package_versions): + """Remove duplicate packages and returns the latest/highest version of each package""" + from core.src.core_logic.VersionComparator import VersionComparator + deduped_packages = [] + deduped_package_versions = [] + version_comparator = VersionComparator() + + for index, package in enumerate(packages): + if package in deduped_packages: + deduped_package_version = deduped_package_versions[deduped_packages.index(package)] + duplicate_package_version = package_versions[index] + # use custom comparator output 0 (equal), -1 (deduped package version is the lower one), +1 (deduped package version is the greater one) + is_deduped_package_latest = version_comparator.compare_versions(deduped_package_version, duplicate_package_version) + if is_deduped_package_latest < 0: + deduped_package_versions[deduped_packages.index(package)] = duplicate_package_version + continue + + deduped_packages.append(package) + deduped_package_versions.append(package_versions[index]) + + return deduped_packages, deduped_package_versions + + @staticmethod + def __is_package(chunk): + """Using a list comprehension to determine if chunk is a package""" + return any(chunk.endswith(ext) for ext in Constants.SUPPORTED_PACKAGE_ARCH) + # endregion + + # region Get Available Updates + def get_all_updates(self, cached=False): + """Get all missing updates""" + self.composite_logger.log_verbose("[DNF5] Discovering all packages...") + if cached and not len(self.all_updates_cached) == 0: + self.composite_logger.log_debug("[DNF5] Get all updates : [Cached={0}][PackagesCount={1}]".format(str(cached), len(self.all_updates_cached))) + return self.all_updates_cached, self.all_update_versions_cached # allows for high performance reuse in areas of the code explicitly aware of the cache + + out = self.invoke_package_manager(self.cmd_repo_refresh) + self.all_updates_cached, self.all_update_versions_cached = self.extract_packages_and_versions(out) + self.composite_logger.log_debug("[DNF5] Get all updates : [Cached={0}][PackagesCount={1}]".format(str(False), len(self.all_updates_cached))) + return self.all_updates_cached, self.all_update_versions_cached + # endregion + + # AssessPatch method + def get_security_updates(self): + """Get missing security updates. NOTE: Classification based categorization of patches is not available in DNF5 as of now""" + self.composite_logger.log_verbose("[DNF5] Discovering all packages as 'security' packages, since DNF5 does not support package classification...") + security_packages, security_package_versions = self.get_all_updates(cached=False) + self.composite_logger.log_debug("[DNF5] Discovered 'security' packages. [Count={0}]".format(len(security_packages))) + return security_packages, security_package_versions + + # AssessPatch method + def get_other_updates(self): + """Get missing other updates.""" + self.composite_logger.log_verbose("[DNF5] Discovering 'other' packages...") + return [], [] + + def set_max_patch_publish_date(self, max_patch_publish_date=str()): + pass + + # Install Patch method + def get_composite_package_identifier(self, package, package_version): + """Creates a version+architecture-specific package identifier for install commands + Parameters: + - package_name (string): Name of the package (may include architecture) + - package_version (string): Version of the package + Returns: + - String: Composite package identifier (e.g., "package-1.0.0.x86_64") + """ + package_without_arch, arch = self.get_product_name_and_arch(package) + package_identifier = package_without_arch + '-' + str(package_version) + if arch is not None: + package_identifier += arch + return package_identifier + + def get_product_name_and_arch(self, package_name): + architectures = Constants.SUPPORTED_PACKAGE_ARCH + for arch in architectures: + if package_name.endswith(arch): + return package_name[:-len(arch)], arch + return package_name, None + + def install_updates_fail_safe(self, excluded_packages): + pass + + # AssessPatch method + def get_all_available_versions_of_package(self, package_name): + """Returns a list of all the available versions of a package""" + # Sample output format + # rubygem-json.x86_64 2.13.2-2.azl4~20260501 azurelinux-base + # rubygem-json.x86_64 2.14.0-1.azl4~20260501 azurelinux-base + cmd = self.single_package_installed_and_available_query.replace('', package_name) + output = self.invoke_package_manager(cmd) + packages, package_versions = self.extract_packages_and_versions_including_duplicates(output) + return package_versions + + # AssessPatch method + def is_package_version_installed(self, package_name, package_version): + """Returns true if the specific package version is installed""" + # Sample output format + # rubygem-json.x86_64 2.13.2-2.azl4~20260501 @System + self.composite_logger.log_verbose("[DNF5] Checking package install status. [PackageName={0}][PackageVersion={1}]".format(str(package_name), str(package_version))) + cmd = self.single_package_installed_and_available_query.replace('', package_name) + output = self.invoke_package_manager(cmd) + packages, package_versions = self.extract_packages_and_versions_including_duplicates(output) + + for index, package in enumerate(packages): + if package == package_name and (package_versions[index] == package_version): + self.composite_logger.log_debug("[DNF5] > Installed version match found. [PackageName={0}][PackageVersion={1}]".format(str(package_name), str(package_version))) + return True + else: + self.composite_logger.log_verbose("[DNF5] > Did not match: " + package + " (" + package_versions[index] + ")") + + # sometimes packages are removed entirely from the system during installation of other packages + # so let's check that the package is still needed before + self.composite_logger.log_debug("[DNF5] > Installed version match NOT found. [PackageName={0}][PackageVersion={1}]".format(str(package_name), str(package_version))) + return False + + + def get_dependent_list(self, packages): + """Returns dependent list for the list of packages + Parameters: + - packages (list): List of package names to get dependencies for + Commands used: + - sudo dnf install --assumeno --skip-broken (simulates installation to find dependencies without actually installing) + Returns: List of dependency package names required for the input packages + + Sample output format: + # Updating and loading repositories: + # Repositories loaded. + # Problem: package perl-Getopt-Long-1:2.58-521.azl4~20260501.noarch from azurelinux-base requires perl(Pod::Usage) >= 1.14, but none of the providers can be installed + # - package perl-Pod-Usage-4:2.05-521.azl4~20260501.noarch from azurelinux-base requires perl(Pod::Text) >= 4, but none of the providers can be installed + # - package git-2.53.0-2.azl4~20260501.x86_64 from azurelinux-base requires perl(Getopt::Long), but none of the providers can be installed + # - package perl-podlators-1:6.0.2-521.azl4~20260501.noarch from azurelinux-base requires perl(Pod::Simple) >= 3.26, but none of the providers can be installed + # - conflicting requests + # - nothing provides perl(Text::Wrap) >= 98.112902 needed by perl-Pod-Simple-1:3.47-4.azl4~20260501.noarch from azurelinux-base + # + # Package Arch Version Repository Size + # Skipping packages with broken dependencies: + # git x86_64 2.53.0-2.azl4~202 azurelinux 56.4 KiB + # perl-Getopt-Long noarch 1:2.58-521.azl4~2 azurelinux 144.5 KiB + # perl-Pod-Simple noarch 1:3.47-4.azl4~202 azurelinux 565.3 KiB + # perl-Pod-Usage noarch 4:2.05-521.azl4~2 azurelinux 86.3 KiB + # perl-podlators noarch 1:6.0.2-521.azl4~ azurelinux 317.5 KiB + # Nothing to do. + """ + package_names = " ".join(packages) + cmd = self.single_package_upgrade_simulation_cmd + package_names + code, output = self.env_layer.run_command_output(cmd, False, False) + self.composite_logger.log_verbose("[DNF5] Dependency simulation. [Command={0}][Code={1}]".format(cmd, str(code))) + if code not in [0, 1]: + self.composite_logger.log_error("[DNF5] Unexpected failure. [Command={0}][Code={1}][Output={2}]".format(cmd, str(code), output) + ) + raise Exception("DNF dependency simulation failed") + + if "Skipping packages with broken dependencies" in output and "Nothing to do." in output: + self.composite_logger.log_error("[DNF5] All packages skipped due to broken dependencies. [Command={0}]".format(cmd)) + raise Exception("Dependency resolution failed: all packages skipped") + + dependencies = self.extract_dependencies(output, packages) + self.composite_logger.log_verbose("[DNF5] Resolved dependencies. [Command={0}][Packages={1}][DependencyCount={2}]".format(str(cmd), str(packages), len(dependencies))) + return dependencies + + def extract_dependencies(self, output, packages): + """Extracts dependent packages from dnf5 output.""" + dependencies = [] + package_arch_to_look_for = ["x86_64", "noarch", "i686", "aarch64"] + + lines = output.strip().splitlines() + in_dependency_section = False + + for line_index in range(0, len(lines)): + line_str = lines[line_index].strip() + + # Detect start of dependency section + if line_str.startswith("Installing dependencies"): + in_dependency_section = True + continue + + # Detect exit of dependency section + if line_str.startswith("Transaction Summary") or \ + line_str.startswith("Installing:") or \ + line_str.startswith("Skipping packages with broken dependencies") or \ + line_str.startswith("Problem:") or \ + line_str.startswith("Total size"): + in_dependency_section = False + continue + + # Only parse dependency section + if not in_dependency_section: + continue + + # Skip empty/header lines + if not line_str or line_str.startswith("Package"): + self.composite_logger.log_verbose("[DNF5] > Skipping header/empty line: " + line_str) + continue + + line = re.split(r'\s+', line_str) + dependent_package_name = "" + + if self.is_valid_update(line, package_arch_to_look_for): + dependent_package_name = self.get_product_name_with_arch(line, package_arch_to_look_for) + else: + self.composite_logger.log_verbose("[DNF5] > Inapplicable line: " + str(line)) + continue + + # Remove input packages (support both pkg and pkg.arch) + base_pkg = dependent_package_name.rsplit('.', 1)[ + 0] if '.' in dependent_package_name else dependent_package_name + + if len(dependent_package_name) != 0 and \ + dependent_package_name not in packages and \ + base_pkg not in packages and \ + dependent_package_name not in dependencies: + self.composite_logger.log_verbose("[DNF5] > Dependency detected: " + dependent_package_name) + dependencies.append(dependent_package_name) + + return dependencies + + def is_valid_update(self, package_details_in_output, package_arch_to_look_for): + # Verifies whether the line under consideration (i.e. package_details_in_output) contains relevant package details. + # package_details_in_output will be of the following format if it is valid + # Sample package details in DNF: + # python3-libs x86_64 3.12.3-5.azl3 azurelinux-official-base 36.05M 10.52M + return len(package_details_in_output) >= 3 and self.is_arch_in_package_details(package_details_in_output[1], package_arch_to_look_for) + + @staticmethod + def is_arch_in_package_details(package_detail, package_arch_to_look_for): + return len([p for p in package_arch_to_look_for if p in package_detail]) == 1 + + def get_product_name(self, package_name): + """Retrieve package name""" + return package_name + + def get_product_name_with_arch(self, package_detail, package_arch_to_look_for): + """ + Returns package name in format: name.arch + Example: + ["oniguruma", "x86_64", ...] -> "oniguruma.x86_64" + """ + if len(package_detail) >= 2 and package_detail[1] in package_arch_to_look_for: + return package_detail[0] + "." + package_detail[1] + return "" + + def get_package_size(self, output): + """Retrieves package size from installation output string + Parameters: + - output (string): The output string from DNF installation command + Returns: + - String: Package size (e.g., "10 M") or UNKNOWN_PACKAGE_SIZE if not found + - Total download size : 10M + """ + if "Nothing to do" not in output: + lines = output.strip().split('\n') + for line in lines: + if line.find(self.STR_TOTAL_DOWNLOAD_SIZE) >= 0: + return line.replace(self.STR_TOTAL_DOWNLOAD_SIZE, "") + + return Constants.UNKNOWN_PACKAGE_SIZE + + # Install Patch method + def install_security_updates_azgps_coordinated(self): + """ + Install updates on Azure Linux 4 using dnf5. + Note: + - DNF5 does not support security classification. + - This installs all available updates instead. + """ + self.composite_logger.log_verbose("[DNF5] Invoking package manager. [Command={0}]".format(self.single_package_upgrade_cmd)) + out, code = self.invoke_package_manager_advanced(self.single_package_upgrade_cmd,raise_on_exception=False) + if code != 0: + self.composite_logger.log_warning("[DNF5] Install failed. [Code={0}][Output={1}]".format(code, out)) + else: + self.composite_logger.log_debug("[DNF5] Install completed successfully") + return code, out + + def try_meet_azgps_coordinated_requirements(self): + """ + Do we need this for dnf? + """ + raise NotImplementedError("DNF: try_meet_azgps_coordinated_requirements not implemented yet") + + def get_current_auto_os_patch_state(self): + """ Gets the current auto OS update patch state on the machine """ + self.composite_logger.log("[DNF5] Fetching the current automatic OS patch state on the machine...") + + current_auto_os_patch_state_for_dnf5_automatic = self.__get_current_auto_os_patch_state_for_dnf5_automatic() + + self.composite_logger.log("[DNF5] OS patch state per auto OS update service: [dnf5-automatic={0}]".format(str(current_auto_os_patch_state_for_dnf5_automatic))) + + if current_auto_os_patch_state_for_dnf5_automatic == Constants.AutomaticOSPatchStates.ENABLED: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.ENABLED + elif current_auto_os_patch_state_for_dnf5_automatic == Constants.AutomaticOSPatchStates.DISABLED: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.DISABLED + else: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.UNKNOWN + + self.composite_logger.log_debug("[DNF5] Overall Auto OS Patch State based on all auto OS update service states [OverallAutoOSPatchState={0}]".format(str(current_auto_os_patch_state))) + return current_auto_os_patch_state + + def __get_current_auto_os_patch_state_for_dnf5_automatic(self): + """ Gets current auto OS update patch state for dnf5-automatic """ + self.composite_logger.log_debug("[DNF5] Fetching current automatic OS patch state in dnf5-automatic service. This includes checks on whether the service is installed, current auto patch enable state and whether it is set to enable on reboot") + self.__init_auto_update_for_dnf5_automatic() + + is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value = self.__get_current_auto_os_updates_setting_on_machine() + + if not is_service_installed: + return Constants.AutomaticOSPatchStates.DISABLED + + if enable_on_reboot_value: + return Constants.AutomaticOSPatchStates.ENABLED + + return Constants.AutomaticOSPatchStates.DISABLED + + def disable_auto_os_update(self): + """ Disables auto OS updates on the machine only if they are enabled and logs the default settings the machine comes with """ + try: + self.composite_logger.log_verbose("[DNF5] Disabling auto OS updates in all identified services...") + self.__disable_auto_os_update_for_dnf5_automatic() + self.composite_logger.log_debug("[DNF5] Successfully disabled auto OS updates") + + except Exception as error: + self.composite_logger.log_error("[DNF5] Could not disable auto OS updates. [Error={0}]".format(repr(error))) + raise + + def __disable_auto_os_update_for_dnf5_automatic(self): + """ Disables auto OS updates, using dnf5-automatic service, and logs the default settings the machine comes with """ + self.composite_logger.log_verbose("[DNF5] Disabling auto OS updates using dnf5-automatic") + self.__init_auto_update_for_dnf5_automatic() + + self.backup_image_default_patch_configuration_if_not_exists() + + if not self.is_auto_update_service_installed(self.dnf5_automatic_install_check_cmd): + self.composite_logger.log_debug("[DNF5] Cannot disable as dnf5-automatic is not installed on the machine") + return + + self.composite_logger.log_verbose("[DNF5] Preemptively disabling auto OS updates using dnf5-automatic") + self.disable_auto_update_on_reboot(self.dnf5_automatic_disable_on_reboot_cmd) + + self.composite_logger.log_debug("[DNF5] Successfully disabled auto OS updates using dnf5-automatic") + + def disable_auto_update_on_reboot(self, command): + """ Disables auto update on reboot by executing systemctl command """ + self.composite_logger.log_verbose("[DNF5] Disabling auto update on reboot. [Command={0}] ".format(command)) + code, out = self.env_layer.run_command_output(command, False, False) + + if code != 0: + self.composite_logger.log_error("[DNF5][ERROR] Error disabling auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + error_msg = 'Unexpected return code (' + str(code) + ') on command: ' + command + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.OPERATION_FAILED) + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + else: + self.composite_logger.log_debug("[DNF5] Disabled auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + + def backup_image_default_patch_configuration_if_not_exists(self): + """ + This method saves the original auto-update configuration so it can be restored later. + """ + try: + self.composite_logger.log_debug("[DNF5] Ensuring there is a backup of the default patch state for [AutoOSUpdateService={0}]".format(str(self.current_auto_os_update_service))) + + # read existing backup since it also contains backup from other update services. We need to preserve any existing data within the backup file + image_default_patch_configuration_backup = {} + if self.image_default_patch_configuration_backup_exists(): + try: + image_default_patch_configuration_backup = json.loads(self.env_layer.file_system.read_with_retry(self.image_default_patch_configuration_backup_path)) + except Exception as error: + self.composite_logger.log_error("[DNF5] Unable to read backup for default patch state. Will attempt to re-write. [Exception={0}]".format(repr(error))) + + # verify if existing backup is valid if not, write to backup + is_backup_valid = self.is_image_default_patch_configuration_backup_valid(image_default_patch_configuration_backup) + if is_backup_valid: + self.composite_logger.log_debug("[DNF5] Since extension has a valid backup, no need to log the current settings again. [Default Auto OS update settings={0}] [File path={1}]" + .format(str(image_default_patch_configuration_backup), self.image_default_patch_configuration_backup_path)) + else: + self.composite_logger.log_debug("[DNF5] Since the backup is invalid, will add a new backup with the current auto OS update settings") + self.composite_logger.log_debug("[DNF5] Fetching current auto OS update settings for [AutoOSUpdateService={0}]".format(str(self.current_auto_os_update_service))) + is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value = self.__get_current_auto_os_updates_setting_on_machine() + + backup_image_default_patch_configuration_json_to_add = { + self.current_auto_os_update_service: { + self.enable_on_reboot_identifier_text: enable_on_reboot_value, + self.installation_state_identifier_text: is_service_installed + } + } + + image_default_patch_configuration_backup.update(backup_image_default_patch_configuration_json_to_add) + + self.composite_logger.log_debug("[DNF5] Logging default system configuration settings for auto OS updates. [Settings={0}] [Log file path={1}]" + .format(str(image_default_patch_configuration_backup), self.image_default_patch_configuration_backup_path)) + self.env_layer.file_system.write_with_retry(self.image_default_patch_configuration_backup_path, '{0}'.format(json.dumps(image_default_patch_configuration_backup)), mode='w+') + except Exception as error: + error_message = "[DNF5] Exception during fetching and logging default auto update settings on the machine. [Exception={0}]".format(repr(error)) + self.composite_logger.log_error(error_message) + self.status_handler.add_error_to_status(error_message, Constants.PatchOperationErrorCodes.DEFAULT_ERROR) + raise + + def is_image_default_patch_configuration_backup_valid(self, image_default_patch_configuration_backup): + # Validate backup JSON for dnf5 automatic service + try: + if self.dnf5_auto_os_update_service in image_default_patch_configuration_backup \ + and self.dnf5_automatic_enable_on_reboot_identifier_text in image_default_patch_configuration_backup[self.dnf5_auto_os_update_service] \ + and self.dnf5_automatic_installation_state_identifier_text in image_default_patch_configuration_backup[self.dnf5_auto_os_update_service]: + self.composite_logger.log_debug("[DNF5] Extension has a valid backup for default dnf5-automatic configuration settings") + return True + else: + self.composite_logger.log_debug("[DNF5] Extension does not have a valid backup for default dnf5-automatic configuration settings") + return False + except Exception: + return False + + def update_os_patch_configuration_sub_setting(self, patch_configuration_sub_setting, value, patch_configuration_sub_setting_pattern_to_match): + # No-op for dnf5 (no config file-based sub-settings) + pass + + # Post Install method/ Install Patch + def is_reboot_pending(self): + """Checks reboot requirement for Azure Linux 4 (dnf5)""" + try: + code, _ = self.env_layer.run_command_output(self.needs_restarting_with_flag, False, False) + reboot_required = (code == 1) + self.composite_logger.log_debug("[DNF5] > Reboot required (needs-restarting) = {0}".format(reboot_required)) + return reboot_required + except Exception as error: + self.composite_logger.log_error("[DNF5] Error checking reboot pending: " + repr(error)) + return True # safe fallback + + # Post Install method / Install Patch + def do_processes_require_restart(self): + """Checks if processes require a restart due to updates + Commands used: + - sudo dnf -y install dnf-utils (installs dnf-utils if not already present) + - sudo LANG=en_US.UTF8 needs-restarting -r (checks if processes require restart) + Returns: + - Boolean: True if processes require restart, False otherwise + """ + raise NotImplementedError("DNF: do_processes_require_restart not implemented yet") + + def add_arch_dependencies(self, package_manager, package, version, packages, package_versions, package_and_dependencies, package_and_dependency_versions): + """ + Unnecessary for DNF because the package manager already handles multi-architecture dependencies automatically + Command Used to confirm above: sudo dnf -y install jq + """ + pass + + def set_security_esm_package_status(self, operation, packages): + """No-op for dnf, tdnf, yum and zypper """ + return + + def separate_out_esm_packages(self, packages, package_versions): + """No-op for dnf, tdnf, yum and zypper """ + return + + def get_package_install_expected_avg_time_in_seconds(self): + return self.package_install_expected_avg_time_in_seconds + + # ConfigurePatch method + def revert_auto_os_update_to_system_default(self): + """ Reverts the auto OS update patch state on the machine to its system default value, if one exists in our backup file """ + # type () -> None + self.composite_logger.log("[DNF5] Reverting the current automatic OS patch state on the machine to its system default value before patchmode was set to 'AutomaticByPlatform'") + self.revert_auto_os_update_to_system_default_for_dnf5_automatic() + self.composite_logger.log_debug("[DNF5] Successfully reverted auto OS updates to system default config") + + def revert_auto_os_update_to_system_default_for_dnf5_automatic(self): + """ Reverts the auto OS update patch state on the machine to its system default value for dnf5-automatic service, if applicable """ + # type () -> None + self.__init_auto_update_for_dnf5_automatic() + self.composite_logger.log("[DNF5] Reverting the current automatic OS patch state on the machine to its system default value for [Service={0}]".format(str(self.current_auto_os_update_service))) + is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value = self.__get_current_auto_os_updates_setting_on_machine() + + if not is_service_installed: + self.composite_logger.log_debug("[DNF5] Machine default auto OS update service is not installed on the VM and hence no config to revert. [Service={0}]".format(str(self.current_auto_os_update_service))) + return + + self.composite_logger.log_debug("[DNF5] Logging current configuration settings for auto OS updates [Service={0}][Is_Service_Installed={1}][Machine_default_update_enable_on_reboot={2}]" + .format(str(self.current_auto_os_update_service), str(is_service_installed), str(enable_on_reboot_value))) + + image_default_patch_configuration_backup = self.__get_image_default_patch_configuration_backup() + self.composite_logger.log_debug("[DNF5] Logging system default configuration settings for auto OS updates. [Settings={0}]".format(str(image_default_patch_configuration_backup))) + is_backup_valid = self.is_image_default_patch_configuration_backup_valid(image_default_patch_configuration_backup) + + if is_backup_valid: + enable_on_reboot_value_from_backup = image_default_patch_configuration_backup[self.current_auto_os_update_service][self.enable_on_reboot_identifier_text] + + # For DNF5, we can only revert the systemctl enable/disable state since ExecStart flags are baked into the unit file. + # If the backup indicates the service should be enabled on reboot, enable it now. + if str(enable_on_reboot_value_from_backup).lower() == 'true': + if not enable_on_reboot_value: + self.composite_logger.log_debug("[DNF5] Enabling service to match system default") + self.enable_auto_update_on_reboot() + else: + self.composite_logger.log_debug("[DNF5] Service already enabled as per system default") + else: + # If backup says it should be disabled on reboot, keep current state or disable if needed + if enable_on_reboot_value: + # Currently enabled but backup says it should be disabled - disable it + self.disable_auto_update_on_reboot(self.dnf5_automatic_disable_on_reboot_cmd) + else: + self.composite_logger.log_debug("[DNF5] Since the backup is invalid or does not exist for current service, we won't be able to revert auto OS patch settings to their system default value. [Service={0}]".format(str(self.current_auto_os_update_service))) + + # region auto OS updates + def __init_constants_for_dnf5_automatic(self): + self.dnf5_automatic_configuration_service = 'systemctl cat dnf5-automatic.service' + self.dnf5_automatic_install_check_cmd = 'rpm -qa | grep dnf5-plugin-automatic' + self.dnf5_automatic_enable_on_reboot_check_cmd = 'systemctl is-enabled dnf5-automatic.timer' + self.dnf5_automatic_disable_on_reboot_cmd = 'systemctl disable --now dnf5-automatic.timer' + self.dnf5_automatic_enable_on_reboot_cmd = 'systemctl enable --now dnf5-automatic.timer' + self.dnf5_automatic_config_pattern_match_text = None + # Detect them from ExecStart flags instead of a file: + self.dnf5_automatic_enable_on_reboot_identifier_text = "enable_on_reboot" + self.dnf5_automatic_installation_state_identifier_text = "installation_state" + self.dnf5_auto_os_update_service = "dnf5-automatic" + + def __init_auto_update_for_dnf5_automatic(self): + """ Initializes all generic auto OS update variables with the config values for dnf5 automatic service """ + self.os_patch_configuration_settings_read_cmd = self.dnf5_automatic_configuration_service + self.enable_on_reboot_identifier_text = self.dnf5_automatic_enable_on_reboot_identifier_text + self.installation_state_identifier_text = self.dnf5_automatic_installation_state_identifier_text + self.enable_on_reboot_check_cmd = self.dnf5_automatic_enable_on_reboot_check_cmd + self.enable_on_reboot_cmd = self.dnf5_automatic_enable_on_reboot_cmd + self.install_check_cmd = self.dnf5_automatic_install_check_cmd + self.current_auto_os_update_service = self.dnf5_auto_os_update_service + + def __get_current_auto_os_updates_setting_on_machine(self): + """ Gets all the update settings related to auto OS updates via dnf """ + try: + download_updates_value = "" + apply_updates_value = "" + is_service_installed = False + enable_on_reboot_value = False + + # get install state + if not self.is_auto_update_service_installed(self.install_check_cmd): + return is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value + + is_service_installed = True + enable_on_reboot_value = self.is_service_set_to_enable_on_reboot(self.enable_on_reboot_check_cmd) + + self.composite_logger.log_debug("[DNF5] Checking if auto updates are currently enabled...") + + # Check systemd service unit file for ExecStart flags to determine current settings + # Get the dnf5-automatic.service configuration + code, unit_output = self.env_layer.run_command_output(self.os_patch_configuration_settings_read_cmd, False, False) + + if code == 0: + self.composite_logger.log_debug("[DNF5] Retrieved dnf5-automatic service unit configuration...") + + # ExecStart line format example: ExecStart=/usr/bin/dnf5 automatic --timer + for line in unit_output.split('\n'): + if line.strip().startswith('ExecStart=') and 'dnf5 automatic' in line: + self.composite_logger.log_debug("[DNF5] ExecStart line: {0}".format(line)) + break + + return is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value + + except Exception as error: + raise Exception("[DNF5] Error occurred in fetching current auto OS update settings from the machine (dnf5). [Exception={0}]".format( + repr(error))) + + def is_auto_update_service_installed(self, install_check_cmd): + """ Checks if the auto update service is installed on the VM """ + code, out = self.env_layer.run_command_output(install_check_cmd, False, False) + self.composite_logger.log_debug("[DNF5] Checked if auto update service is installed. [Command={0}][Code={1}][Output={2}]".format(install_check_cmd, str(code), out)) + if len(out.strip()) > 0 and code == 0: + self.composite_logger.log_debug("[DNF5] > Auto OS update service is installed on the machine") + return True + else: + self.composite_logger.log_debug("[DNF5] > Auto OS update service is NOT installed on the machine") + return False + + def is_service_set_to_enable_on_reboot(self, command): + """ Checking if auto update is set to enable on reboot on the machine. An enable_on_reboot service will be activated (if currently inactive) on machine reboot """ + code, out = self.env_layer.run_command_output(command, False, False) + self.composite_logger.log_debug("[DNF5] Checked if auto update service is set to enable on reboot. [Code={0}][Out={1}]".format(str(code), out)) + if len(out.strip()) > 0 and code == 0 and 'enabled' in out: + self.composite_logger.log_debug("[DNF5] > Auto OS update service will enable on reboot") + return True + self.composite_logger.log_debug("[DNF5] > Auto OS update service will NOT enable on reboot") + return False + + def enable_auto_update_on_reboot(self): + """ Enables machine default auto update on reboot """ + # type () -> None + command = self.enable_on_reboot_cmd + self.composite_logger.log_verbose("[DNF5] Enabling auto update on reboot. [Command={0}] ".format(command)) + code, out = self.env_layer.run_command_output(command, False, False) + + if code != 0: + self.composite_logger.log_error("[DNF5][ERROR] Error enabling auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + error_msg = 'Unexpected return code (' + str(code) + ') on command: ' + command + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.OPERATION_FAILED) + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + else: + self.composite_logger.log_debug("[DNF5] Enabled auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + + def __get_image_default_patch_configuration_backup(self): + """ Get image_default_patch_configuration_backup file""" + image_default_patch_configuration_backup = {} + + # read existing backup since it also contains backup from other update services. We need to preserve any existing data within the backup file + if self.image_default_patch_configuration_backup_exists(): + try: + image_default_patch_configuration_backup = json.loads(self.env_layer.file_system.read_with_retry(self.image_default_patch_configuration_backup_path)) + except Exception as error: + self.composite_logger.log_error("[DNF5] Unable to read backup for default patch state. Will attempt to re-write. [Exception={0}]".format(repr(error))) + return image_default_patch_configuration_backup + + # endregion diff --git a/src/core/src/package_managers/DnfPackageManager.py b/src/core/src/package_managers/DnfPackageManager.py deleted file mode 100644 index 0f1f8db6..00000000 --- a/src/core/src/package_managers/DnfPackageManager.py +++ /dev/null @@ -1,469 +0,0 @@ -# Copyright 2026 Microsoft Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Requires Python 2.7+ - -"""DnfPackageManager for Azure Linux L4 and RHEL 10""" -from abc import ABCMeta -import json -from core.src.bootstrap.Constants import Constants -from core.src.package_managers.PackageManager import PackageManager - - -class DnfPackageManager(PackageManager): - """Implementation of Azure Linux L4/RHEL 10 DNF5 package management operations""" - - def __init__(self, env_layer, execution_config, composite_logger, telemetry_writer, status_handler): - super(DnfPackageManager, self).__init__(env_layer, execution_config, composite_logger, telemetry_writer, status_handler) - - self.cmd_clean_cache = "sudo dnf5 -q clean expire-cache" - self.cmd_repo_refresh = "sudo dnf5 -q check-update " - - # TODO: Add AzL4/Red hat 10 DNF5 specific initialization - self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf5') - - # auto OS updates - self.current_auto_os_update_service = None - self.enable_on_reboot_identifier_text = "" - self.enable_on_reboot_check_cmd = '' - self.enable_on_reboot_cmd = '' - self.installation_state_identifier_text = "" - self.install_check_cmd = "" - self.apply_updates_enabled = "Enabled" - self.apply_updates_disabled = "Disabled" - self.apply_updates_unknown = "Unknown" - - # commands for DNF Automatic updates service - self.__init_constants_for_dnf5_automatic() - - __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) - - def refresh_repo(self): - self.composite_logger.log("[DNF] Refreshing local repo...") - self.invoke_package_manager(self.cmd_clean_cache) - self.invoke_package_manager(self.cmd_repo_refresh) - - # AssessPatch method - def invoke_package_manager_advanced(self, command, raise_on_exception=True): - self.composite_logger.log_verbose("[DNF] Invoking package manager. [Command={0}]".format(str(command))) - # env_layer.run_command_output returns (code, output) - code, out = self.env_layer.run_command_output(command, False, False) - - # Treat exit code 0 as success. No updates available. - if code == 0: - self.composite_logger.log_debug('[DNF] Invoked package manager. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) - - elif code == 100: - # Updates available - self.composite_logger.log_debug( - '[DNF] Updates available. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) - else: - self.composite_logger.log_warning('[ERROR] Customer environment error. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) - error_msg = "Customer environment error: Investigate and resolve unexpected return code ({0}) from package manager on command: {1}".format(str(code), command) - self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.PACKAGE_MANAGER_FAILURE) - if raise_on_exception: - raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) - - return out, code - - # AssessPatch method - def get_all_updates(self, cached=False): - """Gets all missing updates available for the system and returns the cached updates list and versions list - Cache Check Logic: - - If cached=True and cache has data, return cached updates and versions immediately (high performance reuse) - - If cache miss or cached=False, execute the DNF command to get fresh updates and populate cache - Command: - - sudo dnf -q check-update (checks for all available updates) - 1. If cached=True and cache has data, return cached results - 2. Execute command, parse output, cache results - 3. Return all_updates_cached and all_update_versions_cached - """ - raise NotImplementedError("DNF: get_all_updates not implemented yet") - - # AssessPatch method - def get_security_updates(self): - """Gets all missing security updates available for the system and returns packages and versions list - Command: - - sudo dnf -q check-update --security (checks for available security updates only) - Returns: - - List of security package names - - List of corresponding security package versions - """ - raise NotImplementedError("DNF: get_security_updates not implemented yet") - - # AssessPatch method - def get_other_updates(self): - """Gets missing (non-security) updates. Record log and return - """ - return [], [] - - def set_max_patch_publish_date(self, max_patch_publish_date=str()): - raise NotImplementedError("DNF: set_max_patch_publish_date not implemented yet") - - # Install Patch method - def get_composite_package_identifier(self, package_name, package_version): - """Creates a version+architecture-specific package identifier for install commands - Parameters: - - package_name (string): Name of the package (may include architecture) - - package_version (string): Version of the package - Returns: - - String: Composite package identifier (e.g., "package-1.0.0.x86_64") - """ - raise NotImplementedError("DNF: get_composite_package_identifier not implemented yet") - - def install_updates_fail_safe(self, excluded_packages): - raise NotImplementedError("DNF: install_updates_fail_safe not implemented yet") - - # AssessPatch method - def get_all_available_versions_of_package(self, package_name): - """Returns a list of all available versions of a package - Parameters: - - package_name (string): Name of the package to get versions for - Commands used: - - sudo dnf list --available (lists all available versions of the package) - Returns: - - List of all available package versions - """ - raise NotImplementedError("DNF: get_all_available_versions_of_package not implemented yet") - - # AssessPatch method - def is_package_version_installed(self, package_name, package_version): - """Checks if a specific package version is installed - Parameters: - - package_name (string): Name of the package - - package_version (string): Version of the package to check - Commands used: - - sudo dnf list installed (checks if specific package version is installed) - Returns: - - Boolean: True if the specific package version is installed, False otherwise - """ - raise NotImplementedError("DNF: is_package_version_installed not implemented yet") - - - def get_dependent_list(self, packages): - """Returns dependent list for the list of packages - Parameters: - - packages (list): List of package names to get dependencies for - Commands used: - - sudo dnf install --assumeno --skip-broken (simulates installation to find dependencies without actually installing) - Returns: List of dependency package names required for the input packages - """ - raise NotImplementedError("DNF: get_dependent_list not implemented yet") - - def get_product_name(self, package_name): - raise NotImplementedError("DNF: get_product_name not implemented yet") - - def get_package_size(self, output): - """Retrieves package size from installation output string - Parameters: - - output (string): The output string from DNF installation command - Returns: - - String: Package size (e.g., "15 M") or UNKNOWN_PACKAGE_SIZE if not found - """ - raise NotImplementedError("DNF: get_package_size not implemented yet") - - # Install Patch method - def install_security_updates_azgps_coordinated(self): - """Installs security updates in Azure Linux 4 following strict safe deployment practices - Commands used: - - sudo dnf -y upgrade --security --skip-broken (installs security updates only) - Returns: - - Tuple of (return code, output) from the command execution - """ - raise NotImplementedError("DNF: install_security_updates_azgps_coordinated not implemented yet") - - def try_meet_azgps_coordinated_requirements(self): - """ - Do we need this for dnf? - """ - raise NotImplementedError("DNF: try_meet_azgps_coordinated_requirements not implemented yet") - - def get_current_auto_os_patch_state(self): - """ Gets the current auto OS update patch state on the machine """ - self.composite_logger.log("[DNF] Fetching the current automatic OS patch state on the machine...") - - current_auto_os_patch_state_for_dnf5_automatic = self.__get_current_auto_os_patch_state_for_dnf5_automatic() - - self.composite_logger.log("[DNF] OS patch state per auto OS update service: [dnf5-automatic={0}]".format(str(current_auto_os_patch_state_for_dnf5_automatic))) - - if current_auto_os_patch_state_for_dnf5_automatic == Constants.AutomaticOSPatchStates.ENABLED: - current_auto_os_patch_state = Constants.AutomaticOSPatchStates.ENABLED - elif current_auto_os_patch_state_for_dnf5_automatic == Constants.AutomaticOSPatchStates.DISABLED: - current_auto_os_patch_state = Constants.AutomaticOSPatchStates.DISABLED - else: - current_auto_os_patch_state = Constants.AutomaticOSPatchStates.UNKNOWN - - self.composite_logger.log_debug("[DNF] Overall Auto OS Patch State based on all auto OS update service states [OverallAutoOSPatchState={0}]".format(str(current_auto_os_patch_state))) - return current_auto_os_patch_state - - def __get_current_auto_os_patch_state_for_dnf5_automatic(self): - """ Gets current auto OS update patch state for dnf5-automatic """ - self.composite_logger.log_debug("[DNF] Fetching current automatic OS patch state in dnf5-automatic service. This includes checks on whether the service is installed, current auto patch enable state and whether it is set to enable on reboot") - self.__init_auto_update_for_dnf5_automatic() - - is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value = self.__get_current_auto_os_updates_setting_on_machine() - - if not is_service_installed: - return Constants.AutomaticOSPatchStates.DISABLED - - if enable_on_reboot_value: - return Constants.AutomaticOSPatchStates.ENABLED - - return Constants.AutomaticOSPatchStates.DISABLED - - def disable_auto_os_update(self): - """ Disables auto OS updates on the machine only if they are enabled and logs the default settings the machine comes with """ - try: - self.composite_logger.log_verbose("[DNF] Disabling auto OS updates in all identified services...") - self.__disable_auto_os_update_for_dnf5_automatic() - self.composite_logger.log_debug("[DNF] Successfully disabled auto OS updates") - - except Exception as error: - self.composite_logger.log_error("[DNF] Could not disable auto OS updates. [Error={0}]".format(repr(error))) - raise - - def __disable_auto_os_update_for_dnf5_automatic(self): - """ Disables auto OS updates, using dnf5-automatic service, and logs the default settings the machine comes with """ - self.composite_logger.log_verbose("[DNF] Disabling auto OS updates using dnf5-automatic") - self.__init_auto_update_for_dnf5_automatic() - - self.backup_image_default_patch_configuration_if_not_exists() - - if not self.is_auto_update_service_installed(self.dnf5_automatic_install_check_cmd): - self.composite_logger.log_debug("[DNF] Cannot disable as dnf5-automatic is not installed on the machine") - return - - self.composite_logger.log_verbose("[DNF] Preemptively disabling auto OS updates using dnf5-automatic") - self.disable_auto_update_on_reboot(self.dnf5_automatic_disable_on_reboot_cmd) - - self.composite_logger.log_debug("[DNF] Successfully disabled auto OS updates using dnf5-automatic") - - def disable_auto_update_on_reboot(self, command): - """ Disables auto update on reboot by executing systemctl command """ - self.composite_logger.log_verbose("[DNF] Disabling auto update on reboot. [Command={0}] ".format(command)) - code, out = self.env_layer.run_command_output(command, False, False) - - if code != 0: - self.composite_logger.log_error("[DNF][ERROR] Error disabling auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) - error_msg = 'Unexpected return code (' + str(code) + ') on command: ' + command - self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.OPERATION_FAILED) - raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) - else: - self.composite_logger.log_debug("[DNF] Disabled auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) - - def backup_image_default_patch_configuration_if_not_exists(self): - """ - This method saves the original auto-update configuration so it can be restored later. - """ - try: - self.composite_logger.log_debug("[DNF] Ensuring there is a backup of the default patch state for [AutoOSUpdateService={0}]".format(str(self.current_auto_os_update_service))) - - # read existing backup since it also contains backup from other update services. We need to preserve any existing data within the backup file - image_default_patch_configuration_backup = {} - if self.image_default_patch_configuration_backup_exists(): - try: - image_default_patch_configuration_backup = json.loads(self.env_layer.file_system.read_with_retry(self.image_default_patch_configuration_backup_path)) - except Exception as error: - self.composite_logger.log_error("[DNF] Unable to read backup for default patch state. Will attempt to re-write. [Exception={0}]".format(repr(error))) - - # verify if existing backup is valid if not, write to backup - is_backup_valid = self.is_image_default_patch_configuration_backup_valid(image_default_patch_configuration_backup) - if is_backup_valid: - self.composite_logger.log_debug("[DNF] Since extension has a valid backup, no need to log the current settings again. [Default Auto OS update settings={0}] [File path={1}]" - .format(str(image_default_patch_configuration_backup), self.image_default_patch_configuration_backup_path)) - else: - self.composite_logger.log_debug("[DNF] Since the backup is invalid, will add a new backup with the current auto OS update settings") - self.composite_logger.log_debug("[DNF] Fetching current auto OS update settings for [AutoOSUpdateService={0}]".format(str(self.current_auto_os_update_service))) - is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value = self.__get_current_auto_os_updates_setting_on_machine() - - backup_image_default_patch_configuration_json_to_add = { - self.current_auto_os_update_service: { - self.enable_on_reboot_identifier_text: enable_on_reboot_value, - self.installation_state_identifier_text: is_service_installed - } - } - - image_default_patch_configuration_backup.update(backup_image_default_patch_configuration_json_to_add) - - self.composite_logger.log_debug("[DNF] Logging default system configuration settings for auto OS updates. [Settings={0}] [Log file path={1}]" - .format(str(image_default_patch_configuration_backup), self.image_default_patch_configuration_backup_path)) - self.env_layer.file_system.write_with_retry(self.image_default_patch_configuration_backup_path, '{0}'.format(json.dumps(image_default_patch_configuration_backup)), mode='w+') - except Exception as error: - error_message = "[DNF] Exception during fetching and logging default auto update settings on the machine. [Exception={0}]".format(repr(error)) - self.composite_logger.log_error(error_message) - self.status_handler.add_error_to_status(error_message, Constants.PatchOperationErrorCodes.DEFAULT_ERROR) - raise - - def is_image_default_patch_configuration_backup_valid(self, image_default_patch_configuration_backup): - # Validate backup JSON for dnf5 automatic service - try: - if self.dnf5_auto_os_update_service in image_default_patch_configuration_backup \ - and self.dnf5_automatic_enable_on_reboot_identifier_text in image_default_patch_configuration_backup[self.dnf5_auto_os_update_service] \ - and self.dnf5_automatic_installation_state_identifier_text in image_default_patch_configuration_backup[self.dnf5_auto_os_update_service]: - self.composite_logger.log_debug("[DNF] Extension has a valid backup for default dnf5-automatic configuration settings") - return True - else: - self.composite_logger.log_debug("[DNF] Extension does not have a valid backup for default dnf5-automatic configuration settings") - return False - except Exception: - return False - - def update_os_patch_configuration_sub_setting(self, patch_configuration_sub_setting, value, patch_configuration_sub_setting_pattern_to_match): - raise NotImplementedError("DNF: update_os_patch_configuration_sub_setting not implemented yet") - - # Post Install method/ Install Patch - def is_reboot_pending(self): - """Checks if there is a pending reboot on the machine - Returns: - - Boolean: True if reboot is pending, False otherwise - """ - raise NotImplementedError("DNF: is_reboot_pending not implemented yet") - - # Post Install method / Install Patch - def do_processes_require_restart(self): - """Checks if processes require a restart due to updates - Commands used: - - sudo dnf -y install dnf-utils (installs dnf-utils if not already present) - - sudo LANG=en_US.UTF8 needs-restarting -r (checks if processes require restart) - Returns: - - Boolean: True if processes require restart, False otherwise - """ - raise NotImplementedError("DNF: do_processes_require_restart not implemented yet") - - def add_arch_dependencies(self, package_manager, package, version, packages, package_versions, package_and_dependencies, package_and_dependency_versions): - """ - Unnecessary for DNF because the package manager already handles multi-architecture dependencies automatically - Command Used to confirm above: sudo dnf -y install jq - """ - return - - def set_security_esm_package_status(self, operation, packages): - """No-op for dnf, tdnf, yum and zypper """ - return - - def separate_out_esm_packages(self, packages, package_versions): - """No-op for dnf, tdnf, yum and zypper """ - return - - def get_package_install_expected_avg_time_in_seconds(self): - raise NotImplementedError("DNF: get_package_install_expected_avg_time_in_seconds not implemented yet") - - # ConfigurePatch method - def revert_auto_os_update_to_system_default(self): - """ Reverts the auto OS update patch state on the machine to its system default value, if one exists in our backup file """ - raise NotImplementedError("DNF: revert_auto_os_update_to_system_default not implemented yet") - - # region auto OS updates - def __init_constants_for_dnf5_automatic(self): - self.dnf5_automatic_configuration_service = 'systemctl cat dnf5-automatic.service' - self.dnf5_automatic_install_check_cmd = 'rpm -qa | grep dnf5-plugin-automatic' - self.dnf5_automatic_enable_on_reboot_check_cmd = 'systemctl is-enabled dnf5-automatic.timer' - self.dnf5_automatic_disable_on_reboot_cmd = 'systemctl disable --now dnf5-automatic.timer' - self.dnf5_automatic_enable_on_reboot_cmd = 'systemctl enable --now dnf5-automatic.timer' - self.dnf5_automatic_config_pattern_match_text = None - # Detect them from ExecStart flags instead of a file: - self.dnf5_automatic_download_updates_identifier_text = '--downloadupdates' - self.dnf5_automatic_apply_updates_identifier_text = '--installupdates' - self.dnf5_automatic_enable_on_reboot_identifier_text = "enable_on_reboot" - self.dnf5_automatic_installation_state_identifier_text = "installation_state" - self.dnf5_auto_os_update_service = "dnf5-automatic" - - def __init_auto_update_for_dnf5_automatic(self): - """ Initializes all generic auto OS update variables with the config values for dnf5 automatic service """ - self.os_patch_configuration_settings_read_cmd = self.dnf5_automatic_configuration_service - self.download_updates_identifier_text = self.dnf5_automatic_download_updates_identifier_text - self.apply_updates_identifier_text = self.dnf5_automatic_apply_updates_identifier_text - self.enable_on_reboot_identifier_text = self.dnf5_automatic_enable_on_reboot_identifier_text - self.installation_state_identifier_text = self.dnf5_automatic_installation_state_identifier_text - self.enable_on_reboot_check_cmd = self.dnf5_automatic_enable_on_reboot_check_cmd - self.enable_on_reboot_cmd = self.dnf5_automatic_enable_on_reboot_cmd - self.install_check_cmd = self.dnf5_automatic_install_check_cmd - self.current_auto_os_update_service = self.dnf5_auto_os_update_service - - def __get_current_auto_os_updates_setting_on_machine(self): - """ Gets all the update settings related to auto OS updates via dnf """ - try: - download_updates_value = "" - apply_updates_value = "" - is_service_installed = False - enable_on_reboot_value = False - - # get install state - if not self.is_auto_update_service_installed(self.install_check_cmd): - return is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value - - is_service_installed = True - enable_on_reboot_value = self.is_service_set_to_enable_on_reboot(self.enable_on_reboot_check_cmd) - - self.composite_logger.log_debug( - "[DNF] Checking if auto updates are currently enabled...") - - # Check systemd service unit file for ExecStart flags to determine current settings - # Get the dnf5-automatic.service configuration - code, unit_output = self.env_layer.run_command_output(self.os_patch_configuration_settings_read_cmd, False, False) - - if code == 0: - self.composite_logger.log_debug( - "[DNF] Retrieved dnf5-automatic service unit configuration...") - - # ExecStart line format example: ExecStart=/usr/bin/dnf5 automatic --timer - for line in unit_output.split('\n'): - if line.strip().startswith('ExecStart=') and 'dnf5 automatic' in line: - self.composite_logger.log_debug("[DNF] ExecStart line: {0}".format(line)) - break - - return is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value - - except Exception as error: - raise Exception( - "[DNF] Error occurred in fetching current auto OS update settings from the machine (dnf5). [Exception={0}]".format( - repr(error))) - - def is_auto_update_service_installed(self, install_check_cmd): - """ Checks if the auto update service is installed on the VM """ - code, out = self.env_layer.run_command_output(install_check_cmd, False, False) - self.composite_logger.log_debug("[DNF] Checked if auto update service is installed. [Command={0}][Code={1}][Output={2}]".format(install_check_cmd, str(code), out)) - if len(out.strip()) > 0 and code == 0: - self.composite_logger.log_debug("[DNF] > Auto OS update service is installed on the machine") - return True - else: - self.composite_logger.log_debug("[DNF] > Auto OS update service is NOT installed on the machine") - return False - - def is_service_set_to_enable_on_reboot(self, command): - """ Checking if auto update is set to enable on reboot on the machine. An enable_on_reboot service will be activated (if currently inactive) on machine reboot """ - code, out = self.env_layer.run_command_output(command, False, False) - self.composite_logger.log_debug("[DNF] Checked if auto update service is set to enable on reboot. [Code={0}][Out={1}]".format(str(code), out)) - if len(out.strip()) > 0 and code == 0 and 'enabled' in out: - self.composite_logger.log_debug("[DNF] > Auto OS update service will enable on reboot") - return True - self.composite_logger.log_debug("[DNF] > Auto OS update service will NOT enable on reboot") - return False - - def enable_auto_update_on_reboot(self): - """ Enables machine default auto update on reboot """ - # type () -> None - command = self.enable_on_reboot_cmd - self.composite_logger.log_verbose("[DNF] Enabling auto update on reboot. [Command={0}] ".format(command)) - code, out = self.env_layer.run_command_output(command, False, False) - - if code != 0: - self.composite_logger.log_error("[DNF][ERROR] Error enabling auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) - error_msg = 'Unexpected return code (' + str(code) + ') on command: ' + command - self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.OPERATION_FAILED) - raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) - else: - self.composite_logger.log_debug("[DNF] Enabled auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) - - # endregion diff --git a/src/core/tests/Test_DnfPackageManager.py b/src/core/tests/Test_Dnf5PackageManager.py similarity index 51% rename from src/core/tests/Test_DnfPackageManager.py rename to src/core/tests/Test_Dnf5PackageManager.py index 280fe1ac..adf6ef98 100644 --- a/src/core/tests/Test_DnfPackageManager.py +++ b/src/core/tests/Test_Dnf5PackageManager.py @@ -13,10 +13,12 @@ # limitations under the License. # # Requires Python 2.7+ +import json +import os import unittest -from core.src.bootstrap.Constants import Constants from core.tests.library.ArgumentComposer import ArgumentComposer +from core.src.bootstrap.Constants import Constants from core.tests.library.RuntimeCompositor import RuntimeCompositor @@ -115,6 +117,101 @@ def test_get_current_auto_os_patch_state_with_installed_services_and_state_disab self.assertFalse(package_manager.image_default_patch_configuration_backup_exists()) self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.DISABLED) + def test_revert_auto_os_update_to_system_default_with_service_not_installed(self): + """Test revert when dnf5-automatic is not installed - should be no-op""" + self.runtime.set_legacy_test_type('SadPath') + package_manager = self.container.get('package_manager') + + # Create backup with service marked as not installed + package_manager.image_default_patch_configuration_backup_path = os.path.join(self.runtime.execution_config.config_folder, Constants.IMAGE_DEFAULT_PATCH_CONFIGURATION_BACKUP_PATH) + backup_config = { + "dnf5-automatic": { + "enable_on_reboot": False, + "installation_state": False + } + } + self.runtime.write_to_file(package_manager.image_default_patch_configuration_backup_path, json.dumps(backup_config)) + + # Should complete without error even when service is not installed + package_manager.revert_auto_os_update_to_system_default() + self.assertTrue(True) # If no exception, test passes + + def test_revert_auto_os_update_to_system_default_with_enable_on_reboot_true(self): + """Test revert when service should be enabled on reboot""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + + # Create backup with service marked as installed and should be enabled on reboot + package_manager.image_default_patch_configuration_backup_path = os.path.join(self.runtime.execution_config.config_folder, Constants.IMAGE_DEFAULT_PATCH_CONFIGURATION_BACKUP_PATH) + backup_config = { + "dnf5-automatic": { + "enable_on_reboot": True, + "installation_state": True + } + } + self.runtime.write_to_file(package_manager.image_default_patch_configuration_backup_path, json.dumps(backup_config)) + + # Mock run_command_output to simulate service installed and systemctl commands working + backup_run_command_output = self.runtime.env_layer.run_command_output + + def mock_commands(cmd, no_output=False, chk_err=False): + if 'rpm -qa | grep dnf5-plugin-automatic' in cmd: + return 0, 'dnf5-plugin-automatic-xyz' + elif 'systemctl is-enabled dnf5-automatic.timer' in cmd: + return 0, 'disabled' # Currently disabled, will be enabled by revert + elif 'systemctl enable --now dnf5-automatic.timer' in cmd: + return 0, '' # Enable succeeds + elif 'systemctl cat dnf5-automatic' in cmd: + return 0, '[Service]\nExecStart=/usr/bin/dnf5 automatic --timer\n' + return backup_run_command_output(cmd, no_output, chk_err) + + self.runtime.env_layer.run_command_output = mock_commands + + try: + package_manager.revert_auto_os_update_to_system_default() + # Verify it completed without error + self.assertTrue(True) + finally: + self.runtime.env_layer.run_command_output = backup_run_command_output + + def test_revert_auto_os_update_to_system_default_with_enable_on_reboot_false(self): + """Test revert when service should be disabled on reboot""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + + # Create backup with service marked as installed but should be disabled on reboot + package_manager.image_default_patch_configuration_backup_path = os.path.join(self.runtime.execution_config.config_folder, Constants.IMAGE_DEFAULT_PATCH_CONFIGURATION_BACKUP_PATH) + backup_config = { + "dnf5-automatic": { + "enable_on_reboot": False, + "installation_state": True + } + } + self.runtime.write_to_file(package_manager.image_default_patch_configuration_backup_path, json.dumps(backup_config)) + + # Mock run_command_output to simulate service installed but currently enabled + backup_run_command_output = self.runtime.env_layer.run_command_output + + def mock_commands(cmd, no_output=False, chk_err=False): + if 'rpm -qa | grep dnf5-plugin-automatic' in cmd: + return 0, 'dnf5-plugin-automatic-xyz' + elif 'systemctl is-enabled dnf5-automatic.timer' in cmd: + return 0, 'enabled' # Currently enabled, will be disabled by revert + elif 'systemctl disable --now dnf5-automatic.timer' in cmd: + return 0, '' # Disable succeeds + elif 'systemctl cat dnf5-automatic' in cmd: + return 0, '[Service]\nExecStart=/usr/bin/dnf5 automatic --timer\n' + return backup_run_command_output(cmd, no_output, chk_err) + + self.runtime.env_layer.run_command_output = mock_commands + + try: + package_manager.revert_auto_os_update_to_system_default() + # Verify it completed without error + self.assertTrue(True) + finally: + self.runtime.env_layer.run_command_output = backup_run_command_output + if __name__ == '__main__': unittest.main() From 98add11939979c57097b0586875a3ceadd5abad9 Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Wed, 27 May 2026 11:53:43 -0400 Subject: [PATCH 8/8] New Iteration --- .../package_managers/Dnf5PackageManager.py | 70 ++++++++++--------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/src/core/src/package_managers/Dnf5PackageManager.py b/src/core/src/package_managers/Dnf5PackageManager.py index 31e69bff..ffae8a33 100644 --- a/src/core/src/package_managers/Dnf5PackageManager.py +++ b/src/core/src/package_managers/Dnf5PackageManager.py @@ -14,7 +14,7 @@ # # Requires Python 2.7+ -"""DnfPackageManager for Azure Linux and RHEL""" +"""Dnf5PackageManager for Azure Linux and RHEL""" import json import re @@ -53,7 +53,14 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ self.commands_allowing_100_exitcode = [ "check-update" ] - # TODO: Add AzL4/Red hat 10 DNF5 specific initialization + + # DNF5 valid exit codes for simulation commands + self.dnf5_simulation_valid_exit_codes = [0, 1] + self.dnf5_dependency_failure_text = ["Skipping packages with broken dependencies", "Nothing to do."] + self.dnf5_dependency_success_text = "Installing dependencies:" + self.dnf5_dependency_exit_text = "Transaction Summary" + self.dnf5_dependency_skip_text = "Package" + self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, Constants.DNF) # Caching for updates @@ -261,7 +268,7 @@ def get_dependent_list(self, packages): - sudo dnf install --assumeno --skip-broken (simulates installation to find dependencies without actually installing) Returns: List of dependency package names required for the input packages - Sample output format: + Sample output format: ( Failure case : Dependency Fails and exit code : 0 ) - Raise Exception # Updating and loading repositories: # Repositories loaded. # Problem: package perl-Getopt-Long-1:2.58-521.azl4~20260501.noarch from azurelinux-base requires perl(Pod::Usage) >= 1.14, but none of the providers can be installed @@ -284,21 +291,36 @@ def get_dependent_list(self, packages): cmd = self.single_package_upgrade_simulation_cmd + package_names code, output = self.env_layer.run_command_output(cmd, False, False) self.composite_logger.log_verbose("[DNF5] Dependency simulation. [Command={0}][Code={1}]".format(cmd, str(code))) - if code not in [0, 1]: - self.composite_logger.log_error("[DNF5] Unexpected failure. [Command={0}][Code={1}][Output={2}]".format(cmd, str(code), output) - ) + if code not in self.dnf5_simulation_valid_exit_codes: + self.composite_logger.log_error("[DNF5] Unexpected failure. [Command={0}][Code={1}][Output={2}]".format(cmd, str(code), output)) raise Exception("DNF dependency simulation failed") - if "Skipping packages with broken dependencies" in output and "Nothing to do." in output: + if all(text in output for text in self.dnf5_dependency_failure_text): self.composite_logger.log_error("[DNF5] All packages skipped due to broken dependencies. [Command={0}]".format(cmd)) raise Exception("Dependency resolution failed: all packages skipped") + # Only go here for success case dependencies = self.extract_dependencies(output, packages) self.composite_logger.log_verbose("[DNF5] Resolved dependencies. [Command={0}][Packages={1}][DependencyCount={2}]".format(str(cmd), str(packages), len(dependencies))) return dependencies def extract_dependencies(self, output, packages): - """Extracts dependent packages from dnf5 output.""" + # Sample output format (Success case with dependencies , exit code : 1) + # Command : sudo dnf5 install --assumeno --skip-broken jq + # Updating and loading repositories: + # Repositories loaded. + # Package Arch Version Repository Size + # Installing: + # jq x86_64 1.8.1-3.azl4~20260501 azurelinux-base 457.7 KiB + # Installing dependencies: + # oniguruma x86_64 6.9.10-3.azl4~20260501 azurelinux-base 763.1 KiB + # + # Transaction Summary: + # Installing: 2 packages + # + # Total size of inbound packages is 428 KiB. Need to download 428 KiB. + # After this operation, 1 MiB extra will be used (install 1 MiB, remove 0 B). + # Operation aborted by the user. dependencies = [] package_arch_to_look_for = ["x86_64", "noarch", "i686", "aarch64"] @@ -309,25 +331,19 @@ def extract_dependencies(self, output, packages): line_str = lines[line_index].strip() # Detect start of dependency section - if line_str.startswith("Installing dependencies"): + if line_str.startswith(self.dnf5_dependency_success_text): in_dependency_section = True continue # Detect exit of dependency section - if line_str.startswith("Transaction Summary") or \ - line_str.startswith("Installing:") or \ - line_str.startswith("Skipping packages with broken dependencies") or \ - line_str.startswith("Problem:") or \ - line_str.startswith("Total size"): - in_dependency_section = False - continue + if in_dependency_section and line_str.startswith(self.dnf5_dependency_exit_text): + break - # Only parse dependency section if not in_dependency_section: continue # Skip empty/header lines - if not line_str or line_str.startswith("Package"): + if not line_str or line_str.startswith(self.dnf5_dependency_skip_text): self.composite_logger.log_verbose("[DNF5] > Skipping header/empty line: " + line_str) continue @@ -341,13 +357,9 @@ def extract_dependencies(self, output, packages): continue # Remove input packages (support both pkg and pkg.arch) - base_pkg = dependent_package_name.rsplit('.', 1)[ - 0] if '.' in dependent_package_name else dependent_package_name + base_pkg = dependent_package_name.rsplit('.', 1)[0] if '.' in dependent_package_name else dependent_package_name - if len(dependent_package_name) != 0 and \ - dependent_package_name not in packages and \ - base_pkg not in packages and \ - dependent_package_name not in dependencies: + if len(dependent_package_name) != 0 and dependent_package_name not in packages and base_pkg not in packages and dependent_package_name not in dependencies: self.composite_logger.log_verbose("[DNF5] > Dependency detected: " + dependent_package_name) dependencies.append(dependent_package_name) @@ -402,7 +414,6 @@ def install_security_updates_azgps_coordinated(self): - DNF5 does not support security classification. - This installs all available updates instead. """ - self.composite_logger.log_verbose("[DNF5] Invoking package manager. [Command={0}]".format(self.single_package_upgrade_cmd)) out, code = self.invoke_package_manager_advanced(self.single_package_upgrade_cmd,raise_on_exception=False) if code != 0: self.composite_logger.log_warning("[DNF5] Install failed. [Code={0}][Output={1}]".format(code, out)) @@ -564,14 +575,7 @@ def is_reboot_pending(self): # Post Install method / Install Patch def do_processes_require_restart(self): - """Checks if processes require a restart due to updates - Commands used: - - sudo dnf -y install dnf-utils (installs dnf-utils if not already present) - - sudo LANG=en_US.UTF8 needs-restarting -r (checks if processes require restart) - Returns: - - Boolean: True if processes require restart, False otherwise - """ - raise NotImplementedError("DNF: do_processes_require_restart not implemented yet") + raise NotImplementedError("DNF5 uses `needs-restarting` directly via is_reboot_pending(); separate implementation is not required.") def add_arch_dependencies(self, package_manager, package, version, packages, package_versions, package_and_dependencies, package_and_dependency_versions): """