diff --git a/sample_wsgidav.yaml b/sample_wsgidav.yaml index 31ffdfa9..e80e9e62 100644 --- a/sample_wsgidav.yaml +++ b/sample_wsgidav.yaml @@ -234,6 +234,9 @@ impersonator: # or, use WebDAV (HTTP) usernames as is custom_user_mapping: null + # do not allow impersonating as system users (UID <= 999) + reject_system_users: false + # ---------------------------------------------------------------------------- # CORS diff --git a/wsgidav/mw/impersonator.py b/wsgidav/mw/impersonator.py index 0b6254d5..7d449798 100644 --- a/wsgidav/mw/impersonator.py +++ b/wsgidav/mw/impersonator.py @@ -18,50 +18,48 @@ import os import pwd from contextlib import AbstractContextManager -from typing import Tuple from wsgidav import util from wsgidav.mw.base_mw import BaseMiddleware _logger = util.get_module_logger(__name__) -_init_euid = os.geteuid() -_init_egid = os.getegid() -_logger.debug(f"impersonator: initial uid:gid = {_init_euid}:{_init_egid}") +_init_pwd = pwd.getpwuid(os.geteuid()) +_logger.debug(f"impersonator: initial uid:gid = {_init_pwd.pw_uid}:{_init_pwd.pw_gid}") class ImpersonateContext(AbstractContextManager): - def __init__(self, ids: Tuple[int, int] | None) -> None: - if ids is None: + def __init__(self, pwd: pwd.struct_passwd | None) -> None: + if pwd is None: self._enabled = False return + self._enabled = True - self._new_euid = ids[0] - self._new_egid = ids[1] - self._old_euid = os.geteuid() - self._old_egid = os.getegid() + self._pwd = pwd - if self._old_euid != _init_euid or self._old_egid != _init_egid: + if os.geteuid() != _init_pwd.pw_uid or os.getegid() != _init_pwd.pw_gid: raise Exception( - "old ids mismatched with init ids: " - f"{self._old_euid}:{self._old_egid} versus {_init_euid}:{_init_egid}, " + "current ids mismatched with init ids: " + f"{os.geteuid()}:{os.getegid()} versus {_init_pwd.pw_uid}:{_init_pwd.pw_gid}, " "multithreading MUST be disabled for impersonator to function correctly" ) def __enter__(self): if not self._enabled: return - os.seteuid(self._new_euid) - os.setegid(self._new_egid) - _logger.debug(f"impersonator: set uid:gid = {self._new_euid}:{self._new_egid}") + os.seteuid(self._pwd.pw_uid) + os.setegid(self._pwd.pw_gid) + os.initgroups(self._pwd.pw_name, self._pwd.pw_gid) + _logger.debug(f"impersonator: set uid:gid = {os.geteuid()}:{os.getegid()}") + _logger.debug(f"impersonator: set groups = {os.getgroups()}") def __exit__(self, exc_type, exc_value, traceback, /): if not self._enabled: return - os.seteuid(self._old_euid) - os.setegid(self._old_egid) - _logger.debug( - f"impersonator: reset uid:gid = {self._old_euid}:{self._old_egid}" - ) + os.seteuid(_init_pwd.pw_uid) + os.setegid(_init_pwd.pw_gid) + os.initgroups(_init_pwd.pw_name, _init_pwd.pw_gid) + _logger.debug(f"impersonator: reset uid:gid = {os.geteuid()}:{os.getegid()}") + _logger.debug(f"impersonator: reset groups = {os.getgroups()}") class Impersonator(BaseMiddleware): @@ -77,7 +75,7 @@ def __call__(self, environ, start_response): def is_disabled(self): # type: ignore return not self.get_config("impersonator.enable", False) # type: ignore - def _map_id(self, username: str) -> Tuple[int, int] | None: + def _map_id(self, username: str) -> pwd.struct_passwd | None: if self.is_disabled(): return None @@ -93,9 +91,7 @@ def _map_id(self, username: str) -> Tuple[int, int] | None: ) # type: ignore if unix_username is None: - raise RuntimeError( - f"Failed mapping HTTP username '{username}' to Unix username" - ) + raise RuntimeError(f"Failed mapping HTTP username '{username}' to Unix username") _logger.debug( f"impersonator: HTTP user {username or '(anonymous)'} -> Unix user {unix_username}" @@ -103,12 +99,15 @@ def _map_id(self, username: str) -> Tuple[int, int] | None: try: passwd = pwd.getpwnam(unix_username) + if self.get_config("impersonator.reject_system_users", False) and passwd.pw_uid <= 999: + raise ValueError + except ValueError: + raise RuntimeError( + f"Unix user '{unix_username}' is a system user, impersonating rejected" + ) from None except Exception: raise RuntimeError( f"Unix username '{unix_username}' does not exist" ) from None else: - _logger.debug( - f"impersonator: Unix user {unix_username} -> uid:gid = {passwd.pw_uid}:{passwd.pw_gid}" - ) - return (passwd.pw_uid, passwd.pw_gid) + return passwd