-
Notifications
You must be signed in to change notification settings - Fork 30
Add OAuth 2.0 support for BTP ABAP Environment (Steampunk) #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ebd3cf6
240d39a
112485e
4397110
0472e8d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| """OAuth 2.0 password grant flow with token caching for BTP Steampunk.""" | ||
|
|
||
| import json | ||
| import os | ||
| import time | ||
| from pathlib import Path | ||
|
|
||
| import requests | ||
|
|
||
| TOKEN_CACHE_PATH = Path('~/.sapcli/tokens.json').expanduser() | ||
| REFRESH_MARGIN = 60 | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Token cache | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| def _load_token_cache(): | ||
| try: | ||
| with open(TOKEN_CACHE_PATH, 'r', encoding='utf-8') as f: | ||
| return json.load(f) | ||
| except (OSError, json.JSONDecodeError): | ||
| return {} | ||
|
|
||
|
|
||
| def _save_token_cache(cache): | ||
| TOKEN_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True) | ||
| fd = os.open(TOKEN_CACHE_PATH, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) | ||
| with os.fdopen(fd, 'w', encoding='utf-8') as f: | ||
| json.dump(cache, f, indent=2) | ||
|
|
||
|
|
||
| def _cache_key(token_url, client_id): | ||
| return f'{token_url}|{client_id}' | ||
|
|
||
|
|
||
| def get_cached_token(token_url, client_id): | ||
| cache = _load_token_cache() | ||
| entry = cache.get(_cache_key(token_url, client_id)) | ||
| if not entry: | ||
| return None | ||
| if time.time() > entry.get('expires_at', 0) - REFRESH_MARGIN: | ||
| return None | ||
| return entry['access_token'] | ||
|
|
||
|
|
||
| def get_cached_refresh_token(token_url, client_id): | ||
| cache = _load_token_cache() | ||
| entry = cache.get(_cache_key(token_url, client_id)) | ||
| if not entry: | ||
| return None | ||
| return entry.get('refresh_token') | ||
|
|
||
|
|
||
| def save_token_response(token_url, client_id, token_response): | ||
| cache = _load_token_cache() | ||
| expires_in = token_response.get('expires_in', 3600) | ||
| cache[_cache_key(token_url, client_id)] = { | ||
| 'access_token': token_response['access_token'], | ||
| 'refresh_token': token_response.get('refresh_token'), | ||
| 'expires_at': time.time() + expires_in, | ||
| } | ||
| _save_token_cache(cache) | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Token refresh | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| def refresh_access_token(token_url, client_id, client_secret, refresh_token): | ||
| response = requests.post( | ||
| token_url.rstrip('/') + '/oauth/token', | ||
| auth=(client_id, client_secret), | ||
| data={'grant_type': 'refresh_token', 'refresh_token': refresh_token}, | ||
| timeout=30, | ||
| ) | ||
| if not response.ok: | ||
| return None | ||
|
Comment on lines
+70
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refresh failure handling is too silent and can cause confusing fallback prompts. Line 78–79 returns 🤖 Prompt for AI Agents |
||
| token_data = response.json() | ||
| save_token_response(token_url, client_id, token_data) | ||
| return token_data['access_token'] | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Interactive password grant | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| def fetch_token_with_credentials(token_url, client_id, client_secret, user, password): | ||
| """Obtain a Bearer token via OAuth 2.0 password grant using provided credentials.""" | ||
|
|
||
| response = requests.post( | ||
| token_url.rstrip('/') + '/oauth/token', | ||
| auth=(client_id, client_secret), | ||
| data={ | ||
| 'grant_type': 'password', | ||
| 'username': user, | ||
| 'password': password, | ||
| }, | ||
| timeout=30, | ||
| ) | ||
|
|
||
| if not response.ok: | ||
| raise RuntimeError( | ||
| f'OAuth login failed ({response.status_code}): {response.text}' | ||
| ) | ||
|
|
||
| token_data = response.json() | ||
| save_token_response(token_url, client_id, token_data) | ||
| return token_data['access_token'] | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Entry point | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| def get_token(token_url, client_id, client_secret, user=None, password=None): | ||
| """Return a valid Bearer token — from cache, refresh, or credentials grant.""" | ||
|
|
||
| token = get_cached_token(token_url, client_id) | ||
| if token: | ||
| return token | ||
|
|
||
| refresh_token = get_cached_refresh_token(token_url, client_id) | ||
| if refresh_token: | ||
| token = refresh_access_token(token_url, client_id, client_secret, refresh_token) | ||
| if token: | ||
| return token | ||
|
|
||
| return fetch_token_with_credentials(token_url, client_id, client_secret, user, password) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client_secretis now storable, but secret-exposure checks don’t cover it.After adding
client_secretin Line 26,_has_passwords()still only inspectspassword. World-readable/shared-config warnings can be missed when only OAuth secrets are present.💡 Proposed fix
🤖 Prompt for AI Agents