Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 59 additions & 11 deletions udio_wrapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import requests
import os
import time
from .hcaptcha_solver import detect_hcaptcha, solve_hcaptcha, ensure_token, clear_cache

class UdioWrapper:
API_BASE_URL = "https://www.udio.com/api"
Expand All @@ -17,17 +18,64 @@ def __init__(self, auth_token):
self.auth_token = auth_token
self.all_track_ids = []

def make_request(self, url, method, data=None, headers=None):
try:
if method == 'POST':
response = requests.post(url, headers=headers, json=data)
else:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
print(f"Error making {method} request to {url}: {e}")
return None
def make_request(self, url, method, data=None, headers=None, max_retries=2):
for retry in range(max_retries + 1):
try:
# Add captcha token to headers if we have one
if headers is None:
headers = {}
else:
headers = dict(headers)

if "h-captcha-response" not in headers and method == "POST":
captcha_token = ensure_token()
if captcha_token:
headers["h-captcha-response"] = captcha_token

if method == "POST":
response = requests.post(url, headers=headers, json=data)
else:
response = requests.get(url, headers=headers)

# Check for captcha challenge in 500 responses
if response.status_code == 500 and retry < max_retries:
print(f"[hCaptcha] 500 error detected, attempting captcha refresh (attempt {retry+1})...")
clear_cache() # force new captcha token
new_token = solve_hcaptcha()
if new_token:
headers["h-captcha-response"] = new_token
time.sleep(2)
continue

# Check for other captcha indicators
if detect_hcaptcha(response) and retry < max_retries:
print(f"[hCaptcha] Challenge detected, refreshing token (attempt {retry+1})...")
clear_cache()
new_token = solve_hcaptcha()
if new_token:
headers["h-captcha-response"] = new_token
time.sleep(1)
continue

response.raise_for_status()
return response

except requests.exceptions.HTTPError as e:
if retry < max_retries and e.response is not None and e.response.status_code >= 500:
time.sleep((retry + 1) * 3)
continue
print(f"Error making {method} request to {url}: {e}")
return None

except requests.exceptions.RequestException as e:
if retry < max_retries:
time.sleep((retry + 1) * 2)
continue
print(f"Error making {method} request to {url}: {e}")
return None

print(f"Error: All {max_retries + 1} attempts failed for {url}")
return None

def get_headers(self, get_request=False):
headers = {
Expand Down
199 changes: 199 additions & 0 deletions udio_wrapper/hcaptcha_solver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""
hCaptcha auto-solver for UdioWrapper.

Auto-detects hCaptcha challenges from Udio's API (500 errors),
extracts the site key dynamically, and solves via:
1. Capsolver (CAPSOLVER_API_KEY env var)
2. 2captcha (CAPTCHA_API_KEY env var)
3. Manual (paste a token you solved in your browser)

Usage:
from .hcaptcha_solver import solve_hcaptcha, get_sitekey, detect_hcaptcha
"""

import os
import re
import time
import requests

_cache = {"token": None, "token_ttl": 0, "sitekey": None, "sitekey_ttl": 0}
CACHE_TTL = 300
SITEKEY_URL = "https://www.udio.com/"


def _find_sitekey_in_html(html):
patterns = [
r'data-sitekey["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']',
r'websiteKey["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']',
r'sitekey["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']',
r'hcaptcha.*?key["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']',
r'["\']([a-f0-9-]{36})["\'].*?hcaptcha',
r'hcaptcha.*?([a-f0-9-]{36})',
]
for pat in patterns:
m = re.search(pat, html, re.IGNORECASE)
if m:
return m.group(1)
return None


def get_sitekey():
now = time.time()
if _cache["sitekey"] and now < _cache["sitekey_ttl"]:
return _cache["sitekey"]

try:
resp = requests.get(SITEKEY_URL, timeout=15,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"})
html = resp.text
key = _find_sitekey_in_html(html)
if key:
_cache["sitekey"] = key
_cache["sitekey_ttl"] = now + CACHE_TTL
print(f"[hCaptcha] Site key found: {key}")
return key
except Exception as exc:
print(f"[hCaptcha] Failed to fetch site key: {exc}")

fallback = "a2b4c6d8-e5f7-4908-a123-b456c789d012"
print(f"[hCaptcha] Using fallback site key: {fallback}")
_cache["sitekey"] = fallback
_cache["sitekey_ttl"] = now + CACHE_TTL
return fallback


def detect_hcaptcha(response):
if response.status_code in (403, 429, 503):
return True
if response.status_code == 500:
text = (response.text or "").lower().strip()
if not text or text in ("", "{}", "[]"):
return True
if any(k in text for k in ["captcha", "hcaptcha", "challenge", "blocked", "denied", "rate limit"]):
return True
return False


def solve_hcaptcha(site_key=None, page_url="https://www.udio.com"):
now = time.time()
if _cache["token"] and now < _cache["token_ttl"]:
print("[hCaptcha] Using cached token")
return _cache["token"]

if site_key is None:
site_key = get_sitekey()

api_key = os.environ.get("CAPSOLVER_API_KEY") or os.environ.get("CAPTCHA_API_KEY")

if api_key:
token = None
is_2captcha = bool(os.environ.get("CAPTCHA_API_KEY")) and not bool(os.environ.get("CAPSOLVER_API_KEY"))
if is_2captcha:
token = _solve_via_2captcha(site_key, page_url, api_key)
else:
token = _solve_via_capsolver(site_key, page_url, api_key)

if token:
_cache["token"] = token
_cache["token_ttl"] = now + CACHE_TTL
return token

print()
print("[hCaptcha] No CAPSOLVER_API_KEY or CAPTCHA_API_KEY set. To auto-solve, set the env var.")
print("[hCaptcha] Manual: visit https://www.udio.com, solve the captcha,")
print(" then copy the h-captcha-response token.")
try:
token = input("Paste hCaptcha token (or Enter to skip): ").strip()
if token:
_cache["token"] = token
_cache["token_ttl"] = now + CACHE_TTL
return token
except (EOFError, KeyboardInterrupt):
pass
return None


def _solve_via_capsolver(site_key, page_url, api_key):
create_url = "https://api.capsolver.com/createTask"
result_url = "https://api.capsolver.com/getTaskResult"
try:
resp = requests.post(create_url, json={
"clientKey": api_key,
"task": {
"type": "HCaptchaTaskProxyLess",
"websiteURL": page_url,
"websiteKey": site_key
}
}, timeout=15)
result = resp.json()
task_id = result.get("taskId")
if not task_id:
print(f"[hCaptcha] Capsolver error: {result.get('errorDescription', resp.text[:200])}")
return None

# Poll result using getTaskResult
for _ in range(45):
time.sleep(2)
poll = requests.post(result_url, json={"clientKey": api_key, "taskId": task_id}, timeout=10)
status = poll.json()
if status.get("status") == "ready":
sol = status.get("solution", {})
token = sol.get("gRecaptchaResponse") or sol.get("token")
if token:
print("[hCaptcha] Solved via Capsolver")
return token
if status.get("errorId", 0) != 0:
print(f"[hCaptcha] Capsolver error: {status.get('errorDescription', status)}")
return None
print("[hCaptcha] Capsolver timed out")
except Exception as exc:
print(f"[hCaptcha] Capsolver error: {exc}")
return None


def _solve_via_2captcha(site_key, page_url, api_key):
try:
submit = requests.post("https://2captcha.com/in.php", data={
"key": api_key,
"method": "hcaptcha",
"sitekey": site_key,
"pageurl": page_url,
"json": 1,
}, timeout=15)
sid = submit.json().get("request")
if not sid or sid == "ERROR_NO_SLOT_AVAILABLE":
print(f"[hCaptcha] 2captcha error: {submit.text[:200]}")
return None

for _ in range(60):
time.sleep(5)
poll = requests.get("https://2captcha.com/res.php", params={
"key": api_key,
"action": "get",
"id": sid,
"json": 1,
}, timeout=10)
data = poll.json()
if data.get("status") == 1:
token = data.get("request")
if token:
print("[hCaptcha] Solved via 2captcha")
return token
if data.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
print("[hCaptcha] 2captcha: unsolvable")
return None
print("[hCaptcha] 2captcha timed out")
except Exception as exc:
print(f"[hCaptcha] 2captcha error: {exc}")
return None


def ensure_token(site_key=None):
return solve_hcaptcha(site_key)


def clear_cache():
_cache["token"] = None
_cache["token_ttl"] = 0
_cache["sitekey"] = None
_cache["sitekey_ttl"] = 0