From e22c989dadba4d6457e24cf3cc89e7500b093ea1 Mon Sep 17 00:00:00 2001 From: 0xMRMA Date: Tue, 26 May 2026 01:33:53 +0300 Subject: [PATCH 1/2] Harden URL scraping against local and private targets --- tests/test_scraper.py | 65 ++++++++++- thepipe/__init__.py | 55 ++++------ thepipe/extract.py | 2 + thepipe/scraper.py | 247 ++++++++++++++++++++++++++---------------- 4 files changed, 241 insertions(+), 128 deletions(-) diff --git a/tests/test_scraper.py b/tests/test_scraper.py index e32b289..ed756b4 100644 --- a/tests/test_scraper.py +++ b/tests/test_scraper.py @@ -1,10 +1,13 @@ import json import tempfile +import threading from typing import cast import unittest import os +import socket import sys import zipfile +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from PIL import Image import pandas as pd @@ -32,7 +35,9 @@ def setUp(self): self.files_directory = os.path.join(os.path.dirname(__file__), "files") self.outputs_directory = "outputs" # create a client we can re-use for ai_extraction scenarios - self.client = OpenAI() if OpenAI is not None else None + self.client = ( + OpenAI() if OpenAI is not None and os.getenv("OPENAI_API_KEY") else None + ) def tearDown(self): # clean up outputs @@ -77,6 +82,64 @@ def test_scrape_directory_inclusion_exclusion(self): text = cast(str, chunks[0].text) self.assertIn("Y", text) + def test_scrape_url_rejects_file_scheme(self): + with self.assertRaisesRegex(ValueError, "Only http:// and https:// URLs"): + scraper.scrape_url("file:///tmp/secret.html") + + def test_scrape_url_rejects_localhost_html(self): + with self.assertRaisesRegex( + ValueError, "Local and private-network URLs are blocked by default" + ): + scraper.scrape_url("http://127.0.0.1:8000/internal.html") + + def test_scrape_url_rejects_localhost_download(self): + with self.assertRaisesRegex( + ValueError, "Local and private-network URLs are blocked by default" + ): + scraper.scrape_url("http://127.0.0.1:8000/secret.txt") + + def test_scrape_url_allows_localhost_with_opt_in(self): + canary = "LOCALHOST_DOWNLOAD_ALLOWED" + request_log = [] + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + request_log.append(self.path) + body = canary.encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + return + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + port = cast(int, sock.getsockname()[1]) + + server = ThreadingHTTPServer(("127.0.0.1", port), Handler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + try: + chunks = scraper.scrape_url( + f"http://127.0.0.1:{port}/secret.txt", + allow_local_urls=True, + ) + finally: + server.shutdown() + thread.join(timeout=5) + server.server_close() + + self.assertEqual(request_log, ["/secret.txt"]) + self.assertEqual(len(chunks), 1) + self.assertIn(canary, cast(str, chunks[0].text)) + + def test_scrape_github_rejects_confusable_host(self): + with self.assertRaisesRegex(ValueError, "hostname 'evil.example'"): + scraper.scrape_github("https://github.com@evil.example/owner/repo") + def test_scrape_html(self): filepath = os.path.join(self.files_directory, "example.html") chunks = scraper.scrape_file(filepath, verbose=True) diff --git a/thepipe/__init__.py b/thepipe/__init__.py index 93702da..42c19fb 100644 --- a/thepipe/__init__.py +++ b/thepipe/__init__.py @@ -7,39 +7,28 @@ from openai import OpenAI -from .scraper import scrape_directory, scrape_file, scrape_url from .core import DEFAULT_AI_MODEL, save_outputs +from .scraper import scrape_directory, scrape_file, scrape_url -# Argument parsing -def parse_arguments() -> argparse.Namespace: # noqa: D401 – imperative is fine here - """ - Parse CLI flags. +def parse_arguments() -> argparse.Namespace: # noqa: D401 + """Parse CLI flags.""" - Returns - ------- - argparse.Namespace - Parsed arguments. - """ parser = argparse.ArgumentParser( prog="thepipe", description="Universal document/Web scraper with optional OpenAI extraction.", ) - # Required source (file, directory, or URL) parser.add_argument( "source", help="File path, directory, or URL to scrape.", ) - - # Optional flags parser.add_argument( "-i", "--inclusion-pattern", dest="inclusion_pattern", default=None, - help="Regex pattern – only files whose *full path* matches are scraped " - "(applies to directory/zip scraping).", + help="Regex pattern - only files whose full path matches are scraped (applies to directory/zip scraping).", ) parser.add_argument( "-v", @@ -51,15 +40,19 @@ def parse_arguments() -> argparse.Namespace: # noqa: D401 – imperative is fin "--text-only", dest="text_only", action="store_true", - help="Suppress images – output only extracted text.", + help="Suppress images - output only extracted text.", + ) + parser.add_argument( + "--allow-local-urls", + dest="allow_local_urls", + action="store_true", + help="Allow scraping localhost and private-network HTTP(S) URLs. Disabled by default for security.", ) - - # OpenAI-related flags parser.add_argument( "--openai-api-key", dest="openai_api_key", default=os.getenv("OPENAI_API_KEY"), - help="OpenAI API key. If omitted, env variable OPENAI_API_KEY is used.", + help="OpenAI API key. If omitted, env variable OPENAI_API_KEY is used.", ) parser.add_argument( "--openai-base-url", @@ -73,18 +66,15 @@ def parse_arguments() -> argparse.Namespace: # noqa: D401 – imperative is fin default=DEFAULT_AI_MODEL, help=f"Chat/VLM model to use (default: {DEFAULT_AI_MODEL}).", ) - - # Legacy flag (will be removed in future versions) parser.add_argument( "--ai-extraction", action="store_true", - help=argparse.SUPPRESS, # hidden but still accepted + help=argparse.SUPPRESS, ) return parser.parse_args() -# OpenAI client factory def create_openai_client( *, api_key: Optional[str], @@ -92,42 +82,37 @@ def create_openai_client( enable_vlm: bool, ) -> Optional[OpenAI]: if api_key: - # Normal path – user gave an explicit key return OpenAI(api_key=api_key, base_url=base_url) if enable_vlm: - # Old flag: fall back to env vars warnings.warn( - "--ai-extraction is deprecated; " - "please use --openai-api-key and --openai-model " - "(and optionally --openai-base-url) instead.", + "--ai-extraction is deprecated; please use --openai-api-key and " + "--openai-model (and optionally --openai-base-url) instead.", DeprecationWarning, stacklevel=2, ) return OpenAI(base_url=base_url, api_key=os.getenv("OPENAI_API_KEY")) - # AI extraction disabled return None def main() -> None: - """CLI entry point""" - args = parse_arguments() + """CLI entry point.""" - # Instantiate the OpenAI client if requested + args = parse_arguments() openai_client = create_openai_client( api_key=args.openai_api_key, base_url=args.openai_base_url, enable_vlm=args.ai_extraction, ) - # Delegate scraping based on source type if args.source.startswith(("http://", "https://")): chunks = scrape_url( args.source, verbose=args.verbose, openai_client=openai_client, model=args.openai_model, + allow_local_urls=args.allow_local_urls, ) elif os.path.isdir(args.source): chunks = scrape_directory( @@ -146,7 +131,6 @@ def main() -> None: else: raise ValueError(f"Invalid source: {args.source}") - # Persist results save_outputs( chunks=chunks, verbose=args.verbose, @@ -155,9 +139,8 @@ def main() -> None: ) if args.verbose: - print(f"Scraping complete. Outputs saved to 'thepipe_output/'.") + print("Scraping complete. Outputs saved to 'thepipe_output/'.") -# Entry-point shim if __name__ == "__main__": main() diff --git a/thepipe/extract.py b/thepipe/extract.py index bd22611..dbb7717 100644 --- a/thepipe/extract.py +++ b/thepipe/extract.py @@ -218,6 +218,7 @@ def extract_from_url( verbose: bool = False, chunking_method: Callable[[List[Chunk]], List[Chunk]] = chunk_by_page, openai_client: Optional[OpenAI] = None, + allow_local_urls: bool = False, ) -> Tuple[List[Dict], int]: print( f"[thepipe] Extract functions will be deprecated in future versions. See the README for more information" @@ -227,6 +228,7 @@ def extract_from_url( verbose=verbose, chunking_method=chunking_method, openai_client=openai_client, + allow_local_urls=allow_local_urls, ) extracted_chunks, tokens_used = extract( chunks=chunks, diff --git a/thepipe/scraper.py b/thepipe/scraper.py index 942f8d6..0f2d592 100644 --- a/thepipe/scraper.py +++ b/thepipe/scraper.py @@ -3,12 +3,14 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from collections import OrderedDict from io import BytesIO, StringIO +import ipaddress import math import re import fnmatch import os +import socket import tempfile -from urllib.parse import urlparse +from urllib.parse import urljoin, urlparse import zipfile from PIL import Image import requests @@ -80,14 +82,11 @@ ) MAX_WHISPER_DURATION = int(os.getenv("MAX_WHISPER_DURATION", 600)) # 10 minutes -TWITTER_DOMAINS = { - "https://twitter.com", - "https://www.twitter.com", - "https://x.com", - "https://www.x.com", -} -YOUTUBE_DOMAINS = {"https://www.youtube.com", "https://youtube.com"} -GITHUB_DOMAINS = {"https://github.com", "https://www.github.com"} +TWITTER_HOSTS = {"twitter.com", "www.twitter.com", "x.com", "www.x.com"} +YOUTUBE_HOSTS = {"www.youtube.com", "youtube.com"} +GITHUB_HOSTS = {"github.com", "www.github.com"} +ALLOWED_REMOTE_SCHEMES = {"http", "https"} +HTML_EXTENSIONS = {".html", ".htm", ".php", ".asp", ".aspx"} SCRAPING_PROMPT = os.getenv( "SCRAPING_PROMPT", """A document is given. Please output the entire extracted contents from the document in detailed markdown format. @@ -112,6 +111,110 @@ def _load_whisper(): return whisper +def _is_public_ip_address(ip_text: str) -> bool: + ip = ipaddress.ip_address(ip_text) + return ip.is_global + + +def _hostname_resolves_publicly(hostname: str) -> bool: + try: + addrinfo = socket.getaddrinfo(hostname, None, type=socket.SOCK_STREAM) + except socket.gaierror: + # Let the underlying request/browser surface resolution failures. + return True + + found_ip = False + for _, _, _, _, sockaddr in addrinfo: + ip_text = sockaddr[0] + try: + if not _is_public_ip_address(ip_text): + return False + found_ip = True + except ValueError: + continue + return found_ip + + +def _validate_remote_url( + url: str, + allow_local_urls: bool = False, + allowed_hosts: Optional[set[str]] = None, +) -> str: + parsed = urlparse(url) + scheme = parsed.scheme.lower() + if scheme not in ALLOWED_REMOTE_SCHEMES: + raise ValueError( + f"Unsupported URL scheme for scrape_url: '{scheme or 'missing'}'. " + "Only http:// and https:// URLs are allowed. Use scrape_file() for local files." + ) + + hostname = (parsed.hostname or "").lower() + if not hostname: + raise ValueError(f"URL must include a hostname: {url}") + + if allowed_hosts is not None and hostname not in allowed_hosts: + raise ValueError( + f"URL hostname '{hostname}' is not supported for this scraper." + ) + + if allow_local_urls: + return url + + if hostname == "localhost" or hostname.endswith(".localhost"): + raise ValueError( + f"Local and private-network URLs are blocked by default: {url}" + ) + + try: + hostname_ip = ipaddress.ip_address(hostname) + except ValueError: + hostname_ip = None + + if hostname_ip is not None: + if not hostname_ip.is_global: + raise ValueError( + f"Local and private-network URLs are blocked by default: {url}" + ) + elif not _hostname_resolves_publicly(hostname): + raise ValueError( + f"Local and private-network URLs are blocked by default: {url}" + ) + + return url + + +def _request_remote_url( + url: str, + *, + timeout: int, + allow_local_urls: bool = False, + allowed_hosts: Optional[set[str]] = None, + headers: Optional[Dict[str, str]] = None, +) -> requests.Response: + _validate_remote_url( + url, + allow_local_urls=allow_local_urls, + allowed_hosts=allowed_hosts, + ) + response = requests.get(url, timeout=timeout, headers=headers) + response.raise_for_status() + return response + + +def _safe_image_request( + image_url: str, + *, + allow_local_urls: bool = False, +) -> Image.Image: + response = _request_remote_url( + image_url, + timeout=10, + allow_local_urls=allow_local_urls, + headers={"User-Agent": USER_AGENT_STRING}, + ) + return Image.open(BytesIO(response.content)) + + def detect_source_mimetype(source: str) -> str: # try to detect the file type by its extension _, extension = os.path.splitext(source) @@ -494,7 +597,9 @@ def _process_page(page_num: int) -> Tuple[int, str, Optional[Image.Image]]: return chunks -def get_images_from_markdown(text: str) -> List[Image.Image]: +def get_images_from_markdown( + text: str, allow_local_urls: bool = False +) -> List[Image.Image]: image_urls = re.findall(r"!\[.*?\]\((.*?)\)", text) images = [] for url in image_urls: @@ -504,16 +609,10 @@ def get_images_from_markdown(text: str) -> List[Image.Image]: continue try: - response = requests.get( - url, - timeout=10, - headers={"User-Agent": USER_AGENT_STRING}, - ) - response.raise_for_status() + img = _safe_image_request(url, allow_local_urls=allow_local_urls) except Exception: continue - img = Image.open(BytesIO(response.content)) images.append(img) return images @@ -553,9 +652,11 @@ def parse_webpage_with_vlm( verbose: Optional[bool] = False, openai_client: Optional[OpenAI] = None, include_output_images: bool = True, + allow_local_urls: bool = False, ) -> Chunk: if openai_client is None: raise ValueError("parse_webpage_with_vlm requires an openai_client argument.") + _validate_remote_url(url, allow_local_urls=allow_local_urls) from playwright.sync_api import sync_playwright with sync_playwright() as p: @@ -643,13 +744,17 @@ def parse_webpage_with_vlm( def extract_page_content( - url: str, verbose: bool = False, include_output_images: bool = True + url: str, + verbose: bool = False, + include_output_images: bool = True, + allow_local_urls: bool = False, ) -> Chunk: from bs4 import BeautifulSoup from playwright.sync_api import sync_playwright import base64 import requests + _validate_remote_url(url, allow_local_urls=allow_local_urls) texts: List[str] = [] images: List[Image.Image] = [] @@ -728,83 +833,29 @@ def extract_page_content( ) continue else: + candidate_urls = [urljoin(url, img_path)] try: - # Try direct URL first - response = requests.get( - img_path, - timeout=10, - headers={"User-Agent": USER_AGENT_STRING}, + image = _safe_image_request( + candidate_urls[0], + allow_local_urls=allow_local_urls, ) - response.raise_for_status() - image = Image.open(BytesIO(response.content)) images.append(image) except Exception as e: if verbose: - print(f"[thepipe] Error loading image {img_path}: {e}") - print("[thepipe] Attempting to load path with schema.") - - # Try with schema if path is relative - if not img_path.startswith(("http://", "https://")): - try: - # Remove leading slashes - while img_path.startswith("/"): - img_path = img_path[1:] - - # Try with just the scheme - parsed_url = urlparse(url) - path_with_schema = ( - f"{parsed_url.scheme}://{img_path}" - ) - response = requests.get( - path_with_schema, - timeout=10, - headers={"User-Agent": USER_AGENT_STRING}, - ) - response.raise_for_status() - image = Image.open(BytesIO(response.content)) - images.append(image) - except Exception as e: - if verbose: - print( - f"[thepipe] Error loading image {img_path} with schema: {e}" - ) - print( - "[thepipe] Attempting to load with schema and netloc." - ) - - try: - # Try with scheme and netloc - path_with_schema_and_netloc = f"{parsed_url.scheme}://{parsed_url.netloc}/{img_path}" - response = requests.get( - path_with_schema_and_netloc, - timeout=10, - headers={"User-Agent": USER_AGENT_STRING}, - ) - response.raise_for_status() - image = Image.open(BytesIO(response.content)) - images.append(image) - except Exception as e: - if verbose: - print( - f"[thepipe] Final attempt failed for image {img_path}: {e}" - ) - continue - else: - if verbose: - print( - f"[thepipe] Skipping image {img_path} - all attempts failed" - ) - continue + print(f"[thepipe] Skipping image {img_path}: {e}") + continue except Exception as e: if verbose: print(f"[thepipe] Error scraping {url}: {e}") # Fallback to simple requests try: - response = requests.get( - url, headers={"User-Agent": USER_AGENT_STRING}, timeout=30 + response = _request_remote_url( + url, + headers={"User-Agent": USER_AGENT_STRING}, + timeout=30, + allow_local_urls=allow_local_urls, ) - response.raise_for_status() soup = BeautifulSoup(response.content, "html.parser") # Remove unwanted elements @@ -842,18 +893,21 @@ def scrape_url( model: str = DEFAULT_AI_MODEL, include_input_images: bool = True, include_output_images: bool = True, + allow_local_urls: bool = False, ) -> List[Chunk]: - if any(url.startswith(domain) for domain in TWITTER_DOMAINS): + parsed_url = urlparse(url) + hostname = (parsed_url.hostname or "").lower() + if hostname in TWITTER_HOSTS: extraction = scrape_tweet(url=url, include_output_images=include_output_images) return extraction - elif any(url.startswith(domain) for domain in YOUTUBE_DOMAINS): + elif hostname in YOUTUBE_HOSTS: extraction = scrape_youtube( youtube_url=url, verbose=verbose, include_output_images=include_output_images, ) return extraction - elif any(url.startswith(domain) for domain in GITHUB_DOMAINS): + elif hostname in GITHUB_HOSTS: extraction = scrape_github( github_url=url, verbose=verbose, @@ -863,12 +917,18 @@ def scrape_url( include_output_images=include_output_images, ) return extraction - _, extension = os.path.splitext(urlparse(url).path) - if extension and extension not in {".html", ".htm", ".php", ".asp", ".aspx"}: + _validate_remote_url(url, allow_local_urls=allow_local_urls) + _, extension = os.path.splitext(parsed_url.path) + if extension and extension not in HTML_EXTENSIONS: # if url leads to a file, attempt to download it and scrape it with tempfile.TemporaryDirectory() as temp_dir: - file_path = os.path.join(temp_dir, os.path.basename(url)) - response = requests.get(url) + filename = os.path.basename(parsed_url.path) or "downloaded_file" + file_path = os.path.join(temp_dir, filename) + response = _request_remote_url( + url, + timeout=30, + allow_local_urls=allow_local_urls, + ) # verify the ingress/egress with be within limits, if there are any set response_length = int(response.headers.get("Content-Length", 0)) if FILESIZE_LIMIT_MB and response_length > FILESIZE_LIMIT_MB * 1024 * 1024: @@ -894,10 +954,14 @@ def scrape_url( model=model, openai_client=openai_client, include_output_images=include_output_images, + allow_local_urls=allow_local_urls, ) else: chunk = extract_page_content( - url=url, verbose=verbose, include_output_images=include_output_images + url=url, + verbose=verbose, + include_output_images=include_output_images, + allow_local_urls=allow_local_urls, ) chunks = chunking_method([chunk]) # if no text or images were extracted, return error @@ -1042,6 +1106,7 @@ def scrape_github( include_output_images: bool = True, ) -> List[Chunk]: files_contents: List[Chunk] = [] + _validate_remote_url(github_url, allowed_hosts=GITHUB_HOSTS) if not GITHUB_TOKEN: raise ValueError("GITHUB_TOKEN environment variable is not set.") # make new tempdir for cloned repo From 86e65aff9d7dc5c249da1c70f25e4f4c36b3daab Mon Sep 17 00:00:00 2001 From: 0xMRMA Date: Tue, 26 May 2026 01:45:05 +0300 Subject: [PATCH 2/2] Skip extractor integration test without OpenAI key --- tests/test_extractor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_extractor.py b/tests/test_extractor.py index 30a1b55..1c3a518 100644 --- a/tests/test_extractor.py +++ b/tests/test_extractor.py @@ -60,6 +60,7 @@ def test_extract_json_from_response(self): result = extract_json_from_response(case["input"]) self.assertEqual(result, case["expected"]) + @unittest.skipIf(not os.getenv("OPENAI_API_KEY"), "OpenAI API key required") def test_extract(self): # provide an explicit client so we cover the new parameter client = OpenAI()