diff --git a/README.md b/README.md index d6a3cdc3..a9a92582 100644 --- a/README.md +++ b/README.md @@ -200,6 +200,40 @@ sapcli config use-context dev The tool asks only for user and password if missing. All other parameters either have own default value or causes fatal error if not provided. +### Authentication strategies + +In addition to HTTP Basic Auth and OAuth 2.0 (for SAP BTP ABAP +Environment / "Steampunk"), sapcli supports **kubectl-style auth +plugins** for cases neither covers: SAML2 single sign-on, Windows +client certificates pulled from the Windows Certificate Store, or any +corporate IdP that needs a browser. The plugin is an external command +that you point sapcli at: + +```yaml +users: + sso-user: + auth_plugin: + command: /path/to/your-plugin + parameters: + channel: msedge +``` + +sapcli runs the plugin once, caches the cookies (or token, or cert +reference) it returns under `~/.local/state/sapcli/` (or +`~/Library/Application Support/sapcli/` on macOS, +`%LOCALAPPDATA%\sapcli\` on Windows), and reuses them across ADT, gCTS, +and OData commands until they expire. +`sapcli --auth-plugin-invalidate-cache` forces a re-authentication. + +The plugin can be implemented in any language and pull in whatever +dependencies it needs (playwright for browser SSO, pywin32 for the +Windows cert store, ...) without polluting sapcli's installation. A +reference plugin that wraps Basic Auth lives at +`plugins/auth/basic-auth-cookies.py`. See the +[Auth plugins section](doc/configuration.md#auth-plugins) of the +configuration documentation for the protocol and writing-your-own +guide. + Find the complete documentation in [doc/configuration.md](doc/configuration.md) ### RFC usage diff --git a/doc/configuration.md b/doc/configuration.md index f38ed001..9c3afaed 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -7,7 +7,7 @@ a configuration file. The priority order from highest to lowest is: 2. **Environment variables** - override config file values 3. **Configuration file** (active context) - overrides defaults 4. **Built-in defaults** - used when nothing else is specified -5. **Interactive prompt** - fallback for mandatory values (user, password) when no SNC config is present and no valid OAuth token is cached +5. **Interactive prompt** - fallback for mandatory values (user, password) when no SNC config is present, no valid OAuth token is cached, and no auth plugin is configured ## Parameters @@ -109,6 +109,33 @@ Overrides the `SAPCLI_CONTEXT` environment variable and `current-context` in the sapcli --context prod program read ZREPORT ``` +### --auth-plugin-invalidate-cache + +Drop any cached auth-plugin response for the active context before +authenticating. The next command will re-run the configured plugin. Has +no effect unless an [auth plugin](#auth-plugins) is configured for the +active user. + +```bash +sapcli --auth-plugin-invalidate-cache abap systeminfo +``` + +### --auth-plugin-disable-cache + +Disable on-disk caching of the auth-plugin response entirely: sapcli +neither reads from nor writes to the cache, and any pre-existing entry +for the active context is deleted before the plugin runs. + +Same behaviour can be achieved by setting the environment variable +`SAPCLI_AUTH_PLUGIN_DISABLE_CACHE` (to any non-false token), or by +adding `disable_cache: true` inside the `auth_plugin` mapping in the +config file. Precedence: CLI flag > env var > config file. The +[Response caching](#response-caching) section discusses the trade-off. + +```bash +sapcli --auth-plugin-disable-cache abap systeminfo +``` + ## Configuration file ### File location @@ -241,6 +268,11 @@ authentication. See [OAuth 2.0 authentication](#oauth-20-authentication) below. |---|---|---|---|---| | `user` | string | yes | - | `SAP_USER` | | `password` | string | no | - | `SAP_PASSWORD` | +| `auth_plugin` | mapping | no | - | (config only) | + +`auth_plugin` is mutually exclusive with `password` and with OAuth fields +on the same logical session. See [Auth plugins](#auth-plugins) for the +plugin contract and configuration shape. #### `contexts.` @@ -275,9 +307,12 @@ The recommended approaches, in order of preference: 1. **Use OAuth 2.0** - if your system supports it (e.g. SAP cloud systems), prefer OAuth over a stored password. See [OAuth 2.0 authentication](#oauth-20-authentication) below. -2. **Omit the password from config** - sapcli will prompt interactively -3. **Use environment variables** - `SAP_PASSWORD` overrides the config file; suitable for CI/CD pipelines -4. **Store in config file** - acceptable for local development if the file has restrictive permissions (`chmod 600`) +2. **Use an auth plugin** - for SAML2 SSO, Windows client certificates, + or any other method that does not fit OAuth or BasicAuth. See + [Auth plugins](#auth-plugins) below. +3. **Omit the password from config** - sapcli will prompt interactively +4. **Use environment variables** - `SAP_PASSWORD` overrides the config file; suitable for CI/CD pipelines +5. **Store in config file** - acceptable for local development if the file has restrictive permissions (`chmod 600`) sapcli will warn if the config file is world-readable and contains passwords. @@ -343,6 +378,220 @@ file: rm ~/.sapcli/tokens.json ``` +### Auth plugins + +Some authentication methods cannot reasonably be implemented inside +sapcli itself — SAML2 SSO requires running a browser, Windows client +certificates live in the Windows Certificate Store and cannot be +exported to a file, and a long tail of corporate IdPs each have their +own quirks. sapcli takes the kubectl approach for these cases: an +external command — the **auth plugin** — performs the authentication and +returns either cookies, an `Authorization` header, or a client +certificate. sapcli applies the result to the HTTP session and proceeds. + +The plugin is invoked as a subprocess. sapcli writes a JSON request to +its stdin and reads a JSON response from its stdout. The plugin can be +implemented in any language and can pull in whatever dependencies it +needs — playwright for browser SSO, pywin32 for the Windows certificate +store, openssl shelled out from a bash script — without any of those +leaking into sapcli's installation. + +The same auth plugin handles ADT, gCTS (REST), and OData commands. ABAP +session cookies are server-wide, so authenticating once primes the +cache for all three. + +#### Enabling an auth plugin + +Configure the plugin on a user definition: + +```yaml +users: + sso-user: + auth_plugin: + command: /absolute/path/to/your/plugin + parameters: + channel: msedge # optional, plugin-specific key/value pairs + disable_cache: false # optional, opt out of on-disk response caching +``` + +| Field | Type | Required | Default | Notes | +|---|---|---|---|---| +| `command` | string | yes | - | Absolute path to the plugin executable. | +| `parameters` | mapping | no | `{}` | Verbatim key/value pairs forwarded to the plugin as `parameters` in its stdin JSON. | +| `disable_cache` | bool | no | `false` | When true, sapcli does not write the plugin response to disk and ignores any existing entry. See [Response caching](#response-caching) for the env var and CLI flag overrides. | + +`auth_plugin` is **mutually exclusive** with `password` and with the +OAuth fields on the same logical session — the plugin is the one source +of truth for credentials. sapcli rejects the configuration if both are +present. + +`auth_plugin` is configured **only** in the config file, not via CLI +flags or environment variables. It is a structured value, not a scalar, +and its presence flips the entire authentication mode. (The +`disable_cache` knob inside it is the one exception — it can also be +set via `--auth-plugin-disable-cache` or `SAPCLI_AUTH_PLUGIN_DISABLE_CACHE`.) + +#### Response caching + +The first call to a plugin can be slow — browser-based SSO routinely +takes 20+ seconds while the user clicks through the IdP. sapcli caches +the response between invocations so subsequent commands run at sapcli's +native speed. + +| Aspect | Value | +|---|---| +| Cache location (Linux) | `~/.local/state/sapcli/auth_plugin_responses/` | +| Cache location (macOS) | `~/Library/Application Support/sapcli/auth_plugin_responses/` | +| Cache location (Windows) | `%LOCALAPPDATA%\sapcli\auth_plugin_responses\` | +| Cache key | `\|\|` triple. Changing any of the three mints a new entry; identical triples share one entry across ADT, gCTS, and OData commands. | +| File permissions | Directory `0700`, file `0600` on POSIX. | +| Expiration | Honoured when the plugin's response includes an `expiration` ISO 8601 timestamp (with a 30 s leeway to avoid racing the server's clock). Plugins that omit it cache indefinitely; the server eventually invalidates the cached cookies and the next command falls back to a fresh plugin run. | + +To force a fresh plugin run, pass `--auth-plugin-invalidate-cache` on +the command line: + +```bash +sapcli --auth-plugin-invalidate-cache abap systeminfo +``` + +You can also delete the cache file directly — useful for scripted +cleanup. + +##### Disabling the cache entirely + +Some deployments cannot tolerate session credentials being written to +disk at all (corporate DLP policies, shared jump hosts, compliance +regimes that forbid at-rest persistence of bearer material). For those +cases, caching can be turned off altogether through any of the +following — in precedence order: + +| Layer | How | +|---|---| +| CLI | `--auth-plugin-disable-cache` | +| Env var | `SAPCLI_AUTH_PLUGIN_DISABLE_CACHE=true` (any non-false token; `no`/`off`/`false`/`n` switch it back off) | +| Config | `auth_plugin.disable_cache: true` on the user definition | + +When in effect, sapcli (1) does not read from the cache, (2) does not +write to it, and (3) deletes any pre-existing entry for the active +context before the plugin runs. + +The trade-off: every sapcli invocation re-runs the plugin from scratch. +For browser-based SSO that is the difference between a sub-second +command and a 20-second one. Within a single sapcli invocation that +hits more than one connection type (ADT + gCTS + OData), the plugin is +re-invoked per connection — there is no in-process fallback today; this +is a known limitation when disabling the cache. + +#### The plugin protocol + +##### Request (stdin) + +```json +{ + "connection": { + "proto": "https", + "ashost": "abap.example.org", + "port": "44300", + "client": "100", + "type": "adt", + "path": "/sap/bc/adt/core/discovery", + "sysnr": null, + "verify": true, + "ssl_server_cert": null + }, + "parameters": { + "channel": "msedge" + } +} +``` + +- `type` is one of `adt`, `rest`, `odata` — set by sapcli based on which + command is running, so a plugin that supports multiple endpoints can + pick the right one. +- `path` is the endpoint that sapcli's built-in flow uses for the same + connection type. Use it unchanged unless your auth flow needs a + different one. +- `verify` and `ssl_server_cert` mirror the `ssl_verify` and + `ssl_server_cert` connection settings. Plugins must honour them when + making HTTPS calls so they do not bypass the user's TLS policy. +- `parameters` is the verbatim `parameters:` map from the configuration + — pass plugin-specific knobs through here. + +##### Response (stdout) + +```json +{ + "message": "Authentication successful", + "expiration": "2026-05-08T23:59:59Z", + "content": { + "type": "cookie", + "cookies": [ + {"name": "SAP_SESSIONID_X01_100", "value": "...", "domain": "abap.example.org", "path": "/", "secure": true} + ] + } +} +``` + +`content.type` selects the authentication mechanism applied to the HTTP +session. Three values are supported: + +| `content.type` | Other fields | Effect on the session | +|---|---|---| +| `cookie` | `cookies: [{name, value, domain?, path?, expires?, secure?}, ...]` | Adds the cookies to `requests.Session.cookies`. | +| `http_authorization_header` | `headers: {: , ...}` | Sets the headers on the session (typically `Authorization`). | +| `certificates` | `certificate: `, `key: `, `issuer_certificate: ?` | Sets `session.cert` and optionally `session.verify` to the supplied file paths. | + +`expiration` is optional. If present and ISO 8601, sapcli stops using +the cached response when the timestamp comes within 30 s of now and +re-runs the plugin. If absent, the response is cached indefinitely (the +server is the source of truth for invalidation). + +##### Failure + +A non-zero exit code from the plugin means authentication failed. +sapcli prints the plugin's `message` field (if it emitted valid JSON on +stdout), along with the captured stdout and stderr, and stops. A plugin +that prints invalid JSON on stdout is treated the same way. + +#### Reference plugin + +sapcli ships a proof-of-concept plugin at +`plugins/auth/basic-auth-cookies.py` that performs HTTP Basic auth and +returns the resulting session cookies. It exists primarily to exercise +the protocol end-to-end against a real ABAP system without needing a +browser-automation plugin set up, and to serve as a template for +writing your own. Credentials are read from `SAP_USER` and +`SAP_PASSWORD` environment variables so they never appear in the config +file. + +```yaml +users: + basic-auth-via-plugin: + auth_plugin: + command: /absolute/path/to/sapcli/plugins/auth/basic-auth-cookies.py +``` + +```bash +SAP_USER=DEVELOPER SAP_PASSWORD=secret \ + sapcli --context my-system abap systeminfo +``` + +#### Writing your own plugin + +A plugin is any executable that: + +1. Reads a single JSON object from stdin (the request shape above). +2. Performs authentication using whatever mechanism it needs. +3. Writes a single JSON object to stdout (the response shape above). +4. Exits 0 on success or non-zero on failure. + +There is no required language, library, or interface to import — the +contract is the JSON envelope on stdin/stdout, full stop. Look at +`plugins/auth/basic-auth-cookies.py` for a minimal Python example. A +browser-based SSO plugin would replace the `requests.get` call with a +`playwright.sync_api` flow that opens a window, waits for the user to +log in, and reads cookies out of the resulting browser context. + ## Config management commands ```bash diff --git a/plugins/auth/basic-auth-cookies.py b/plugins/auth/basic-auth-cookies.py new file mode 100755 index 00000000..7c22fdb4 --- /dev/null +++ b/plugins/auth/basic-auth-cookies.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +"""sapcli auth plugin: HTTP Basic Auth -> session cookies. + +Performs HTTP Basic authentication against the ABAP system and returns the +session cookies set by the server. Functionally equivalent to sapcli's +built-in BasicAuth flow; its purpose is to exercise the auth-plugin +protocol end-to-end with the simplest possible authentication mechanism +so the rest of the plugin pipeline (subprocess invocation, JSON envelope, +content-type dispatch) can be validated against a real ABAP system. + +Credentials come from environment variables, NOT from the request payload +or the plugin's `parameters`. This keeps plaintext secrets out of the +sapcli config file: + + SAP_USER - logon user + SAP_PASSWORD - logon password + +Configure in your sapcli config: + + users: + basic-auth-cookies-user: + auth_plugin: + command: /absolute/path/to/plugins/auth/basic-auth-cookies.py + +Manual invocation (for end-to-end testing without the full CLI wiring): + + echo '{ + "connection": { + "proto": "https", + "ashost": "abap.example.org", + "port": "443", + "client": "100", + "type": "adt", + "path": "/sap/bc/adt/core/systeminformation" + }, + "parameters": {} + }' | SAP_USER=me SAP_PASSWORD=secret \\ + python3 plugins/auth/basic-auth-cookies.py +""" + +import json +import os +import sys + +import requests +from requests.auth import HTTPBasicAuth + + +def _build_url(connection): + return ( + f"{connection['proto']}://{connection['ashost']}:" + f"{connection['port']}{connection['path']}" + ) + + +def _serialize_cookies(jar): + cookies = [] + for cookie in jar: + entry = {'name': cookie.name, 'value': cookie.value} + if cookie.domain: + entry['domain'] = cookie.domain + if cookie.path: + entry['path'] = cookie.path + if cookie.expires is not None: + entry['expires'] = cookie.expires + if cookie.secure: + entry['secure'] = True + cookies.append(entry) + return cookies + + +def _emit(message, content=None): + payload = {'message': message, 'content': content or {}} + sys.stdout.write(json.dumps(payload)) + + +def _fail(message, code=1): + _emit(message) + sys.exit(code) + + +def main(): + try: + request = json.loads(sys.stdin.read()) + except json.JSONDecodeError as ex: + _fail(f'Invalid JSON request on stdin: {ex}') + + connection = request.get('connection') + if not isinstance(connection, dict): + _fail("Request is missing the 'connection' object") + + required = ('proto', 'ashost', 'port', 'path', 'client') + missing = [name for name in required if not connection.get(name)] + if missing: + _fail( + 'Request connection is missing required field(s): ' + + ', '.join(missing) + ) + + user = os.environ.get('SAP_USER') + password = os.environ.get('SAP_PASSWORD') + if not user or not password: + _fail('SAP_USER and SAP_PASSWORD environment variables must be set') + + # requests' verify is either bool or path to a CA bundle. ssl_server_cert + # wins when set, otherwise we fall back to the boolean. + verify = connection.get('ssl_server_cert') or connection.get('verify', True) + if verify is False: + # Suppress the InsecureRequestWarning the user has explicitly + # opted into via ssl_verify: false / SAP_SSL_VERIFY=no. + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + try: + # GET, not HEAD: newer ABAP systems return 400 on HEAD against + # /sap/bc/adt/core/discovery and only set the session cookie on + # GET. Mirrors sap.adt.core.Connection's login_method='GET'. + response = requests.get( + _build_url(connection), + params={'sap-client': connection['client']}, + auth=HTTPBasicAuth(user, password), + headers={'x-csrf-token': 'Fetch'}, + verify=verify, + timeout=30, + ) + except requests.RequestException as ex: + _fail(f'Request failed: {ex}') + + if response.status_code == 401: + _fail(f'Authentication failed for user {user!r}: HTTP 401') + if response.status_code >= 400: + _fail( + f'Login endpoint returned HTTP {response.status_code}: ' + f'{response.text[:200]}' + ) + + cookies = _serialize_cookies(response.cookies) + if not cookies: + _fail('Server did not set any cookies on the response') + + _emit( + 'Authentication successful', + content={'type': 'cookie', 'cookies': cookies}, + ) + + +if __name__ == '__main__': + main() diff --git a/sap/cli/__init__.py b/sap/cli/__init__.py index d8457c11..bbf44a5f 100644 --- a/sap/cli/__init__.py +++ b/sap/cli/__init__.py @@ -12,6 +12,7 @@ from sap import rfc from sap.config import SAPCliConfigError from sap.errors import SAPCliError +from sap.http.auth_plugin_cache import cache_key_for, get_response_store class CommandsCache: @@ -129,7 +130,14 @@ def adt_connection_from_args(args): import sap.adt - session_initializer = _build_session_initializer(args) + # ADT's built-in login is GET on /sap/bc/adt/core/discovery (see + # sap.adt.core.Connection); plugins use the same endpoint so the + # cookies they collect are the ones sapcli would have collected. + session_initializer = _build_session_initializer( + args, + conn_type='adt', + conn_path='/sap/bc/adt/core/discovery', + ) return sap.adt.Connection( args.ashost, args.client, args.user, args.password, @@ -138,14 +146,26 @@ def adt_connection_from_args(args): session_initializer=session_initializer) -def _build_session_initializer(args): - """Build an OAuthHTTPSessionInitializer when args.token_url is set, - otherwise return None so HTTPClient falls back to BasicAuth. +def _build_session_initializer(args, conn_type=None, conn_path=None): + """Pick the HTTPSessionInitializer for the given args. + + Precedence: auth_plugin > OAuth > None (HTTPClient falls back to + BasicAuth). The three are mutually exclusive - the spec for + auth_plugin says so explicitly, and OAuth+plugin would be + nonsensical anyway since both want to own the session's auth. """ - token_url = args.token_url - client_id = args.client_id - client_secret = args.client_secret + if getattr(args, 'auth_plugin', None): + # Mutual exclusivity with user/password and OAuth is enforced at + # config-resolution time - see _resolve_auth_plugin_default. By the + # time we get here, args.password may have been populated from env + # (the plugin's subprocess inherits it), and that is fine. + return _build_plugin_initializer(args, conn_type, conn_path) + + token_url = getattr(args, 'token_url', None) + client_id = getattr(args, 'client_id', None) + client_secret = getattr(args, 'client_secret', None) + if not token_url and not client_id and not client_secret: return None @@ -163,6 +183,68 @@ def _build_session_initializer(args): ) +def _build_plugin_initializer(args, conn_type, conn_path): + """Construct an HTTPExternalSessionInitializer from args.auth_plugin.""" + + from sap.http.auth_plugin import ConnectionInfo + from sap.http.external_session_initializer import ( + HTTPExternalSessionInitializer, + ) + + plugin_config = args.auth_plugin + if not isinstance(plugin_config, dict): + raise SAPCliError( + "auth_plugin must be a mapping with a 'command' field" + ) + + command = plugin_config.get('command') + if not command: + raise SAPCliError("auth_plugin is missing required field 'command'") + + parameters = plugin_config.get('parameters') or {} + + proto = 'https' if args.ssl else 'http' + # ConnectionInfo.port is str (matches the wire format the plugin sees). + # args.port is int from argparse, so we coerce here rather than mutate + # the user-facing args namespace. + connection = ConnectionInfo( + proto=proto, + ashost=args.ashost, + port=str(args.port), + client=args.client, + type=conn_type, + path=conn_path, + sysnr=getattr(args, 'sysnr', None), + verify=bool(args.verify), + ssl_server_cert=getattr(args, 'ssl_server_cert', None), + ) + + cache_key = getattr(args, 'auth_plugin_cache_key', None) + disable_cache = getattr(args, 'auth_plugin_disable_cache', False) + + # --auth-plugin-invalidate-cache drops the entry before the initializer + # runs. The subsequent initialize_session call will then take the + # cache-miss path and store a fresh response. --auth-plugin-disable-cache + # piggy-backs on the same delete: when the user opts out of on-disk + # caching, any pre-existing entry must be scrubbed (defense in depth + # - the on-disk file is the credential they want gone). + if cache_key and (getattr(args, 'auth_plugin_invalidate_cache', False) or disable_cache): + get_response_store().delete(cache_key) + + if disable_cache: + # cache_key=None tells HTTPExternalSessionInitializer to skip both + # reads and writes (see test_no_cache_key_skips_cache_entirely). + cache_key = None + + return HTTPExternalSessionInitializer( + command=command, + parameters=parameters, + connection=connection, + user=args.user, + cache_key=cache_key, + ) + + def rfc_connection_from_args(args): """Returns RFC connection constructed from the passed args (Namespace) """ @@ -187,19 +269,38 @@ def gcts_connection_from_args(args): import sap.rest + # gCTS REST login lives at /sap/bc/cts_abapvcs/system. ABAP session + # cookies are server-wide, so a plugin that authenticates here also + # works for ADT/OData against the same system - cache reuse is the + # whole point of the shared (context, connection, user) key. + session_initializer = _build_session_initializer( + args, + conn_type='rest', + conn_path='/sap/bc/cts_abapvcs/system', + ) + return sap.rest.Connection('sap/bc/cts_abapvcs', 'system', args.ashost, args.client, args.user, args.password, port=args.port, ssl=args.ssl, - verify=args.verify, ssl_server_cert=args.ssl_server_cert) + verify=args.verify, ssl_server_cert=args.ssl_server_cert, + session_initializer=session_initializer) def odata_connection_from_args(service_name, args): - """Returns RFC connection constructed from the passed args (Namespace) + """Returns OData connection constructed from the passed args (Namespace). """ import sap.odata + + session_initializer = _build_session_initializer( + args, + conn_type='odata', + conn_path=f'/sap/opu/odata/{service_name}', + ) + return sap.odata.Connection(service_name, args.ashost, args.port, args.client, args.user, args.password, args.ssl, - args.verify, ssl_server_cert=args.ssl_server_cert) + args.verify, ssl_server_cert=args.ssl_server_cert, + session_initializer=session_initializer) def no_connection(_args): @@ -236,6 +337,10 @@ def build_empty_connection_values(): token_url=None, client_id=None, client_secret=None, + auth_plugin=None, + auth_plugin_cache_key=None, + auth_plugin_disable_cache=None, + auth_plugin_invalidate_cache=False, ) @@ -321,6 +426,8 @@ def resolve_default_connection_values(args): _resolve_oauth_defaults(args, config_values) + _resolve_auth_plugin_default(args, config_values) + if hasattr(args, 'corrnr') and args.corrnr is None: args.corrnr = os.getenv('SAP_CORRNR') @@ -342,6 +449,115 @@ def _resolve_oauth_defaults(args, config_values): args.client_secret = os.getenv('SAP_CLIENT_SECRET') or config_values.get('client_secret') +def _resolve_auth_plugin_default(args, config_values): + """Resolve the auth_plugin definition from the config file and enforce + its mutual exclusivity with password / OAuth at the *config* level. + + The plugin is configured purely in the config file (not via CLI flags + or env vars) - it is a structured value, not a scalar, and its + presence flips the whole authentication mode. Picking it up only from + config keeps the precedence rules simple. + + Mutual exclusivity is checked against config_values rather than args + because the plugin typically needs SAP_USER/SAP_PASSWORD env vars to + be set so its subprocess can read them; those would land on + args.password and trip a runtime check that has nothing to do with + what the user actually configured. + """ + + if getattr(args, 'auth_plugin', None) is not None: + args.auth_plugin_cache_key = _derive_cache_key(args) + _resolve_disable_cache(args, args.auth_plugin) + return + + plugin = config_values.get('auth_plugin') + if not plugin: + args.auth_plugin = None + args.auth_plugin_cache_key = None + _resolve_disable_cache(args, None) + return + + if config_values.get('password'): + raise SAPCliConfigError( + "auth_plugin and 'password' are mutually exclusive in the same " + "user definition. Remove 'password' from the user (set " + "SAP_PASSWORD via env if your plugin reads it)." + ) + + if any(config_values.get(k) for k in ('token_url', 'client_id', 'client_secret')): + raise SAPCliConfigError( + "auth_plugin and OAuth fields (token_url/client_id/client_secret) " + "are mutually exclusive." + ) + + args.auth_plugin = plugin + args.auth_plugin_cache_key = _derive_cache_key(args) + _resolve_disable_cache(args, plugin) + + +def _resolve_disable_cache(args, plugin_config): + """Normalize args.auth_plugin_disable_cache to a strict bool. + + Precedence: CLI flag > SAPCLI_AUTH_PLUGIN_DISABLE_CACHE env var > + auth_plugin.disable_cache in the resolved config > False. + + The config value lives *inside* the auth_plugin mapping rather than + on the user definition - it is an auth-plugin-specific knob, not a + generic user field, and grouping it with command/parameters keeps + the plugin config self-contained. + """ + + cli_value = getattr(args, 'auth_plugin_disable_cache', None) + if cli_value is not None: + args.auth_plugin_disable_cache = _normalize_bool(cli_value) + return + + env_value = os.environ.get('SAPCLI_AUTH_PLUGIN_DISABLE_CACHE') + if env_value is not None: + args.auth_plugin_disable_cache = _normalize_bool(env_value) + return + + if isinstance(plugin_config, dict) and 'disable_cache' in plugin_config: + args.auth_plugin_disable_cache = _normalize_bool(plugin_config['disable_cache']) + return + + args.auth_plugin_disable_cache = False + + +def _derive_cache_key(args): + """Build the (context|connection|user) cache key for the active context. + + Returns None if any piece is missing - we never want to mint a key that + would collide with a different (or anonymous) session. auth_plugin is + config-only, so reaching this code means we came through a context; + the triple is always available in normal usage. + """ + + config_file = getattr(args, 'config_file', None) + if config_file is None: + return None + + context_name = ( + getattr(args, 'context', None) + or os.environ.get('SAPCLI_CONTEXT') + or config_file.current_context + ) + if not context_name: + return None + + try: + ctx = config_file.get_context(context_name) + except SAPCliConfigError: + return None + + connection = ctx.get('connection') + user = ctx.get('user') + if not connection or not user: + return None + + return cache_key_for(context_name, connection, user) + + def _get_config_context_values(args): """Load config file and resolve the active context to a flat dict.""" diff --git a/sap/cli/_entry.py b/sap/cli/_entry.py index 09770efc..19a1a992 100644 --- a/sap/cli/_entry.py +++ b/sap/cli/_entry.py @@ -65,6 +65,16 @@ def parse_command_line(argv): arg_parser.add_argument( '--context', dest='context', type=str, default=None, help='Configuration context to use (overrides current-context in config file)') + arg_parser.add_argument( + '--auth-plugin-invalidate-cache', dest='auth_plugin_invalidate_cache', + default=False, action='store_true', + help='Drop the cached auth-plugin response before authenticating') + arg_parser.add_argument( + '--auth-plugin-disable-cache', dest='auth_plugin_disable_cache', + default=None, action='store_true', + help='Do not read or write the auth-plugin response cache on disk ' + '(also drops any pre-existing entry). ' + 'Env: SAPCLI_AUTH_PLUGIN_DISABLE_CACHE') arg_parser.add_argument( '--ashost', dest='ashost', type=str, default=None, help='Application Server address (DNS or IP)') @@ -155,13 +165,17 @@ def parse_command_line(argv): ))) if not (args.snc_qop or args.snc_myname or args.snc_partnername): - if not args.user: - args.user = input('Login:') + # auth_plugin owns credential acquisition - do not prompt for either + # user or password. The plugin reads what it needs from its own + # source (env vars, browser, cert store, ...). + if not args.auth_plugin: + if not args.user: + args.user = input('Login:') - oauth_needs_password = sap.http.oauth.password_required(args.token_url, args.client_id) + oauth_needs_password = sap.http.oauth.password_required(args.token_url, args.client_id) - if not args.password and oauth_needs_password: - args.password = getpass.getpass() + if not args.password and oauth_needs_password: + args.password = getpass.getpass() return args diff --git a/sap/config.py b/sap/config.py index 5631a53c..286d638c 100644 --- a/sap/config.py +++ b/sap/config.py @@ -27,7 +27,7 @@ class SAPCliConfigError(SAPCliError): ) USER_FIELDS = ( - 'user', 'password', + 'user', 'password', 'auth_plugin', ) CONTEXT_FIELDS = () diff --git a/sap/http/auth_plugin.py b/sap/http/auth_plugin.py index f49504d2..c3a40a6d 100644 --- a/sap/http/auth_plugin.py +++ b/sap/http/auth_plugin.py @@ -13,7 +13,7 @@ import json import subprocess from dataclasses import asdict, dataclass -from datetime import datetime +from datetime import datetime, timezone from typing import Optional from sap.errors import SAPCliError @@ -24,6 +24,7 @@ class AuthPluginError(SAPCliError): @dataclass(frozen=True) +# pylint: disable=too-many-instance-attributes class ConnectionInfo: """Connection details forwarded to the plugin in the request payload.""" @@ -37,6 +38,14 @@ class ConnectionInfo: # plugins (and by ABAP RFC SDK callers in general) since sysnr selects # the application-server instance (gateway port = 33). sysnr: Optional[str] = None + # Whether the plugin should verify the server's TLS certificate. + # Defaults to True (the safe default); sapcli forwards args.verify here + # so a user with ssl_verify: false in config doesn't have to teach + # every plugin about SAP_SSL_VERIFY independently. + verify: bool = True + # Optional path to a custom CA bundle, mirroring sapcli's + # SAP_SSL_SERVER_CERT / ssl_server_cert config knob. + ssl_server_cert: Optional[str] = None def to_dict(self) -> dict: """Return the JSON-serializable form.""" @@ -100,6 +109,41 @@ def from_json(cls, raw: str) -> 'AuthPluginResponse': return cls.from_dict(json.loads(raw)) + def to_dict(self) -> dict: + """Return the JSON-serializable form, omitting absent expiration.""" + + data: dict = {'message': self.message, 'content': self.content} + if self.expiration is not None: + # Naive datetimes are interpreted as UTC; better than crashing + # or silently round-tripping them as local time. + ts = self.expiration + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + data['expiration'] = ts.astimezone(timezone.utc).isoformat() + return data + + def to_json(self) -> str: + """Return the JSON string, suitable for cache storage.""" + + return json.dumps(self.to_dict()) + + def is_expired(self, *, leeway_seconds: int = 30) -> bool: + """Return True when the response is at or past its expiration. + + Mirrors Token.is_expired: missing expiration means "never expires" + - the server is responsible for ultimately invalidating the + session. Plugins that know their token lifetime should set + expiration explicitly. + """ + + if self.expiration is None: + return False + + ts = self.expiration + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + return (ts - datetime.now(timezone.utc)).total_seconds() <= leeway_seconds + def _parse_expiration(value) -> Optional[datetime]: if not value: diff --git a/sap/http/auth_plugin_cache.py b/sap/http/auth_plugin_cache.py new file mode 100644 index 00000000..21cce75b --- /dev/null +++ b/sap/http/auth_plugin_cache.py @@ -0,0 +1,72 @@ +"""File-backed cache for AuthPluginResponse objects. + +Plugins can be slow - browser-based SSO can take minutes - so caching the +response across sapcli invocations is the difference between "interactive +once a day" and "interactive every command". Storage uses the same +JSONFileStore primitive that backs the OAuth token cache: atomic writes, +0o700/0o600 perms on POSIX, corruption-tolerant reads. + +The cache does NOT police expiration. ``AuthPluginResponse.is_expired`` is +checked by the caller (the session initializer); the store just stores. An +expired entry is therefore still readable, which lets the initializer log +'using stale cookies' or refresh as it sees fit. + +Future implementation: OS keyring backend - the response can carry a +session cookie, which is a credential. Today's plaintext-on-disk choice +matches what the OAuth token store already does, and the same migration +path applies (introduce an ABC, swap the factory). +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Optional +import json +import hashlib + +from sap.http.auth_plugin import AuthPluginResponse +from sap.http.json_store import JSONFileStore, _default_cache_dir + + +class AuthPluginResponseFileStore(JSONFileStore[AuthPluginResponse]): + """File-backed cache of plugin responses under ``/auth_plugin_responses/``.""" + + def __init__(self, base_dir: Optional[Path] = None) -> None: + super().__init__(base_dir or _default_cache_dir(), "auth_plugin_responses") + + def _serialize(self, value: AuthPluginResponse) -> str: + return value.to_json() + + def _deserialize(self, raw: str) -> AuthPluginResponse: + return AuthPluginResponse.from_json(raw) + + +def cache_key_for(context: str, connection: str, user: str) -> str: + """Build the cache key for a (context, connection, user) tuple. + + The triple captures the spec's cache-isolation requirement: changing + any of the three must not let a cached response leak across to a + different combination. Uses ``|`` as the separator since it is not a + valid identifier character (no collisions) and JSONFileStore's + filename sanitiser turns it into ``_`` on disk anyway. + """ + + raw = json.dumps([context, connection, user], separators=(',', ':'), ensure_ascii=False) + return hashlib.sha256(raw.encode('utf-8')).hexdigest() + + +_response_store: Optional[AuthPluginResponseFileStore] = None + + +def get_response_store() -> AuthPluginResponseFileStore: + """Return the configured plugin-response cache. + + Today: file-based. + Tomorrow: read an env var or config flag and return a keyring-backed + store instead, the same way ``get_token_store`` is wired. + """ + + global _response_store # pylint: disable=global-statement + if _response_store is None: + _response_store = AuthPluginResponseFileStore() + return _response_store diff --git a/sap/http/external_session_initializer.py b/sap/http/external_session_initializer.py new file mode 100644 index 00000000..74a80fee --- /dev/null +++ b/sap/http/external_session_initializer.py @@ -0,0 +1,181 @@ +"""HTTP session initializer that delegates authentication to an external plugin. + +The plugin protocol itself lives in ``sap.http.auth_plugin``; this module turns +the plugin response into the requests.Session state HTTPClient expects: cookies, +an Authorization header, or a client certificate. Each ``content.type`` from the +plugin response maps to a single applier function, so adding a new content type +is a one-line entry in ``_HANDLERS`` plus the applier itself. +""" + +from __future__ import annotations + +from typing import Optional + +from sap.http.auth_plugin import ( + AuthPluginError, + AuthPluginResponse, + ConnectionInfo, + run_plugin, +) +from sap.http.auth_plugin_cache import get_response_store +from sap.http.errors import UnauthorizedError + + +_COOKIE_TYPE = 'cookie' +_HEADER_TYPE = 'http_authorization_header' +_CERT_TYPE = 'certificates' + + +class HTTPExternalSessionInitializer: + """Run an auth plugin and apply its response to a requests.Session. + + Implements the ``HTTPSessionInitializer`` protocol from + ``sap.http.client``. The plugin command is invoked lazily inside + ``initialize_session`` - constructing the initializer must not perform + subprocess I/O. + """ + + # pylint: disable=too-many-arguments + def __init__( + self, + command: str, + parameters: Optional[dict], + connection: ConnectionInfo, + user: Optional[str] = None, + cache_key: Optional[str] = None, + ): + self._command = command + self._parameters = parameters or {} + self._connection = connection + self._user = user + # When cache_key is None, the cache is bypassed entirely - both + # reads and writes. That keeps cache-less callers (tests, ad-hoc + # invocations) from accidentally writing a response to disk. + self._cache_key = cache_key + + def initialize_session(self, session): + """Invoke the plugin (or reuse a cached response) and apply it to ``session``.""" + + response = self._fetch_response() + _apply_response(session, response) + return session + + def _fetch_response(self) -> AuthPluginResponse: + if self._cache_key: + cached = get_response_store().get(self._cache_key) + if cached is not None and not cached.is_expired(): + return cached + + # Plugin error must propagate without touching the cache - storing + # half-built or failed responses would mask the problem on the + # next run and make 'something is broken' harder to diagnose. + response = run_plugin(self._command, self._parameters, self._connection) + + if self._cache_key: + get_response_store().set(self._cache_key, response) + + return response + + def build_unauthorized_error(self, req, res): + """Build an UnauthorizedError carrying the configured user.""" + + return UnauthorizedError(req, res, self._user) + + +def _apply_response(session, response: AuthPluginResponse) -> None: + content = response.content + if not isinstance(content, dict): + raise AuthPluginError( + f"Plugin response 'content' must be a JSON object, got " + f"{type(content).__name__}" + ) + + content_type = content.get('type') + handler = _HANDLERS.get(content_type) if isinstance(content_type, str) else None + if handler is None: + supported = ', '.join(sorted(_HANDLERS)) + raise AuthPluginError( + f"Unsupported plugin content type: {content_type!r}. " + f"Expected 'type' to be one of: {supported}" + ) + + handler(session, content) + + +def _apply_cookies(session, content: dict) -> None: + cookies = content.get('cookies') + if not isinstance(cookies, list): + raise AuthPluginError( + "Plugin cookie response missing required field 'cookies' (list of " + "cookie objects)" + ) + + for cookie in cookies: + if not isinstance(cookie, dict): + raise AuthPluginError( + f"Plugin cookie entry must be a JSON object, got " + f"{type(cookie).__name__}" + ) + + name = cookie.get('name') + if not name: + raise AuthPluginError( + "Plugin cookie entry missing required field 'name'" + ) + + # Empty string is a valid cookie value per RFC 6265; only reject + # missing keys. + if 'value' not in cookie: + raise AuthPluginError( + "Plugin cookie entry missing required field 'value'" + ) + + kwargs = {} + for src, dst in (('domain', 'domain'), ('path', 'path'), ('expires', 'expires')): + if cookie.get(src) is not None: + kwargs[dst] = cookie[src] + if 'secure' in cookie: + if not isinstance(cookie['secure'], bool): + raise AuthPluginError( + "Plugin cookie entry field 'secure' must be a boolean" + ) + kwargs['secure'] = cookie['secure'] + + session.cookies.set(name, cookie['value'], **kwargs) + + +def _apply_headers(session, content: dict) -> None: + headers = content.get('headers') + if not isinstance(headers, dict) or not headers: + raise AuthPluginError( + "Plugin http_authorization_header response missing required field " + "'headers' (non-empty object of header name/value pairs)" + ) + + session.headers.update(headers) + + +def _apply_certificates(session, content: dict) -> None: + certificate = content.get('certificate') + key = content.get('key') + if not certificate: + raise AuthPluginError( + "Plugin certificates response missing required field 'certificate'" + ) + if not key: + raise AuthPluginError( + "Plugin certificates response missing required field 'key'" + ) + + session.cert = (certificate, key) + + issuer = content.get('issuer_certificate') + if issuer: + session.verify = issuer + + +_HANDLERS = { + _COOKIE_TYPE: _apply_cookies, + _HEADER_TYPE: _apply_headers, + _CERT_TYPE: _apply_certificates, +} diff --git a/test/unit/test_sap_cli.py b/test/unit/test_sap_cli.py index ed627a8a..bfe1e5c5 100755 --- a/test/unit/test_sap_cli.py +++ b/test/unit/test_sap_cli.py @@ -10,6 +10,7 @@ import sap.cli.core from sap.config import ConfigFile, SAPCliConfigError from sap.errors import SAPCliError +import sap.http.auth_plugin_cache from pathlib import Path @@ -816,5 +817,673 @@ def test_partial_oauth_config_missing_secret_raises(self): sap.cli.adt_connection_from_args(args) +class TestBuildEmptyConnectionValuesAuthPlugin(unittest.TestCase): + + def test_empty_values_include_auth_plugin(self): + values = sap.cli.build_empty_connection_values() + + self.assertTrue(hasattr(values, 'auth_plugin')) + self.assertIsNone(values.auth_plugin) + + +class TestResolveDefaultConnectionValuesAuthPlugin(unittest.TestCase): + """auth_plugin is propagated from the resolved config context onto args + so that the connection factory can construct an + HTTPExternalSessionInitializer. + """ + + def _make_args(self, **kwargs): + defaults = dict( + ashost=None, sysnr=None, client=None, port=None, + ssl=None, verify=None, ssl_server_cert=None, + user=None, password=None, + token_url=None, client_id=None, client_secret=None, + auth_plugin=None, + ) + defaults.update(kwargs) + return SimpleNamespace(**defaults) + + def test_auth_plugin_propagated_from_config(self): + config_data = { + 'current-context': 'plugin-ctx', + 'connections': { + 'server': {'ashost': 'h.example.com', 'client': '100'}, + }, + 'users': { + 'plug-user': { + 'auth_plugin': {'command': '/p', 'parameters': {'k': 'v'}}, + }, + }, + 'contexts': { + 'plugin-ctx': {'connection': 'server', 'user': 'plug-user'}, + }, + } + config_file = ConfigFile(config_data, TEST_CONFIG_PATH) + args = self._make_args(config_file=config_file) + + with patch.dict('os.environ', {}, clear=True): + sap.cli.resolve_default_connection_values(args) + + self.assertEqual(args.auth_plugin, { + 'command': '/p', 'parameters': {'k': 'v'}, + }) + + def test_auth_plugin_absent_when_not_in_config(self): + config_data = { + 'current-context': 'basic-ctx', + 'connections': { + 'server': {'ashost': 'h.example.com', 'client': '100'}, + }, + 'users': {'u': {'user': 'USR', 'password': 'pwd'}}, + 'contexts': { + 'basic-ctx': {'connection': 'server', 'user': 'u'}, + }, + } + config_file = ConfigFile(config_data, TEST_CONFIG_PATH) + args = self._make_args(config_file=config_file) + + with patch.dict('os.environ', {}, clear=True): + sap.cli.resolve_default_connection_values(args) + + self.assertIsNone(args.auth_plugin) + + +class TestAuthPluginSessionInitializer(unittest.TestCase): + """adt_connection_from_args must construct an + HTTPExternalSessionInitializer when args.auth_plugin is set. + """ + + def _make_args(self, **overrides): + defaults = dict( + ashost='h.example.com', + client='100', + user=None, + password=None, + port=443, + ssl=True, + verify=True, + ssl_server_cert=None, + token_url=None, + client_id=None, + client_secret=None, + auth_plugin=None, + ) + defaults.update(overrides) + return SimpleNamespace(**defaults) + + def test_external_initializer_when_auth_plugin_set(self): + from sap.http.auth_plugin import ConnectionInfo + from sap.http.external_session_initializer import ( + HTTPExternalSessionInitializer, + ) + + args = self._make_args( + auth_plugin={'command': '/path/to/plugin', 'parameters': {'k': 'v'}}, + user='ELBEZI', + ) + + with patch('sap.adt.Connection') as mock_connection: + sap.cli.adt_connection_from_args(args) + + _, kwargs = mock_connection.call_args + initializer = kwargs.get('session_initializer') + self.assertIsInstance(initializer, HTTPExternalSessionInitializer) + # The initializer must carry the user (for UnauthorizedError messages) + # and the connection details the plugin needs to build its URL. + self.assertEqual(initializer._user, 'ELBEZI') + self.assertEqual(initializer._command, '/path/to/plugin') + self.assertEqual(initializer._parameters, {'k': 'v'}) + self.assertIsInstance(initializer._connection, ConnectionInfo) + self.assertEqual(initializer._connection.proto, 'https') + self.assertEqual(initializer._connection.ashost, 'h.example.com') + self.assertEqual(initializer._connection.port, '443') + self.assertEqual(initializer._connection.client, '100') + self.assertEqual(initializer._connection.type, 'adt') + # Path points to the ADT login endpoint sapcli's built-in flow uses. + self.assertIn('discovery', initializer._connection.path) + + def test_auth_plugin_missing_command_raises(self): + args = self._make_args(auth_plugin={'parameters': {}}) + + with self.assertRaises(SAPCliError) as cm: + sap.cli.adt_connection_from_args(args) + + self.assertIn('command', str(cm.exception).lower()) + + def test_auth_plugin_with_args_password_is_fine(self): + # Mutual exclusivity is enforced at config-resolution time, not on + # args. SAP_PASSWORD env populates args.password so the plugin's + # subprocess can inherit it - this is by design and must not + # trigger an error here. + args = self._make_args( + auth_plugin={'command': '/p'}, + user='ELBEZI', + password='from-env', + ) + + with patch('sap.adt.Connection'): + sap.cli.adt_connection_from_args(args) + + +class TestResolveAuthPluginMutualExclusivity(unittest.TestCase): + """auth_plugin must conflict with password / OAuth fields at the + config level (when they appear together in a resolved context), + not at the args level. This lets users set SAP_PASSWORD as an env + var for the plugin to inherit without tripping the check. + """ + + def _make_args(self, **kwargs): + defaults = dict( + ashost=None, sysnr=None, client=None, port=None, + ssl=None, verify=None, ssl_server_cert=None, + user=None, password=None, + token_url=None, client_id=None, client_secret=None, + auth_plugin=None, + ) + defaults.update(kwargs) + return SimpleNamespace(**defaults) + + def _config_with(self, user_def): + return ConfigFile({ + 'current-context': 'ctx', + 'connections': {'srv': {'ashost': 'h', 'client': '100'}}, + 'users': {'u': user_def}, + 'contexts': {'ctx': {'connection': 'srv', 'user': 'u'}}, + }, TEST_CONFIG_PATH) + + def test_password_with_auth_plugin_in_user_raises(self): + config_file = self._config_with({ + 'password': 'secret', + 'auth_plugin': {'command': '/p'}, + }) + args = self._make_args(config_file=config_file) + + with patch.dict('os.environ', {}, clear=True), \ + self.assertRaises(SAPCliConfigError) as cm: + sap.cli.resolve_default_connection_values(args) + + self.assertIn('mutually exclusive', str(cm.exception).lower()) + + def test_oauth_fields_with_auth_plugin_raises(self): + config_file = self._config_with({ + 'auth_plugin': {'command': '/p'}, + }) + # OAuth fields live on the connection, but they end up in the + # same flat resolved-context dict as auth_plugin - the conflict + # is real even if they live in different config sections. + config_file.connections['srv']['token_url'] = 'https://t' + config_file.connections['srv']['client_id'] = 'cid' + config_file.connections['srv']['client_secret'] = 'csec' + + args = self._make_args(config_file=config_file) + + with patch.dict('os.environ', {}, clear=True), \ + self.assertRaises(SAPCliConfigError) as cm: + sap.cli.resolve_default_connection_values(args) + + self.assertIn('mutually exclusive', str(cm.exception).lower()) + + def test_env_sap_password_does_not_trigger_mutex(self): + # Setting SAP_PASSWORD in the env (e.g. so the plugin's + # subprocess inherits it) must not collide with auth_plugin. + config_file = self._config_with({'auth_plugin': {'command': '/p'}}) + args = self._make_args(config_file=config_file) + + with patch.dict('os.environ', {'SAP_PASSWORD': 'env-pwd'}, clear=True): + sap.cli.resolve_default_connection_values(args) + + self.assertEqual(args.auth_plugin, {'command': '/p'}) + self.assertEqual(args.password, 'env-pwd') + + +class TestAuthPluginCacheKeyDerivation(unittest.TestCase): + """The (context, connection, user) triple is the cache-isolation + contract: changing any of the three must mint a different cache + key so cookies do not leak across logical sessions. + """ + + def _make_args(self, **kwargs): + defaults = dict( + ashost=None, sysnr=None, client=None, port=None, + ssl=None, verify=None, ssl_server_cert=None, + user=None, password=None, + token_url=None, client_id=None, client_secret=None, + auth_plugin=None, + context=None, + ) + defaults.update(kwargs) + return SimpleNamespace(**defaults) + + def _config(self): + return ConfigFile({ + 'current-context': 'dev', + 'connections': { + 'dev-srv': {'ashost': 'h', 'client': '100'}, + 'prod-srv': {'ashost': 'p', 'client': '200'}, + }, + 'users': { + 'plug-user': {'auth_plugin': {'command': '/p'}}, + 'other-user': {'auth_plugin': {'command': '/p'}}, + }, + 'contexts': { + 'dev': {'connection': 'dev-srv', 'user': 'plug-user'}, + 'prod': {'connection': 'prod-srv', 'user': 'plug-user'}, + 'dev-other': {'connection': 'dev-srv', 'user': 'other-user'}, + }, + }, TEST_CONFIG_PATH) + + def test_cache_key_built_from_context_triple(self): + args = self._make_args(config_file=self._config()) + + with patch.dict('os.environ', {}, clear=True): + sap.cli.resolve_default_connection_values(args) + + key = sap.http.auth_plugin_cache.cache_key_for('dev', 'dev-srv', 'plug-user') + self.assertEqual(args.auth_plugin_cache_key, key) + + def test_cache_key_changes_with_connection(self): + args = self._make_args(config_file=self._config(), context='prod') + + with patch.dict('os.environ', {}, clear=True): + sap.cli.resolve_default_connection_values(args) + + key = sap.http.auth_plugin_cache.cache_key_for('prod', 'prod-srv', 'plug-user') + self.assertEqual(args.auth_plugin_cache_key, key) + + def test_cache_key_changes_with_user(self): + args = self._make_args(config_file=self._config(), context='dev-other') + + with patch.dict('os.environ', {}, clear=True): + sap.cli.resolve_default_connection_values(args) + + key = sap.http.auth_plugin_cache.cache_key_for('dev-other', 'dev-srv', 'other-user') + self.assertEqual(args.auth_plugin_cache_key, key) + + def test_cache_key_none_when_no_auth_plugin(self): + # auth_plugin not configured → cache key is None (caching disabled). + config = ConfigFile({ + 'current-context': 'basic', + 'connections': {'srv': {'ashost': 'h', 'client': '100'}}, + 'users': {'u': {'user': 'USR', 'password': 'pwd'}}, + 'contexts': {'basic': {'connection': 'srv', 'user': 'u'}}, + }, TEST_CONFIG_PATH) + args = self._make_args(config_file=config) + + with patch.dict('os.environ', {}, clear=True): + sap.cli.resolve_default_connection_values(args) + + self.assertIsNone(args.auth_plugin_cache_key) + + +class TestGctsConnectionFromArgs(unittest.TestCase): + """gcts_connection_from_args must respect auth_plugin and OAuth the + same way adt_connection_from_args does - all three CLI auth + strategies should reach the REST connection. + """ + + def _make_args(self, **overrides): + defaults = dict( + ashost='h.example.com', client='100', + user='USR', password='pwd', + port=443, ssl=True, verify=True, ssl_server_cert=None, + token_url=None, client_id=None, client_secret=None, + auth_plugin=None, auth_plugin_cache_key=None, + auth_plugin_invalidate_cache=False, + ) + defaults.update(overrides) + return SimpleNamespace(**defaults) + + def test_basic_auth_when_no_strategy_configured(self): + args = self._make_args() + + with patch('sap.rest.Connection') as mock_connection: + sap.cli.gcts_connection_from_args(args) + + self.assertIsNone(mock_connection.call_args.kwargs.get('session_initializer')) + + def test_auth_plugin_initializer_when_plugin_configured(self): + from sap.http.external_session_initializer import ( + HTTPExternalSessionInitializer, + ) + args = self._make_args( + password=None, + auth_plugin={'command': '/p'}, + auth_plugin_cache_key='ctx|conn|user', + ) + + with patch('sap.rest.Connection') as mock_connection: + sap.cli.gcts_connection_from_args(args) + + initializer = mock_connection.call_args.kwargs.get('session_initializer') + self.assertIsInstance(initializer, HTTPExternalSessionInitializer) + # Plugin receives 'rest' as the type so it can pick a REST-specific + # auth endpoint if it cares; cookies are server-wide on ABAP, but + # 'type' is still the documented signal. + self.assertEqual(initializer._connection.type, 'rest') + + def test_oauth_initializer_when_oauth_configured(self): + from sap.http.oauth import OAuthHTTPSessionInitializer + args = self._make_args( + token_url='https://t', client_id='cid', client_secret='csec', + ) + + with patch('sap.rest.Connection') as mock_connection: + sap.cli.gcts_connection_from_args(args) + + initializer = mock_connection.call_args.kwargs.get('session_initializer') + self.assertIsInstance(initializer, OAuthHTTPSessionInitializer) + + +class TestOdataConnectionFromArgs(unittest.TestCase): + """odata_connection_from_args must respect auth_plugin and OAuth the + same way adt_connection_from_args does. + """ + + def _make_args(self, **overrides): + defaults = dict( + ashost='h.example.com', client='100', + user='USR', password='pwd', + port=443, ssl=True, verify=True, ssl_server_cert=None, + token_url=None, client_id=None, client_secret=None, + auth_plugin=None, auth_plugin_cache_key=None, + auth_plugin_invalidate_cache=False, + ) + defaults.update(overrides) + return SimpleNamespace(**defaults) + + def test_basic_auth_when_no_strategy_configured(self): + args = self._make_args() + + with patch('sap.odata.Connection') as mock_connection: + sap.cli.odata_connection_from_args('UI5/SOMESERVICE', args) + + self.assertIsNone(mock_connection.call_args.kwargs.get('session_initializer')) + + def test_auth_plugin_initializer_carries_odata_service_path(self): + from sap.http.external_session_initializer import ( + HTTPExternalSessionInitializer, + ) + args = self._make_args( + password=None, + auth_plugin={'command': '/p'}, + auth_plugin_cache_key='ctx|conn|user', + ) + + with patch('sap.odata.Connection') as mock_connection: + sap.cli.odata_connection_from_args('UI5/SOMESERVICE', args) + + initializer = mock_connection.call_args.kwargs.get('session_initializer') + self.assertIsInstance(initializer, HTTPExternalSessionInitializer) + self.assertEqual(initializer._connection.type, 'odata') + # The OData service path lets a plugin authenticate against the + # service the command will actually call. + self.assertEqual( + initializer._connection.path, + '/sap/opu/odata/UI5/SOMESERVICE', + ) + + def test_oauth_initializer_when_oauth_configured(self): + from sap.http.oauth import OAuthHTTPSessionInitializer + args = self._make_args( + token_url='https://t', client_id='cid', client_secret='csec', + ) + + with patch('sap.odata.Connection') as mock_connection: + sap.cli.odata_connection_from_args('UI5/SOMESERVICE', args) + + initializer = mock_connection.call_args.kwargs.get('session_initializer') + self.assertIsInstance(initializer, OAuthHTTPSessionInitializer) + + +class TestAuthPluginInitializerCacheKey(unittest.TestCase): + """adt_connection_from_args must forward the cache key onto the + constructed HTTPExternalSessionInitializer; --auth-plugin-invalidate-cache + must drop the existing entry before the initializer runs. + """ + + def _make_args(self, **overrides): + defaults = dict( + ashost='h.example.com', client='100', + user=None, password=None, + port=443, ssl=True, verify=True, ssl_server_cert=None, + token_url=None, client_id=None, client_secret=None, + auth_plugin={'command': '/p'}, + auth_plugin_cache_key='ctx|conn|user', + auth_plugin_invalidate_cache=False, + ) + defaults.update(overrides) + return SimpleNamespace(**defaults) + + def test_cache_key_forwarded_to_initializer(self): + args = self._make_args() + + with patch('sap.adt.Connection') as mock_connection: + sap.cli.adt_connection_from_args(args) + + initializer = mock_connection.call_args.kwargs['session_initializer'] + self.assertEqual(initializer._cache_key, 'ctx|conn|user') + + def test_invalidate_cache_drops_entry_before_run(self): + args = self._make_args(auth_plugin_invalidate_cache=True) + + with patch('sap.cli.get_response_store') as mock_store, \ + patch('sap.adt.Connection'): + sap.cli.adt_connection_from_args(args) + + mock_store.return_value.delete.assert_called_once_with('ctx|conn|user') + + def test_invalidate_cache_without_key_is_noop(self): + args = self._make_args( + auth_plugin_cache_key=None, + auth_plugin_invalidate_cache=True, + ) + + with patch('sap.cli.get_response_store') as mock_store, \ + patch('sap.adt.Connection'): + sap.cli.adt_connection_from_args(args) + + mock_store.return_value.delete.assert_not_called() + + def test_no_invalidate_does_not_touch_cache(self): + args = self._make_args(auth_plugin_invalidate_cache=False) + + with patch('sap.cli.get_response_store') as mock_store, \ + patch('sap.adt.Connection'): + sap.cli.adt_connection_from_args(args) + + mock_store.return_value.delete.assert_not_called() + + def test_disable_cache_nulls_cache_key_on_initializer(self): + """When auth_plugin_disable_cache is True, the initializer must + be constructed with cache_key=None - the existing 'no cache' + code path in HTTPExternalSessionInitializer does the rest + (skips reads AND writes).""" + + args = self._make_args(auth_plugin_disable_cache=True) + + with patch('sap.cli.get_response_store'), \ + patch('sap.adt.Connection') as mock_connection: + sap.cli.adt_connection_from_args(args) + + initializer = mock_connection.call_args.kwargs['session_initializer'] + self.assertIsNone(initializer._cache_key) + + def test_disable_cache_deletes_existing_entry(self): + """Disabling the cache also scrubs any pre-existing on-disk + entry - defense in depth, the user just told us they do not + want secrets persisted.""" + + args = self._make_args(auth_plugin_disable_cache=True) + + with patch('sap.cli.get_response_store') as mock_store, \ + patch('sap.adt.Connection'): + sap.cli.adt_connection_from_args(args) + + mock_store.return_value.delete.assert_called_once_with('ctx|conn|user') + + def test_disable_cache_without_cache_key_is_noop(self): + """No derived cache key (e.g. no auth_plugin / no context) plus + disable_cache must not call delete - nothing to clean up.""" + + args = self._make_args( + auth_plugin_cache_key=None, + auth_plugin_disable_cache=True, + ) + + with patch('sap.cli.get_response_store') as mock_store, \ + patch('sap.adt.Connection'): + sap.cli.adt_connection_from_args(args) + + mock_store.return_value.delete.assert_not_called() + + def test_disable_cache_false_leaves_cache_key(self): + """The default (disable_cache=False) preserves the derived cache + key on the initializer.""" + + args = self._make_args(auth_plugin_disable_cache=False) + + with patch('sap.cli.get_response_store'), \ + patch('sap.adt.Connection') as mock_connection: + sap.cli.adt_connection_from_args(args) + + initializer = mock_connection.call_args.kwargs['session_initializer'] + self.assertEqual(initializer._cache_key, 'ctx|conn|user') + + +class TestResolveAuthPluginDisableCache(unittest.TestCase): + """The disable_cache value is resolved with the standard precedence: + CLI flag > SAPCLI_AUTH_PLUGIN_DISABLE_CACHE env var > config file + (under auth_plugin.disable_cache) > built-in default of False. + """ + + def _make_args(self, **kwargs): + defaults = dict( + ashost=None, sysnr=None, client=None, port=None, + ssl=None, verify=None, ssl_server_cert=None, + user=None, password=None, + token_url=None, client_id=None, client_secret=None, + auth_plugin=None, + auth_plugin_disable_cache=None, + ) + defaults.update(kwargs) + return SimpleNamespace(**defaults) + + def _config_with_plugin(self, plugin_dict): + return ConfigFile({ + 'current-context': 'ctx', + 'connections': {'srv': {'ashost': 'h', 'client': '100'}}, + 'users': {'u': {'auth_plugin': plugin_dict}}, + 'contexts': {'ctx': {'connection': 'srv', 'user': 'u'}}, + }, TEST_CONFIG_PATH) + + def test_default_is_false_without_signal(self): + config_file = self._config_with_plugin({'command': '/p'}) + args = self._make_args(config_file=config_file) + + with patch.dict('os.environ', {}, clear=True): + sap.cli.resolve_default_connection_values(args) + + # Strict bool: the resolver must normalize None to False so the + # downstream `getattr(args, 'auth_plugin_disable_cache', False)` + # branch is a real boolean check, not a None-vs-True check. + self.assertIs(args.auth_plugin_disable_cache, False) + + def test_config_disable_cache_true(self): + config_file = self._config_with_plugin({ + 'command': '/p', 'disable_cache': True, + }) + args = self._make_args(config_file=config_file) + + with patch.dict('os.environ', {}, clear=True): + sap.cli.resolve_default_connection_values(args) + + self.assertTrue(args.auth_plugin_disable_cache) + + def test_config_disable_cache_string_true_normalized(self): + """YAML quoted 'true' / 'yes' must be treated as True.""" + + config_file = self._config_with_plugin({ + 'command': '/p', 'disable_cache': 'yes', + }) + args = self._make_args(config_file=config_file) + + with patch.dict('os.environ', {}, clear=True): + sap.cli.resolve_default_connection_values(args) + + self.assertTrue(args.auth_plugin_disable_cache) + + def test_env_overrides_config_false(self): + config_file = self._config_with_plugin({ + 'command': '/p', 'disable_cache': False, + }) + args = self._make_args(config_file=config_file) + + with patch.dict('os.environ', + {'SAPCLI_AUTH_PLUGIN_DISABLE_CACHE': 'true'}, + clear=True): + sap.cli.resolve_default_connection_values(args) + + self.assertTrue(args.auth_plugin_disable_cache) + + def test_env_false_token_keeps_cache(self): + """Env var false-token must NOT silently disable the cache - it + must override a config-level True back to False.""" + + config_file = self._config_with_plugin({ + 'command': '/p', 'disable_cache': True, + }) + args = self._make_args(config_file=config_file) + + with patch.dict('os.environ', + {'SAPCLI_AUTH_PLUGIN_DISABLE_CACHE': 'no'}, + clear=True): + sap.cli.resolve_default_connection_values(args) + + self.assertFalse(args.auth_plugin_disable_cache) + + def test_cli_true_overrides_env_and_config(self): + config_file = self._config_with_plugin({ + 'command': '/p', 'disable_cache': False, + }) + args = self._make_args( + config_file=config_file, + auth_plugin_disable_cache=True, + ) + + with patch.dict('os.environ', + {'SAPCLI_AUTH_PLUGIN_DISABLE_CACHE': 'no'}, + clear=True): + sap.cli.resolve_default_connection_values(args) + + self.assertTrue(args.auth_plugin_disable_cache) + + def test_no_auth_plugin_defaults_false(self): + """Without an auth_plugin in the resolved context, disable_cache + still resolves to a bool (False) - the field is always set so + getattr(...False) in the connection factory is consistent.""" + + config = ConfigFile({ + 'current-context': 'basic', + 'connections': {'srv': {'ashost': 'h', 'client': '100'}}, + 'users': {'u': {'user': 'USR', 'password': 'pwd'}}, + 'contexts': {'basic': {'connection': 'srv', 'user': 'u'}}, + }, TEST_CONFIG_PATH) + args = self._make_args(config_file=config) + + with patch.dict('os.environ', {}, clear=True): + sap.cli.resolve_default_connection_values(args) + + self.assertIs(args.auth_plugin_disable_cache, False) + + +class TestBuildEmptyConnectionValuesDisableCache(unittest.TestCase): + + def test_empty_values_include_auth_plugin_disable_cache(self): + values = sap.cli.build_empty_connection_values() + + self.assertTrue(hasattr(values, 'auth_plugin_disable_cache')) + self.assertIsNone(values.auth_plugin_disable_cache) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/test_sap_cli__entry.py b/test/unit/test_sap_cli__entry.py index 7ad04585..833e32e1 100644 --- a/test/unit/test_sap_cli__entry.py +++ b/test/unit/test_sap_cli__entry.py @@ -188,6 +188,51 @@ def test_args_prompt_password_when_token_required(self): self.assertEqual(args.password, 'prompted-pwd') + def test_args_skip_user_and_password_prompt_when_auth_plugin_set(self): + """When the resolved context configures an auth_plugin, the user + prompt and the password prompt must both be suppressed - the + plugin is responsible for credentials. + """ + + config_data = { + 'current-context': 'plugin-ctx', + 'connections': { + 'server': {'ashost': 'h.example.com', 'client': '100'}, + }, + 'users': { + 'plug-user': { + 'auth_plugin': {'command': '/p'}, + }, + }, + 'contexts': { + 'plugin-ctx': {'connection': 'server', 'user': 'plug-user'}, + }, + } + self._config_patcher.stop() + self._config_patcher = patch( + 'sap.cli._entry.ConfigFile.load', + return_value=ConfigFile(config_data, TEST_CONFIG_PATH), + ) + self._config_patcher.start() + + test_params = get_tested_parameters() + remove_cmd_param_from_list(test_params, '--password') + remove_cmd_param_from_list(test_params, '--user') + remove_cmd_param_from_list(test_params, '--ashost') + remove_cmd_param_from_list(test_params, '--client') + + getpass_mock = Mock(return_value='should-not-be-used') + input_mock = Mock(return_value='should-not-be-used') + + with patch('getpass.getpass', getpass_mock), \ + patch('builtins.input', input_mock): + args = entry.parse_command_line(test_params) + + getpass_mock.assert_not_called() + input_mock.assert_not_called() + self.assertIsNone(args.password) + self.assertEqual(args.auth_plugin, {'command': '/p'}) + def test_args_ask_user_and_password(self): test_params = get_tested_parameters() remove_cmd_param_from_list(test_params, '--password') @@ -303,6 +348,57 @@ def test_args_ssl_server_cert_default_none(self): self.assertIsNone(args.ssl_server_cert) + def test_args_auth_plugin_disable_cache_default_false(self): + """Without the flag and without env, the resolved value is False + - the cache is on by default (the on-disk store is opt-out).""" + + test_params = get_tested_parameters() + + with patch.dict(os.environ, {}, clear=False) as _env: + os.environ.pop('SAPCLI_AUTH_PLUGIN_DISABLE_CACHE', None) + args = entry.parse_command_line(test_params) + + self.assertFalse(args.auth_plugin_disable_cache) + + def test_args_auth_plugin_disable_cache_flag(self): + """--auth-plugin-disable-cache flips the value to True.""" + + test_params = get_tested_parameters() + test_params.insert(1, '--auth-plugin-disable-cache') + + args = entry.parse_command_line(test_params) + + self.assertTrue(args.auth_plugin_disable_cache) + + def test_args_auth_plugin_disable_cache_env_var_true(self): + """SAPCLI_AUTH_PLUGIN_DISABLE_CACHE in the env enables it without + a CLI flag.""" + + test_params = get_tested_parameters() + + os.environ['SAPCLI_AUTH_PLUGIN_DISABLE_CACHE'] = 'true' + try: + args = entry.parse_command_line(test_params) + finally: + del os.environ['SAPCLI_AUTH_PLUGIN_DISABLE_CACHE'] + + self.assertTrue(args.auth_plugin_disable_cache) + + def test_args_auth_plugin_disable_cache_env_var_false_tokens(self): + """False tokens (no/off/false/n) keep the cache enabled. Same + normalization rules as SAP_SSL / SAP_SSL_VERIFY.""" + + test_params = get_tested_parameters() + + for variant in ('n', 'no', 'No', 'NO', 'false', 'FALSE', 'off', 'Off'): + os.environ['SAPCLI_AUTH_PLUGIN_DISABLE_CACHE'] = variant + try: + args = entry.parse_command_line(test_params) + finally: + del os.environ['SAPCLI_AUTH_PLUGIN_DISABLE_CACHE'] + + self.assertFalse(args.auth_plugin_disable_cache, msg=variant) + class TestParseCommandLineNoCommand(unittest.TestCase): diff --git a/test/unit/test_sap_config.py b/test/unit/test_sap_config.py index cc35c5bb..f1f77c81 100644 --- a/test/unit/test_sap_config.py +++ b/test/unit/test_sap_config.py @@ -1591,5 +1591,60 @@ def test_delete_nonexistent_context_raises(self): self.assertIn('not found', str(cm.exception)) +class TestResolveContextAuthPlugin(unittest.TestCase): + """auth_plugin is a dict carried verbatim from the user definition into + the resolved context, so the CLI layer can construct an + HTTPExternalSessionInitializer without further parsing. + """ + + def test_auth_plugin_surfaced_from_user(self): + data = { + 'connections': { + 'server': {'ashost': 'host.example.com', 'client': '100'}, + }, + 'users': { + 'plugin-user': { + 'auth_plugin': { + 'command': '/path/to/plugin', + 'parameters': {'channel': 'msedge'}, + }, + }, + }, + 'contexts': { + 'ctx': {'connection': 'server', 'user': 'plugin-user'}, + }, + } + config = ConfigFile(data, TEST_CONFIG_PATH) + result = config.resolve_context('ctx') + + self.assertEqual(result['auth_plugin'], { + 'command': '/path/to/plugin', + 'parameters': {'channel': 'msedge'}, + }) + + def test_auth_plugin_overridden_at_context_level(self): + data = { + 'connections': { + 'server': {'ashost': 'host.example.com', 'client': '100'}, + }, + 'users': { + 'plugin-user': { + 'auth_plugin': {'command': '/base/plugin'}, + }, + }, + 'contexts': { + 'ctx': { + 'connection': 'server', + 'user': 'plugin-user', + 'auth_plugin': {'command': '/override/plugin'}, + }, + }, + } + config = ConfigFile(data, TEST_CONFIG_PATH) + result = config.resolve_context('ctx') + + self.assertEqual(result['auth_plugin'], {'command': '/override/plugin'}) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/test_sap_http_auth_plugin.py b/test/unit/test_sap_http_auth_plugin.py index b35ea03e..ee13b12f 100644 --- a/test/unit/test_sap_http_auth_plugin.py +++ b/test/unit/test_sap_http_auth_plugin.py @@ -3,7 +3,7 @@ import json import subprocess import unittest -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from unittest.mock import patch from sap.errors import SAPCliError @@ -49,8 +49,17 @@ def test_to_dict_emits_all_fields(self): 'type': 'adt', 'path': '/sap/bc/adt/core/systeminformation', 'sysnr': None, + 'verify': True, + 'ssl_server_cert': None, }) + def test_verify_defaults_to_true(self): + # Safe default. Forwarded explicitly by sapcli when the user has + # ssl_verify: false, so plugins do not need to read SAP_SSL_VERIFY + # from env. + self.assertIs(_connection().verify, True) + self.assertIsNone(_connection().ssl_server_cert) + def test_sysnr_defaults_to_none(self): # sysnr is only meaningful for RFC-based plugins; HTTP plugins can # ignore it. Defaulting to None keeps construction noise-free for @@ -98,6 +107,8 @@ def test_to_dict_nests_connection_and_parameters(self): 'type': 'adt', 'path': '/sap/bc/adt/core/systeminformation', 'sysnr': '00', + 'verify': True, + 'ssl_server_cert': None, }, 'parameters': {'channel': 'msedge'}, }) @@ -214,6 +225,96 @@ def test_invalid_json_raises_value_error(self): AuthPluginResponse.from_json('this is not json') +class TestAuthPluginResponseSerialize(unittest.TestCase): + + def test_to_dict_no_expiration_omits_field(self): + response = AuthPluginResponse(message='ok', content={'type': 'cookie'}) + + self.assertEqual(response.to_dict(), { + 'message': 'ok', + 'content': {'type': 'cookie'}, + }) + + def test_to_dict_with_expiration_emits_iso_utc(self): + response = AuthPluginResponse( + message='ok', + content={'type': 'cookie'}, + expiration=datetime(2026, 5, 8, 12, 30, tzinfo=timezone.utc), + ) + + self.assertEqual(response.to_dict()['expiration'], '2026-05-08T12:30:00+00:00') + + def test_to_dict_normalizes_naive_datetime_to_utc(self): + # Naive datetimes are interpreted as UTC. Better than crashing or + # silently treating them as local time, which would round-trip + # differently in different timezones. + response = AuthPluginResponse( + message='ok', + content={'type': 'cookie'}, + expiration=datetime(2026, 5, 8, 12, 30), + ) + + self.assertTrue(response.to_dict()['expiration'].endswith('+00:00')) + + def test_to_json_round_trips_through_from_json(self): + response = AuthPluginResponse( + message='ok', + content={'type': 'cookie', 'cookies': [{'name': 'X', 'value': 'y'}]}, + expiration=datetime(2026, 5, 8, 12, 30, tzinfo=timezone.utc), + ) + + restored = AuthPluginResponse.from_json(response.to_json()) + + self.assertEqual(restored.message, response.message) + self.assertEqual(restored.content, response.content) + self.assertEqual(restored.expiration, response.expiration) + + def test_to_json_without_expiration_round_trips(self): + response = AuthPluginResponse(message='ok', content={'type': 'cookie'}) + + restored = AuthPluginResponse.from_json(response.to_json()) + + self.assertEqual(restored, response) + + +class TestAuthPluginResponseIsExpired(unittest.TestCase): + + def test_no_expiration_means_never_expired(self): + # Spec lets sapcli choose; we mirror Token.is_expired semantics so + # plugins that omit expiration get cached indefinitely (which is + # what most session-cookie plugins want - the server invalidates). + response = AuthPluginResponse(message='ok', content={}) + + self.assertFalse(response.is_expired()) + + def test_future_expiration_not_expired(self): + future = datetime.now(timezone.utc) + timedelta(hours=1) + response = AuthPluginResponse( + message='ok', content={}, expiration=future, + ) + + self.assertFalse(response.is_expired()) + + def test_past_expiration_is_expired(self): + past = datetime.now(timezone.utc) - timedelta(seconds=1) + response = AuthPluginResponse( + message='ok', content={}, expiration=past, + ) + + self.assertTrue(response.is_expired()) + + def test_within_leeway_is_expired(self): + # Refresh slightly early to avoid races where the token expires + # mid-flight between our check and the server's validation. + soon = datetime.now(timezone.utc) + timedelta(seconds=10) + response = AuthPluginResponse( + message='ok', content={}, expiration=soon, + ) + + self.assertTrue(response.is_expired(leeway_seconds=30)) + self.assertFalse(response.is_expired(leeway_seconds=5)) + + class TestAuthPluginError(unittest.TestCase): def test_inherits_from_sapclierror(self): diff --git a/test/unit/test_sap_http_auth_plugin_cache.py b/test/unit/test_sap_http_auth_plugin_cache.py new file mode 100644 index 00000000..0d9e4e26 --- /dev/null +++ b/test/unit/test_sap_http_auth_plugin_cache.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +import os +import stat +import tempfile +import unittest +from datetime import datetime, timedelta, timezone +from pathlib import Path +from unittest.mock import patch + +from sap.http.auth_plugin import AuthPluginResponse +from sap.http.auth_plugin_cache import ( + AuthPluginResponseFileStore, + cache_key_for, + get_response_store, +) + + +def _response(message='ok', content=None, expiration=None): + return AuthPluginResponse( + message=message, + content=content if content is not None else {'type': 'cookie', 'cookies': []}, + expiration=expiration, + ) + + +class TestCacheKeyFor(unittest.TestCase): + + def test_combines_three_components_with_pipe(self): + # The triple is the cache-isolation contract from the spec: changing + # any of context, connection, or user must not reuse a cached entry + # minted for a different combination. + self.assertEqual( + cache_key_for('dev', 'localhost', 'developer'), + '8bb9ac567da8a43ca1f89ac47fdefc15a61b1bcfc711e9a6018bd50599e4aa11', + ) + + def test_different_combinations_produce_different_keys(self): + keys = { + cache_key_for('dev', 'host1', 'u1'), + cache_key_for('dev', 'host2', 'u1'), + cache_key_for('dev', 'host1', 'u2'), + cache_key_for('prod', 'host1', 'u1'), + } + self.assertEqual(len(keys), 4) + + +class TestAuthPluginResponseFileStoreRoundTrip(unittest.TestCase): + + def setUp(self): + self._tmp = tempfile.TemporaryDirectory() + self.addCleanup(self._tmp.cleanup) + self.store = AuthPluginResponseFileStore(base_dir=Path(self._tmp.name)) + + def test_set_then_get_returns_equivalent_response(self): + original = _response( + content={'type': 'cookie', 'cookies': [{'name': 'X', 'value': 'y'}]}, + expiration=datetime(2026, 5, 8, 12, tzinfo=timezone.utc), + ) + + key = cache_key_for('dev', 'localhost', 'developer') + self.store.set(key, original) + loaded = self.store.get(key) + + self.assertEqual(loaded.message, original.message) + self.assertEqual(loaded.content, original.content) + self.assertEqual(loaded.expiration, original.expiration) + + def test_get_missing_key_returns_none(self): + key = cache_key_for('absent', 'key', 'here') + self.assertIsNone(self.store.get(key)) + + def test_set_overwrites_existing_entry(self): + self.store.set('k', _response(message='first')) + self.store.set('k', _response(message='second')) + + self.assertEqual(self.store.get('k').message, 'second') + + def test_delete_removes_entry(self): + self.store.set('k', _response()) + self.store.delete('k') + + self.assertIsNone(self.store.get('k')) + + def test_delete_missing_key_is_noop(self): + self.store.delete('never-stored') + + def test_response_without_expiration_round_trips(self): + # No expiration field is the common case for cookie plugins; the + # store must not require one. + self.store.set('k', _response(expiration=None)) + loaded = self.store.get('k') + + self.assertIsNone(loaded.expiration) + + def test_two_keys_are_isolated(self): + self.store.set('a', _response(message='for-a')) + self.store.set('b', _response(message='for-b')) + + self.assertEqual(self.store.get('a').message, 'for-a') + self.assertEqual(self.store.get('b').message, 'for-b') + + def test_cache_directory_has_owner_only_permissions(self): + self.store.set('k', _response()) + cache_dir = Path(self._tmp.name) / 'auth_plugin_responses' + mode = stat.S_IMODE(cache_dir.stat().st_mode) + self.assertEqual(mode, 0o700) + + def test_cache_file_has_owner_only_permissions(self): + self.store.set('k', _response()) + cache_dir = Path(self._tmp.name) / 'auth_plugin_responses' + for entry in cache_dir.iterdir(): + mode = stat.S_IMODE(entry.stat().st_mode) + self.assertEqual(mode, 0o600) + + def test_keys_with_pipe_separator_sanitized_in_filename(self): + # Pipe is not a portable filename character; JSONFileStore sanitizes + # it. We only care that the round-trip works - the on-disk filename + # is an implementation detail. + key = cache_key_for('dev', 'host', 'user') + self.store.set(key, _response(message='live')) + self.assertEqual(self.store.get(key).message, 'live') + + +class TestExpiredResponseStillReadable(unittest.TestCase): + """The store does not police expiration - that's the caller's job. An + expired response must still be readable so the caller can decide + whether to refresh. + """ + + def setUp(self): + self._tmp = tempfile.TemporaryDirectory() + self.addCleanup(self._tmp.cleanup) + self.store = AuthPluginResponseFileStore(base_dir=Path(self._tmp.name)) + + def test_expired_response_round_trips(self): + past = datetime.now(timezone.utc) - timedelta(hours=1) + self.store.set('k', _response(expiration=past)) + + loaded = self.store.get('k') + + self.assertIsNotNone(loaded) + self.assertTrue(loaded.is_expired()) + + +class TestGetResponseStoreSingleton(unittest.TestCase): + + def test_returns_same_instance_on_repeat_calls(self): + # Reset the module-level singleton for an isolated test. + with patch('sap.http.auth_plugin_cache._response_store', None): + first = get_response_store() + second = get_response_store() + + self.assertIs(first, second) + + def test_default_instance_uses_default_cache_dir(self): + with patch('sap.http.auth_plugin_cache._response_store', None), \ + patch('sap.http.auth_plugin_cache._default_cache_dir') as mock_dir: + mock_dir.return_value = Path(tempfile.mkdtemp()) + try: + store = get_response_store() + self.assertIsInstance(store, AuthPluginResponseFileStore) + mock_dir.assert_called_once() + finally: + import shutil + shutil.rmtree(mock_dir.return_value, ignore_errors=True) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/test_sap_http_external_session_initializer.py b/test/unit/test_sap_http_external_session_initializer.py new file mode 100644 index 00000000..31e56d34 --- /dev/null +++ b/test/unit/test_sap_http_external_session_initializer.py @@ -0,0 +1,441 @@ +#!/usr/bin/env python3 + +import unittest +from datetime import datetime, timedelta, timezone +from unittest.mock import Mock, patch + +import requests + +from sap.http.auth_plugin import ( + AuthPluginError, + AuthPluginResponse, + ConnectionInfo, +) +from sap.http.client import HTTPClient, HTTPSessionInitializer +from sap.http.errors import UnauthorizedError +from sap.http.external_session_initializer import HTTPExternalSessionInitializer + + +def _connection(): + return ConnectionInfo( + proto='https', + ashost='abap.example.org', + port='44300', + client='100', + type='adt', + path='/sap/bc/adt/core/systeminformation', + ) + + +def _response(content, message='ok', expiration=None): + return AuthPluginResponse( + message=message, + content=content, + expiration=expiration, + ) + + +class TestConstruction(unittest.TestCase): + + def test_satisfies_session_initializer_protocol(self): + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='u' + ) + + self.assertIsInstance(initializer, HTTPSessionInitializer) + + @patch('sap.http.external_session_initializer.run_plugin') + def test_does_not_run_plugin_at_construction(self, mock_run): + HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='u' + ) + + mock_run.assert_not_called() + + @patch('sap.http.external_session_initializer.run_plugin') + def test_initialize_session_returns_same_session(self, mock_run): + mock_run.return_value = _response({'type': 'cookie', 'cookies': []}) + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='u' + ) + session = requests.Session() + + returned = initializer.initialize_session(session) + + self.assertIs(returned, session) + + @patch('sap.http.external_session_initializer.run_plugin') + def test_initialize_session_invokes_plugin_with_configured_args(self, mock_run): + mock_run.return_value = _response({'type': 'cookie', 'cookies': []}) + connection = _connection() + parameters = {'channel': 'msedge'} + initializer = HTTPExternalSessionInitializer( + command='sapcli-windows-cert-auth', + parameters=parameters, + connection=connection, + user='u', + ) + + initializer.initialize_session(requests.Session()) + + mock_run.assert_called_once_with( + 'sapcli-windows-cert-auth', parameters, connection + ) + + @patch('sap.http.external_session_initializer.run_plugin') + def test_initialize_session_propagates_plugin_error(self, mock_run): + mock_run.side_effect = AuthPluginError('plugin crashed') + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='u' + ) + + with self.assertRaisesRegex(AuthPluginError, 'plugin crashed'): + initializer.initialize_session(requests.Session()) + + def test_build_unauthorized_error_carries_user(self): + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='alice' + ) + req = Mock() + res = Mock() + + err = initializer.build_unauthorized_error(req, res) + + self.assertIsInstance(err, UnauthorizedError) + self.assertIs(err.request, req) + self.assertIs(err.response, res) + self.assertEqual(err.user, 'alice') + + +class TestCookieDispatch(unittest.TestCase): + + def _initialize(self, content): + with patch( + 'sap.http.external_session_initializer.run_plugin', + return_value=_response(content), + ): + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='u' + ) + session = requests.Session() + initializer.initialize_session(session) + return session + + def test_minimal_cookie_sets_name_and_value(self): + session = self._initialize({ + 'type': 'cookie', + 'cookies': [{'name': 'SAP_SESSIONID', 'value': 'abc123'}], + }) + + self.assertEqual(session.cookies.get('SAP_SESSIONID'), 'abc123') + + def test_multiple_cookies_all_set(self): + session = self._initialize({ + 'type': 'cookie', + 'cookies': [ + {'name': 'A', 'value': '1'}, + {'name': 'B', 'value': '2'}, + ], + }) + + self.assertEqual(session.cookies.get('A'), '1') + self.assertEqual(session.cookies.get('B'), '2') + + def test_cookie_with_domain_and_path(self): + session = self._initialize({ + 'type': 'cookie', + 'cookies': [{ + 'name': 'SAP_SESSIONID', + 'value': 'abc', + 'domain': 'abap.example.org', + 'path': '/sap', + }], + }) + + # Read back via get with the same domain/path filter we passed in. + self.assertEqual( + session.cookies.get('SAP_SESSIONID', domain='abap.example.org', path='/sap'), + 'abc', + ) + + def test_missing_cookies_field_raises(self): + with self.assertRaisesRegex(AuthPluginError, "cookies"): + self._initialize({'type': 'cookie'}) + + def test_cookies_not_a_list_raises(self): + with self.assertRaisesRegex(AuthPluginError, "cookies"): + self._initialize({'type': 'cookie', 'cookies': 'not-a-list'}) + + def test_cookie_without_name_raises(self): + with self.assertRaisesRegex(AuthPluginError, "name"): + self._initialize({ + 'type': 'cookie', + 'cookies': [{'value': 'abc'}], + }) + + def test_cookie_without_value_raises(self): + with self.assertRaisesRegex(AuthPluginError, "value"): + self._initialize({ + 'type': 'cookie', + 'cookies': [{'name': 'X'}], + }) + + def test_cookie_entry_not_object_raises(self): + with self.assertRaisesRegex(AuthPluginError, "object"): + self._initialize({ + 'type': 'cookie', + 'cookies': ['name=value; Path=/'], + }) + + +class TestHeaderDispatch(unittest.TestCase): + + def _initialize(self, content): + with patch( + 'sap.http.external_session_initializer.run_plugin', + return_value=_response(content), + ): + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='u' + ) + session = requests.Session() + initializer.initialize_session(session) + return session + + def test_authorization_header_set(self): + session = self._initialize({ + 'type': 'http_authorization_header', + 'headers': {'Authorization': 'Basic abc123'}, + }) + + self.assertEqual(session.headers.get('Authorization'), 'Basic abc123') + + def test_multiple_headers_set(self): + session = self._initialize({ + 'type': 'http_authorization_header', + 'headers': { + 'Authorization': 'Bearer xyz', + 'X-Custom': 'value', + }, + }) + + self.assertEqual(session.headers.get('Authorization'), 'Bearer xyz') + self.assertEqual(session.headers.get('X-Custom'), 'value') + + def test_missing_headers_field_raises(self): + with self.assertRaisesRegex(AuthPluginError, "headers"): + self._initialize({'type': 'http_authorization_header'}) + + def test_empty_headers_raises(self): + with self.assertRaisesRegex(AuthPluginError, "headers"): + self._initialize({ + 'type': 'http_authorization_header', + 'headers': {}, + }) + + def test_headers_not_a_dict_raises(self): + with self.assertRaisesRegex(AuthPluginError, "headers"): + self._initialize({ + 'type': 'http_authorization_header', + 'headers': ['Authorization: Basic abc'], + }) + + +class TestCertificatesDispatch(unittest.TestCase): + + def _initialize(self, content): + with patch( + 'sap.http.external_session_initializer.run_plugin', + return_value=_response(content), + ): + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='u' + ) + session = requests.Session() + initializer.initialize_session(session) + return session + + def test_certificate_and_key_set_session_cert(self): + session = self._initialize({ + 'type': 'certificates', + 'certificate': '/etc/ssl/client.pem', + 'key': '/etc/ssl/client.key', + }) + + self.assertEqual(session.cert, ('/etc/ssl/client.pem', '/etc/ssl/client.key')) + + def test_issuer_certificate_sets_session_verify(self): + session = self._initialize({ + 'type': 'certificates', + 'certificate': '/etc/ssl/client.pem', + 'key': '/etc/ssl/client.key', + 'issuer_certificate': '/etc/ssl/ca.pem', + }) + + self.assertEqual(session.verify, '/etc/ssl/ca.pem') + + def test_missing_certificate_raises(self): + with self.assertRaisesRegex(AuthPluginError, "certificate"): + self._initialize({ + 'type': 'certificates', + 'key': '/etc/ssl/client.key', + }) + + def test_missing_key_raises(self): + with self.assertRaisesRegex(AuthPluginError, "key"): + self._initialize({ + 'type': 'certificates', + 'certificate': '/etc/ssl/client.pem', + }) + + +class TestUnknownContentType(unittest.TestCase): + + @patch('sap.http.external_session_initializer.run_plugin') + def test_unknown_type_raises(self, mock_run): + mock_run.return_value = _response({'type': 'oauth_token'}) + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='u' + ) + + with self.assertRaisesRegex(AuthPluginError, 'oauth_token'): + initializer.initialize_session(requests.Session()) + + @patch('sap.http.external_session_initializer.run_plugin') + def test_missing_type_raises(self, mock_run): + mock_run.return_value = _response({'cookies': []}) + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='u' + ) + + with self.assertRaisesRegex(AuthPluginError, 'type'): + initializer.initialize_session(requests.Session()) + + @patch('sap.http.external_session_initializer.run_plugin') + def test_content_not_a_dict_raises(self, mock_run): + mock_run.return_value = _response('not-an-object') + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='u' + ) + + with self.assertRaisesRegex(AuthPluginError, 'content'): + initializer.initialize_session(requests.Session()) + + +class TestHTTPClientWithExternalInitializer(unittest.TestCase): + + def test_external_initializer_is_used_when_provided(self): + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='u' + ) + + client = HTTPClient(host='h', user='u', password=None, session_initializer=initializer) + + self.assertIs(client._session_initializer, initializer) + + def test_build_unauthorized_error_delegates_to_initializer(self): + initializer = HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), user='alice' + ) + client = HTTPClient(host='h', user='u', password=None, session_initializer=initializer) + req = Mock() + res = Mock() + + err = client.build_unauthorized_error(req, res) + + self.assertIsInstance(err, UnauthorizedError) + self.assertEqual(err.user, 'alice') + + +class TestCacheIntegration(unittest.TestCase): + """When cache_key is set, initialize_session must: + 1. consult the cache first; + 2. on miss/expired, run the plugin and store the result; + 3. never read or write the cache when cache_key is None (back-compat + for callers that did not opt in to caching). + """ + + def _make(self, cache_key=None): + return HTTPExternalSessionInitializer( + command='cmd', parameters={}, connection=_connection(), + user='u', cache_key=cache_key, + ) + + @patch('sap.http.external_session_initializer.get_response_store') + @patch('sap.http.external_session_initializer.run_plugin') + def test_no_cache_key_skips_cache_entirely(self, mock_run, mock_store): + mock_run.return_value = _response({'type': 'cookie', 'cookies': []}) + + self._make(cache_key=None).initialize_session(requests.Session()) + + mock_run.assert_called_once() + mock_store.assert_not_called() + + @patch('sap.http.external_session_initializer.get_response_store') + @patch('sap.http.external_session_initializer.run_plugin') + def test_cache_miss_runs_plugin_and_stores(self, mock_run, mock_store): + store = Mock() + store.get.return_value = None + mock_store.return_value = store + plugin_response = _response({'type': 'cookie', 'cookies': []}) + mock_run.return_value = plugin_response + + self._make(cache_key='ctx|conn|u').initialize_session(requests.Session()) + + store.get.assert_called_once_with('ctx|conn|u') + mock_run.assert_called_once() + store.set.assert_called_once_with('ctx|conn|u', plugin_response) + + @patch('sap.http.external_session_initializer.get_response_store') + @patch('sap.http.external_session_initializer.run_plugin') + def test_cache_hit_skips_plugin(self, mock_run, mock_store): + store = Mock() + cached = _response( + {'type': 'cookie', 'cookies': [{'name': 'X', 'value': 'cached'}]}, + ) + store.get.return_value = cached + mock_store.return_value = store + session = requests.Session() + + self._make(cache_key='k').initialize_session(session) + + mock_run.assert_not_called() + store.set.assert_not_called() + # Cookies from the cached response must land on the session. + self.assertEqual(session.cookies.get('X'), 'cached') + + @patch('sap.http.external_session_initializer.get_response_store') + @patch('sap.http.external_session_initializer.run_plugin') + def test_expired_cache_entry_is_refreshed(self, mock_run, mock_store): + store = Mock() + # is_expired() returning True makes the initializer treat it as a miss. + expired = AuthPluginResponse( + message='stale', content={'type': 'cookie', 'cookies': []}, + expiration=datetime.now(timezone.utc) - timedelta(hours=1), + ) + store.get.return_value = expired + mock_store.return_value = store + fresh = _response({'type': 'cookie', 'cookies': []}) + mock_run.return_value = fresh + + self._make(cache_key='k').initialize_session(requests.Session()) + + mock_run.assert_called_once() + store.set.assert_called_once_with('k', fresh) + + @patch('sap.http.external_session_initializer.get_response_store') + @patch('sap.http.external_session_initializer.run_plugin') + def test_plugin_error_does_not_poison_cache(self, mock_run, mock_store): + store = Mock() + store.get.return_value = None + mock_store.return_value = store + mock_run.side_effect = AuthPluginError('plugin crashed') + + with self.assertRaises(AuthPluginError): + self._make(cache_key='k').initialize_session(requests.Session()) + + store.set.assert_not_called() + + +if __name__ == '__main__': + unittest.main()