From 314033a2f7a6340a9a653b7776d9eecbddaa516a Mon Sep 17 00:00:00 2001 From: Andrey Mazurchuk Date: Tue, 23 Jun 2026 21:30:32 +0200 Subject: [PATCH 1/2] Add .gitattributes: normalize all text files to LF --- .gitattributes | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .gitattributes diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..0c3c7ed7 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,11 @@ +# Normalize line endings to LF in the repository and on checkout +* text=auto eol=lf + +# Binary files — never touch line endings +*.png binary +*.jpg binary +*.jpeg binary +*.gif binary +*.ico binary +*.pdf binary +*.db binary From 0627d27e2de9ede79e38778cba4adfe276a0a543 Mon Sep 17 00:00:00 2001 From: Andrey Mazurchuk Date: Tue, 23 Jun 2026 21:31:02 +0200 Subject: [PATCH 2/2] Normalize line endings to LF; bug fixes from development MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Line endings: all Python source files converted from CRLF to LF. Bug fixes (detail.py, chrome.py, launcher.py, prompt.py): - enrich: catch Playwright cleanup crash on Ubuntu 26.04 when Chrome launch fails mid-batch; unprocessed jobs marked browser_unavailable - enrich: sanitize LLM-returned application_url — treat "None"/"null" strings as NULL, not as a valid URL - apply/chrome: derive stable per-instance CDP base port from MD5 hash of instance name so concurrent apply runs don't clobber each other - apply/launcher: use 127.0.0.1 instead of localhost for CDP endpoint (Ubuntu 26.04 resolves localhost to IPv6 ::1, Chrome binds IPv4 only) - apply/launcher: guard application_url against "None" string value - apply/prompt: look up site-specific password from profile.json personal.site_passwords before falling back to the default password Co-Authored-By: Claude Sonnet 4.6 --- src/applypilot/apply/chrome.py | 20 ++- src/applypilot/apply/dashboard.py | 3 + src/applypilot/apply/launcher.py | 179 +++++++++++++---------- src/applypilot/apply/prompt.py | 4 +- src/applypilot/cli.py | 16 +- src/applypilot/config.py | 10 +- src/applypilot/database.py | 101 ++++++++++++- src/applypilot/discovery/jobspy.py | 34 +++-- src/applypilot/discovery/smartextract.py | 10 +- src/applypilot/enrichment/detail.py | 39 ++++- src/applypilot/llm.py | 76 ++++++++-- src/applypilot/pipeline.py | 106 +++++--------- src/applypilot/scoring/cover_letter.py | 2 +- src/applypilot/scoring/pdf.py | 16 +- src/applypilot/scoring/scorer.py | 4 +- src/applypilot/scoring/tailor.py | 2 +- 16 files changed, 427 insertions(+), 195 deletions(-) diff --git a/src/applypilot/apply/chrome.py b/src/applypilot/apply/chrome.py index 690211a8..91c79116 100644 --- a/src/applypilot/apply/chrome.py +++ b/src/applypilot/apply/chrome.py @@ -4,8 +4,10 @@ worker profile setup/cloning, and cross-platform process cleanup. """ +import hashlib import json import logging +import os import platform import shutil import subprocess @@ -17,8 +19,22 @@ logger = logging.getLogger(__name__) -# CDP port base — each worker uses BASE_CDP_PORT + worker_id -BASE_CDP_PORT = 9222 + +def _instance_base_port() -> int: + """Derive a stable CDP base port from the instance name so concurrent + apply runs on different instances don't clobber each other's Chrome. + Maps each instance into a 10-port window in 9200-9399. + """ + instance_dir = os.environ.get("APPLYPILOT_DIR", "") + name = Path(instance_dir).name if instance_dir else "default" + offset = int(hashlib.md5(name.encode()).hexdigest()[:2], 16) % 20 * 10 + return 9200 + offset + + +# CDP port base — each worker uses BASE_CDP_PORT + worker_id. +# Derived from the instance name so concurrent apply runs on different +# instances don't collide on the same port. +BASE_CDP_PORT = _instance_base_port() # Track Chrome processes per worker for cleanup _chrome_procs: dict[int, subprocess.Popen] = {} diff --git a/src/applypilot/apply/dashboard.py b/src/applypilot/apply/dashboard.py index c2860091..07e005e0 100644 --- a/src/applypilot/apply/dashboard.py +++ b/src/applypilot/apply/dashboard.py @@ -5,6 +5,7 @@ """ import logging +import re import threading import time from dataclasses import dataclass, field @@ -81,6 +82,8 @@ def add_event(msg: str) -> None: Args: msg: Rich markup string describing the event. """ + plain = re.sub(r'\[/?[^\]]*\]', '', msg) + logger.info("%s", plain) ts = datetime.now().strftime("%H:%M:%S") with _lock: _events.append(f"[dim]{ts}[/dim] {msg}") diff --git a/src/applypilot/apply/launcher.py b/src/applypilot/apply/launcher.py index 341a11a3..eab2c574 100644 --- a/src/applypilot/apply/launcher.py +++ b/src/applypilot/apply/launcher.py @@ -20,11 +20,8 @@ from datetime import datetime, timezone from pathlib import Path -from rich.console import Console -from rich.live import Live - from applypilot import config -from applypilot.database import get_connection +from applypilot.database import get_connection, is_already_applied, record_applied, init_applied_db from applypilot.apply import chrome, dashboard, prompt as prompt_mod from applypilot.apply.chrome import ( launch_chrome, cleanup_worker, kill_all_chrome, @@ -71,7 +68,7 @@ def _make_mcp_config(cdp_port: int) -> dict: "command": "npx", "args": [ "@playwright/mcp@latest", - f"--cdp-endpoint=http://localhost:{cdp_port}", + f"--cdp-endpoint=http://127.0.0.1:{cdp_port}", f"--viewport-size={config.DEFAULTS['viewport']}", ], }, @@ -147,7 +144,8 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, # Skip manual ATS sites (unsolvable CAPTCHAs) from applypilot.config import is_manual_ats - apply_url = row["application_url"] or row["url"] + _raw_app_url = row["application_url"] + apply_url = (_raw_app_url if _raw_app_url and _raw_app_url != "None" else None) or row["url"] if is_manual_ats(apply_url): conn.execute( "UPDATE jobs SET apply_status = 'manual', apply_error = 'manual ATS' WHERE url = ?", @@ -157,6 +155,17 @@ def acquire_job(target_url: str | None = None, min_score: int = 7, logger.info("Skipping manual ATS: %s", row["url"][:80]) return None + # Cross-instance dedup — skip if already applied via another instance + if is_already_applied(row["url"], row["application_url"]): + conn.execute( + "UPDATE jobs SET apply_status = 'skip_dedup', " + "apply_error = 'applied via another instance' WHERE url = ?", + (row["url"],), + ) + conn.commit() + logger.info("Skipping (cross-instance dedup): %s", row["url"][:80]) + return None + now = datetime.now(timezone.utc).isoformat() conn.execute(""" UPDATE jobs SET apply_status = 'in_progress', @@ -403,6 +412,10 @@ def run_job(job: dict, port: int, worker_id: int = 0, if bt == "text": text_parts.append(block["text"]) lf.write(block["text"] + "\n") + # Emit agent text to main log (truncated to avoid flooding) + for tline in block["text"].splitlines(): + if tline.strip(): + logger.info("[W%d] %s", worker_id, tline[:200]) elif bt == "tool_use": name = ( block.get("name", "") @@ -422,6 +435,7 @@ def run_job(job: dict, port: int, worker_id: int = 0, desc = name lf.write(f" >> {desc}\n") + logger.info("[W%d] >> %s", worker_id, desc) ws = get_state(worker_id) cur_actions = ws.actions if ws else 0 update_state(worker_id, @@ -610,6 +624,8 @@ def worker_loop(worker_id: int = 0, limit: int = 1, continue elif result == "applied": mark_result(job["url"], "applied", duration_ms=duration_ms) + instance_name = os.path.basename(os.environ.get("APPLYPILOT_DIR", "default")) + record_applied(job, instance=instance_name) applied += 1 update_state(worker_id, jobs_applied=applied, jobs_done=applied + failed) @@ -672,7 +688,7 @@ def main(limit: int = 1, target_url: str | None = None, _stop_event.clear() config.ensure_dirs() - console = Console() + init_applied_db() if continuous: effective_limit = 0 @@ -686,8 +702,8 @@ def main(limit: int = 1, target_url: str | None = None, init_worker(i) worker_label = f"{workers} worker{'s' if workers > 1 else ''}" - console.print(f"Launching apply pipeline ({mode_label}, {worker_label}, poll every {POLL_INTERVAL}s)...") - console.print("[dim]Ctrl+C = skip current job(s) | Ctrl+C x2 = stop[/dim]") + logger.info("Launching apply pipeline (%s, %s, poll every %ds)...", + mode_label, worker_label, POLL_INTERVAL) # Double Ctrl+C handler _ctrl_c_count = 0 @@ -696,14 +712,13 @@ def _sigint_handler(sig, frame): nonlocal _ctrl_c_count _ctrl_c_count += 1 if _ctrl_c_count == 1: - console.print("\n[yellow]Skipping current job(s)... (Ctrl+C again to STOP)[/yellow]") - # Kill all active Claude processes to skip current jobs + logger.warning("Skipping current job(s)... (Ctrl+C again to STOP)") with _claude_lock: for wid, cproc in list(_claude_procs.items()): if cproc.poll() is None: _kill_process_tree(cproc.pid) else: - console.print("\n[red bold]STOPPING[/red bold]") + logger.warning("STOPPING") _stop_event.set() with _claude_lock: for wid, cproc in list(_claude_procs.items()): @@ -714,78 +729,88 @@ def _sigint_handler(sig, frame): signal.signal(signal.SIGINT, _sigint_handler) + def _run_workers() -> tuple[int, int]: + if workers == 1: + return worker_loop( + worker_id=0, + limit=effective_limit, + target_url=target_url, + min_score=min_score, + headless=headless, + model=model, + dry_run=dry_run, + ) + + if effective_limit: + base = effective_limit // workers + extra = effective_limit % workers + limits = [base + (1 if i < extra else 0) for i in range(workers)] + else: + limits = [0] * workers + + with ThreadPoolExecutor(max_workers=workers, + thread_name_prefix="apply-worker") as executor: + futures = { + executor.submit( + worker_loop, + worker_id=i, + limit=limits[i], + target_url=target_url, + min_score=min_score, + headless=headless, + model=model, + dry_run=dry_run, + ): i + for i in range(workers) + } + results: list[tuple[int, int]] = [] + for future in as_completed(futures): + wid = futures[future] + try: + results.append(future.result()) + except Exception: + logger.exception("Worker %d crashed", wid) + results.append((0, 0)) + + return sum(r[0] for r in results), sum(r[1] for r in results) + + is_tty = sys.stdout.isatty() + try: - with Live(render_full(), console=console, refresh_per_second=2) as live: - # Daemon thread for display refresh only (no business logic) + if is_tty: + from rich.console import Console as _Console + _console = _Console() + _console.print("[dim]Ctrl+C = skip current job(s) | Ctrl+C x2 = stop[/dim]") + + from rich.live import Live _dashboard_running = True - def _refresh(): + def _refresh(live): while _dashboard_running: live.update(render_full()) time.sleep(0.5) - refresh_thread = threading.Thread(target=_refresh, daemon=True) - refresh_thread.start() - - if workers == 1: - # Single worker — run directly in main thread - total_applied, total_failed = worker_loop( - worker_id=0, - limit=effective_limit, - target_url=target_url, - min_score=min_score, - headless=headless, - model=model, - dry_run=dry_run, - ) - else: - # Multi-worker — distribute limit across workers - if effective_limit: - base = effective_limit // workers - extra = effective_limit % workers - limits = [base + (1 if i < extra else 0) - for i in range(workers)] - else: - limits = [0] * workers # continuous mode - - with ThreadPoolExecutor(max_workers=workers, - thread_name_prefix="apply-worker") as executor: - futures = { - executor.submit( - worker_loop, - worker_id=i, - limit=limits[i], - target_url=target_url, - min_score=min_score, - headless=headless, - model=model, - dry_run=dry_run, - ): i - for i in range(workers) - } - - results: list[tuple[int, int]] = [] - for future in as_completed(futures): - wid = futures[future] - try: - results.append(future.result()) - except Exception: - logger.exception("Worker %d crashed", wid) - results.append((0, 0)) - - total_applied = sum(r[0] for r in results) - total_failed = sum(r[1] for r in results) - - _dashboard_running = False - refresh_thread.join(timeout=2) - live.update(render_full()) - - totals = get_totals() - console.print( - f"\n[bold]Done: {total_applied} applied, {total_failed} failed " - f"(${totals['cost']:.3f})[/bold]" - ) - console.print(f"Logs: {config.LOG_DIR}") + with Live(render_full(), console=_console, refresh_per_second=2) as live: + refresh_thread = threading.Thread( + target=_refresh, args=(live,), daemon=True) + refresh_thread.start() + total_applied, total_failed = _run_workers() + _dashboard_running = False + refresh_thread.join(timeout=2) + live.update(render_full()) + + totals = get_totals() + _console.print( + f"\n[bold]Done: {total_applied} applied, {total_failed} failed " + f"(${totals['cost']:.3f})[/bold]" + ) + _console.print(f"Logs: {config.LOG_DIR}") + else: + total_applied, total_failed = _run_workers() + totals = get_totals() + logger.info("Done: %d applied, %d failed ($%.3f)", + total_applied, total_failed, totals['cost']) + logger.info("Logs: %s", config.LOG_DIR) except KeyboardInterrupt: pass diff --git a/src/applypilot/apply/prompt.py b/src/applypilot/apply/prompt.py index 37c3790a..04ea2c0d 100644 --- a/src/applypilot/apply/prompt.py +++ b/src/applypilot/apply/prompt.py @@ -516,7 +516,7 @@ def build_prompt(job: dict, tailored_resume: str, prompt = f"""You are an autonomous job application agent. Your ONE mission: get this candidate an interview. You have all the information and tools. Think strategically. Act decisively. Submit the application. == JOB == -URL: {job.get('application_url') or job['url']} +URL: {job.get('application_url') if job.get('application_url') and job.get('application_url') != 'None' else job['url']} Title: {job['title']} Company: {job.get('site', 'Unknown')} Fit Score: {job.get('fit_score', 'N/A')}/10 @@ -568,7 +568,7 @@ def build_prompt(job: dict, tailored_resume: str, 5. Login wall? 5a. FIRST: check the URL. If you landed on {', '.join(blocked_sso)}, or any SSO/OAuth page -> STOP. Output RESULT:FAILED:sso_required. Do NOT try to sign in to Google/Microsoft/SSO. 5b. Check for popups. Run browser_tabs action "list". If a new tab/window appeared (login popup), switch to it with browser_tabs action "select". Check the URL there too -- if it's SSO -> RESULT:FAILED:sso_required. - 5c. Regular login form (employer's own site)? Try sign in: {personal['email']} / {personal.get('password', '')} + 5c. Regular login form (employer's own site)? Try sign in: {personal['email']} / {personal.get('site_passwords', {}).get(job.get('site', ''), personal.get('password', ''))} 5d. After clicking Login/Sign-in: run CAPTCHA DETECT. Login pages frequently have invisible CAPTCHAs that silently block form submissions. If found, solve it then retry login. 5e. Sign in failed? Try sign up with same email and password. 5f. Need email verification? Use search_emails + read_email to get the code. diff --git a/src/applypilot/cli.py b/src/applypilot/cli.py index 6c8be912..d5e59c62 100644 --- a/src/applypilot/cli.py +++ b/src/applypilot/cli.py @@ -11,12 +11,26 @@ from applypilot import __version__ +import os as _os, sys as _sys, pathlib as _pathlib + +_log_handlers: list[logging.Handler] = [logging.StreamHandler(_sys.stderr)] +_log_file_path = _os.environ.get("APPLYPILOT_LOG_FILE") +if _log_file_path: + _pathlib.Path(_log_file_path).parent.mkdir(parents=True, exist_ok=True) + _log_handlers.append(logging.FileHandler(_log_file_path, mode="a", encoding="utf-8")) + logging.basicConfig( level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", + format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S", + handlers=_log_handlers, ) +# Third-party HTTP libraries log every request/response at INFO by default. +# Our own WARNING already covers every 429 retry, so suppress their chatter. +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("httpcore").setLevel(logging.WARNING) + app = typer.Typer( name="applypilot", help="AI-powered end-to-end job application pipeline.", diff --git a/src/applypilot/config.py b/src/applypilot/config.py index 8c397807..839ed60b 100644 --- a/src/applypilot/config.py +++ b/src/applypilot/config.py @@ -8,13 +8,19 @@ # User data directory — all user-specific files live here APP_DIR = Path(os.environ.get("APPLYPILOT_DIR", Path.home() / ".applypilot")) +# Shared config dir — profile.json and .env resolve from here when set. +# Allows multiple instances to share credentials without duplication. +# Defaults to APP_DIR for backwards compatibility. +SHARED_DIR = Path(os.environ.get("APPLYPILOT_SHARED_DIR", APP_DIR)) + # Core paths DB_PATH = APP_DIR / "applypilot.db" -PROFILE_PATH = APP_DIR / "profile.json" +APPLIED_DB_PATH = SHARED_DIR / "applied.db" # shared cross-instance registry +PROFILE_PATH = Path(os.environ.get("APPLYPILOT_PROFILE", SHARED_DIR / "profile.json")) RESUME_PATH = APP_DIR / "resume.txt" RESUME_PDF_PATH = APP_DIR / "resume.pdf" SEARCH_CONFIG_PATH = APP_DIR / "searches.yaml" -ENV_PATH = APP_DIR / ".env" +ENV_PATH = Path(os.environ.get("APPLYPILOT_ENV_FILE", SHARED_DIR / ".env")) # Generated output TAILORED_DIR = APP_DIR / "tailored_resumes" diff --git a/src/applypilot/database.py b/src/applypilot/database.py index a1779c02..0cf731df 100644 --- a/src/applypilot/database.py +++ b/src/applypilot/database.py @@ -5,12 +5,14 @@ without migration ordering issues. """ +import re import sqlite3 import threading from datetime import datetime, timezone from pathlib import Path +from urllib.parse import urlparse, urlencode, parse_qsl -from applypilot.config import DB_PATH +from applypilot.config import DB_PATH, APPLIED_DB_PATH # Thread-local connection storage — each thread gets its own connection # (required for SQLite thread safety with parallel workers) @@ -422,3 +424,100 @@ def get_jobs_by_stage(conn: sqlite3.Connection | None = None, columns = rows[0].keys() return [dict(zip(columns, row)) for row in rows] return [] + + +# --------------------------------------------------------------------------- +# Cross-instance deduplication via shared applied.db +# --------------------------------------------------------------------------- + +_STRIP_PARAMS: frozenset[str] = frozenset({ + "utm_source", "utm_medium", "utm_campaign", "utm_content", "utm_term", + "trk", "trackingId", "refId", "src", "ref", "source", "mcid", + "lipi", "li_fat_id", "fbclid", "gclid", "msclkid", +}) + + +def canonical_url(url: str) -> str: + """Normalize a URL for deduplication — strip tracking params, fragment, trailing slash.""" + try: + p = urlparse(url.strip()) + qs = [(k, v) for k, v in parse_qsl(p.query) if k not in _STRIP_PARAMS] + clean = p._replace(query=urlencode(qs), fragment="") + return clean.geturl().rstrip("/").lower() + except Exception: + return url.strip().lower() + + +def get_applied_db() -> sqlite3.Connection: + """Thread-local connection to the shared applied.db in SHARED_DIR.""" + path = str(APPLIED_DB_PATH) + if not hasattr(_local, "connections"): + _local.connections = {} + conn = _local.connections.get(path) + if conn is not None: + try: + conn.execute("SELECT 1") + return conn + except sqlite3.ProgrammingError: + pass + conn = sqlite3.connect(path, timeout=10) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=10000") + conn.row_factory = sqlite3.Row + _local.connections[path] = conn + return conn + + +def init_applied_db() -> None: + """Create applied_urls table in shared applied.db if it doesn't exist.""" + Path(APPLIED_DB_PATH).parent.mkdir(parents=True, exist_ok=True) + conn = get_applied_db() + conn.execute(""" + CREATE TABLE IF NOT EXISTS applied_urls ( + canonical_url TEXT PRIMARY KEY, + raw_url TEXT, + instance TEXT NOT NULL, + title TEXT, + site TEXT, + applied_at TEXT NOT NULL, + status TEXT NOT NULL + ) + """) + conn.commit() + + +def is_already_applied(url: str, application_url: str | None = None) -> bool: + """Return True if either URL is already in the shared applied registry.""" + try: + conn = get_applied_db() + urls = list({canonical_url(url)} | ({canonical_url(application_url)} if application_url else set())) + placeholders = ",".join("?" * len(urls)) + row = conn.execute( + f"SELECT 1 FROM applied_urls WHERE canonical_url IN ({placeholders}) LIMIT 1", + urls, + ).fetchone() + return row is not None + except Exception: + return False # never block application on registry errors + + +def record_applied(job: dict, instance: str) -> None: + """Write a successful application to the shared cross-instance registry.""" + try: + conn = get_applied_db() + now = datetime.now(timezone.utc).isoformat() + raw_urls = {job["url"]} + if job.get("application_url"): + raw_urls.add(job["application_url"]) + for raw in raw_urls: + conn.execute( + """INSERT OR IGNORE INTO applied_urls + (canonical_url, raw_url, instance, title, site, applied_at, status) + VALUES (?,?,?,?,?,?,'applied')""", + (canonical_url(raw), raw, instance, + job.get("title"), job.get("site"), now), + ) + conn.commit() + except Exception as e: + import logging + logging.getLogger(__name__).warning("record_applied failed: %s", e) diff --git a/src/applypilot/discovery/jobspy.py b/src/applypilot/discovery/jobspy.py index b5e54ff4..4d48e07f 100644 --- a/src/applypilot/discovery/jobspy.py +++ b/src/applypilot/discovery/jobspy.py @@ -81,37 +81,41 @@ def _load_location_config(search_cfg: dict) -> tuple[list[str], list[str]]: Falls back to sensible defaults if not defined in the YAML. """ - accept = search_cfg.get("location_accept", []) - reject = search_cfg.get("location_reject_non_remote", []) + loc_block = search_cfg.get("location", {}) + accept = loc_block.get("accept_patterns", []) + reject = loc_block.get("reject_patterns", []) return accept, reject def _location_ok(location: str | None, accept: list[str], reject: list[str]) -> bool: """Check if a job location passes the user's location filter. - Remote jobs are always accepted. Non-remote jobs must match an accept - pattern and not match a reject pattern. + Evaluation order (first match wins): + 1. Reject patterns — always block, even for remote jobs (e.g. Russia remote) + 2. Remote keywords — any non-rejected remote job passes + 3. Accept patterns — non-remote jobs must match an explicit accept entry + 4. Default-reject — unknown non-remote locations are dropped """ if not location: - return True # unknown location -- keep it, let scorer decide + return True # unknown location — keep it, let scorer decide loc = location.lower() - # Remote jobs always OK - if any(r in loc for r in ("remote", "anywhere", "work from home", "wfh", "distributed")): - return True - - # Reject non-remote matches + # 1. Hard reject — checked before remote so e.g. "Remote (Russia)" is still blocked for r in reject: if r.lower() in loc: return False - # Accept matches + # 2. Remote jobs pass for any non-rejected country + if any(r in loc for r in ("remote", "anywhere", "work from home", "wfh", "distributed")): + return True + + # 3. On-site / hybrid jobs must match an explicit accept pattern for a in accept: if a.lower() in loc: return True - # No match -- reject unknown + # 4. No match — reject return False @@ -301,7 +305,7 @@ def search_jobs( ) -> dict: """Run a single job search via JobSpy and store results in DB.""" if sites is None: - sites = ["indeed", "linkedin", "zip_recruiter"] + sites = ["indeed", "linkedin", "glassdoor"] proxy_config = parse_proxy(proxy) if proxy else None @@ -369,7 +373,7 @@ def _full_crawl( ) -> dict: """Run all search queries from search config across all locations.""" if sites is None: - sites = ["indeed", "linkedin", "zip_recruiter"] + sites = ["indeed", "linkedin", "glassdoor"] # Build search combinations from config queries = search_cfg.get("queries", []) @@ -461,7 +465,7 @@ def run_discovery(cfg: dict | None = None) -> dict: return {"new": 0, "existing": 0, "errors": 0, "db_total": 0, "queries": 0} proxy = cfg.get("proxy") - sites = cfg.get("sites") + sites = cfg.get("boards") or cfg.get("sites") results_per_site = cfg.get("defaults", {}).get("results_per_site", 100) hours_old = cfg.get("defaults", {}).get("hours_old", 72) tiers = cfg.get("tiers") diff --git a/src/applypilot/discovery/smartextract.py b/src/applypilot/discovery/smartextract.py index cf49a9a2..d936dcd1 100644 --- a/src/applypilot/discovery/smartextract.py +++ b/src/applypilot/discovery/smartextract.py @@ -162,7 +162,15 @@ def on_response(response): pass with sync_playwright() as p: - browser = p.chromium.launch(headless=headless) + try: + browser = p.chromium.launch(headless=headless) + except Exception: + from applypilot.config import get_chrome_path + browser = p.chromium.launch( + executable_path=get_chrome_path(), + headless=headless, + args=["--no-sandbox", "--disable-dev-shm-usage"], + ) page = browser.new_page(user_agent=UA) page.on("response", on_response) diff --git a/src/applypilot/enrichment/detail.py b/src/applypilot/enrichment/detail.py index 11b79260..a1da56fe 100644 --- a/src/applypilot/enrichment/detail.py +++ b/src/applypilot/enrichment/detail.py @@ -19,6 +19,8 @@ from datetime import datetime, timezone from urllib.parse import urljoin +_NULL_STRINGS = frozenset({"None", "none", "null", "NULL", "N/A", "n/a", ""}) + from bs4 import BeautifulSoup from playwright.sync_api import sync_playwright @@ -144,7 +146,15 @@ def capture_algolia(response): pass with sync_playwright() as p: - browser = p.chromium.launch(headless=True) + try: + browser = p.chromium.launch(headless=True) + except Exception: + from applypilot.config import get_chrome_path + browser = p.chromium.launch( + executable_path=get_chrome_path(), + headless=True, + args=["--no-sandbox", "--disable-dev-shm-usage"], + ) page = browser.new_page(user_agent=UA) page.on("response", capture_algolia) page.goto( @@ -473,13 +483,15 @@ def extract_with_llm(page, url: str) -> dict: result = extract_json(raw) desc = result.get("full_description") apply_url = result.get("application_url") + if str(apply_url).strip() in _NULL_STRINGS: + apply_url = None if desc: desc = clean_description(desc) return {"full_description": desc, "application_url": apply_url} except Exception as e: - log.error("LLM ERROR: %s", e) + log.error("LLM ERROR: %s", str(e).splitlines()[0]) return {"full_description": None, "application_url": None} @@ -636,7 +648,14 @@ def scrape_site_batch( launch_opts: dict = {"headless": True} if _PROXY_CONFIG: launch_opts["proxy"] = _PROXY_CONFIG["playwright"] - browser = p.chromium.launch(**launch_opts) + try: + browser = p.chromium.launch(**launch_opts) + except Exception: + from applypilot.config import get_chrome_path + launch_opts["executable_path"] = get_chrome_path() + launch_opts.setdefault("args", []) + launch_opts["args"] += ["--no-sandbox", "--disable-dev-shm-usage"] + browser = p.chromium.launch(**launch_opts) context = browser.new_context(user_agent=UA) page = context.new_page() @@ -681,6 +700,20 @@ def scrape_site_batch( time.sleep(delay) browser.close() + except Exception as e: + # Playwright launch or cleanup failure (e.g. bundled browser unavailable on this OS). + # Mark remaining jobs as browser_unavailable so they aren't retried next run. + log.warning("Browser unavailable for %s batch (%d jobs): %s", site, len(jobs), e) + unprocessed = len(jobs) - stats["processed"] + if unprocessed > 0: + stats["error"] += unprocessed + if conn: + for url, _ in jobs[stats["processed"]:]: + conn.execute( + "UPDATE jobs SET detail_error = ?, detail_scraped_at = ? WHERE url = ?", + ("browser_unavailable", now, url), + ) + conn.commit() finally: if own_conn: conn.close() diff --git a/src/applypilot/llm.py b/src/applypilot/llm.py index 1fb7be64..85d71fbb 100644 --- a/src/applypilot/llm.py +++ b/src/applypilot/llm.py @@ -11,6 +11,8 @@ import logging import os +import random +import threading import time import httpx @@ -66,9 +68,54 @@ def _detect_provider() -> tuple[str, str, str]: _MAX_RETRIES = 5 _TIMEOUT = 120 # seconds -# Base wait on first 429/503 (doubles each retry, caps at 60s). -# Gemini free tier is 15 RPM = 4s minimum between requests; 10s gives headroom. -_RATE_LIMIT_BASE_WAIT = 10 +# On a 429/503: wait = min(BASE * 2^attempt, MAX) + random jitter. +# Jitter desynchronises two instances that share a key so they don't retry +# simultaneously and immediately re-collide. +_RATE_LIMIT_BASE_WAIT = 20 # seconds (was 10 — more conservative first pause) +_RATE_LIMIT_MAX_WAIT = 120 # seconds cap (was 60) +_RATE_LIMIT_JITTER = 10 # seconds of uniform random noise added to each wait + +# --------------------------------------------------------------------------- +# Proactive inter-request throttle +# --------------------------------------------------------------------------- +# Enforces a minimum gap between outgoing requests within this process, +# keeping us below the provider's RPM limit before we ever see a 429. +# +# Gemini free = 15 RPM → 4s theoretical → we use 5s (≈12 RPM, −20% safety margin) +# OpenAI = 500 RPM → 0.12s → we use 0.25s (≈240 RPM, well under limit) +# Local/custom = no API limit → no throttle + +_MIN_INTERVAL: dict[str, float] = { + "gemini": 5.0, + "openai": 0.25, + "default": 0.0, +} + +_last_request_time: float = 0.0 +_rate_gate = threading.Lock() + + +def _provider_key(base_url: str) -> str: + if "generativelanguage.googleapis.com" in base_url: + return "gemini" + if "openai.com" in base_url: + return "openai" + return "default" + + +def _proactive_throttle(base_url: str) -> None: + """Sleep if needed so we don't exceed the provider's RPM ceiling proactively.""" + global _last_request_time + interval = _MIN_INTERVAL.get(_provider_key(base_url), 0.0) + if interval <= 0: + return + with _rate_gate: + elapsed = time.time() - _last_request_time + if elapsed < interval: + gap = interval - elapsed + log.debug("Rate gate: holding %.1fs before next request", gap) + time.sleep(gap) + _last_request_time = time.time() _GEMINI_COMPAT_BASE = "https://generativelanguage.googleapis.com/v1beta/openai" @@ -201,6 +248,8 @@ def chat( for attempt in range(_MAX_RETRIES): try: + _proactive_throttle(self.base_url) + # Route to native Gemini if we've already confirmed it's needed if self._use_native_gemini: return self._chat_native_gemini(messages, temperature, max_tokens) @@ -229,23 +278,25 @@ def chat( except httpx.HTTPStatusError as exc: resp = exc.response if resp.status_code in (429, 503) and attempt < _MAX_RETRIES - 1: - # Respect Retry-After header if provided (Gemini sends this). + # Honour Retry-After if the server sends one, otherwise + # use exponential backoff with a random jitter so two + # processes that share a key don't retry in lockstep. retry_after = ( resp.headers.get("Retry-After") or resp.headers.get("X-RateLimit-Reset-Requests") ) if retry_after: try: - wait = float(retry_after) + wait = float(retry_after) + random.uniform(0, _RATE_LIMIT_JITTER) except (ValueError, TypeError): - wait = _RATE_LIMIT_BASE_WAIT * (2 ** attempt) + wait = min(_RATE_LIMIT_BASE_WAIT * (2 ** attempt), _RATE_LIMIT_MAX_WAIT) + wait += random.uniform(0, _RATE_LIMIT_JITTER) else: - wait = min(_RATE_LIMIT_BASE_WAIT * (2 ** attempt), 60) + wait = min(_RATE_LIMIT_BASE_WAIT * (2 ** attempt), _RATE_LIMIT_MAX_WAIT) + wait += random.uniform(0, _RATE_LIMIT_JITTER) log.warning( - "LLM rate limited (HTTP %s). Waiting %ds before retry %d/%d. " - "Tip: Gemini free tier = 15 RPM. Consider a paid account " - "or switching to a local model.", + "LLM rate limited (HTTP %s). Waiting %.0fs before retry %d/%d.", resp.status_code, wait, attempt + 1, _MAX_RETRIES, ) time.sleep(wait) @@ -254,9 +305,10 @@ def chat( except httpx.TimeoutException: if attempt < _MAX_RETRIES - 1: - wait = min(_RATE_LIMIT_BASE_WAIT * (2 ** attempt), 60) + wait = min(_RATE_LIMIT_BASE_WAIT * (2 ** attempt), _RATE_LIMIT_MAX_WAIT) + wait += random.uniform(0, _RATE_LIMIT_JITTER) log.warning( - "LLM request timed out, retrying in %ds (attempt %d/%d)", + "LLM request timed out, retrying in %.0fs (attempt %d/%d)", wait, attempt + 1, _MAX_RETRIES, ) time.sleep(wait) diff --git a/src/applypilot/pipeline.py b/src/applypilot/pipeline.py index 29881c5f..9828b5a8 100644 --- a/src/applypilot/pipeline.py +++ b/src/applypilot/pipeline.py @@ -17,15 +17,10 @@ import time from datetime import datetime -from rich.console import Console -from rich.panel import Panel -from rich.table import Table - from applypilot.config import load_env, ensure_dirs from applypilot.database import init_db, get_connection, get_stats log = logging.getLogger(__name__) -console = Console() # --------------------------------------------------------------------------- @@ -63,37 +58,31 @@ def _run_discover(workers: int = 1) -> dict: """Stage: Job discovery — JobSpy, Workday, and smart-extract scrapers.""" stats: dict = {"jobspy": None, "workday": None, "smartextract": None} - # JobSpy - console.print(" [cyan]JobSpy full crawl...[/cyan]") + log.info("JobSpy full crawl...") try: from applypilot.discovery.jobspy import run_discovery run_discovery() stats["jobspy"] = "ok" except Exception as e: log.error("JobSpy crawl failed: %s", e) - console.print(f" [red]JobSpy error:[/red] {e}") stats["jobspy"] = f"error: {e}" - # Workday corporate scraper - console.print(" [cyan]Workday corporate scraper...[/cyan]") + log.info("Workday corporate scraper...") try: from applypilot.discovery.workday import run_workday_discovery run_workday_discovery(workers=workers) stats["workday"] = "ok" except Exception as e: log.error("Workday scraper failed: %s", e) - console.print(f" [red]Workday error:[/red] {e}") stats["workday"] = f"error: {e}" - # Smart extract - console.print(" [cyan]Smart extract (AI-powered scraping)...[/cyan]") + log.info("Smart extract (AI-powered scraping)...") try: from applypilot.discovery.smartextract import run_smart_extract run_smart_extract(workers=workers) stats["smartextract"] = "ok" except Exception as e: log.error("Smart extract failed: %s", e) - console.print(f" [red]Smart extract error:[/red] {e}") stats["smartextract"] = f"error: {e}" return stats @@ -177,10 +166,7 @@ def _resolve_stages(stage_names: list[str]) -> list[str]: resolved = [] for name in stage_names: if name not in STAGE_META: - console.print( - f"[red]Unknown stage:[/red] '{name}'. " - f"Available: {', '.join(STAGE_ORDER)}, all" - ) + log.error("Unknown stage '%s'. Available: %s, all", name, ', '.join(STAGE_ORDER)) raise SystemExit(1) if name not in resolved: resolved.append(name) @@ -332,10 +318,8 @@ def _run_sequential(ordered: list[str], min_score: int, workers: int = 1, for name in ordered: meta = STAGE_META[name] - console.print(f"\n{'=' * 70}") - console.print(f" [bold]STAGE: {name}[/bold] — {meta['desc']}") - console.print(f" Started: {datetime.now().strftime('%H:%M:%S')}") - console.print(f"{'=' * 70}") + log.info("-" * 60) + log.info("STAGE: %s — %s", name, meta['desc']) t0 = time.time() runner = _STAGE_RUNNERS[name] @@ -365,13 +349,12 @@ def _run_sequential(ordered: list[str], min_score: int, workers: int = 1, elapsed = time.time() - t0 status = f"error: {e}" log.exception("Stage '%s' crashed", name) - console.print(f"\n [red]STAGE FAILED:[/red] {e}") results.append({"stage": name, "status": status, "elapsed": elapsed}) if status not in ("ok", "partial"): errors[name] = status - console.print(f"\n Stage '{name}' completed in {elapsed:.1f}s — {status}") + log.info("Stage '%s' done in %.1fs — %s", name, elapsed, status) total_elapsed = time.time() - pipeline_start return {"stages": results, "errors": errors, "elapsed": total_elapsed} @@ -384,8 +367,8 @@ def _run_streaming(ordered: list[str], min_score: int, workers: int = 1, stop_event = threading.Event() pipeline_start = time.time() - console.print(f"\n [bold cyan]STREAMING MODE[/bold cyan] — stages run concurrently") - console.print(f" Poll interval: {_STREAM_POLL_INTERVAL}s\n") + log.info("STREAMING MODE — stages run concurrently") + log.info("Poll interval: %ds", _STREAM_POLL_INTERVAL) # Mark stages NOT in `ordered` as done so downstream doesn't wait for them for stage in STAGE_ORDER: @@ -406,18 +389,16 @@ def _run_streaming(ordered: list[str], min_score: int, workers: int = 1, ) threads[name] = t t.start() - console.print(f" [dim]Started thread:[/dim] {name}") + log.info("Started thread: %s", name) # Wait for all threads to finish try: for name in ordered: threads[name].join() elapsed = time.time() - start_times[name] - console.print( - f" [green]Completed:[/green] {name} ({elapsed:.1f}s)" - ) + log.info("Completed: %s (%.1fs)", name, elapsed) except KeyboardInterrupt: - console.print("\n[yellow]Interrupted — stopping stages...[/yellow]") + log.warning("Interrupted — stopping stages...") stop_event.set() for t in threads.values(): t.join(timeout=10) @@ -473,26 +454,20 @@ def run_pipeline( # Banner mode = "streaming" if stream else "sequential" - console.print() - console.print(Panel.fit( - f"[bold]ApplyPilot Pipeline[/bold] ({mode})", - border_style="blue", - )) - console.print(f" Min score: {min_score}") - console.print(f" Workers: {workers}") - console.print(f" Validation: {validation_mode}") - console.print(f" Stages: {' -> '.join(ordered)}") + log.info("=== ApplyPilot Pipeline (%s) ===", mode) + log.info("min_score=%d workers=%d validation=%s", min_score, workers, validation_mode) + log.info("stages: %s", " -> ".join(ordered)) # Pre-run stats pre_stats = get_stats() - console.print(f" DB: {pre_stats['total']} jobs, {pre_stats['pending_detail']} pending enrichment") + log.info("DB: %d jobs, %d pending enrichment", pre_stats['total'], pre_stats['pending_detail']) if dry_run: - console.print(f"\n [yellow]DRY RUN[/yellow] — would execute ({mode}):") + log.info("DRY RUN — would execute (%s):", mode) for name in ordered: meta = STAGE_META[name] - console.print(f" {name:<12s} {meta['desc']}") - console.print(f"\n No changes made.") + log.info(" %-12s %s", name, meta['desc']) + log.info("No changes made.") return {"stages": [], "errors": {}, "elapsed": 0.0} # Execute @@ -503,38 +478,23 @@ def run_pipeline( result = _run_sequential(ordered, min_score, workers=workers, validation_mode=validation_mode) - # Summary table - console.print(f"\n{'=' * 70}") - summary = Table(title="Pipeline Summary", show_header=True, header_style="bold") - summary.add_column("Stage", style="bold") - summary.add_column("Status") - summary.add_column("Time", justify="right") - + # Summary + log.info("=" * 60) + log.info("Pipeline Summary") for r in result["stages"]: - elapsed_str = f"{r['elapsed']:.1f}s" - status_display = r["status"][:30] - if r["status"] == "ok": - style = "green" - elif r["status"] in ("partial", "skipped"): - style = "yellow" - else: - style = "red" - summary.add_row(r["stage"], f"[{style}]{status_display}[/{style}]", elapsed_str) - - summary.add_row("", "", "") - summary.add_row("[bold]Total[/bold]", "", f"[bold]{result['elapsed']:.1f}s[/bold]") - console.print(summary) + log.info(" %-12s %-20s %.1fs", r["stage"], r["status"][:20], r["elapsed"]) + log.info(" %-12s %-20s %.1fs", "TOTAL", "", result["elapsed"]) + log.info("=" * 60) # Final DB stats final = get_stats() - console.print(f"\n [bold]DB Final State:[/bold]") - console.print(f" Total jobs: {final['total']}") - console.print(f" With desc: {final['with_description']}") - console.print(f" Scored: {final['scored']}") - console.print(f" Tailored: {final['tailored']}") - console.print(f" Cover letters: {final['with_cover_letter']}") - console.print(f" Ready to apply: {final['ready_to_apply']}") - console.print(f" Applied: {final['applied']}") - console.print(f"{'=' * 70}\n") + log.info("DB Final State:") + log.info(" total jobs: %d", final['total']) + log.info(" with desc: %d", final['with_description']) + log.info(" scored: %d", final['scored']) + log.info(" tailored: %d", final['tailored']) + log.info(" cover letters: %d", final['with_cover_letter']) + log.info(" ready to apply: %d", final['ready_to_apply']) + log.info(" applied: %d", final['applied']) return result diff --git a/src/applypilot/scoring/cover_letter.py b/src/applypilot/scoring/cover_letter.py index c16cdd5f..ba71c60b 100644 --- a/src/applypilot/scoring/cover_letter.py +++ b/src/applypilot/scoring/cover_letter.py @@ -275,7 +275,7 @@ def run_cover_letters(min_score: int = 7, limit: int = 20, } error_count += 1 results.append(result) - log.error("%d/%d [ERROR] %s -- %s", completed, len(jobs), job["title"][:40], e) + log.error("%d/%d [ERROR] %s -- %s", completed, len(jobs), job["title"][:40], str(e).splitlines()[0]) # Persist to DB: increment attempt counter for ALL, save path only for successes now = datetime.now(timezone.utc).isoformat() diff --git a/src/applypilot/scoring/pdf.py b/src/applypilot/scoring/pdf.py index 2b87b673..7ff7ff16 100644 --- a/src/applypilot/scoring/pdf.py +++ b/src/applypilot/scoring/pdf.py @@ -334,16 +334,28 @@ def build_html(resume: dict) -> str: # ── PDF Renderer ───────────────────────────────────────────────────────── def render_pdf(html: str, output_path: str) -> None: - """Render HTML to PDF using Playwright's headless Chromium. + """Render HTML to PDF using Playwright with headless Chrome. + + Prefers Playwright's bundled Chromium; falls back to the system Chrome + (needed on OS versions not yet supported by Playwright's install script). Args: html: Complete HTML string. output_path: Path to write the PDF file. """ from playwright.sync_api import sync_playwright + from applypilot.config import get_chrome_path with sync_playwright() as p: - browser = p.chromium.launch() + try: + browser = p.chromium.launch() + except Exception: + system_chrome = get_chrome_path() + log.debug("Playwright bundled browser unavailable; using system Chrome at %s", system_chrome) + browser = p.chromium.launch( + executable_path=system_chrome, + args=["--no-sandbox", "--disable-dev-shm-usage"], + ) page = browser.new_page() page.set_content(html, wait_until="networkidle") page.pdf( diff --git a/src/applypilot/scoring/scorer.py b/src/applypilot/scoring/scorer.py index 97692d5f..332e6ee6 100644 --- a/src/applypilot/scoring/scorer.py +++ b/src/applypilot/scoring/scorer.py @@ -97,8 +97,8 @@ def score_job(resume_text: str, job: dict) -> dict: response = client.chat(messages, max_tokens=512, temperature=0.2) return _parse_score_response(response) except Exception as e: - log.error("LLM error scoring job '%s': %s", job.get("title", "?"), e) - return {"score": 0, "keywords": "", "reasoning": f"LLM error: {e}"} + log.error("LLM error scoring job '%s': %s", job.get("title", "?"), str(e).splitlines()[0]) + return {"score": 0, "keywords": "", "reasoning": f"LLM error: {str(e).splitlines()[0]}"} def run_scoring(limit: int = 0, rescore: bool = False) -> dict: diff --git a/src/applypilot/scoring/tailor.py b/src/applypilot/scoring/tailor.py index 352fb5ff..09f6931b 100644 --- a/src/applypilot/scoring/tailor.py +++ b/src/applypilot/scoring/tailor.py @@ -539,7 +539,7 @@ def run_tailoring(min_score: int = 7, limit: int = 20, "url": job["url"], "title": job["title"], "site": job["site"], "status": "error", "attempts": 0, "path": None, "pdf_path": None, } - log.error("%d/%d [ERROR] %s -- %s", completed, len(jobs), job["title"][:40], e) + log.error("%d/%d [ERROR] %s -- %s", completed, len(jobs), job["title"][:40], str(e).splitlines()[0]) results.append(result) stats[result.get("status", "error")] = stats.get(result.get("status", "error"), 0) + 1