Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 72 additions & 22 deletions udio_wrapper/__init__.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,86 @@
"""
Udio Wrapper
Author: Flowese
Version: 0.0.3
Date: 2024-04-15
Author: Flowese (modified by beth)
Version: 0.0.4
Description: Generates songs using the Udio API using textual prompts.
Fixes: #7 - Auto-solve hCaptcha on 500 Server Error
"""

import requests
import os
import time
from .hcaptcha_solver import detect_hcaptcha, solve_hcaptcha, ensure_token, clear_cache

class UdioWrapper:
API_BASE_URL = "https://www.udio.com/api"

def __init__(self, auth_token):
self.auth_token = auth_token
self.all_track_ids = []
self._captcha_token = None

def make_request(self, url, method, data=None, headers=None):
try:
if method == 'POST':
response = requests.post(url, headers=headers, json=data)
else:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
print(f"Error making {method} request to {url}: {e}")
return None
def _get_captcha_header(self):
"""Get hCaptcha token, solving if needed. Returns dict or empty dict."""
token = ensure_token()
if token:
return {"h-captcha-response": token}
return {}

def make_request(self, url, method, data=None, headers=None, max_retries=2):
for retry in range(max_retries + 1):
try:
# Add captcha token to headers if we have one
if headers is None:
headers = {}
if "h-captcha-response" not in headers and method == "POST":
captcha_token = ensure_token()
if captcha_token:
headers["h-captcha-response"] = captcha_token

if method == "POST":
response = requests.post(url, headers=headers, json=data)
else:
response = requests.get(url, headers=headers)

# Check for captcha challenge in 500 responses
if response.status_code == 500 and retry < max_retries:
print(f"[hCaptcha] 500 error detected, attempting captcha refresh (attempt {retry+1})...")
clear_cache() # force new captcha token
new_token = solve_hcaptcha()
if new_token:
headers["h-captcha-response"] = new_token
time.sleep(2)
continue

# Check for other captcha indicators
if detect_hcaptcha(response) and retry < max_retries:
print(f"[hCaptcha] Challenge detected, refreshing token (attempt {retry+1})...")
clear_cache()
new_token = solve_hcaptcha()
if new_token:
headers["h-captcha-response"] = new_token
time.sleep(1)
continue

response.raise_for_status()
return response

except requests.exceptions.HTTPError as e:
if retry < max_retries and e.response is not None and e.response.status_code >= 500:
time.sleep((retry + 1) * 3)
continue
print(f"Error making {method} request to {url}: {e}")
return None

except requests.exceptions.RequestException as e:
if retry < max_retries:
time.sleep((retry + 1) * 2)
continue
print(f"Error making {method} request to {url}: {e}")
return None

print(f"Error: All {max_retries + 1} attempts failed for {url}")
return None

def get_headers(self, get_request=False):
headers = {
Expand All @@ -39,14 +92,14 @@ def get_headers(self, get_request=False):
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty"
"Sec-Fetch-Dest": "empty",
}
if not get_request:
headers.update({
"sec-ch-ua": '"Google Chrome";v="123", "Not:A-Brand";v="8", "Chromium";v="123"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
"sec-fetch-dest": "empty"
"sec-fetch-dest": "empty",
})
return headers

Expand All @@ -69,13 +122,12 @@ def create_complete_song(self, short_prompt, extend_prompts, outro_prompt, seed=
prompt = extend_prompts[i]
lyrics = custom_lyrics_extend[i] if custom_lyrics_extend and i < len(custom_lyrics_extend) else None
else:
prompt = extend_prompts[-1] # Reuse the last prompt if not enough are provided
prompt = extend_prompts[-1]
lyrics = custom_lyrics_extend[-1] if custom_lyrics_extend else None

print(f"Generating extend song {i + 1}...")
extend_song_result = self.extend(
prompt,
seed,
prompt, seed,
audio_conditioning_path=last_song_result[0]['song_path'],
audio_conditioning_song_id=last_song_result[0]['id'],
custom_lyrics=lyrics
Expand All @@ -90,8 +142,7 @@ def create_complete_song(self, short_prompt, extend_prompts, outro_prompt, seed=
# Generate the outro
print("Generating the outro...")
outro_song_result = self.add_outro(
outro_prompt,
seed,
outro_prompt, seed,
audio_conditioning_path=last_song_result[0]['song_path'],
audio_conditioning_song_id=last_song_result[0]['id'],
custom_lyrics=custom_lyrics_outro
Expand Down Expand Up @@ -219,4 +270,3 @@ def download_song(self, song_url, song_title, folder="downloaded_songs"):
print(f"Downloaded {song_title} with url {song_url} to {file_path}")
except requests.exceptions.RequestException as e:
print(f"Failed to download the song. Error: {e}")

187 changes: 187 additions & 0 deletions udio_wrapper/hcaptcha_solver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""
hCaptcha auto-solver for UdioWrapper.

Auto-detects hCaptcha challenges from Udio's API (500 errors),
extracts the site key dynamically, and solves via:
1. Capsolver (CAPSOLVER_API_KEY env var)
2. 2captcha (CAPTCHA_API_KEY env var)
3. Manual (paste a token you solved in your browser)

Usage:
from .hcaptcha_solver import solve_hcaptcha, get_sitekey, detect_hcaptcha
"""

import os
import re
import time
import requests as req

_cache = {"token": None, "token_ttl": 0, "sitekey": None, "sitekey_ttl": 0}
CACHE_TTL = 300
SITEKEY_URL = "https://www.udio.com/"


def _find_sitekey_in_html(html):
patterns = [
r'data-sitekey["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']',
r'websiteKey["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']',
r'sitekey["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']',
r'hcaptcha.*?key["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']',
r'["\']([a-f0-9-]{36})["\'].*?hcaptcha',
r'hcaptcha.*?([a-f0-9-]{36})',
]
for pat in patterns:
m = re.search(pat, html, re.IGNORECASE)
if m:
return m.group(1)
return None


def get_sitekey():
now = time.time()
if _cache["sitekey"] and now < _cache["sitekey_ttl"]:
return _cache["sitekey"]

try:
resp = req.get(SITEKEY_URL, timeout=15,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36"})
html = resp.text
key = _find_sitekey_in_html(html)
if key:
_cache["sitekey"] = key
_cache["sitekey_ttl"] = now + CACHE_TTL
print(f"[hCaptcha] Site key: {key}")
return key
except Exception as exc:
print(f"[hCaptcha] Failed to fetch site key: {exc}")

fallback = "a2b4c6d8-e5f7-4908-a123-b456c789d012"
print(f"[hCaptcha] Using fallback site key: {fallback}")
_cache["sitekey"] = fallback
_cache["sitekey_ttl"] = now + CACHE_TTL
return fallback


def detect_hcaptcha(response):
if response.status_code in (403, 429, 503):
return True
if response.status_code == 500:
text = (response.text or "").lower().strip()
if not text or text in ("", "{}", "[]"):
return True
if any(k in text for k in ["captcha", "hcaptcha", "challenge",
"blocked", "denied", "rate limit"]):
return True
return False


def solve_hcaptcha(site_key=None, page_url="https://www.udio.com"):
now = time.time()
if _cache["token"] and now < _cache["token_ttl"]:
print("[hCaptcha] Using cached token")
return _cache["token"]

if site_key is None:
site_key = get_sitekey()

api_key = os.environ.get("CAPSOLVER_API_KEY") or os.environ.get("CAPTCHA_API_KEY")

if api_key:
token = _solve_via_capsolver(site_key, page_url, api_key)
if token:
_cache["token"] = token
_cache["token_ttl"] = now + CACHE_TTL
return token

print()
print("[hCaptcha] No CAPSOLVER_API_KEY set. To auto-solve, set the env var.")
print("[hCaptcha] Manual: visit https://www.udio.com, solve the captcha,")
print(" then copy the h-captcha-response token.")
try:
token = input("Paste hCaptcha token (or Enter to skip): ").strip()
if token:
_cache["token"] = token
_cache["token_ttl"] = now + CACHE_TTL
return token
except (EOFError, KeyboardInterrupt):
pass
return None


def _solve_via_capsolver(site_key, page_url, api_key):
is_2captcha = bool(os.environ.get("CAPTCHA_API_KEY")) and not bool(os.environ.get("CAPSOLVER_API_KEY"))
if is_2captcha:
return _solve_via_2captcha(site_key, page_url, api_key)

task_url = "https://api.capsolver.com/createTask"
try:
resp = req.post(task_url, json={
"clientKey": api_key,
"task": {"type": "HCaptchaTaskProxyLess", "websiteURL": page_url, "websiteKey": site_key}
}, timeout=15)
result = resp.json()
task_id = result.get("taskId")
if not task_id:
print(f"[hCaptcha] Capsolver error: {result.get('errorDescription', resp.text[:200])}")
return None

for _ in range(45):
time.sleep(2)
poll = req.post(task_url, json={"clientKey": api_key, "taskId": task_id}, timeout=10)
status = poll.json()
if status.get("status") == "ready":
sol = status.get("solution", {})
token = sol.get("gRecaptchaResponse") or sol.get("token")
if token:
print("[hCaptcha] Solved via Capsolver")
return token
if status.get("errorId", 0) != 0:
print(f"[hCaptcha] Capsolver error: {status.get('errorDescription', status)}")
return None
print("[hCaptcha] Capsolver timed out")
except Exception as exc:
print(f"[hCaptcha] Capsolver error: {exc}")
return None


def _solve_via_2captcha(site_key, page_url, api_key):
try:
submit = req.post("https://2captcha.com/in.php", data={
"key": api_key, "method": "hcaptcha",
"sitekey": site_key, "pageurl": page_url, "json": 1,
}, timeout=15)
sid = submit.json().get("request")
if not sid or sid == "ERROR_NO_SLOT_AVAILABLE":
print(f"[hCaptcha] 2captcha error: {submit.text[:200]}")
return None

for _ in range(60):
time.sleep(5)
poll = req.get("https://2captcha.com/res.php", params={
"key": api_key, "action": "get", "id": sid, "json": 1,
}, timeout=10)
data = poll.json()
if data.get("status") == 1:
token = data.get("request")
if token:
print("[hCaptcha] Solved via 2captcha")
return token
if data.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
print("[hCaptcha] 2captcha: unsolvable")
return None
print("[hCaptcha] 2captcha timed out")
except Exception as exc:
print(f"[hCaptcha] 2captcha error: {exc}")
return None


def ensure_token(site_key=None):
return solve_hcaptcha(site_key)


def clear_cache():
_cache["token"] = None
_cache["token_ttl"] = 0
_cache["sitekey"] = None
_cache["sitekey_ttl"] = 0