diff --git a/sap/http/json_store.py b/sap/http/json_store.py new file mode 100644 index 00000000..59d2e759 --- /dev/null +++ b/sap/http/json_store.py @@ -0,0 +1,146 @@ +"""Generic file-backed JSON cache primitive shared by the auth subsystems. + +`JSONFileStore[T]` stores typed payloads as one JSON file per key, with: +- atomic write (tmp + rename) so a kill mid-write never leaves a corrupt file, +- POSIX permission hardening (0o700 on the directory, 0o600 on each file), +- corruption-tolerant reads (any IO/parse error is treated as a cache miss). + +Subclasses provide `_serialize` / `_deserialize` for their concrete payload +type T. See `sap.http.token_cache.FileTokenStore` for the OAuth specialization +and the auth-plugin response cache (forthcoming) for the second consumer. +""" + +from __future__ import annotations + +import os +import stat +import sys +import uuid +from pathlib import Path +from typing import Generic, Optional, TypeVar + +from platformdirs import PlatformDirs + +APP_NAME = "sapcli" + +T = TypeVar('T') + + +# --------------------------------------------------------------------------- +# Generic file-backed JSON store +# --------------------------------------------------------------------------- + +class JSONFileStore(Generic[T]): + """File-backed JSON store keyed by string. + + Layout: //.json + POSIX: directory chmod 0700, file chmod 0600 + Windows: relies on %LOCALAPPDATA% ACLs being user-scoped + """ + + def __init__(self, base_dir: Path, subdir: str) -> None: + self._dir = base_dir / subdir + self._dir.mkdir(parents=True, exist_ok=True) + _harden_dir(self._dir) + + def _serialize(self, value: T) -> str: + raise NotImplementedError("Subclasses must implement _serialize") + + def _deserialize(self, raw: str) -> T: + raise NotImplementedError("Subclasses must implement _deserialize") + + def get(self, key: str) -> Optional[T]: + """Return the payload stored under `key`, or None if absent or corrupt.""" + + path = self._path_for(key) + if not path.exists(): + return None + try: + return self._deserialize(path.read_text(encoding="utf-8")) + except (OSError, ValueError, KeyError, TypeError): + # Corrupt or unreadable - treat as absent rather than crash. + # TypeError covers schema drift (e.g. cached JSON is now a list + # instead of the expected mapping, so dict-style access blows up). + return None + + def set(self, key: str, value: T) -> None: + """Store `value` under `key`, overwriting any existing entry atomically.""" + + path = self._path_for(key) + # Unique tmp filename per write so concurrent writers (different + # processes or threads sharing the same cache dir) never trample each + # other's in-flight files. Atomic os.replace at the end gives the + # "no half-written file on disk if killed mid-write" guarantee. + tmp = path.parent / f'{path.name}.{uuid.uuid4().hex}.tmp' + try: + tmp.write_text(self._serialize(value), encoding='utf-8') + _harden_file(tmp) + os.replace(tmp, path) + except Exception: + # On serialize/IO failure, drop the unique tmp so we do not + # accumulate stale .tmp garbage in the cache directory. + tmp.unlink(missing_ok=True) + raise + + def delete(self, key: str) -> None: + """Remove the entry for `key`. No-op if it doesn't exist.""" + + try: + self._path_for(key).unlink() + except FileNotFoundError: + # Do not crash on such lame reason + pass + + def _path_for(self, key: str) -> Path: + return self._dir / f"{_sanitize(key)}.json" + + +# --------------------------------------------------------------------------- +# Path + permission helpers +# --------------------------------------------------------------------------- + +def _default_cache_dir() -> Path: + """Return the platform-appropriate per-user cache directory for sapcli.""" + + dirs = PlatformDirs(appname=APP_NAME, roaming=False) + if sys.platform == "linux": + path = Path(dirs.user_state_dir) + elif sys.platform == "darwin": + path = Path(dirs.user_data_dir) + elif sys.platform == "win32": + path = Path(dirs.user_data_dir) + else: + path = Path(dirs.user_state_dir) + path.mkdir(parents=True, exist_ok=True) + _harden_dir(path) + return path + + +def _harden_dir(path: Path) -> None: + if os.name == "posix": + try: + path.chmod(stat.S_IRWXU) # 0o700 + except OSError: + # Best-effort hardening: chmod can fail on read-only or sandboxed + # filesystems (containers, NFS mounts, mandatory-access-control + # setups). The caller already placed the directory inside the + # user's per-user cache root, so the OS-level isolation is the + # real defense; the explicit 0700 is belt-and-braces. + pass + + +def _harden_file(path: Path) -> None: + if os.name == "posix": + try: + path.chmod(stat.S_IRUSR | stat.S_IWUSR) # 0o600 + except OSError: + # Same rationale as _harden_dir: best-effort hardening, the + # surrounding directory's perms are the actual access boundary. + pass + + +def _sanitize(key: str) -> str: + """Make `key` safe for use as a filename component.""" + + safe = "".join(c if c.isalnum() or c in "-._" else "_" for c in key) + return safe or "default" diff --git a/sap/http/token_cache.py b/sap/http/token_cache.py index f61aec31..6e83847b 100644 --- a/sap/http/token_cache.py +++ b/sap/http/token_cache.py @@ -1,6 +1,7 @@ -"""Token storage for thetool with a swappable backend. +"""Token storage for the tool with a swappable backend. -Current implementation: plaintext JSON files in the per-user cache directory. +Current implementation: plaintext JSON files in the per-user cache directory +(see `sap.http.json_store.JSONFileStore`). Future implementation: OS credential manager (Keychain / Credential Manager / Secret Service) — drop in a new TokenStore subclass and change the factory. @@ -16,27 +17,37 @@ by client_id (or provider:client_id) means the same store handles all of them without interface churn. If you only ever have one token, key="default" is fine. - -The atomic write. write_text followed by os.replace is the standard -pattern for "never leave a corrupt file on disk if interrupted." Worth keeping -even though the failure mode is rare. """ from __future__ import annotations import json -import os -import stat -import sys from abc import ABC, abstractmethod from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path from typing import Optional -from platformdirs import PlatformDirs - -APP_NAME = "sapcli" +from sap.http.json_store import ( + JSONFileStore, + _default_cache_dir, + _harden_dir, + _harden_file, + _sanitize, +) + +# Re-exported for backward compatibility with code that imported these +# helpers from sap.http.token_cache. +__all__ = [ + 'Token', + 'TokenStore', + 'FileTokenStore', + 'get_token_store', + '_default_cache_dir', + '_harden_dir', + '_harden_file', + '_sanitize', +] # --------------------------------------------------------------------------- @@ -115,52 +126,21 @@ def delete(self, key: str) -> None: # File-based implementation (today) # --------------------------------------------------------------------------- -class FileTokenStore(TokenStore): - """Stores tokens as plaintext JSON files, one per key. +class FileTokenStore(JSONFileStore[Token], TokenStore): + """File-backed Token store under /tokens/. - Layout: /tokens/.json - POSIX: directory chmod 0700, file chmod 0600 - Windows: relies on %LOCALAPPDATA% ACLs being user-scoped + JSONFileStore comes first in the MRO so its concrete get/set/delete + satisfy the abstract methods declared on TokenStore. """ def __init__(self, base_dir: Optional[Path] = None) -> None: - self._base_dir = base_dir or _default_cache_dir() - self._tokens_dir = self._base_dir / "tokens" - self._tokens_dir.mkdir(parents=True, exist_ok=True) - _harden_dir(self._tokens_dir) - - # -- TokenStore ----------------------------------------------------- - - def get(self, key: str) -> Optional[Token]: - path = self._path_for(key) - if not path.exists(): - return None - try: - return Token.from_json(path.read_text(encoding="utf-8")) - except (OSError, ValueError, KeyError): - # Corrupt or unreadable — treat as absent rather than crash. - return None - - def set(self, key: str, token: Token) -> None: - path = self._path_for(key) - # Write to a temp file and rename, so we never leave a half-written - # token behind if the process is killed mid-write. - tmp = path.with_suffix(path.suffix + '.tmp') - tmp.write_text(token.to_json(), encoding='utf-8') - _harden_file(tmp) - os.replace(tmp, path) + super().__init__(base_dir or _default_cache_dir(), "tokens") - def delete(self, key: str) -> None: - try: - self._path_for(key).unlink() - except FileNotFoundError: - # Do not crash on such lame reason - pass - - # -- internals ------------------------------------------------------ + def _serialize(self, value: Token) -> str: + return value.to_json() - def _path_for(self, key: str) -> Path: - return self._tokens_dir / f"{_sanitize(key)}.json" + def _deserialize(self, raw: str) -> Token: + return Token.from_json(raw) # --------------------------------------------------------------------------- @@ -183,44 +163,3 @@ def get_token_store() -> TokenStore: _token_store = FileTokenStore() return _token_store - - -# --------------------------------------------------------------------------- -# Path + permission helpers -# --------------------------------------------------------------------------- - -def _default_cache_dir() -> Path: - dirs = PlatformDirs(appname=APP_NAME, roaming=False) - if sys.platform == "linux": - path = Path(dirs.user_state_dir) - elif sys.platform == "darwin": - path = Path(dirs.user_data_dir) - elif sys.platform == "win32": - path = Path(dirs.user_data_dir) - else: - path = Path(dirs.user_state_dir) - path.mkdir(parents=True, exist_ok=True) - _harden_dir(path) - return path - - -def _harden_dir(path: Path) -> None: - if os.name == "posix": - try: - path.chmod(stat.S_IRWXU) # 0o700 - except OSError: - pass - - -def _harden_file(path: Path) -> None: - if os.name == "posix": - try: - path.chmod(stat.S_IRUSR | stat.S_IWUSR) # 0o600 - except OSError: - pass - - -def _sanitize(key: str) -> str: - """Make `key` safe for use as a filename component.""" - safe = "".join(c if c.isalnum() or c in "-._" else "_" for c in key) - return safe or "default" diff --git a/test/unit/test_sap_http_json_store.py b/test/unit/test_sap_http_json_store.py new file mode 100644 index 00000000..bade4b73 --- /dev/null +++ b/test/unit/test_sap_http_json_store.py @@ -0,0 +1,477 @@ +#!/usr/bin/env python3 + +"""Tests for sap.http.json_store. + +All filesystem and platformdirs interaction is mocked. No real file is opened, +created, chmod'd, or replaced anywhere in this module. +""" + +import json +import stat +import unittest +from dataclasses import dataclass +from pathlib import Path +from unittest.mock import Mock, patch + +import sap.http.json_store as json_store +from sap.http.json_store import ( + JSONFileStore, + _default_cache_dir, + _harden_dir, + _harden_file, + _sanitize, +) + + +# --------------------------------------------------------------------------- +# Disk-write guard (same shape as test_sap_http_token_cache) +# --------------------------------------------------------------------------- + +_module_patchers = [] + + +def setUpModule(): + patcher_mkdir = patch('pathlib.Path.mkdir', return_value=None) + patcher_mkdir.start() + _module_patchers.append(patcher_mkdir) + + patcher_chmod = patch('pathlib.Path.chmod', return_value=None) + patcher_chmod.start() + _module_patchers.append(patcher_chmod) + + patcher_exists = patch('pathlib.Path.exists', return_value=False) + patcher_exists.start() + _module_patchers.append(patcher_exists) + + patcher_unlink = patch('pathlib.Path.unlink', return_value=None) + patcher_unlink.start() + _module_patchers.append(patcher_unlink) + + patcher_read = patch('pathlib.Path.read_text', return_value='') + patcher_read.start() + _module_patchers.append(patcher_read) + + patcher_write = patch('pathlib.Path.write_text', return_value=None) + patcher_write.start() + _module_patchers.append(patcher_write) + + patcher_replace = patch('sap.http.json_store.os.replace', return_value=None) + patcher_replace.start() + _module_patchers.append(patcher_replace) + + +def tearDownModule(): + while _module_patchers: + _module_patchers.pop().stop() + + +# --------------------------------------------------------------------------- +# A tiny payload type used to exercise JSONFileStore generically +# --------------------------------------------------------------------------- + +@dataclass(frozen=True) +class _Sample: + name: str + value: int + + def to_json(self) -> str: + return json.dumps({'name': self.name, 'value': self.value}) + + @classmethod + def from_json(cls, raw: str) -> '_Sample': + data = json.loads(raw) + return cls(name=data['name'], value=data['value']) + + +class _SampleStore(JSONFileStore[_Sample]): + """Concrete JSONFileStore[_Sample] used by the tests below.""" + + def __init__(self, base_dir: Path, subdir: str = 'samples'): + super().__init__(base_dir, subdir) + + def _serialize(self, value: _Sample) -> str: + return value.to_json() + + def _deserialize(self, raw: str) -> _Sample: + return _Sample.from_json(raw) + + +# --------------------------------------------------------------------------- +# JSONFileStore default _serialize / _deserialize +# --------------------------------------------------------------------------- + +class TestJSONFileStoreUnimplementedHooks(unittest.TestCase): + """A bare JSONFileStore is not directly usable - subclasses must override.""" + + @patch.object(Path, 'mkdir') + def test_default_serialize_raises_not_implemented(self, _mock_mkdir): + store = JSONFileStore(Path('/fake/base'), 'subdir') + + with self.assertRaises(NotImplementedError): + store._serialize(object()) + + @patch.object(Path, 'mkdir') + def test_default_deserialize_raises_not_implemented(self, _mock_mkdir): + store = JSONFileStore(Path('/fake/base'), 'subdir') + + with self.assertRaises(NotImplementedError): + store._deserialize('{}') + + +# --------------------------------------------------------------------------- +# JSONFileStore.__init__ +# --------------------------------------------------------------------------- + +class TestJSONFileStoreInit(unittest.TestCase): + + @patch.object(Path, 'mkdir') + def test_creates_subdir_under_base(self, mock_mkdir): + _SampleStore(Path('/fake/base')) + + mock_mkdir.assert_called_once_with(parents=True, exist_ok=True) + + @patch.object(Path, 'chmod') + @patch.object(Path, 'mkdir') + def test_hardens_subdir_on_posix(self, _mock_mkdir, mock_chmod): + with patch.object(json_store, 'os') as mock_os: + mock_os.name = 'posix' + _SampleStore(Path('/fake/base')) + + mock_chmod.assert_called_once_with(stat.S_IRWXU) + + @patch.object(Path, 'chmod') + @patch.object(Path, 'mkdir') + def test_does_not_chmod_on_non_posix(self, _mock_mkdir, mock_chmod): + with patch.object(json_store, 'os') as mock_os: + mock_os.name = 'nt' + _SampleStore(Path('/fake/base')) + + mock_chmod.assert_not_called() + + @patch.object(Path, 'mkdir') + def test_subdir_name_is_used(self, _mock_mkdir): + store = _SampleStore(Path('/fake/base'), subdir='custom-name') + + self.assertEqual(store._dir, Path('/fake/base/custom-name')) + + +# --------------------------------------------------------------------------- +# JSONFileStore.get +# --------------------------------------------------------------------------- + +class TestJSONFileStoreGet(unittest.TestCase): + + def setUp(self): + self.store = _SampleStore(Path('/fake/base')) + + @patch.object(Path, 'read_text') + @patch.object(Path, 'exists', return_value=True) + def test_returns_payload_when_file_exists(self, _mock_exists, mock_read): + original = _Sample(name='n', value=42) + mock_read.return_value = original.to_json() + + result = self.store.get('mykey') + + self.assertEqual(result, original) + + @patch.object(Path, 'exists', return_value=False) + def test_returns_none_when_file_missing(self, _mock_exists): + result = self.store.get('mykey') + + self.assertIsNone(result) + + @patch.object(Path, 'read_text', side_effect=OSError('disk error')) + @patch.object(Path, 'exists', return_value=True) + def test_returns_none_on_os_error(self, _mock_exists, _mock_read): + result = self.store.get('mykey') + + self.assertIsNone(result) + + @patch.object(Path, 'read_text', return_value='{ this is not valid JSON') + @patch.object(Path, 'exists', return_value=True) + def test_returns_none_on_invalid_json(self, _mock_exists, _mock_read): + result = self.store.get('mykey') + + self.assertIsNone(result) + + @patch.object(Path, 'read_text', return_value='{"unexpected": "shape"}') + @patch.object(Path, 'exists', return_value=True) + def test_returns_none_when_deserialize_raises_keyerror(self, _mock_exists, _mock_read): + # _Sample.from_json reads required keys; missing ones raise KeyError + # which JSONFileStore.get must treat as a corrupt-cache miss. + result = self.store.get('mykey') + + self.assertIsNone(result) + + @patch.object(Path, 'read_text', return_value='[1, 2, 3]') + @patch.object(Path, 'exists', return_value=True) + def test_returns_none_when_deserialize_raises_typeerror(self, _mock_exists, _mock_read): + # Cached payload deserializes into a list (schema drift). _Sample + # then indexes it with string keys, which raises TypeError. get() + # must treat that as a corrupt-cache miss too. + result = self.store.get('mykey') + + self.assertIsNone(result) + + +# --------------------------------------------------------------------------- +# JSONFileStore.set +# --------------------------------------------------------------------------- + +class TestJSONFileStoreSet(unittest.TestCase): + + def setUp(self): + self.store = _SampleStore(Path('/fake/base')) + + @patch('sap.http.json_store.os.replace') + @patch.object(Path, 'write_text') + def test_writes_payload_then_renames_atomically(self, mock_write, mock_replace): + sample = _Sample(name='n', value=1) + + self.store.set('mykey', sample) + + mock_write.assert_called_once() + mock_replace.assert_called_once() + + tmp_path, final_path = mock_replace.call_args.args + self.assertTrue(str(tmp_path).endswith('.tmp')) + self.assertFalse(str(final_path).endswith('.tmp')) + + @patch('sap.http.json_store.os.replace') + @patch.object(Path, 'write_text') + def test_writes_serialized_json(self, mock_write, _mock_replace): + sample = _Sample(name='hello', value=99) + + self.store.set('mykey', sample) + + written_payload = mock_write.call_args.args[0] + self.assertIn('hello', written_payload) + self.assertIn('99', written_payload) + + @patch('sap.http.json_store.os.replace') + @patch.object(Path, 'write_text') + def test_uses_utf8_encoding(self, mock_write, _mock_replace): + self.store.set('mykey', _Sample(name='n', value=1)) + + self.assertEqual(mock_write.call_args.kwargs['encoding'], 'utf-8') + + @patch('sap.http.json_store.os.replace') + @patch.object(Path, 'chmod') + @patch.object(Path, 'write_text') + def test_hardens_tmp_file_on_posix(self, _mock_write, mock_chmod, _mock_replace): + with patch.object(json_store, 'os') as mock_os: + mock_os.name = 'posix' + self.store.set('mykey', _Sample(name='n', value=1)) + + mock_chmod.assert_called_once_with(stat.S_IRUSR | stat.S_IWUSR) + + @patch('sap.http.json_store.os.replace') + @patch.object(Path, 'write_text') + def test_uses_unique_tmp_filename_per_write(self, _mock_write, mock_replace): + # Two writes to the same key must use distinct tmp paths so concurrent + # writers cannot collide on a fixed '.json.tmp' name. + self.store.set('mykey', _Sample(name='n', value=1)) + self.store.set('mykey', _Sample(name='n', value=2)) + + self.assertEqual(mock_replace.call_count, 2) + first_tmp = mock_replace.call_args_list[0].args[0] + second_tmp = mock_replace.call_args_list[1].args[0] + self.assertNotEqual(first_tmp, second_tmp) + + # Both still target the same final path. + first_final = mock_replace.call_args_list[0].args[1] + second_final = mock_replace.call_args_list[1].args[1] + self.assertEqual(first_final, second_final) + + @patch('sap.http.json_store.os.replace') + @patch.object(Path, 'unlink') + @patch.object(Path, 'write_text', side_effect=OSError('disk full')) + def test_cleans_up_tmp_on_write_failure(self, _mock_write, mock_unlink, mock_replace): + with self.assertRaises(OSError): + self.store.set('mykey', _Sample(name='n', value=1)) + + # The unique tmp must be removed so it does not pile up as garbage. + mock_unlink.assert_called_once() + # And the failed write must NOT be promoted to the final path. + mock_replace.assert_not_called() + + +# --------------------------------------------------------------------------- +# JSONFileStore.delete +# --------------------------------------------------------------------------- + +class TestJSONFileStoreDelete(unittest.TestCase): + + def setUp(self): + self.store = _SampleStore(Path('/fake/base')) + + @patch.object(Path, 'unlink') + def test_unlinks_existing_file(self, mock_unlink): + self.store.delete('mykey') + + mock_unlink.assert_called_once() + + @patch.object(Path, 'unlink', side_effect=FileNotFoundError()) + def test_silently_ignores_missing_file(self, _mock_unlink): + # Must not raise. + self.store.delete('mykey') + + +# --------------------------------------------------------------------------- +# JSONFileStore._path_for +# --------------------------------------------------------------------------- + +class TestJSONFileStorePathFor(unittest.TestCase): + + def setUp(self): + self.store = _SampleStore(Path('/fake/base')) + + def test_path_lives_under_subdir(self): + path = self.store._path_for('mykey') + + self.assertEqual(path, Path('/fake/base/samples/mykey.json')) + + def test_path_sanitizes_special_chars(self): + path = self.store._path_for('https://x|client-1') + + self.assertEqual(path, Path('/fake/base/samples/https___x_client-1.json')) + + +# --------------------------------------------------------------------------- +# _sanitize +# --------------------------------------------------------------------------- + +class TestSanitize(unittest.TestCase): + + def test_alphanumeric_passes_through(self): + self.assertEqual(_sanitize('abcXYZ123'), 'abcXYZ123') + + def test_dash_dot_underscore_pass_through(self): + self.assertEqual(_sanitize('a-b.c_d'), 'a-b.c_d') + + def test_special_chars_become_underscore(self): + self.assertEqual(_sanitize('a/b\\c|d:e'), 'a_b_c_d_e') + + def test_pipe_replaced_with_underscore(self): + # OAuth helpers key by '|'. + self.assertEqual(_sanitize('https://x.example.com|client-1'), + 'https___x.example.com_client-1') + + def test_empty_input_returns_default(self): + self.assertEqual(_sanitize(''), 'default') + + def test_only_special_chars_does_not_collapse_to_default(self): + # The 'default' fallback only fires for empty input. + self.assertEqual(_sanitize('!!!'), '___') + + +# --------------------------------------------------------------------------- +# _default_cache_dir +# --------------------------------------------------------------------------- + +class TestDefaultCacheDir(unittest.TestCase): + + def _patch_platform(self, platform_name, dirs_attrs): + platform_dirs_mock = Mock(**dirs_attrs) + return ( + patch.object(json_store.sys, 'platform', platform_name), + patch('sap.http.json_store.PlatformDirs', return_value=platform_dirs_mock), + platform_dirs_mock, + ) + + def test_uses_user_state_dir_on_linux(self): + sys_p, pd_p, _pd_mock = self._patch_platform( + 'linux', {'user_state_dir': '/state', 'user_data_dir': '/data'} + ) + with sys_p, pd_p: + self.assertEqual(_default_cache_dir(), Path('/state')) + + def test_uses_user_data_dir_on_darwin(self): + sys_p, pd_p, _pd_mock = self._patch_platform( + 'darwin', {'user_state_dir': '/state', 'user_data_dir': '/data'} + ) + with sys_p, pd_p: + self.assertEqual(_default_cache_dir(), Path('/data')) + + def test_uses_user_data_dir_on_win32(self): + sys_p, pd_p, _pd_mock = self._patch_platform( + 'win32', {'user_state_dir': '/state', 'user_data_dir': '/data'} + ) + with sys_p, pd_p: + self.assertEqual(_default_cache_dir(), Path('/data')) + + def test_falls_back_to_user_state_dir_on_unknown_platform(self): + sys_p, pd_p, _pd_mock = self._patch_platform( + 'something-exotic', {'user_state_dir': '/state', 'user_data_dir': '/data'} + ) + with sys_p, pd_p: + self.assertEqual(_default_cache_dir(), Path('/state')) + + @patch.object(Path, 'mkdir') + def test_creates_directory(self, mock_mkdir): + sys_p, pd_p, _pd_mock = self._patch_platform( + 'linux', {'user_state_dir': '/state', 'user_data_dir': '/data'} + ) + with sys_p, pd_p: + _default_cache_dir() + + mock_mkdir.assert_called_once_with(parents=True, exist_ok=True) + + +# --------------------------------------------------------------------------- +# _harden_dir / _harden_file +# --------------------------------------------------------------------------- + +class TestHardenHelpers(unittest.TestCase): + + def test_harden_dir_chmods_0700_on_posix(self): + path = Mock() + with patch.object(json_store, 'os') as mock_os: + mock_os.name = 'posix' + _harden_dir(path) + + path.chmod.assert_called_once_with(stat.S_IRWXU) + + def test_harden_dir_is_noop_on_non_posix(self): + path = Mock() + with patch.object(json_store, 'os') as mock_os: + mock_os.name = 'nt' + _harden_dir(path) + + path.chmod.assert_not_called() + + def test_harden_dir_swallows_oserror(self): + path = Mock() + path.chmod.side_effect = OSError('permission denied') + with patch.object(json_store, 'os') as mock_os: + mock_os.name = 'posix' + # Must not raise. + _harden_dir(path) + + def test_harden_file_chmods_0600_on_posix(self): + path = Mock() + with patch.object(json_store, 'os') as mock_os: + mock_os.name = 'posix' + _harden_file(path) + + path.chmod.assert_called_once_with(stat.S_IRUSR | stat.S_IWUSR) + + def test_harden_file_is_noop_on_non_posix(self): + path = Mock() + with patch.object(json_store, 'os') as mock_os: + mock_os.name = 'nt' + _harden_file(path) + + path.chmod.assert_not_called() + + def test_harden_file_swallows_oserror(self): + path = Mock() + path.chmod.side_effect = OSError('readonly') + with patch.object(json_store, 'os') as mock_os: + mock_os.name = 'posix' + # Must not raise. + _harden_file(path) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/test_sap_http_token_cache.py b/test/unit/test_sap_http_token_cache.py index ed3193d8..bf4e956b 100644 --- a/test/unit/test_sap_http_token_cache.py +++ b/test/unit/test_sap_http_token_cache.py @@ -4,26 +4,26 @@ All filesystem and platformdirs interaction is mocked. No real file is opened, created, chmod'd, or replaced anywhere in this module — patches on -pathlib.Path methods, sap.http.token_cache.os, and PlatformDirs guarantee that. +pathlib.Path methods, sap.http.json_store.os, and PlatformDirs guarantee that. + +Generic JSONFileStore behaviour (atomic write, permission hardening, key +sanitization, path-for resolution, default cache dir) is exercised in +test_sap_http_json_store.py. This file focuses on the Token-specific specialization: +Token (de)serialization and the FileTokenStore wiring. """ import json -import stat import unittest from dataclasses import FrozenInstanceError from datetime import datetime, timedelta, timezone from pathlib import Path -from unittest.mock import Mock, patch +from unittest.mock import patch import sap.http.token_cache as token_cache from sap.http.token_cache import ( FileTokenStore, Token, TokenStore, - _default_cache_dir, - _harden_dir, - _harden_file, - _sanitize, get_token_store, ) @@ -32,28 +32,42 @@ # Disk-write guard # # Patches every Path method that could touch the filesystem at module scope, so -# any path that forgets a per-test patch still cannot reach disk. Per-test -# decorators override these guards within their scope and the originals are -# restored automatically on teardown. +# any path that forgets a per-test patch still cannot reach disk. os.replace is +# now invoked from sap.http.json_store (where JSONFileStore lives), so the patch +# targets that module. # --------------------------------------------------------------------------- _module_patchers = [] def setUpModule(): - targets = [ - ('pathlib.Path.mkdir', None), - ('pathlib.Path.chmod', None), - ('pathlib.Path.exists', False), - ('pathlib.Path.unlink', None), - ('pathlib.Path.read_text', ''), - ('pathlib.Path.write_text', None), - ('sap.http.token_cache.os.replace', None), - ] - for target, return_value in targets: - patcher = patch(target, return_value=return_value) - patcher.start() - _module_patchers.append(patcher) + patcher_mkdir = patch('pathlib.Path.mkdir', return_value=None) + patcher_mkdir.start() + _module_patchers.append(patcher_mkdir) + + patcher_chmod = patch('pathlib.Path.chmod', return_value=None) + patcher_chmod.start() + _module_patchers.append(patcher_chmod) + + patcher_exists = patch('pathlib.Path.exists', return_value=False) + patcher_exists.start() + _module_patchers.append(patcher_exists) + + patcher_unlink = patch('pathlib.Path.unlink', return_value=None) + patcher_unlink.start() + _module_patchers.append(patcher_unlink) + + patcher_read = patch('pathlib.Path.read_text', return_value='') + patcher_read.start() + _module_patchers.append(patcher_read) + + patcher_write = patch('pathlib.Path.write_text', return_value=None) + patcher_write.start() + _module_patchers.append(patcher_write) + + patcher_replace = patch('sap.http.json_store.os.replace', return_value=None) + patcher_replace.start() + _module_patchers.append(patcher_replace) def tearDownModule(): @@ -210,24 +224,6 @@ def test_uses_default_cache_dir_when_base_dir_is_none(self, _mock_mkdir, mock_de mock_default.assert_called_once() - @patch.object(Path, 'chmod') - @patch.object(Path, 'mkdir') - def test_hardens_tokens_directory_on_posix(self, _mock_mkdir, mock_chmod): - with patch.object(token_cache, 'os') as mock_os: - mock_os.name = 'posix' - FileTokenStore(base_dir=Path('/fake/base')) - - mock_chmod.assert_called_once_with(stat.S_IRWXU) - - @patch.object(Path, 'chmod') - @patch.object(Path, 'mkdir') - def test_does_not_chmod_on_non_posix(self, _mock_mkdir, mock_chmod): - with patch.object(token_cache, 'os') as mock_os: - mock_os.name = 'nt' - FileTokenStore(base_dir=Path('/fake/base')) - - mock_chmod.assert_not_called() - # --------------------------------------------------------------------------- # FileTokenStore.get @@ -279,7 +275,7 @@ class TestFileTokenStoreSet(unittest.TestCase): def setUp(self): self.store = FileTokenStore(base_dir=Path('/fake/base')) - @patch('sap.http.token_cache.os.replace') + @patch('sap.http.json_store.os.replace') @patch.object(Path, 'write_text') def test_writes_token_then_renames_atomically(self, mock_write, mock_replace): token = Token(access_token='abc') @@ -293,7 +289,7 @@ def test_writes_token_then_renames_atomically(self, mock_write, mock_replace): self.assertTrue(str(tmp_path).endswith('.tmp')) self.assertFalse(str(final_path).endswith('.tmp')) - @patch('sap.http.token_cache.os.replace') + @patch('sap.http.json_store.os.replace') @patch.object(Path, 'write_text') def test_writes_serialized_json(self, mock_write, _mock_replace): token = Token(access_token='hello-world', refresh_token='r-1') @@ -304,7 +300,7 @@ def test_writes_serialized_json(self, mock_write, _mock_replace): self.assertIn('hello-world', written_payload) self.assertIn('r-1', written_payload) - @patch('sap.http.token_cache.os.replace') + @patch('sap.http.json_store.os.replace') @patch.object(Path, 'write_text') def test_uses_utf8_encoding(self, mock_write, _mock_replace): token = Token(access_token='abc') @@ -313,19 +309,6 @@ def test_uses_utf8_encoding(self, mock_write, _mock_replace): self.assertEqual(mock_write.call_args.kwargs['encoding'], 'utf-8') - @patch('sap.http.token_cache.os.replace') - @patch.object(Path, 'chmod') - @patch.object(Path, 'write_text') - def test_hardens_tmp_file_on_posix(self, _mock_write, mock_chmod, _mock_replace): - with patch.object(token_cache, 'os') as mock_os: - # set() also calls os.replace — keep it as a mock call we don't care about - mock_os.name = 'posix' - self.store.set('mykey', Token(access_token='abc')) - - # The write_text path's chmod (file 0600). The dir's chmod happened in - # __init__ before this test, where it was patched away separately. - mock_chmod.assert_called_once_with(stat.S_IRUSR | stat.S_IWUSR) - # --------------------------------------------------------------------------- # FileTokenStore.delete @@ -349,33 +332,9 @@ def test_silently_ignores_missing_file(self, _mock_unlink): # --------------------------------------------------------------------------- -# _path_for / _sanitize +# FileTokenStore.path_for # --------------------------------------------------------------------------- -class TestSanitize(unittest.TestCase): - - def test_alphanumeric_passes_through(self): - self.assertEqual(_sanitize('abcXYZ123'), 'abcXYZ123') - - def test_dash_dot_underscore_pass_through(self): - self.assertEqual(_sanitize('a-b.c_d'), 'a-b.c_d') - - def test_special_chars_become_underscore(self): - self.assertEqual(_sanitize('a/b\\c|d:e'), 'a_b_c_d_e') - - def test_pipe_replaced_with_underscore(self): - # OAuth helpers key by '|'. - self.assertEqual(_sanitize('https://x.example.com|client-1'), - 'https___x.example.com_client-1') - - def test_empty_input_returns_default(self): - self.assertEqual(_sanitize(''), 'default') - - def test_only_special_chars_does_not_collapse_to_default(self): - # The 'default' fallback only fires for empty input. - self.assertEqual(_sanitize('!!!'), '___') - - class TestFileTokenStorePathFor(unittest.TestCase): def setUp(self): @@ -383,11 +342,8 @@ def setUp(self): def test_path_lives_under_tokens_subdir(self): path = self.store._path_for('mykey') - self.assertEqual(path, Path('/fake/base/tokens/mykey.json')) - def test_path_sanitizes_special_chars(self): - path = self.store._path_for('https://x|client-1') - self.assertEqual(path, Path('/fake/base/tokens/https___x_client-1.json')) + self.assertEqual(path, Path('/fake/base/tokens/mykey.json')) # --------------------------------------------------------------------------- @@ -426,115 +382,5 @@ def test_constructs_file_token_store_only_once(self, _mock_default, mock_ctor): mock_ctor.assert_called_once() -# --------------------------------------------------------------------------- -# _default_cache_dir -# --------------------------------------------------------------------------- - -class TestDefaultCacheDir(unittest.TestCase): - - def _patch_platform(self, platform_name, dirs_attrs): - """Patch sys.platform and PlatformDirs; return the mock so callers can assert.""" - - platform_dirs_mock = Mock(**dirs_attrs) - return ( - patch.object(token_cache.sys, 'platform', platform_name), - patch('sap.http.token_cache.PlatformDirs', return_value=platform_dirs_mock), - platform_dirs_mock, - ) - - def test_uses_user_state_dir_on_linux(self): - sys_p, pd_p, _pd_mock = self._patch_platform( - 'linux', {'user_state_dir': '/state', 'user_data_dir': '/data'} - ) - with sys_p, pd_p: - self.assertEqual(_default_cache_dir(), Path('/state')) - - def test_uses_user_data_dir_on_darwin(self): - sys_p, pd_p, _pd_mock = self._patch_platform( - 'darwin', {'user_state_dir': '/state', 'user_data_dir': '/data'} - ) - with sys_p, pd_p: - self.assertEqual(_default_cache_dir(), Path('/data')) - - def test_uses_user_data_dir_on_win32(self): - sys_p, pd_p, _pd_mock = self._patch_platform( - 'win32', {'user_state_dir': '/state', 'user_data_dir': '/data'} - ) - with sys_p, pd_p: - self.assertEqual(_default_cache_dir(), Path('/data')) - - def test_falls_back_to_user_state_dir_on_unknown_platform(self): - sys_p, pd_p, _pd_mock = self._patch_platform( - 'something-exotic', {'user_state_dir': '/state', 'user_data_dir': '/data'} - ) - with sys_p, pd_p: - self.assertEqual(_default_cache_dir(), Path('/state')) - - @patch.object(Path, 'mkdir') - def test_creates_directory(self, mock_mkdir): - sys_p, pd_p, _pd_mock = self._patch_platform( - 'linux', {'user_state_dir': '/state', 'user_data_dir': '/data'} - ) - with sys_p, pd_p: - _default_cache_dir() - - mock_mkdir.assert_called_once_with(parents=True, exist_ok=True) - - -# --------------------------------------------------------------------------- -# _harden_dir / _harden_file -# --------------------------------------------------------------------------- - -class TestHardenHelpers(unittest.TestCase): - - def test_harden_dir_chmods_0700_on_posix(self): - path = Mock() - with patch.object(token_cache, 'os') as mock_os: - mock_os.name = 'posix' - _harden_dir(path) - - path.chmod.assert_called_once_with(stat.S_IRWXU) - - def test_harden_dir_is_noop_on_non_posix(self): - path = Mock() - with patch.object(token_cache, 'os') as mock_os: - mock_os.name = 'nt' - _harden_dir(path) - - path.chmod.assert_not_called() - - def test_harden_dir_swallows_oserror(self): - path = Mock() - path.chmod.side_effect = OSError('permission denied') - with patch.object(token_cache, 'os') as mock_os: - mock_os.name = 'posix' - # Must not raise. - _harden_dir(path) - - def test_harden_file_chmods_0600_on_posix(self): - path = Mock() - with patch.object(token_cache, 'os') as mock_os: - mock_os.name = 'posix' - _harden_file(path) - - path.chmod.assert_called_once_with(stat.S_IRUSR | stat.S_IWUSR) - - def test_harden_file_is_noop_on_non_posix(self): - path = Mock() - with patch.object(token_cache, 'os') as mock_os: - mock_os.name = 'nt' - _harden_file(path) - - path.chmod.assert_not_called() - - def test_harden_file_swallows_oserror(self): - path = Mock() - path.chmod.side_effect = OSError('readonly') - with patch.object(token_cache, 'os') as mock_os: - mock_os.name = 'posix' - # Must not raise. - _harden_file(path) - - if __name__ == '__main__': unittest.main()