From d6a6fec2145476686277777d52691589945c6335 Mon Sep 17 00:00:00 2001 From: Filip Jodoin Date: Mon, 4 May 2026 07:55:10 -0400 Subject: [PATCH 1/2] Adding PRT spraying for CAP gaps --- findmeaccess.py | 619 ++++++++++++++++++++++++++++++++++++++++++++++- requirements.txt | 4 +- 2 files changed, 616 insertions(+), 7 deletions(-) diff --git a/findmeaccess.py b/findmeaccess.py index 1a8a588..c09d573 100644 --- a/findmeaccess.py +++ b/findmeaccess.py @@ -11,6 +11,9 @@ from lxml import etree import base64 import uuid +import os +import time +from urllib.parse import urlparse, parse_qs # endpoint resources @@ -160,6 +163,82 @@ def get_tenant_id(domain, proxy): print(f"[!] Error retrieving tenant ID - HTTP Status Code {response.status_code}") return +def base64url_encode(data): + if isinstance(data, str): + data = data.encode('utf-8') + return base64.urlsafe_b64encode(data).rstrip(b'=').decode('utf-8') + +def base64url_decode(data): + pad = 4 - len(data) % 4 + if pad != 4: + data += '=' * pad + return base64.urlsafe_b64decode(data) + +def get_prt_nonce(proxy): + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + url = "https://login.microsoftonline.com/common/oauth2/token" + data = {"grant_type": "srv_challenge"} + response = requests.post(url, data=data, proxies=proxy, verify=False) + if response.status_code == 200: + return response.json().get("Nonce") + print(colored("[!] Failed to get PRT nonce", "red", attrs=['bold'])) + return None + +def calculate_derived_key(session_key_bytes, context=None): + """KDF using KBKDFHMAC - matches roadtx implementation""" + from cryptography.hazmat.primitives import hashes as crypto_hashes + from cryptography.hazmat.primitives.kdf.kbkdf import CounterLocation, KBKDFHMAC, Mode + from cryptography.hazmat.backends import default_backend + + label = b"AzureAD-SecureConversation" + if not context: + context = os.urandom(24) + kdf = KBKDFHMAC( + algorithm=crypto_hashes.SHA256(), + mode=Mode.CounterMode, + length=32, + rlen=4, + llen=4, + location=CounterLocation.BeforeFixed, + label=label, + context=context, + fixed=None, + backend=default_backend() + ) + derived_key = kdf.derive(session_key_bytes) + return context, derived_key + +def create_prt_cookie_kdf_v2(prt, session_key_bytes, nonce): + """Create a KDF v2 PRT cookie - matches roadtx create_prt_cookie_kdf_ver_2""" + import jwt as pyjwt + from cryptography.hazmat.primitives import hashes as crypto_hashes + + context = os.urandom(24) + headers = { + 'ctx': base64.b64encode(context).decode('utf-8'), + 'kdf_ver': 2 + } + payload = { + "refresh_token": prt, + "is_primary": "true", + "request_nonce": nonce + } + + # Sign with random key to get JWT body in correct encoding + temp_jwt = pyjwt.encode(payload, os.urandom(32), algorithm='HS256', headers=headers) + jbody = temp_jwt.split('.')[1] + jwt_body = base64.b64decode(jbody + ('=' * (len(jbody) % 4))) + + # Calculate derived key v2: SHA256(context + jwt_body) -> KBKDFHMAC + digest = crypto_hashes.Hash(crypto_hashes.SHA256()) + digest.update(context) + digest.update(jwt_body) + kdf_context = digest.finalize() + _, derived_key = calculate_derived_key(session_key_bytes, kdf_context) + + cookie = pyjwt.encode(payload, derived_key, algorithm='HS256', headers=headers) + return cookie + def refresh_authenticate(client_id, user_agent, proxy, tenant_id, refresh_token, scope): urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -610,7 +689,414 @@ def get_azure_token_via_adfs(username, password, scope, custom_user_agent, clien def handle_combination(combination): username, password, resource, client_id, user_agent, proxy = combination return authenticate(username, password, resource, client_id, user_agent, proxy) + +# refresh token audit authentication - tests CAP policies post-MFA +def refresh_audit_authenticate(scope, client_id, user_agent, proxy, tenant_id, refresh_token): + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + url = f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token" + + parameters = { + 'refresh_token': refresh_token, + 'client_id': client_id[1], + 'grant_type': 'refresh_token', + 'scope': scope[1] + } + + headers = { + 'User-Agent': user_agent[1], + 'Accept': 'application/json', + 'Content-Type': 'application/x-www-form-urlencoded' + } + + response = requests.post(url, data=parameters, headers=headers, proxies=proxy, verify=False) + + if response.status_code == 200: + success_string = colored("Success! Token Obtained","green", attrs=['bold']) + json_text = json.loads(response.text) + token_scope = json_text.get('scope', 'None') + scope_string = colored(f"Token Scope: {token_scope}", attrs=['bold']) + print(f"[+] {scope[0]} - {client_id[0]} - {user_agent[0]} - {success_string} - {scope_string}") + return scope, client_id, user_agent + + else: + # Microsoft MFA + if "AADSTS50076" in response.text: + message_string = colored("MFA Required or blocked by conditional access","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + # Must enroll in MFA + elif "AADSTS50079" in response.text: + message_string = colored("MFA enrollment required but not configured!","green", attrs=['bold']) + print(f"[+] {scope[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + # Conditional Access + elif "AADSTS53003" in response.text: + message_string = colored("Blocked by conditional access policy","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + # Conditional Access + elif "AADSTS50105" in response.text: + message_string = colored("Application blocked by conditional access policy","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + # Third party MFA + elif "AADSTS50158" in response.text: + message_string = colored("Third-party MFA required","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + # Compliant Device + elif "AADSTS53000" in response.text: + message_string = colored("Requires compliant/managed device","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + # Consent + elif "AADSTS65001" in response.text: + message_string = colored("User or administrator has not consented to use the application","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + # Disabled application + elif "AADSTS7000112:" in response.text: + message_string = colored("Application disabled","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {message_string}") + + # Clientid isn't valid for scope + elif "AADSTS65002" in response.text: + message_string = colored("Client_id not authorized for scope","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {message_string}") + + # Assertion or secret required + elif "AADSTS7000218" in response.text: + message_string = colored("client_assertion or client_secret required","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {message_string}") + + # User blocked + elif "AADSTS53011" in response.text: + message_string = colored("User blocked due to risk on home tenant","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {message_string}") + + # Suspicious activity + elif "AADSTS53004" in response.text: + message_string = colored("Suspicious activity","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {message_string}") + + # Refresh token not valid for this client_id (not in FOCI family) + elif "AADSTS70000" in response.text: + message_string = colored("Refresh token not valid for this client_id","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {message_string}") + + # Refresh token expired (SPA fixed lifetime) + elif "AADSTS700084" in response.text: + raise ValueError(colored(f"[!] Refresh token is expired","red", attrs=['bold'])) + + # Invalid clientid + elif "AADSTS700016" in response.text: + message_string = colored("Client_id is invalid","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {message_string}") + + # Invalid scope + elif "AADSTS70011" in response.text or "AADSTS500011" in response.text: + message_string = colored("Invalid scope","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {message_string}") + + # Disabled account + elif "AADSTS50057" in response.text: + raise ValueError(colored(f"[!] The account appears to be disabled.","red", attrs=['bold'])) + + # Locked account + elif "AADSTS50053" in response.text: + raise ValueError(colored(f"[!] The account appears to be locked.","red", attrs=['bold'])) + + # default unknown + else: + response_data = json.loads(response.text) + error_description = response_data.get('error_description') + message_string = colored(f"Error: {error_description}","yellow", attrs=['bold']) + print(f"[-] {scope[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + return + +def handle_refresh_combination(combination): + scope, client_id, user_agent, proxy, tenant_id, refresh_token = combination + return refresh_audit_authenticate(scope, client_id, user_agent, proxy, tenant_id, refresh_token) + +# mass check scopes via refresh token exchange +def check_scopes_refresh(tenant_id, refresh_token, all_user_agents, threads, custom_user_agent, custom_scope, proxy, custom_client=None): + print("[*] Starting refresh token audit checks") + results = [] + scopes_to_check = {} + + if custom_scope is not None: + if custom_scope in scopes: + scopes_to_check[custom_scope] = scopes[custom_scope][0] + else: + print(f"[-] Unknown scope '{custom_scope}'. List with --list_scopes") + sys.exit() + else: + scopes_to_check = {k: v[0] for k, v in scopes.items()} + + # Filter client_ids + if custom_client is not None: + if custom_client in client_ids: + client_ids_to_use = {custom_client: client_ids[custom_client]} + else: + client_ids_to_use = {"Custom": custom_client} + else: + client_ids_to_use = client_ids + + # generate final results dict + for scope_name in scopes_to_check: + final_results[scope_name] = {'Accessible': False, 'Accessible Client IDs': 0} + + if all_user_agents: + combinations = [(scope, client_id, user_agent, proxy, tenant_id, refresh_token) + for scope in scopes_to_check.items() + for client_id in client_ids_to_use.items() + for user_agent in user_agents.items()] + else: + if custom_user_agent is not None: + user_agent = ("Custom", custom_user_agent) + else: + ua_key = "Windows 10 Chrome" + user_agent = (ua_key, user_agents[ua_key]) + combinations = [(scope, client_id, user_agent, proxy, tenant_id, refresh_token) + for scope in scopes_to_check.items() + for client_id in client_ids_to_use.items()] + + try: + error_raised = False + with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: + try: + for result in executor.map(handle_refresh_combination, combinations): + results.append(result) + except ValueError as e: + if not error_raised: + error_raised = True + print(e) + sys.exit() + + return results + + except KeyboardInterrupt: + print(colored("[!] Ctrl+C detected, exiting...", "yellow")) + sys.exit() + +# PRT audit - uses roadtools library directly (same as roadtx prtauth) +def prt_audit_authenticate(resource, client_id, user_agent, proxy, tenant_id, prt, session_key_bytes): + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + from roadtools.roadlib.auth import Authentication, AuthenticationException + from roadtools.roadlib.deviceauth import DeviceAuthentication + + try: + from roadtools.roadtx.utils import find_redirurl_for_client + except ImportError: + find_redirurl_for_client = None + + auth = Authentication() + auth.tenant = tenant_id + auth.verify = False + auth.proxies = proxy + auth.set_client_id(client_id[1]) + auth.set_resource_uri(resource[1]) + auth.set_user_agent(user_agent[1]) + + deviceauth = DeviceAuthentication(auth) + deviceauth.proxies = proxy + deviceauth.verify = False + deviceauth.prt = prt + deviceauth.session_key = session_key_bytes + + # Find correct redirect URI (same logic as roadtx prtauth) + if find_redirurl_for_client: + redirect_uri = find_redirurl_for_client(auth.client_id, interactive=False, broker=True) + else: + redirect_uri = 'https://login.microsoftonline.com/common/oauth2/nativeclient' + + try: + tokendata = deviceauth.aad_brokerplugin_prt_auth(client_id[1], resource[1], redirect_uri=redirect_uri) + if tokendata and 'access_token' in tokendata: + success_string = colored("Success! Token Obtained", "green", attrs=['bold']) + print(f"[+] {resource[0]} - {client_id[0]} - {user_agent[0]} - {success_string}") + return resource, client_id, user_agent + elif tokendata and 'error_description' in tokendata: + error_desc = tokendata['error_description'] + message_string = colored(f"Error: {str(error_desc)[:200]}", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + return None + else: + success_string = colored("Success! Token Obtained (no access_token key)", "green", attrs=['bold']) + print(f"[+] {resource[0]} - {client_id[0]} - {user_agent[0]} - {success_string}") + return resource, client_id, user_agent + + except AuthenticationException as e: + error_text = str(e) + + if "AADSTS50076" in error_text: + message_string = colored("MFA Required or blocked by conditional access", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + elif "AADSTS50079" in error_text: + message_string = colored("MFA enrollment required but not configured!", "green", attrs=['bold']) + print(f"[+] {resource[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + elif "AADSTS53003" in error_text: + message_string = colored("Blocked by conditional access policy", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + elif "AADSTS50105" in error_text: + message_string = colored("Application blocked by conditional access policy", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + elif "AADSTS50158" in error_text: + message_string = colored("Third-party MFA required", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + elif "AADSTS53000" in error_text: + message_string = colored("Requires compliant/managed device", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + elif "AADSTS65001" in error_text: + message_string = colored("User or administrator has not consented", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + elif "AADSTS7000112" in error_text: + message_string = colored("Application disabled", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {message_string}") + + elif "AADSTS65002" in error_text: + message_string = colored("Client_id not authorized for resource", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {message_string}") + + elif "AADSTS7000218" in error_text: + message_string = colored("client_assertion or client_secret required", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {message_string}") + + elif "AADSTS50053" in error_text: + raise ValueError(colored("[!] The account appears to be locked.", "red", attrs=['bold'])) + + elif "AADSTS50057" in error_text: + raise ValueError(colored("[!] The account appears to be disabled.", "red", attrs=['bold'])) + + elif "AADSTS500011" in error_text: + message_string = colored("Invalid resource", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {message_string}") + + elif "AADSTS700016" in error_text: + message_string = colored("Client_id is invalid", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {message_string}") + + elif "AADSTS50173" in error_text: + raise ValueError(colored("[!] PRT is invalid or expired", "red", attrs=['bold'])) + + elif "AADSTS70000" in error_text: + message_string = colored("Invalid grant/assertion", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {message_string}") + + else: + try: + err_data = json.loads(error_text) + desc = err_data.get('error_description', error_text[:200]) + except (json.JSONDecodeError, TypeError): + desc = error_text[:200] + message_string = colored(f"Error: {desc}", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + + return None + + except Exception as e: + message_string = colored(f"Error: {str(e)[:200]}", "yellow", attrs=['bold']) + print(f"[-] {resource[0]} - {client_id[0]} - {user_agent[0]} - {message_string}") + return None + +def handle_prt_combination(combination): + resource, client_id, user_agent, proxy, tenant_id, prt, session_key_bytes = combination + return prt_audit_authenticate(resource, client_id, user_agent, proxy, tenant_id, prt, session_key_bytes) + +# Load PRT from roadtx .prt file +def load_prt_file(filepath): + """Load PRT and session key from a roadtx .prt file using roadtools""" + from roadtools.roadlib.auth import Authentication + from roadtools.roadlib.deviceauth import DeviceAuthentication + + auth = Authentication() + deviceauth = DeviceAuthentication(auth) + if not deviceauth.loadprt(filepath): + print(colored(f"[!] Failed to load PRT file: {filepath}", "red", attrs=['bold'])) + sys.exit() + + # Return PRT and session key in the format check_resources_prt expects + print(colored("[+] Loaded PRT from file", "green", attrs=['bold'])) + return deviceauth.prt, deviceauth.session_key + +def check_resources_prt(tenant_id, prt, session_key, all_user_agents, threads, custom_user_agent, custom_resource, proxy, custom_client=None): + print("[*] Starting PRT audit checks") + + # session_key is already bytes if loaded via roadtools, or hex string if from CLI + if isinstance(session_key, str): + from roadtools.roadlib.auth import Authentication + session_key_bytes = Authentication.ensure_binary_sessionkey(session_key) + else: + session_key_bytes = session_key + + results = [] + resources_to_check = {} + + if custom_resource is not None: + if custom_resource in resources: + resources_to_check[custom_resource] = resources[custom_resource] + elif custom_resource in resources.values(): + for key, value in resources.items(): + if value == custom_resource: + resources_to_check[key] = custom_resource + else: + resources_to_check["Custom"] = custom_resource + else: + resources_to_check = resources + + if custom_client is not None: + if custom_client in client_ids: + client_ids_to_use = {custom_client: client_ids[custom_client]} + else: + client_ids_to_use = {"Custom": custom_client} + else: + client_ids_to_use = client_ids + + for resource in resources_to_check: + final_results[resource] = {'Accessible': False, 'Accessible Client IDs': 0} + + if all_user_agents: + combinations = [(resource, client_id, user_agent, proxy, tenant_id, prt, session_key_bytes) + for resource in resources_to_check.items() + for client_id in client_ids_to_use.items() + for user_agent in user_agents.items()] + else: + if custom_user_agent is not None: + user_agent = ("Custom", custom_user_agent) + else: + ua_key = "Windows 10 Chrome" + user_agent = (ua_key, user_agents[ua_key]) + combinations = [(resource, client_id, user_agent, proxy, tenant_id, prt, session_key_bytes) + for resource in resources_to_check.items() + for client_id in client_ids_to_use.items()] + + try: + error_raised = False + with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: + try: + for result in executor.map(handle_prt_combination, combinations): + results.append(result) + except ValueError as e: + if not error_raised: + error_raised = True + print(e) + sys.exit() + + return results + + except KeyboardInterrupt: + print(colored("[!] Ctrl+C detected, exiting...", "yellow")) + sys.exit() + # mass check resources, client ids, and user agents def check_resources(username, password, all_user_agents, threads, custom_user_agent, custom_resource, proxy, custom_client=None): print("[*] Starting checks") @@ -697,7 +1183,7 @@ def write_results(username, results): print(f"\n[+] Results written to {filename}\n") # print out final table -def print_table(results): +def print_table(results, accessible_header="Accessible w/o MFA"): #filter out None results filtered_results = [x for x in results if x is not None] @@ -716,7 +1202,7 @@ def print_table(results): accessible = colored(accessible, 'red',attrs=['bold']) table_data.append([resource, accessible, e['Accessible Client IDs']]) - print("\n\n"+tabulate(table_data, headers=[colored("Resource", attrs=['bold']), colored("Accessible w/o MFA",attrs=['bold']), colored("Accessible Client IDs",attrs=['bold'])], tablefmt="grid")) + print("\n\n"+tabulate(table_data, headers=[colored("Resource", attrs=['bold']), colored(accessible_header,attrs=['bold']), colored("Accessible Client IDs",attrs=['bold'])], tablefmt="grid")) def add_shared_arguments(parser): parser.add_argument('--proxy', metavar="proxy", help="HTTP proxy to use - ie http://127.0.0.1:8080", type=str) @@ -728,7 +1214,7 @@ def add_shared_arguments(parser): parser.add_argument('-p', metavar="password", help="Password for account", type=str) def main(): - banner = "\nFindMeAccess v3.1\n" + banner = "\nFindMeAccess v3.3\n" print(banner) parser = argparse.ArgumentParser(description='') @@ -739,7 +1225,14 @@ def main(): audit_parser.add_argument('--list_resources', help="List all resources", action='store_true') audit_parser.add_argument('--list_clients', help="List all client ids", action='store_true') audit_parser.add_argument('--list_ua', help="List all user agents", action='store_true') - audit_parser.add_argument('--ua_all', help="Check all users agents (Default: False)", action='store_true', default=False) + audit_parser.add_argument('--list_scopes', help="List all token scopes", action='store_true') + audit_parser.add_argument('--ua_all', help="Check all users agents (Default: False)", action='store_true', default=False) + audit_parser.add_argument('--refresh_token', help="Refresh token for post-MFA CAP audit (FOCI-bound)", type=str) + audit_parser.add_argument('--prt', help="Primary Refresh Token (raw value)", type=str) + audit_parser.add_argument('--prt_sessionkey', help="PRT session key (hex, from roadtx)", type=str) + audit_parser.add_argument('--prt_file', help="Path to roadtx .prt file (e.g. roadtx.prt)", type=str) + audit_parser.add_argument('-d', help="Tenant domain (required with --refresh_token/--prt/--prt_file)", type=str) + audit_parser.add_argument('-s', help="Token scope filter - show with --list_scopes", type=str) token_parser = subparsers.add_parser("token", help="Used for getting tokens") add_shared_arguments(token_parser) @@ -756,6 +1249,11 @@ def main(): adfs_parser.add_argument('--get_all', help="Get tokens for every scope", action='store_true') adfs_parser.add_argument('--url', help="ADFS endpoint ex - https://adfs.domain.com", type=str) adfs_parser.add_argument('--ua_all', help="Check all users agents (Default: False)", action='store_true', default=False) + adfs_parser.add_argument('--refresh_token', help="Refresh token for post-MFA CAP audit (FOCI-bound)", type=str) + adfs_parser.add_argument('--prt', help="Primary Refresh Token (raw value)", type=str) + adfs_parser.add_argument('--prt_sessionkey', help="PRT session key (hex, from roadtx)", type=str) + adfs_parser.add_argument('--prt_file', help="Path to roadtx .prt file (e.g. roadtx.prt)", type=str) + adfs_parser.add_argument('-d', help="Tenant domain (required with --refresh_token/--prt/--prt_file)", type=str) @@ -782,6 +1280,62 @@ def main(): elif args.list_ua: print_aligned(user_agents) + elif args.list_scopes: + print_aligned(scopes) + + elif args.refresh_token: + # Refresh token audit mode - test CAP policies post-MFA + if not args.d: + print("[-] No domain specified with '-d' option (required for refresh token audit)") + sys.exit() + + tenant_id = get_tenant_id(args.d, proxies) + if tenant_id is None: + print("[-] Exiting due to tenant ID failure - check domain name") + sys.exit() + + try: + results = check_scopes_refresh(tenant_id, args.refresh_token, args.ua_all, args.threads, args.user_agent, args.s, proxies, args.c) + if not args.ua_all: + print_table(results, "Accessible (post-MFA)") + write_results("refresh-audit", results) + + except Exception as e: + print(e) + print("[!] Exception caught, exiting...") + sys.exit() + + elif args.prt or args.prt_file: + # PRT audit mode - exchange PRT for tokens across all resource/client permutations + if not args.d: + print("[-] No domain specified with '-d' option (required for PRT audit)") + sys.exit() + + if args.prt_file: + prt, session_key_hex = load_prt_file(args.prt_file) + elif args.prt: + if not args.prt_sessionkey: + print("[-] No PRT session key specified with '--prt_sessionkey' option") + sys.exit() + prt = args.prt + session_key_hex = args.prt_sessionkey + + tenant_id = get_tenant_id(args.d, proxies) + if tenant_id is None: + print("[-] Exiting due to tenant ID failure - check domain name") + sys.exit() + + try: + results = check_resources_prt(tenant_id, prt, session_key_hex, args.ua_all, args.threads, args.user_agent, args.r, proxies, args.c) + if not args.ua_all: + print_table(results, "Accessible (via PRT)") + write_results("prt-audit", results) + + except Exception as e: + print(e) + print("[!] Exception caught, exiting...") + sys.exit() + else: if not args.u: @@ -856,8 +1410,61 @@ def main(): print_aligned(scopes) return - if not args.url: - print("[!] ADFS URL required via --url") + if args.refresh_token: + # Refresh token audit mode - ADFS not needed, goes straight to Azure AD + if not args.d: + print("[-] No domain specified with '-d' option (required for refresh token audit)") + sys.exit() + + tenant_id = get_tenant_id(args.d, proxies) + if tenant_id is None: + print("[-] Exiting due to tenant ID failure - check domain name") + sys.exit() + + try: + results = check_scopes_refresh(tenant_id, args.refresh_token, args.ua_all, args.threads, args.user_agent, args.s, proxies, args.c) + if not args.ua_all: + print_table(results, "Accessible (post-MFA)") + write_results("refresh-audit", results) + + except Exception as e: + print(e) + print("[!] Exception caught, exiting...") + sys.exit() + + elif args.prt or args.prt_file: + # PRT audit mode + if not args.d: + print("[-] No domain specified with '-d' option (required for PRT audit)") + sys.exit() + + if args.prt_file: + prt, session_key_hex = load_prt_file(args.prt_file) + elif args.prt: + if not args.prt_sessionkey: + print("[-] No PRT session key specified with '--prt_sessionkey' option") + sys.exit() + prt = args.prt + session_key_hex = args.prt_sessionkey + + tenant_id = get_tenant_id(args.d, proxies) + if tenant_id is None: + print("[-] Exiting due to tenant ID failure - check domain name") + sys.exit() + + try: + results = check_resources_prt(tenant_id, prt, session_key_hex, args.ua_all, args.threads, args.user_agent, args.r, proxies, args.c) + if not args.ua_all: + print_table(results, "Accessible (via PRT)") + write_results("prt-audit", results) + + except Exception as e: + print(e) + print("[!] Exception caught, exiting...") + sys.exit() + + elif not args.url: + print("[!] ADFS URL required via --url (or use --refresh_token/--prt/--prt_file)") return else: diff --git a/requirements.txt b/requirements.txt index 25c5d55..844780e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ tabulate termcolor requests -lxml \ No newline at end of file +lxml +cryptography +PyJWT \ No newline at end of file From 7bc7e01d6f2e197834b6370362f24eb933f3e011 Mon Sep 17 00:00:00 2001 From: Filip Jodoin Date: Mon, 4 May 2026 07:58:25 -0400 Subject: [PATCH 2/2] Updating README with PRT capabilities --- README.md | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/README.md b/README.md index 4712432..0027d4c 100644 --- a/README.md +++ b/README.md @@ -245,6 +245,46 @@ Of if you want to quickly check your refresh tokens against all of the scopes an python findmeaccess.py token -d domain.com --get_all --refresh_token ``` +## PRT Auditing + +When you have a Primary Refresh Token (PRT), you can audit all resource/client ID combinations using the PRT instead of a password. This leverages the `roadtools` library (roadtx) under the hood to perform PRT-based authentication via the AAD broker plugin. + +### Using a roadtx .prt file + +If you have a `.prt` file from roadtx (e.g. `roadtx.prt`), you can pass it directly: + +``` +python findmeaccess.py audit --prt_file roadtx.prt -d domain.com +``` + +### Using raw PRT and session key + +You can also provide the PRT and session key values directly: + +``` +python findmeaccess.py audit --prt --prt_sessionkey -d domain.com +``` + +### Filtering by resource, client ID, or user agent + +The same flags from the standard audit apply — you can target a specific resource with `-r`, a specific client ID with `-c`, or test all user agents with `--ua_all`: + +``` +python findmeaccess.py audit --prt_file roadtx.prt -d domain.com -r "Microsoft Graph API" +python findmeaccess.py audit --prt_file roadtx.prt -d domain.com -c "Microsoft Office" --ua_all +``` + +### PRT via ADFS subcommand + +PRT auditing is also available under the `adfs` subcommand with the same flags: + +``` +python findmeaccess.py adfs --prt_file roadtx.prt -d domain.com +python findmeaccess.py adfs --prt --prt_sessionkey -d domain.com +``` + +Results are written to `prt-audit-accessible.txt` and a summary table is displayed with the header "Accessible (via PRT)". + ## Federated Auditing with ADFS **NOTE: This feature has only been tested with limited environments and may not function fully with all setups.**