Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sample_wsgidav.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ impersonator:
# or, use WebDAV (HTTP) usernames as is
custom_user_mapping: null

# do not allow impersonating as system users (UID <= 999)
reject_system_users: false


# ----------------------------------------------------------------------------
# CORS
Expand Down
57 changes: 28 additions & 29 deletions wsgidav/mw/impersonator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,48 @@
import os
import pwd
from contextlib import AbstractContextManager
from typing import Tuple

from wsgidav import util
from wsgidav.mw.base_mw import BaseMiddleware

_logger = util.get_module_logger(__name__)
_init_euid = os.geteuid()
_init_egid = os.getegid()
_logger.debug(f"impersonator: initial uid:gid = {_init_euid}:{_init_egid}")
_init_pwd = pwd.getpwuid(os.geteuid())
_logger.debug(f"impersonator: initial uid:gid = {_init_pwd.pw_uid}:{_init_pwd.pw_gid}")


class ImpersonateContext(AbstractContextManager):
def __init__(self, ids: Tuple[int, int] | None) -> None:
if ids is None:
def __init__(self, pwd: pwd.struct_passwd | None) -> None:
if pwd is None:
self._enabled = False
return

self._enabled = True
self._new_euid = ids[0]
self._new_egid = ids[1]
self._old_euid = os.geteuid()
self._old_egid = os.getegid()
self._pwd = pwd

if self._old_euid != _init_euid or self._old_egid != _init_egid:
if os.geteuid() != _init_pwd.pw_uid or os.getegid() != _init_pwd.pw_gid:
raise Exception(
"old ids mismatched with init ids: "
f"{self._old_euid}:{self._old_egid} versus {_init_euid}:{_init_egid}, "
"current ids mismatched with init ids: "
f"{os.geteuid()}:{os.getegid()} versus {_init_pwd.pw_uid}:{_init_pwd.pw_gid}, "
"multithreading MUST be disabled for impersonator to function correctly"
)

def __enter__(self):
if not self._enabled:
return
os.seteuid(self._new_euid)
os.setegid(self._new_egid)
_logger.debug(f"impersonator: set uid:gid = {self._new_euid}:{self._new_egid}")
os.seteuid(self._pwd.pw_uid)
os.setegid(self._pwd.pw_gid)
os.initgroups(self._pwd.pw_name, self._pwd.pw_gid)
_logger.debug(f"impersonator: set uid:gid = {os.geteuid()}:{os.getegid()}")
_logger.debug(f"impersonator: set groups = {os.getgroups()}")

def __exit__(self, exc_type, exc_value, traceback, /):
if not self._enabled:
return
os.seteuid(self._old_euid)
os.setegid(self._old_egid)
_logger.debug(
f"impersonator: reset uid:gid = {self._old_euid}:{self._old_egid}"
)
os.seteuid(_init_pwd.pw_uid)
os.setegid(_init_pwd.pw_gid)
os.initgroups(_init_pwd.pw_name, _init_pwd.pw_gid)
_logger.debug(f"impersonator: reset uid:gid = {os.geteuid()}:{os.getegid()}")
_logger.debug(f"impersonator: reset groups = {os.getgroups()}")


class Impersonator(BaseMiddleware):
Expand All @@ -77,7 +75,7 @@ def __call__(self, environ, start_response):
def is_disabled(self): # type: ignore
return not self.get_config("impersonator.enable", False) # type: ignore

def _map_id(self, username: str) -> Tuple[int, int] | None:
def _map_id(self, username: str) -> pwd.struct_passwd | None:
if self.is_disabled():
return None

Expand All @@ -93,22 +91,23 @@ def _map_id(self, username: str) -> Tuple[int, int] | None:
) # type: ignore

if unix_username is None:
raise RuntimeError(
f"Failed mapping HTTP username '{username}' to Unix username"
)
raise RuntimeError(f"Failed mapping HTTP username '{username}' to Unix username")

_logger.debug(
f"impersonator: HTTP user {username or '(anonymous)'} -> Unix user {unix_username}"
)

try:
passwd = pwd.getpwnam(unix_username)
if self.get_config("impersonator.reject_system_users", False) and passwd.pw_uid <= 999:
raise ValueError
except ValueError:
raise RuntimeError(
f"Unix user '{unix_username}' is a system user, impersonating rejected"
) from None
except Exception:
raise RuntimeError(
f"Unix username '{unix_username}' does not exist"
) from None
else:
_logger.debug(
f"impersonator: Unix user {unix_username} -> uid:gid = {passwd.pw_uid}:{passwd.pw_gid}"
)
return (passwd.pw_uid, passwd.pw_gid)
return passwd
Loading