From c24e0d903cb7a16c8da277c4dbab74c6f611049e Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sat, 9 May 2026 17:50:41 +0800 Subject: [PATCH 1/6] add --recursive to fast validation --- stac_validator/fast_validator.py | 236 +++++++++++++++++++++++++++++++ stac_validator/stac_validator.py | 13 +- 2 files changed, 247 insertions(+), 2 deletions(-) diff --git a/stac_validator/fast_validator.py b/stac_validator/fast_validator.py index 016c11d..9518b40 100644 --- a/stac_validator/fast_validator.py +++ b/stac_validator/fast_validator.py @@ -357,3 +357,239 @@ def run(self): ] click.echo("\n") + + def run_recursive(self): + """Recursively validate a STAC catalog/collection and all its children.""" + import json + + # Load the root STAC object + try: + if self.stac_file.startswith("http"): + req = urllib.request.Request( + self.stac_file, headers={"User-Agent": "stac-fast-cli/5.0"} + ) + with urllib.request.urlopen(req) as response: + root_data = json.loads(response.read().decode("utf-8")) + root_path = self.stac_file + else: + with open(self.stac_file, "r") as f: + root_data = json.load(f) + root_path = os.path.abspath(self.stac_file) + except Exception as e: + click.secho(f"āŒ Error reading {self.stac_file}: {e}", fg="red", bold=True) + self.valid = False + return + + # Recursively validate the root and all children + results = [] + self._validate_recursive(root_data, root_path, results) + + # Display results + click.echo("\n" + "=" * 55) + click.secho("šŸ“Š RECURSIVE VALIDATION SUMMARY", bold=True, fg="blue") + click.echo("=" * 55) + + valid_count = sum(1 for r in results if r["valid_stac"]) + invalid_count = len(results) - valid_count + + click.echo(f"Total Objects Validated: {len(results)}") + click.echo(f"Valid Objects: {valid_count}") + click.echo(f"Invalid Objects: {invalid_count}") + + if invalid_count > 0: + click.echo("\n" + "=" * 55) + click.secho("🚨 INVALID OBJECTS", bold=True, fg="red") + click.echo("=" * 55) + + # Group errors by message + error_groups = {} + for result in results: + if not result["valid_stac"]: + error_msg = result.get("error_message", "Unknown error") + if error_msg not in error_groups: + error_groups[error_msg] = [] + # Store both path and ID for better identification + object_id = result.get("id", "unknown") + error_groups[error_msg].append( + {"path": result["path"], "id": object_id} + ) + + # Display grouped errors + for error_msg, items in error_groups.items(): + click.echo(f"\nāŒ {error_msg}") + click.echo(f" Affected Objects: {len(items)}") + # Show first 5 examples + for item in items[:5]: + item_id = item["id"] if item["id"] != "unknown" else "" + if item_id: + click.echo(f" - {item['path']} (ID: {item_id})") + else: + click.echo(f" - {item['path']}") + if len(items) > 5: + click.echo(f" ... and {len(items) - 5} more") + + # Set overall validity + self.valid = invalid_count == 0 + self.message = results + + def _validate_recursive( + self, data: Dict[str, Any], file_path: str, results: List[Dict] + ): + """Recursively validate a STAC object and its children.""" + import json + + # Determine STAC type - could be "Catalog", "Collection", or "Feature" (Item) + raw_type = data.get("type", "unknown") + if raw_type == "Feature": + stac_type = "item" + elif raw_type == "Collection": + stac_type = "collection" + elif raw_type == "Catalog": + stac_type = "catalog" + else: + stac_type = raw_type.lower() if raw_type else "unknown" + + stac_version = data.get("stac_version", "unknown") + + # Validate current object using get_validator (same as run() does) + # Skip validation for STAC API responses (they have conformsTo instead of stac_extensions) + is_stac_api = "conformsTo" in data + + if is_stac_api: + # STAC API catalogs don't validate against STAC schemas, just mark as valid + is_valid = True + error_msg = None + else: + try: + extensions = data.get("stac_extensions", []) + validator, _ = get_validator(stac_type, stac_version, extensions) + validator(data) + is_valid = True + error_msg = None + except fastjsonschema.JsonSchemaValueException as e: + is_valid = False + error_msg = f"{e.name} {e.message.replace(e.name, '').strip()}" + except Exception as e: + is_valid = False + error_msg = str(e) + + # Create result for this object + # Extract ID if available + object_id = data.get("id", "unknown") + + result = { + "path": file_path, + "id": object_id, + "valid_stac": is_valid, + "stac_type": stac_type, + "stac_version": stac_version, + } + if error_msg: + result["error_message"] = error_msg + + results.append(result) + + # Process child links + base_dir = ( + os.path.dirname(file_path) + if not file_path.startswith("http") + else file_path.rsplit("/", 1)[0] + ) + links = data.get("links", []) + + # For remote URLs, look for "data" links (collections endpoint) or "child"/"item" links + # For local files, look for "child" and "item" links + is_remote = file_path.startswith("http") + + for link in links: + rel = link.get("rel", "") + href = link.get("href", "") + + # Determine if we should follow this link + should_follow = False + if is_remote: + # For remote catalogs, follow "data" (collections), "child", "item", and "items" links + if rel in ["data", "child", "item", "items"] and href: + should_follow = True + else: + # For local catalogs, follow "child" and "item" links + if rel in ["child", "item"] and href: + should_follow = True + + if should_follow: + # Resolve relative path + if href.startswith("http"): + child_path = href + else: + child_path = os.path.normpath(os.path.join(base_dir, href)) + + # Load and validate child + try: + if child_path.startswith("http"): + req = urllib.request.Request( + child_path, headers={"User-Agent": "stac-fast-cli/5.0"} + ) + with urllib.request.urlopen(req) as response: + child_data = json.loads(response.read().decode("utf-8")) + else: + with open(child_path, "r") as f: + child_data = json.load(f) + + # If this is a collections list endpoint, extract individual collections + if rel == "data" and is_remote and isinstance(child_data, dict): + collections = child_data.get("collections", []) + if collections: + # This is a collections list - process each collection + for collection in collections: + collection_id = collection.get("id") + if collection_id: + # Build URL to individual collection + collection_url = ( + f"{child_path.rstrip('/')}/{collection_id}" + ) + try: + req = urllib.request.Request( + collection_url, + headers={"User-Agent": "stac-fast-cli/5.0"}, + ) + with urllib.request.urlopen(req) as response: + collection_data = json.loads( + response.read().decode("utf-8") + ) + # Recursively validate the full collection + self._validate_recursive( + collection_data, collection_url, results + ) + except Exception as e: + results.append( + { + "path": collection_url, + "valid_stac": False, + "error_message": f"Failed to load: {str(e)}", + } + ) + else: + # Not a collections list, validate as normal + self._validate_recursive(child_data, child_path, results) + # If this is an items endpoint (GeoJSON FeatureCollection), extract individual items + elif rel == "items" and is_remote and isinstance(child_data, dict): + features = child_data.get("features", []) + if features: + # This is an items list - process each item + for feature in features: + # Recursively validate each item + self._validate_recursive(feature, child_path, results) + else: + # Not an items list, validate as normal + self._validate_recursive(child_data, child_path, results) + else: + # Recursively validate child + self._validate_recursive(child_data, child_path, results) + except Exception as e: + results.append( + { + "path": child_path, + "valid_stac": False, + "error_message": f"Failed to load: {str(e)}", + } + ) diff --git a/stac_validator/stac_validator.py b/stac_validator/stac_validator.py index 76b5862..36b66a3 100644 --- a/stac_validator/stac_validator.py +++ b/stac_validator/stac_validator.py @@ -539,11 +539,20 @@ def batch( is_flag=True, help="Show full validation logs for all items. By default, a limited sample of item logs is shown.", ) -def fast(stac_file: str, quiet: bool, verbose: bool): +@click.option( + "--recursive", + "-r", + is_flag=True, + help="Recursively validate all child catalogs, collections, and items.", +) +def fast(stac_file: str, quiet: bool, verbose: bool, recursive: bool): """High-speed validation using fastjsonschema and local caching.""" try: fv = FastValidator(stac_file, quiet=quiet, verbose=verbose) - fv.run() + if recursive: + fv.run_recursive() + else: + fv.run() sys.exit(0 if fv.valid else 1) except RuntimeError as e: click.secho(f"\n🚨 FATAL ERROR: {e}", fg="red", bold=True) From 2ba780c3a6daab562e0c4bb87eef3a82f38d318a Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sat, 9 May 2026 18:14:01 +0800 Subject: [PATCH 2/6] update --- stac_validator/fast_validator.py | 41 +++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/stac_validator/fast_validator.py b/stac_validator/fast_validator.py index 9518b40..e084765 100644 --- a/stac_validator/fast_validator.py +++ b/stac_validator/fast_validator.py @@ -1,5 +1,6 @@ import json import os +import sys import time import urllib.error import urllib.request @@ -362,6 +363,8 @@ def run_recursive(self): """Recursively validate a STAC catalog/collection and all its children.""" import json + sys.setrecursionlimit(10000) + # Load the root STAC object try: if self.stac_file.startswith("http"): @@ -382,7 +385,9 @@ def run_recursive(self): # Recursively validate the root and all children results = [] - self._validate_recursive(root_data, root_path, results) + visited = set() + visited.add(root_path) + self._validate_recursive(root_data, root_path, results, visited) # Display results click.echo("\n" + "=" * 55) @@ -433,7 +438,11 @@ def run_recursive(self): self.message = results def _validate_recursive( - self, data: Dict[str, Any], file_path: str, results: List[Dict] + self, + data: Dict[str, Any], + file_path: str, + results: List[Dict], + visited: Set[str], ): """Recursively validate a STAC object and its children.""" import json @@ -523,6 +532,10 @@ def _validate_recursive( else: child_path = os.path.normpath(os.path.join(base_dir, href)) + if child_path in visited: + continue + visited.add(child_path) + # Load and validate child try: if child_path.startswith("http"): @@ -558,7 +571,10 @@ def _validate_recursive( ) # Recursively validate the full collection self._validate_recursive( - collection_data, collection_url, results + collection_data, + collection_url, + results, + visited, ) except Exception as e: results.append( @@ -570,21 +586,30 @@ def _validate_recursive( ) else: # Not a collections list, validate as normal - self._validate_recursive(child_data, child_path, results) + self._validate_recursive( + child_data, child_path, results, visited + ) # If this is an items endpoint (GeoJSON FeatureCollection), extract individual items elif rel == "items" and is_remote and isinstance(child_data, dict): features = child_data.get("features", []) if features: # This is an items list - process each item for feature in features: - # Recursively validate each item - self._validate_recursive(feature, child_path, results) + item_id = feature.get("id", "unknown") + item_path = f"{child_path}#{item_id}" + self._validate_recursive( + feature, item_path, results, visited + ) else: # Not an items list, validate as normal - self._validate_recursive(child_data, child_path, results) + self._validate_recursive( + child_data, child_path, results, visited + ) else: # Recursively validate child - self._validate_recursive(child_data, child_path, results) + self._validate_recursive( + child_data, child_path, results, visited + ) except Exception as e: results.append( { From 75e9efae5970f0f9b947e4fc252a34389f22e3c1 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Mon, 11 May 2026 15:08:26 +0800 Subject: [PATCH 3/6] add --api option to fast --- stac_validator/fast_validator.py | 170 +++++++++++++++++++++++++++---- stac_validator/stac_validator.py | 12 ++- 2 files changed, 161 insertions(+), 21 deletions(-) diff --git a/stac_validator/fast_validator.py b/stac_validator/fast_validator.py index e084765..d041ab3 100644 --- a/stac_validator/fast_validator.py +++ b/stac_validator/fast_validator.py @@ -1,9 +1,11 @@ +import io import json import os import sys import time import urllib.error import urllib.request +from contextlib import redirect_stderr, redirect_stdout from typing import Any, Dict, List, Set import click @@ -360,7 +362,7 @@ def run(self): click.echo("\n") def run_recursive(self): - """Recursively validate a STAC catalog/collection and all its children.""" + """Recursively validate a local STAC catalog/collection and all its children.""" import json sys.setrecursionlimit(10000) @@ -387,7 +389,7 @@ def run_recursive(self): results = [] visited = set() visited.add(root_path) - self._validate_recursive(root_data, root_path, results, visited) + self._validate_recursive(root_data, root_path, results, visited, is_api=False) # Display results click.echo("\n" + "=" * 55) @@ -437,16 +439,128 @@ def run_recursive(self): self.valid = invalid_count == 0 self.message = results + def run_api(self): + """Recursively validate a STAC API catalog and all its collections/items.""" + import json + + sys.setrecursionlimit(10000) + + if not self.quiet: + click.secho("šŸš€ Starting STAC API validation...", fg="blue", bold=True) + + # Load the root STAC API object + try: + if self.stac_file.startswith("http"): + req = urllib.request.Request( + self.stac_file, headers={"User-Agent": "stac-fast-cli/5.0"} + ) + with urllib.request.urlopen(req) as response: + root_data = json.loads(response.read().decode("utf-8")) + root_path = self.stac_file + else: + with open(self.stac_file, "r") as f: + root_data = json.load(f) + root_path = os.path.abspath(self.stac_file) + except Exception as e: + click.secho(f"āŒ Error reading {self.stac_file}: {e}", fg="red", bold=True) + self.valid = False + return + + # Recursively validate the root and all children (API mode) + results = [] + visited = set() + visited.add(root_path) + # Add a counter for progress tracking + self._progress_count = 0 + self._validate_recursive(root_data, root_path, results, visited, is_api=True) + + # Display results + click.echo("\n" + "=" * 55) + click.secho("šŸ“Š STAC API VALIDATION SUMMARY", bold=True, fg="blue") + click.echo("=" * 55) + + valid_count = sum(1 for r in results if r["valid_stac"]) + invalid_count = len(results) - valid_count + + click.echo(f"Total Objects Validated: {len(results)}") + click.echo(f"Valid Objects: {valid_count}") + click.echo(f"Invalid Objects: {invalid_count}") + + if invalid_count > 0: + click.echo("\n" + "=" * 55) + click.secho("🚨 INVALID OBJECTS", bold=True, fg="red") + click.echo("=" * 55) + + # Group errors by message + error_groups = {} + for result in results: + if not result["valid_stac"]: + error_msg = result.get("error_message", "Unknown error") + if error_msg not in error_groups: + error_groups[error_msg] = [] + # Store both path and ID for better identification + object_id = result.get("id", "unknown") + error_groups[error_msg].append( + {"path": result["path"], "id": object_id} + ) + + # Display grouped errors + for error_msg, items in error_groups.items(): + click.echo(f"\nāŒ {error_msg}") + click.echo(f" Affected Objects: {len(items)}") + # Show first 5 examples + for item in items[:5]: + item_id = item["id"] if item["id"] != "unknown" else "" + if item_id: + click.echo(f" - {item['path']} (ID: {item_id})") + else: + click.echo(f" - {item['path']}") + if len(items) > 5: + click.echo(f" ... and {len(items) - 5} more") + + # Set overall validity + self.valid = invalid_count == 0 + self.message = results + def _validate_recursive( self, data: Dict[str, Any], file_path: str, results: List[Dict], visited: Set[str], + is_api: bool = False, + collection_id: str = None, ): - """Recursively validate a STAC object and its children.""" + """Recursively validate a STAC object and its children. + + Args: + data: The STAC object to validate + file_path: Path or URL to the object + results: List to accumulate validation results + visited: Set of already-visited paths to prevent circular references + is_api: If True, follow API-specific links (data, items, next); if False, follow catalog links (child, item) + collection_id: Optional collection ID for items from FeatureCollections + """ import json + # Log progress in API mode + if is_api and not self.quiet: + self._progress_count += 1 + object_id = data.get("id", "unknown") + object_type = data.get("type", "unknown") + if collection_id and object_type == "Feature": + click.secho( + f" [{self._progress_count}] Validating {object_type}: {object_id} (Collection: {collection_id})", + fg="cyan", + dim=True, + ) + else: + click.secho( + f" [{self._progress_count}] Validating {object_type}: {object_id}", + fg="cyan", + dim=True, + ) + # Determine STAC type - could be "Catalog", "Collection", or "Feature" (Item) raw_type = data.get("type", "unknown") if raw_type == "Feature": @@ -471,8 +585,12 @@ def _validate_recursive( else: try: extensions = data.get("stac_extensions", []) - validator, _ = get_validator(stac_type, stac_version, extensions) - validator(data) + + # Mute noisy "[Fallback]" and "[Network]" prints from validator setup + with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()): + validator, _ = get_validator(stac_type, stac_version, extensions) + validator(data) + is_valid = True error_msg = None except fastjsonschema.JsonSchemaValueException as e: @@ -506,22 +624,18 @@ def _validate_recursive( ) links = data.get("links", []) - # For remote URLs, look for "data" links (collections endpoint) or "child"/"item" links - # For local files, look for "child" and "item" links - is_remote = file_path.startswith("http") - for link in links: rel = link.get("rel", "") href = link.get("href", "") - # Determine if we should follow this link + # Determine if we should follow this link based on mode should_follow = False - if is_remote: - # For remote catalogs, follow "data" (collections), "child", "item", and "items" links + if is_api: + # API mode: follow "data" (collections), "child", "item", and "items" links if rel in ["data", "child", "item", "items"] and href: should_follow = True else: - # For local catalogs, follow "child" and "item" links + # Local mode: follow "child" and "item" links only if rel in ["child", "item"] and href: should_follow = True @@ -549,7 +663,7 @@ def _validate_recursive( child_data = json.load(f) # If this is a collections list endpoint, extract individual collections - if rel == "data" and is_remote and isinstance(child_data, dict): + if rel == "data" and is_api and isinstance(child_data, dict): collections = child_data.get("collections", []) if collections: # This is a collections list - process each collection @@ -575,6 +689,7 @@ def _validate_recursive( collection_url, results, visited, + is_api, ) except Exception as e: results.append( @@ -587,28 +702,45 @@ def _validate_recursive( else: # Not a collections list, validate as normal self._validate_recursive( - child_data, child_path, results, visited + child_data, child_path, results, visited, is_api ) # If this is an items endpoint (GeoJSON FeatureCollection), extract individual items - elif rel == "items" and is_remote and isinstance(child_data, dict): + elif rel == "items" and is_api and isinstance(child_data, dict): features = child_data.get("features", []) if features: + # Extract collection ID from URL (e.g., /collections/{id}/items) + collection_id = None + if "/collections/" in child_path: + parts = child_path.split("/collections/") + if len(parts) > 1: + collection_parts = parts[1].split("/items") + collection_id = ( + collection_parts[0] + if collection_parts + else None + ) + # This is an items list - process each item for feature in features: item_id = feature.get("id", "unknown") item_path = f"{child_path}#{item_id}" self._validate_recursive( - feature, item_path, results, visited + feature, + item_path, + results, + visited, + is_api, + collection_id, ) else: # Not an items list, validate as normal self._validate_recursive( - child_data, child_path, results, visited + child_data, child_path, results, visited, is_api ) else: # Recursively validate child self._validate_recursive( - child_data, child_path, results, visited + child_data, child_path, results, visited, is_api ) except Exception as e: results.append( diff --git a/stac_validator/stac_validator.py b/stac_validator/stac_validator.py index 36b66a3..bc6b564 100644 --- a/stac_validator/stac_validator.py +++ b/stac_validator/stac_validator.py @@ -545,11 +545,19 @@ def batch( is_flag=True, help="Recursively validate all child catalogs, collections, and items.", ) -def fast(stac_file: str, quiet: bool, verbose: bool, recursive: bool): +@click.option( + "--api", + "-a", + is_flag=True, + help="Validate a STAC API catalog recursively (follows data, child, item, and items links).", +) +def fast(stac_file: str, quiet: bool, verbose: bool, recursive: bool, api: bool): """High-speed validation using fastjsonschema and local caching.""" try: fv = FastValidator(stac_file, quiet=quiet, verbose=verbose) - if recursive: + if api: + fv.run_api() + elif recursive: fv.run_recursive() else: fv.run() From 10cb6ea5ee0b3c7ffdb1a1a9ac1deefd4f0ed4e4 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Mon, 11 May 2026 23:42:00 +0800 Subject: [PATCH 4/6] add --limit param --- CHANGELOG.md | 9 + README.md | 23 +- pyproject.toml | 6 +- stac_validator/fast_validator.py | 451 ++++++++++++++++++------ stac_validator/stac_validator.py | 25 +- tests/test_fast_validator.py | 587 +++++++++++++++++++++++++++++++ tests/test_sys_exit.py | 16 + 7 files changed, 999 insertions(+), 118 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08c036c..26785fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ The format is (loosely) based on [Keep a Changelog](http://keepachangelog.com/) ### Added +- support for --limit option for `fast` command to cap the number of STAC objects validated + ### Changed ### Fixed @@ -16,6 +18,13 @@ The format is (loosely) based on [Keep a Changelog](http://keepachangelog.com/) ### Updated +## [v4.4.0] - 2026-05-11 + +### Added + +- support for --recursive option for `fast` command to validate static STAC catalogs +- support for --api option for `fast` command to validate STAC API endpoints + ## [v4.3.0] - 2026-05-08 ### Added diff --git a/README.md b/README.md index 1aff4ca..bf16614 100644 --- a/README.md +++ b/README.md @@ -311,6 +311,12 @@ Options: -q, --quiet Suppress individual item logs. -v, --verbose Show full validation logs for all items. By default, only invalid items are shown. + -r, --recursive Recursively validate all child catalogs, collections, + and items. + -a, --api Validate a STAC API catalog recursively (follows data, + child, item, and items links). + --limit INTEGER RANGE Limit number of STAC objects to validate. + [x>=1] --help Show this message and exit. ``` @@ -513,6 +519,8 @@ The `fast` command provides ultra-high-speed validation using `fastjsonschema` w - **Multi-tier caching:** RAM → Disk → Network with automatic fallback - **Local schema storage:** Schemas cached locally under `local_schemas/.schemas` directory for instant reuse - **Automatic detection:** Detects STAC type (Item, Collection, Catalog, FeatureCollection) automatically +- **Recursive traversal:** Supports `--recursive` for local catalog/collection graphs +- **STAC API traversal:** Supports `--api` to follow STAC API data, child, item, and items links - **Detailed metrics:** Shows setup time, execution time, and cache hit status for each item - **Error grouping:** Groups validation errors by type and shows affected items @@ -541,8 +549,17 @@ $ stac-validator fast item.json --quiet # Show detailed output for all items (default shows first 5) $ stac-validator fast collection.json --verbose +# Validate only first 25 objects in a large FeatureCollection +$ stac-validator fast collection.json --limit 25 + +# Recursively validate a local catalog graph +$ stac-validator fast catalog.json --recursive + +# Recursively validate a STAC API root endpoint +$ stac-validator fast https://api.example.com --api + # Combine options -$ stac-validator fast collection.json --verbose --quiet # Quiet takes precedence +$ stac-validator fast collection.json --verbose --limit 50 ``` **Example Output** @@ -868,6 +885,10 @@ import json fv = FastValidator("large_collection.json", quiet=True) fv.run() +# Optionally cap validation to the first N objects +fv_limited = FastValidator("large_collection.json", quiet=True, limit=100) +fv_limited.run() + # Access validation results via the message attribute print(json.dumps(fv.message, indent=2)) diff --git a/pyproject.toml b/pyproject.toml index 7aac5b2..128a49f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,11 +4,11 @@ build-backend = "setuptools.build_meta" [project] name = "stac_valid" -version = "4.3.0" +version = "4.4.0" description = "A package to validate STAC files" authors = [ - {name = "James Banting"}, - {name = "Jonathan Healy", email = "jon@healy-hyperspatial.dev"} + {name = "Jonathan Healy", email = "jon@healy-hyperspatial.dev"}, + {name = "James Banting"} ] maintainers = [ {name = "Jonathan Healy", email = "jon@healy-hyperspatial.dev"}, diff --git a/stac_validator/fast_validator.py b/stac_validator/fast_validator.py index d041ab3..52a86c9 100644 --- a/stac_validator/fast_validator.py +++ b/stac_validator/fast_validator.py @@ -3,13 +3,17 @@ import os import sys import time -import urllib.error -import urllib.request +from concurrent.futures import Future, ThreadPoolExecutor from contextlib import redirect_stderr, redirect_stdout -from typing import Any, Dict, List, Set +from typing import Any, Dict, List, Optional, Set, Tuple import click import fastjsonschema # type: ignore +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from .utilities import validate_with_ref_resolver # --- Caches & Config --- SCHEMA_CACHE: Dict[str, Any] = {} @@ -22,6 +26,22 @@ ".schemas", ) +# Shared HTTP session with keep-alive connection pooling and retries for crawler workloads. +HTTP_SESSION = requests.Session() +_http_retries = Retry( + total=3, + backoff_factor=0.2, + status_forcelist=[500, 502, 503, 504], +) +_http_adapter = HTTPAdapter( + pool_connections=100, + pool_maxsize=100, + max_retries=_http_retries, +) +HTTP_SESSION.mount("http://", _http_adapter) +HTTP_SESSION.mount("https://", _http_adapter) +HTTP_SESSION.headers.update({"User-Agent": "stac-fast-cli/5.0"}) + def get_local_path_for_uri(uri: str) -> str: """Creates a safe local filepath for a cached schema URL.""" @@ -51,11 +71,11 @@ def fetch_schema(uri: str) -> Dict[str, Any]: # 3. Network Fetch if not QUIET_MODE: click.secho(f" [Network] Fetching: {uri}", fg="yellow", dim=True) - req = urllib.request.Request(uri, headers={"User-Agent": "stac-fast-cli/5.0"}) try: - with urllib.request.urlopen(req) as response: - schema_dict = json.loads(response.read().decode("utf-8")) - except urllib.error.URLError as e: + response = HTTP_SESSION.get(uri, timeout=10) + response.raise_for_status() + schema_dict = response.json() + except requests.RequestException as e: raise RuntimeError(f"Could not resolve schema: {uri}. Reason: {e}") # 4. Save to Disk Cache @@ -129,30 +149,153 @@ def fallback_validator(data: Dict[str, Any]) -> None: class FastValidator: - def __init__(self, stac_file: str, quiet: bool = False, verbose: bool = False): + def __init__( + self, + stac_file: str, + quiet: bool = False, + verbose: bool = False, + limit: Optional[int] = None, + ): global QUIET_MODE self.stac_file = stac_file self.quiet = quiet self.valid = True self.verbose = verbose + self.limit = limit self.message: List[Dict[str, Any]] = [] QUIET_MODE = quiet + def _limit_reached(self, results: List[Dict]) -> bool: + return self.limit is not None and len(results) >= self.limit + + def _get_base_schema_uri(self, stac_type: str, stac_version: str) -> str: + stac_type_lower = stac_type.lower() + if stac_type_lower in ["item", "feature"]: + return f"https://schemas.stacspec.org/v{stac_version}/item-spec/json-schema/item.json" + if stac_type_lower == "collection": + return f"https://schemas.stacspec.org/v{stac_version}/collection-spec/json-schema/collection.json" + if stac_type_lower == "catalog": + return f"https://schemas.stacspec.org/v{stac_version}/catalog-spec/json-schema/catalog.json" + raise ValueError(f"Unknown STAC type for validation: {stac_type}") + + def _is_ref_resolution_error(self, err: Exception) -> bool: + err_text = str(err) + err_type = err.__class__.__name__ + return ( + "Unresolvable JSON pointer" in err_text + or "RefResolutionError" in err_type + or "Unresolvable" in err_type + ) + + def _validate_with_jsonschema_fallback( + self, + item: Dict[str, Any], + stac_type: str, + stac_version: str, + extensions: List[str], + ) -> None: + """Fallback validation path using the main jsonschema resolver utility.""" + base_schema = self._get_base_schema_uri(stac_type, stac_version) + validate_with_ref_resolver(base_schema, item) + for ext_schema in extensions: + validate_with_ref_resolver(ext_schema, item) + + def _load_json_resource(self, resource_path: str) -> Dict[str, Any]: + if resource_path.startswith("http"): + response = HTTP_SESSION.get(resource_path, timeout=15) + response.raise_for_status() + return response.json() + + with open(resource_path, "r") as f: + return json.load(f) + + def _get_parallel_fetch_workers(self, item_count: int) -> int: + return max(1, min(8, item_count)) + + def _load_collection_documents( + self, collection_urls: List[str] + ) -> List[tuple[str, Optional[Dict[str, Any]], Optional[Exception]]]: + if len(collection_urls) <= 1: + results = [] + for collection_url in collection_urls: + try: + results.append( + (collection_url, self._load_json_resource(collection_url), None) + ) + except Exception as exc: + results.append((collection_url, None, exc)) + return results + + with ThreadPoolExecutor( + max_workers=self._get_parallel_fetch_workers(len(collection_urls)) + ) as executor: + futures: List[Future[Dict[str, Any]]] = [ + executor.submit(self._load_json_resource, collection_url) + for collection_url in collection_urls + ] + + results = [] + for collection_url, future in zip(collection_urls, futures): + try: + results.append((collection_url, future.result(), None)) + except Exception as exc: + results.append((collection_url, None, exc)) + + return results + + def _prefetch_api_collection_resources( + self, collection_url: str + ) -> Tuple[str, Optional[Dict[str, Dict[str, Any]]], Optional[Exception]]: + try: + collection_data = self._load_json_resource(collection_url) + except Exception as exc: + return collection_url, None, exc + + resources = {collection_url: collection_data} + base_dir = collection_url.rsplit("/", 1)[0] + + for link in collection_data.get("links", []): + if link.get("rel") != "items": + continue + + href = link.get("href", "") + if not href: + continue + + if href.startswith("http"): + items_path = href + else: + items_path = os.path.normpath(os.path.join(base_dir, href)) + + try: + resources[items_path] = self._load_json_resource(items_path) + except Exception: + pass + + return collection_url, resources, None + + def _prefetch_api_collection_resources_batch( + self, collection_urls: List[str] + ) -> List[Tuple[str, Optional[Dict[str, Dict[str, Any]]], Optional[Exception]]]: + if len(collection_urls) <= 1: + return [self._prefetch_api_collection_resources(collection_url) for collection_url in collection_urls] + + with ThreadPoolExecutor( + max_workers=self._get_parallel_fetch_workers(len(collection_urls)) + ) as executor: + futures: List[Future[Tuple[str, Optional[Dict[str, Dict[str, Any]]], Optional[Exception]]]] = [ + executor.submit(self._prefetch_api_collection_resources, collection_url) + for collection_url in collection_urls + ] + return [future.result() for future in futures] + def run(self): """Universal high-speed STAC Validator (Items, Collections, Catalogs, FeatureCollections)""" if not self.quiet: click.secho(f"\nšŸ“‚ Loading: {self.stac_file}", fg="blue", bold=True) try: - if self.stac_file.startswith("http"): - req = urllib.request.Request( - self.stac_file, headers={"User-Agent": "stac-fast-cli/5.0"} - ) - with urllib.request.urlopen(req) as response: - data = json.loads(response.read().decode("utf-8")) - else: - with open(self.stac_file, "r") as f: - data = json.load(f) + data = self._load_json_resource(self.stac_file) except Exception as e: click.secho(f"āŒ Error reading {self.stac_file}: {e}", fg="red", bold=True) self.valid = False @@ -198,6 +341,15 @@ def run(self): return # --- Metrics --- + available_objects = len(items_to_validate) + if self.limit is not None: + items_to_validate = items_to_validate[: self.limit] + if not self.quiet and available_objects > self.limit: + click.secho( + f"šŸ”¢ Limiting validation to first {self.limit} objects (out of {available_objects}).", + fg="yellow", + ) + total_setup_ms = 0.0 total_exec_ms = 0.0 valid_count = 0 @@ -221,14 +373,9 @@ def run(self): ) # Build schema URI for this object type - stac_type_lower = actual_type.lower() - if stac_type_lower in ["item", "feature"]: - base_schema = f"https://schemas.stacspec.org/v{stac_version}/item-spec/json-schema/item.json" - elif stac_type_lower == "collection": - base_schema = f"https://schemas.stacspec.org/v{stac_version}/collection-spec/json-schema/collection.json" - elif stac_type_lower == "catalog": - base_schema = f"https://schemas.stacspec.org/v{stac_version}/catalog-spec/json-schema/catalog.json" - else: + try: + base_schema = self._get_base_schema_uri(actual_type, stac_version) + except ValueError: base_schema = "" if base_schema: @@ -287,6 +434,38 @@ def run(self): error_registry[error_msg].append(item_id) status_text = click.style("āŒ INVALID", fg="red") + except Exception as e: + t3 = time.perf_counter() + exec_time = (t3 - t2) * 1000 + total_exec_ms += exec_time + + if self._is_ref_resolution_error(e): + try: + self._validate_with_jsonschema_fallback( + item, + actual_type, + stac_version, + extensions, + ) + valid_count += 1 + status_text = click.style("āœ… VALID", fg="green") + except Exception as fallback_err: + invalid_count += 1 + self.valid = False + error_msg = str(fallback_err) + if error_msg not in error_registry: + error_registry[error_msg] = [] + error_registry[error_msg].append(item_id) + status_text = click.style("āŒ INVALID", fg="red") + else: + invalid_count += 1 + self.valid = False + error_msg = str(e) + if error_msg not in error_registry: + error_registry[error_msg] = [] + error_registry[error_msg].append(item_id) + status_text = click.style("āŒ INVALID", fg="red") + if not self.quiet: if self.verbose or index < 5 or (len(items_to_validate) < 20): cache_icon = "⚔" if is_cached else "🐌" @@ -363,23 +542,17 @@ def run(self): def run_recursive(self): """Recursively validate a local STAC catalog/collection and all its children.""" - import json - sys.setrecursionlimit(10000) + start_time = time.perf_counter() # Load the root STAC object try: - if self.stac_file.startswith("http"): - req = urllib.request.Request( - self.stac_file, headers={"User-Agent": "stac-fast-cli/5.0"} - ) - with urllib.request.urlopen(req) as response: - root_data = json.loads(response.read().decode("utf-8")) - root_path = self.stac_file - else: - with open(self.stac_file, "r") as f: - root_data = json.load(f) - root_path = os.path.abspath(self.stac_file) + root_data = self._load_json_resource(self.stac_file) + root_path = ( + self.stac_file + if self.stac_file.startswith("http") + else os.path.abspath(self.stac_file) + ) except Exception as e: click.secho(f"āŒ Error reading {self.stac_file}: {e}", fg="red", bold=True) self.valid = False @@ -391,6 +564,12 @@ def run_recursive(self): visited.add(root_path) self._validate_recursive(root_data, root_path, results, visited, is_api=False) + if self.limit is not None and not self.quiet and len(results) >= self.limit: + click.secho( + f"šŸ”¢ Validation limit reached ({self.limit} objects).", + fg="yellow", + ) + # Display results click.echo("\n" + "=" * 55) click.secho("šŸ“Š RECURSIVE VALIDATION SUMMARY", bold=True, fg="blue") @@ -398,10 +577,12 @@ def run_recursive(self): valid_count = sum(1 for r in results if r["valid_stac"]) invalid_count = len(results) - valid_count + elapsed_ms = (time.perf_counter() - start_time) * 1000 click.echo(f"Total Objects Validated: {len(results)}") click.echo(f"Valid Objects: {valid_count}") click.echo(f"Invalid Objects: {invalid_count}") + click.echo(f"Execution Time: {elapsed_ms:.2f} ms") if invalid_count > 0: click.echo("\n" + "=" * 55) @@ -436,31 +617,26 @@ def run_recursive(self): click.echo(f" ... and {len(items) - 5} more") # Set overall validity - self.valid = invalid_count == 0 + self.valid = all(r.get("valid_stac", False) for r in results) self.message = results def run_api(self): """Recursively validate a STAC API catalog and all its collections/items.""" - import json - sys.setrecursionlimit(10000) + start_time = time.perf_counter() if not self.quiet: click.secho("šŸš€ Starting STAC API validation...", fg="blue", bold=True) + click.secho("ā³ Fetching API root and discovery links...", fg="cyan", dim=True) # Load the root STAC API object try: - if self.stac_file.startswith("http"): - req = urllib.request.Request( - self.stac_file, headers={"User-Agent": "stac-fast-cli/5.0"} - ) - with urllib.request.urlopen(req) as response: - root_data = json.loads(response.read().decode("utf-8")) - root_path = self.stac_file - else: - with open(self.stac_file, "r") as f: - root_data = json.load(f) - root_path = os.path.abspath(self.stac_file) + root_data = self._load_json_resource(self.stac_file) + root_path = ( + self.stac_file + if self.stac_file.startswith("http") + else os.path.abspath(self.stac_file) + ) except Exception as e: click.secho(f"āŒ Error reading {self.stac_file}: {e}", fg="red", bold=True) self.valid = False @@ -470,10 +646,23 @@ def run_api(self): results = [] visited = set() visited.add(root_path) - # Add a counter for progress tracking self._progress_count = 0 + + if not self.quiet: + click.secho( + "🧠 Compiling/warming schemas (first objects may be slower)...", + fg="cyan", + dim=True, + ) + self._validate_recursive(root_data, root_path, results, visited, is_api=True) + if self.limit is not None and not self.quiet and len(results) >= self.limit: + click.secho( + f"šŸ”¢ Validation limit reached ({self.limit} objects).", + fg="yellow", + ) + # Display results click.echo("\n" + "=" * 55) click.secho("šŸ“Š STAC API VALIDATION SUMMARY", bold=True, fg="blue") @@ -481,10 +670,12 @@ def run_api(self): valid_count = sum(1 for r in results if r["valid_stac"]) invalid_count = len(results) - valid_count + elapsed_ms = (time.perf_counter() - start_time) * 1000 click.echo(f"Total Objects Validated: {len(results)}") click.echo(f"Valid Objects: {valid_count}") click.echo(f"Invalid Objects: {invalid_count}") + click.echo(f"Execution Time: {elapsed_ms:.2f} ms") if invalid_count > 0: click.echo("\n" + "=" * 55) @@ -519,7 +710,7 @@ def run_api(self): click.echo(f" ... and {len(items) - 5} more") # Set overall validity - self.valid = invalid_count == 0 + self.valid = all(r.get("valid_stac", False) for r in results) self.message = results def _validate_recursive( @@ -530,6 +721,7 @@ def _validate_recursive( visited: Set[str], is_api: bool = False, collection_id: str = None, + prefetched_resources: Optional[Dict[str, Dict[str, Any]]] = None, ): """Recursively validate a STAC object and its children. @@ -543,6 +735,9 @@ def _validate_recursive( """ import json + if self._limit_reached(results): + return + # Log progress in API mode if is_api and not self.quiet: self._progress_count += 1 @@ -586,7 +781,7 @@ def _validate_recursive( try: extensions = data.get("stac_extensions", []) - # Mute noisy "[Fallback]" and "[Network]" prints from validator setup + # Mute noisy "[Fallback]" and "[Network]" prints from validation execution path with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()): validator, _ = get_validator(stac_type, stac_version, extensions) validator(data) @@ -597,8 +792,22 @@ def _validate_recursive( is_valid = False error_msg = f"{e.name} {e.message.replace(e.name, '').strip()}" except Exception as e: - is_valid = False - error_msg = str(e) + if self._is_ref_resolution_error(e): + try: + self._validate_with_jsonschema_fallback( + data, + stac_type, + stac_version, + extensions, + ) + is_valid = True + error_msg = None + except Exception as fallback_err: + is_valid = False + error_msg = str(fallback_err) + else: + is_valid = False + error_msg = str(e) # Create result for this object # Extract ID if available @@ -616,6 +825,9 @@ def _validate_recursive( results.append(result) + if self._limit_reached(results): + return + # Process child links base_dir = ( os.path.dirname(file_path) @@ -625,6 +837,9 @@ def _validate_recursive( links = data.get("links", []) for link in links: + if self._limit_reached(results): + break + rel = link.get("rel", "") href = link.get("href", "") @@ -652,76 +867,90 @@ def _validate_recursive( # Load and validate child try: - if child_path.startswith("http"): - req = urllib.request.Request( - child_path, headers={"User-Agent": "stac-fast-cli/5.0"} - ) - with urllib.request.urlopen(req) as response: - child_data = json.loads(response.read().decode("utf-8")) + if prefetched_resources and child_path in prefetched_resources: + child_data = prefetched_resources[child_path] else: - with open(child_path, "r") as f: - child_data = json.load(f) + if is_api and not self.quiet and rel in ["data", "items"]: + label = "collections" if rel == "data" else "items" + click.secho( + f" Discovering {label}: {child_path}", + fg="cyan", + dim=True, + ) + child_data = self._load_json_resource(child_path) # If this is a collections list endpoint, extract individual collections if rel == "data" and is_api and isinstance(child_data, dict): collections = child_data.get("collections", []) if collections: # This is a collections list - process each collection + collection_urls = [] for collection in collections: collection_id = collection.get("id") if collection_id: - # Build URL to individual collection - collection_url = ( + collection_urls.append( f"{child_path.rstrip('/')}/{collection_id}" ) - try: - req = urllib.request.Request( - collection_url, - headers={"User-Agent": "stac-fast-cli/5.0"}, - ) - with urllib.request.urlopen(req) as response: - collection_data = json.loads( - response.read().decode("utf-8") - ) - # Recursively validate the full collection - self._validate_recursive( - collection_data, - collection_url, - results, - visited, - is_api, - ) - except Exception as e: - results.append( - { - "path": collection_url, - "valid_stac": False, - "error_message": f"Failed to load: {str(e)}", - } - ) + + # Avoid prefetching beyond remaining validation capacity. + if self.limit is not None: + remaining = max(1, self.limit - len(results)) + collection_urls = collection_urls[:remaining] + + for collection_url, prefetched_collection_resources, load_error in self._prefetch_api_collection_resources_batch( + collection_urls + ): + if self._limit_reached(results): + break + + if load_error is not None: + results.append( + { + "path": collection_url, + "valid_stac": False, + "error_message": f"Failed to load: {str(load_error)}", + } + ) + continue + + visited.add(collection_url) + collection_data = prefetched_collection_resources[ + collection_url + ] + + self._validate_recursive( + collection_data, + collection_url, + results, + visited, + is_api, + prefetched_resources=prefetched_collection_resources, + ) else: # Not a collections list, validate as normal self._validate_recursive( child_data, child_path, results, visited, is_api ) - # If this is an items endpoint (GeoJSON FeatureCollection), extract individual items + # If this is an items endpoint (GeoJSON FeatureCollection), validate only Features elif rel == "items" and is_api and isinstance(child_data, dict): - features = child_data.get("features", []) - if features: - # Extract collection ID from URL (e.g., /collections/{id}/items) - collection_id = None - if "/collections/" in child_path: - parts = child_path.split("/collections/") - if len(parts) > 1: - collection_parts = parts[1].split("/items") - collection_id = ( - collection_parts[0] - if collection_parts - else None - ) + features = child_data.get("features") + + # Extract collection ID from URL (e.g., /collections/{id}/items) + collection_id = None + if "/collections/" in child_path: + parts = child_path.split("/collections/") + if len(parts) > 1: + collection_parts = parts[1].split("/items") + collection_id = ( + collection_parts[0] if collection_parts else None + ) - # This is an items list - process each item + # Validate each feature item from the items page, not the FeatureCollection container. + if isinstance(features, list): for feature in features: + if self._limit_reached(results): + break + item_id = feature.get("id", "unknown") item_path = f"{child_path}#{item_id}" self._validate_recursive( @@ -732,17 +961,15 @@ def _validate_recursive( is_api, collection_id, ) - else: - # Not an items list, validate as normal - self._validate_recursive( - child_data, child_path, results, visited, is_api - ) else: # Recursively validate child self._validate_recursive( child_data, child_path, results, visited, is_api ) except Exception as e: + if self._limit_reached(results): + break + results.append( { "path": child_path, diff --git a/stac_validator/stac_validator.py b/stac_validator/stac_validator.py index bc6b564..7457ccd 100644 --- a/stac_validator/stac_validator.py +++ b/stac_validator/stac_validator.py @@ -551,10 +551,31 @@ def batch( is_flag=True, help="Validate a STAC API catalog recursively (follows data, child, item, and items links).", ) -def fast(stac_file: str, quiet: bool, verbose: bool, recursive: bool, api: bool): +@click.option( + "--limit", + type=click.IntRange(min=1), + default=None, + help="Limit number of STAC objects to validate.", +) +def fast( + stac_file: str, + quiet: bool, + verbose: bool, + recursive: bool, + api: bool, + limit: Optional[int], +): """High-speed validation using fastjsonschema and local caching.""" + if api and not stac_file.startswith(("http://", "https://")): + click.secho( + "āŒ Invalid STAC API URL. Include 'http://' or 'https://' (example: https://example.com/stac).", + fg="red", + bold=True, + ) + sys.exit(1) + try: - fv = FastValidator(stac_file, quiet=quiet, verbose=verbose) + fv = FastValidator(stac_file, quiet=quiet, verbose=verbose, limit=limit) if api: fv.run_api() elif recursive: diff --git a/tests/test_fast_validator.py b/tests/test_fast_validator.py index 919e8d1..92aa1c1 100644 --- a/tests/test_fast_validator.py +++ b/tests/test_fast_validator.py @@ -210,6 +210,593 @@ def test_non_verbose_mode(self, tmp_path, capsys): assert "[1]" in captured.out assert "silencing output" in captured.out + def test_limit_reduces_validated_objects(self, tmp_path): + """Test limit option validates only the first N objects.""" + fc_path = tmp_path / "limited_fc.json" + fc_data = { + "type": "FeatureCollection", + "features": [ + { + "stac_version": "1.0.0", + "type": "Feature", + "id": f"item-{i}", + "geometry": None, + "properties": {"datetime": "2023-01-01T00:00:00Z"}, + "links": [{"rel": "self", "href": "http://example.com"}], + "assets": {}, + } + for i in range(10) + ], + } + fc_path.write_text(json.dumps(fc_data)) + + fv = FastValidator(str(fc_path), quiet=True, limit=3) + fv.run() + + msg = fv.message[0] + assert msg["total_objects"] == 3 + assert msg["valid_objects"] == 3 + assert msg["invalid_objects"] == 0 + + def test_limit_above_total_does_not_change_count(self, valid_feature_collection): + """Test limit larger than object count validates all objects.""" + fv = FastValidator(valid_feature_collection, quiet=True, limit=20) + fv.run() + + msg = fv.message[0] + assert msg["total_objects"] == 5 + assert msg["valid_objects"] == 5 + + +class TestFastValidatorRecursiveAndApi: + """Test recursive and API traversal behavior.""" + + def test_load_collection_documents_keeps_order_and_errors(self, monkeypatch): + """Test parallel collection loading preserves URL order and captures per-URL errors.""" + + def _fake_load(self, resource_path): + if resource_path.endswith("two"): + raise RuntimeError("boom") + return {"id": resource_path.rsplit("/", 1)[-1]} + + monkeypatch.setattr(FastValidator, "_load_json_resource", _fake_load) + + fv = FastValidator("https://api.example.com", quiet=True) + loaded = fv._load_collection_documents( + [ + "https://api.example.com/one", + "https://api.example.com/two", + "https://api.example.com/three", + ] + ) + + assert [entry[0] for entry in loaded] == [ + "https://api.example.com/one", + "https://api.example.com/two", + "https://api.example.com/three", + ] + assert loaded[0][1] == {"id": "one"} + assert loaded[0][2] is None + assert loaded[1][1] is None + assert str(loaded[1][2]) == "boom" + assert loaded[2][1] == {"id": "three"} + assert loaded[2][2] is None + + def test_prefetch_api_collection_resources_batch_prefetches_items(self, monkeypatch): + """Test API collection prefetch preserves order and includes items pages.""" + + payloads = { + "https://api.example.com/one": { + "id": "one", + "links": [ + { + "rel": "items", + "href": "https://api.example.com/one/items", + } + ], + }, + "https://api.example.com/one/items": { + "type": "FeatureCollection", + "features": [], + }, + "https://api.example.com/two": {"id": "two", "links": []}, + } + + def _fake_load(self, resource_path): + return payloads[resource_path] + + monkeypatch.setattr(FastValidator, "_load_json_resource", _fake_load) + + fv = FastValidator("https://api.example.com", quiet=True) + loaded = fv._prefetch_api_collection_resources_batch( + [ + "https://api.example.com/one", + "https://api.example.com/two", + ] + ) + + assert [entry[0] for entry in loaded] == [ + "https://api.example.com/one", + "https://api.example.com/two", + ] + assert loaded[0][1]["https://api.example.com/one"]["id"] == "one" + assert ( + loaded[0][1]["https://api.example.com/one/items"]["type"] + == "FeatureCollection" + ) + assert loaded[1][1]["https://api.example.com/two"]["id"] == "two" + + def test_api_prefetch_truncates_to_remaining_limit(self, monkeypatch): + """Test API data-link prefetch list is trimmed to remaining validation capacity.""" + + def _ok_validator(data): + return None + + monkeypatch.setattr( + "stac_validator.fast_validator.get_validator", + lambda *args, **kwargs: (_ok_validator, True), + ) + + payloads = { + "https://api.example.com": { + "conformsTo": ["https://api.stacspec.org/v1.0.0/core"], + "id": "api-root", + "type": "Catalog", + "description": "api root", + "links": [{"rel": "data", "href": "https://api.example.com/collections"}], + }, + "https://api.example.com/collections": { + "collections": [ + {"id": "c1"}, + {"id": "c2"}, + {"id": "c3"}, + ] + }, + } + + def _fake_load(self, resource_path): + return payloads[resource_path] + + captured = [] + + def _fake_prefetch(self, collection_urls): + captured.extend(collection_urls) + return [] + + monkeypatch.setattr(FastValidator, "_load_json_resource", _fake_load) + monkeypatch.setattr( + FastValidator, + "_prefetch_api_collection_resources_batch", + _fake_prefetch, + ) + + fv = FastValidator("https://api.example.com", quiet=True, limit=2) + fv.run_api() + + # One slot is consumed by the root catalog, so only one collection should be prefetched. + assert captured == ["https://api.example.com/collections/c1"] + + def test_recursive_mode_respects_limit(self, tmp_path, monkeypatch): + """Test recursive validation follows links and stops at limit.""" + + def _ok_validator(data): + return None + + monkeypatch.setattr( + "stac_validator.fast_validator.get_validator", + lambda *args, **kwargs: (_ok_validator, True), + ) + + root = { + "stac_version": "1.0.0", + "type": "Catalog", + "id": "root", + "description": "root catalog", + "links": [{"rel": "child", "href": "child.json"}], + } + child = { + "stac_version": "1.0.0", + "type": "Catalog", + "id": "child", + "description": "child catalog", + "links": [ + {"rel": "item", "href": "item-1.json"}, + {"rel": "item", "href": "item-2.json"}, + ], + } + item_1 = { + "stac_version": "1.0.0", + "type": "Feature", + "id": "item-1", + "geometry": None, + "properties": {"datetime": "2023-01-01T00:00:00Z"}, + "links": [{"rel": "self", "href": "http://example.com/item-1"}], + "assets": {}, + } + item_2 = { + "stac_version": "1.0.0", + "type": "Feature", + "id": "item-2", + "geometry": None, + "properties": {"datetime": "2023-01-01T00:00:00Z"}, + "links": [{"rel": "self", "href": "http://example.com/item-2"}], + "assets": {}, + } + + root_path = tmp_path / "catalog.json" + (tmp_path / "child.json").write_text(json.dumps(child)) + (tmp_path / "item-1.json").write_text(json.dumps(item_1)) + (tmp_path / "item-2.json").write_text(json.dumps(item_2)) + root_path.write_text(json.dumps(root)) + + fv = FastValidator(str(root_path), quiet=True, limit=2) + fv.run_recursive() + + assert fv.valid is True + assert len(fv.message) == 2 + assert fv.message[0]["id"] == "root" + assert fv.message[1]["id"] == "child" + + def test_recursive_mode_summary_includes_execution_time( + self, tmp_path, monkeypatch, capsys + ): + """Test recursive mode keeps recursive summary format and includes execution time.""" + + def _ok_validator(data): + return None + + monkeypatch.setattr( + "stac_validator.fast_validator.get_validator", + lambda *args, **kwargs: (_ok_validator, True), + ) + + root = { + "stac_version": "1.0.0", + "type": "Catalog", + "id": "root", + "description": "root catalog", + "links": [{"rel": "child", "href": "child.json"}], + } + child = { + "stac_version": "1.0.0", + "type": "Catalog", + "id": "child", + "description": "child catalog", + "links": [], + } + + root_path = tmp_path / "catalog.json" + (tmp_path / "child.json").write_text(json.dumps(child)) + root_path.write_text(json.dumps(root)) + + fv = FastValidator(str(root_path), quiet=False, verbose=True) + fv.run_recursive() + + captured = capsys.readouterr() + assert "RECURSIVE VALIDATION SUMMARY" in captured.out + assert "Execution Time" in captured.out + + def test_api_mode_respects_limit(self, monkeypatch): + """Test API validation follows API links and stops at limit.""" + + def _ok_validator(data): + return None + + monkeypatch.setattr( + "stac_validator.fast_validator.get_validator", + lambda *args, **kwargs: (_ok_validator, True), + ) + + payloads = { + "https://api.example.com": { + "conformsTo": ["https://api.stacspec.org/v1.0.0/core"], + "id": "api-root", + "type": "Catalog", + "description": "api root", + "links": [{"rel": "data", "href": "https://api.example.com/collections"}], + }, + "https://api.example.com/collections": { + "collections": [{"id": "demo-collection"}], + }, + "https://api.example.com/collections/demo-collection": { + "stac_version": "1.0.0", + "type": "Collection", + "id": "demo-collection", + "description": "demo", + "license": "MIT", + "extent": { + "spatial": {"bbox": [[-180, -90, 180, 90]]}, + "temporal": {"interval": [["2023-01-01T00:00:00Z", None]]}, + }, + "links": [ + { + "rel": "items", + "href": "https://api.example.com/collections/demo-collection/items", + } + ], + }, + "https://api.example.com/collections/demo-collection/items": { + "type": "FeatureCollection", + "features": [ + { + "stac_version": "1.0.0", + "type": "Feature", + "id": "item-1", + "geometry": None, + "properties": {"datetime": "2023-01-01T00:00:00Z"}, + "links": [{"rel": "self", "href": "https://api.example.com/items/item-1"}], + "assets": {}, + } + ], + }, + } + + class _Response: + def __init__(self, data): + self._data = data + + def raise_for_status(self): + return None + + def json(self): + return self._data + + def _fake_get(url, timeout=15): + if url not in payloads: + raise RuntimeError(f"Unexpected URL: {url}") + return _Response(payloads[url]) + + monkeypatch.setattr("stac_validator.fast_validator.HTTP_SESSION.get", _fake_get) + + fv = FastValidator("https://api.example.com", quiet=True, limit=2) + fv.run_api() + + assert fv.valid is True + assert len(fv.message) == 2 + assert fv.message[0]["id"] == "api-root" + assert fv.message[1]["id"] == "demo-collection" + + def test_api_mode_summary_includes_execution_time(self, monkeypatch, capsys): + """Test API mode keeps API summary format and includes execution time.""" + + def _ok_validator(data): + return None + + monkeypatch.setattr( + "stac_validator.fast_validator.get_validator", + lambda *args, **kwargs: (_ok_validator, True), + ) + + payloads = { + "https://api.example.com": { + "conformsTo": ["https://api.stacspec.org/v1.0.0/core"], + "id": "api-root", + "type": "Catalog", + "description": "api root", + "links": [{"rel": "data", "href": "https://api.example.com/collections"}], + }, + "https://api.example.com/collections": { + "collections": [{"id": "demo-collection"}], + }, + "https://api.example.com/collections/demo-collection": { + "stac_version": "1.0.0", + "type": "Collection", + "id": "demo-collection", + "description": "demo", + "license": "MIT", + "extent": { + "spatial": {"bbox": [[-180, -90, 180, 90]]}, + "temporal": {"interval": [["2023-01-01T00:00:00Z", None]]}, + }, + "links": [], + }, + } + + class _Response: + def __init__(self, data): + self._data = data + + def raise_for_status(self): + return None + + def json(self): + return self._data + + def _fake_get(url, timeout=15): + if url not in payloads: + raise RuntimeError(f"Unexpected URL: {url}") + return _Response(payloads[url]) + + monkeypatch.setattr("stac_validator.fast_validator.HTTP_SESSION.get", _fake_get) + + fv = FastValidator("https://api.example.com", quiet=False, verbose=True) + fv.run_api() + + captured = capsys.readouterr() + assert "[1] Validating Catalog: api-root" in captured.out + assert "STAC API VALIDATION SUMMARY" in captured.out + assert "Execution Time" in captured.out + + def test_api_mode_does_not_validate_items_featurecollection(self, monkeypatch): + """Test API mode validates item features, not the /items FeatureCollection object.""" + + def _ok_validator(data): + return None + + monkeypatch.setattr( + "stac_validator.fast_validator.get_validator", + lambda *args, **kwargs: (_ok_validator, True), + ) + + payloads = { + "https://api.example.com": { + "conformsTo": ["https://api.stacspec.org/v1.0.0/core"], + "id": "api-root", + "type": "Catalog", + "description": "api root", + "links": [{"rel": "data", "href": "https://api.example.com/collections"}], + }, + "https://api.example.com/collections": { + "collections": [{"id": "demo-collection"}], + }, + "https://api.example.com/collections/demo-collection": { + "stac_version": "1.0.0", + "type": "Collection", + "id": "demo-collection", + "description": "demo", + "license": "MIT", + "extent": { + "spatial": {"bbox": [[-180, -90, 180, 90]]}, + "temporal": {"interval": [["2023-01-01T00:00:00Z", None]]}, + }, + "links": [ + { + "rel": "items", + "href": "https://api.example.com/collections/demo-collection/items", + } + ], + }, + "https://api.example.com/collections/demo-collection/items": { + "type": "FeatureCollection", + "features": [], + "links": [], + }, + } + + class _Response: + def __init__(self, data): + self._data = data + + def raise_for_status(self): + return None + + def json(self): + return self._data + + def _fake_get(url, timeout=15): + if url not in payloads: + raise RuntimeError(f"Unexpected URL: {url}") + return _Response(payloads[url]) + + monkeypatch.setattr("stac_validator.fast_validator.HTTP_SESSION.get", _fake_get) + + fv = FastValidator("https://api.example.com", quiet=True) + fv.run_api() + + assert fv.valid is True + assert len(fv.message) == 2 + paths = {entry["path"] for entry in fv.message} + assert "https://api.example.com/collections/demo-collection/items" not in paths + + +class TestFastValidatorRefResolutionFallback: + """Test fallback behavior when fast path hits ref-resolution errors.""" + + def test_run_falls_back_to_jsonschema_on_ref_error(self, valid_item, monkeypatch): + """run() should retry via jsonschema resolver on ref-resolution errors.""" + + class FakeRefError(Exception): + pass + + def _raise_ref_error(_data): + raise FakeRefError("Unresolvable JSON pointer: 'definitions/link'") + + monkeypatch.setattr( + "stac_validator.fast_validator.get_validator", + lambda *args, **kwargs: (_raise_ref_error, True), + ) + + fallback_calls = [] + + def _fallback(schema_path, content): + fallback_calls.append(schema_path) + + monkeypatch.setattr( + "stac_validator.fast_validator.validate_with_ref_resolver", + _fallback, + ) + + fv = FastValidator(valid_item, quiet=True) + fv.run() + + assert fv.valid is True + assert len(fallback_calls) == 1 + assert "item-spec/json-schema/item.json" in fallback_calls[0] + assert fv.message[0]["invalid_objects"] == 0 + + def test_run_api_falls_back_to_jsonschema_on_ref_error(self, monkeypatch): + """run_api() should retry via jsonschema resolver on ref-resolution errors.""" + + class FakeRefError(Exception): + pass + + def _raise_ref_error(_data): + raise FakeRefError("Unresolvable JSON pointer: 'definitions/asset'") + + monkeypatch.setattr( + "stac_validator.fast_validator.get_validator", + lambda *args, **kwargs: (_raise_ref_error, True), + ) + + fallback_calls = [] + + def _fallback(schema_path, content): + fallback_calls.append(schema_path) + + monkeypatch.setattr( + "stac_validator.fast_validator.validate_with_ref_resolver", + _fallback, + ) + + payloads = { + "https://api.example.com": { + "conformsTo": ["https://api.stacspec.org/v1.0.0/core"], + "id": "api-root", + "type": "Catalog", + "description": "api root", + "links": [{"rel": "data", "href": "https://api.example.com/collections"}], + }, + "https://api.example.com/collections": { + "collections": [{"id": "demo-collection"}], + }, + "https://api.example.com/collections/demo-collection": { + "stac_version": "1.0.0", + "type": "Collection", + "id": "demo-collection", + "description": "demo", + "license": "MIT", + "extent": { + "spatial": {"bbox": [[-180, -90, 180, 90]]}, + "temporal": {"interval": [["2023-01-01T00:00:00Z", None]]}, + }, + "links": [], + }, + } + + class _Response: + def __init__(self, data): + self._data = data + + def raise_for_status(self): + return None + + def json(self): + return self._data + + def _fake_get(url, timeout=15): + if url not in payloads: + raise RuntimeError(f"Unexpected URL: {url}") + return _Response(payloads[url]) + + monkeypatch.setattr("stac_validator.fast_validator.HTTP_SESSION.get", _fake_get) + + fv = FastValidator("https://api.example.com", quiet=True) + fv.run_api() + + assert fv.valid is True + assert len(fallback_calls) == 1 + assert "collection-spec/json-schema/collection.json" in fallback_calls[0] + class TestFastValidatorDetection: """Test STAC type detection.""" diff --git a/tests/test_sys_exit.py b/tests/test_sys_exit.py index 9f1e03d..145a2c4 100644 --- a/tests/test_sys_exit.py +++ b/tests/test_sys_exit.py @@ -52,3 +52,19 @@ def test_cli_schema_cache_size_option(): ], check=True, ) + + +def test_fast_api_requires_url_scheme(): + result = subprocess.run( + [ + "stac-validator", + "fast", + "--api", + "stac.opensearch.dataspace.copernicus.eu/v1", + ], + capture_output=True, + text=True, + ) + + assert result.returncode == 1 + assert "Invalid STAC API URL" in (result.stdout + result.stderr) From c58a023e425face322c4b266c8fe02f67bc74317 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Tue, 12 May 2026 00:10:38 +0800 Subject: [PATCH 5/6] use typing --- stac_validator/fast_validator.py | 55 +++++++++++++++++++++----------- tests/test_fast_validator.py | 31 ++++++++++++++---- 2 files changed, 60 insertions(+), 26 deletions(-) diff --git a/stac_validator/fast_validator.py b/stac_validator/fast_validator.py index 52a86c9..fe82dbc 100644 --- a/stac_validator/fast_validator.py +++ b/stac_validator/fast_validator.py @@ -214,17 +214,19 @@ def _get_parallel_fetch_workers(self, item_count: int) -> int: def _load_collection_documents( self, collection_urls: List[str] - ) -> List[tuple[str, Optional[Dict[str, Any]], Optional[Exception]]]: + ) -> List[Tuple[str, Optional[Dict[str, Any]], Optional[Exception]]]: if len(collection_urls) <= 1: - results = [] + results_single: List[ + Tuple[str, Optional[Dict[str, Any]], Optional[Exception]] + ] = [] for collection_url in collection_urls: try: - results.append( + results_single.append( (collection_url, self._load_json_resource(collection_url), None) ) except Exception as exc: - results.append((collection_url, None, exc)) - return results + results_single.append((collection_url, None, exc)) + return results_single with ThreadPoolExecutor( max_workers=self._get_parallel_fetch_workers(len(collection_urls)) @@ -234,14 +236,16 @@ def _load_collection_documents( for collection_url in collection_urls ] - results = [] + results_parallel: List[ + Tuple[str, Optional[Dict[str, Any]], Optional[Exception]] + ] = [] for collection_url, future in zip(collection_urls, futures): try: - results.append((collection_url, future.result(), None)) + results_parallel.append((collection_url, future.result(), None)) except Exception as exc: - results.append((collection_url, None, exc)) + results_parallel.append((collection_url, None, exc)) - return results + return results_parallel def _prefetch_api_collection_resources( self, collection_url: str @@ -278,12 +282,19 @@ def _prefetch_api_collection_resources_batch( self, collection_urls: List[str] ) -> List[Tuple[str, Optional[Dict[str, Dict[str, Any]]], Optional[Exception]]]: if len(collection_urls) <= 1: - return [self._prefetch_api_collection_resources(collection_url) for collection_url in collection_urls] + return [ + self._prefetch_api_collection_resources(collection_url) + for collection_url in collection_urls + ] with ThreadPoolExecutor( max_workers=self._get_parallel_fetch_workers(len(collection_urls)) ) as executor: - futures: List[Future[Tuple[str, Optional[Dict[str, Dict[str, Any]]], Optional[Exception]]]] = [ + futures: List[ + Future[ + Tuple[str, Optional[Dict[str, Dict[str, Any]]], Optional[Exception]] + ] + ] = [ executor.submit(self._prefetch_api_collection_resources, collection_url) for collection_url in collection_urls ] @@ -627,7 +638,9 @@ def run_api(self): if not self.quiet: click.secho("šŸš€ Starting STAC API validation...", fg="blue", bold=True) - click.secho("ā³ Fetching API root and discovery links...", fg="cyan", dim=True) + click.secho( + "ā³ Fetching API root and discovery links...", fg="cyan", dim=True + ) # Load the root STAC API object try: @@ -720,7 +733,7 @@ def _validate_recursive( results: List[Dict], visited: Set[str], is_api: bool = False, - collection_id: str = None, + collection_id: Optional[str] = None, prefetched_resources: Optional[Dict[str, Dict[str, Any]]] = None, ): """Recursively validate a STAC object and its children. @@ -733,8 +746,6 @@ def _validate_recursive( is_api: If True, follow API-specific links (data, items, next); if False, follow catalog links (child, item) collection_id: Optional collection ID for items from FeatureCollections """ - import json - if self._limit_reached(results): return @@ -897,7 +908,11 @@ def _validate_recursive( remaining = max(1, self.limit - len(results)) collection_urls = collection_urls[:remaining] - for collection_url, prefetched_collection_resources, load_error in self._prefetch_api_collection_resources_batch( + for ( + collection_url, + prefetched_collection_resources, + load_error, + ) in self._prefetch_api_collection_resources_batch( collection_urls ): if self._limit_reached(results): @@ -914,6 +929,8 @@ def _validate_recursive( continue visited.add(collection_url) + if prefetched_collection_resources is None: + continue collection_data = prefetched_collection_resources[ collection_url ] @@ -936,12 +953,12 @@ def _validate_recursive( features = child_data.get("features") # Extract collection ID from URL (e.g., /collections/{id}/items) - collection_id = None + collection_id_from_items: Optional[str] = None if "/collections/" in child_path: parts = child_path.split("/collections/") if len(parts) > 1: collection_parts = parts[1].split("/items") - collection_id = ( + collection_id_from_items = ( collection_parts[0] if collection_parts else None ) @@ -959,7 +976,7 @@ def _validate_recursive( results, visited, is_api, - collection_id, + collection_id_from_items, ) else: # Recursively validate child diff --git a/tests/test_fast_validator.py b/tests/test_fast_validator.py index 92aa1c1..b81206a 100644 --- a/tests/test_fast_validator.py +++ b/tests/test_fast_validator.py @@ -282,7 +282,9 @@ def _fake_load(self, resource_path): assert loaded[2][1] == {"id": "three"} assert loaded[2][2] is None - def test_prefetch_api_collection_resources_batch_prefetches_items(self, monkeypatch): + def test_prefetch_api_collection_resources_batch_prefetches_items( + self, monkeypatch + ): """Test API collection prefetch preserves order and includes items pages.""" payloads = { @@ -343,7 +345,9 @@ def _ok_validator(data): "id": "api-root", "type": "Catalog", "description": "api root", - "links": [{"rel": "data", "href": "https://api.example.com/collections"}], + "links": [ + {"rel": "data", "href": "https://api.example.com/collections"} + ], }, "https://api.example.com/collections": { "collections": [ @@ -493,7 +497,9 @@ def _ok_validator(data): "id": "api-root", "type": "Catalog", "description": "api root", - "links": [{"rel": "data", "href": "https://api.example.com/collections"}], + "links": [ + {"rel": "data", "href": "https://api.example.com/collections"} + ], }, "https://api.example.com/collections": { "collections": [{"id": "demo-collection"}], @@ -524,7 +530,12 @@ def _ok_validator(data): "id": "item-1", "geometry": None, "properties": {"datetime": "2023-01-01T00:00:00Z"}, - "links": [{"rel": "self", "href": "https://api.example.com/items/item-1"}], + "links": [ + { + "rel": "self", + "href": "https://api.example.com/items/item-1", + } + ], "assets": {}, } ], @@ -573,7 +584,9 @@ def _ok_validator(data): "id": "api-root", "type": "Catalog", "description": "api root", - "links": [{"rel": "data", "href": "https://api.example.com/collections"}], + "links": [ + {"rel": "data", "href": "https://api.example.com/collections"} + ], }, "https://api.example.com/collections": { "collections": [{"id": "demo-collection"}], @@ -634,7 +647,9 @@ def _ok_validator(data): "id": "api-root", "type": "Catalog", "description": "api root", - "links": [{"rel": "data", "href": "https://api.example.com/collections"}], + "links": [ + {"rel": "data", "href": "https://api.example.com/collections"} + ], }, "https://api.example.com/collections": { "collections": [{"id": "demo-collection"}], @@ -754,7 +769,9 @@ def _fallback(schema_path, content): "id": "api-root", "type": "Catalog", "description": "api root", - "links": [{"rel": "data", "href": "https://api.example.com/collections"}], + "links": [ + {"rel": "data", "href": "https://api.example.com/collections"} + ], }, "https://api.example.com/collections": { "collections": [{"id": "demo-collection"}], From ee5c0aca188c43fd769a185be6b006bb96365196 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Tue, 12 May 2026 00:44:14 +0800 Subject: [PATCH 6/6] add run dict to fast --- CHANGELOG.md | 9 +- stac_validator/fast_validator.py | 162 +++++++++++++++++++++++++++++++ tests/test_fast_validator.py | 68 +++++++++++++ 3 files changed, 235 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 26785fe..d7be189 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,8 +8,6 @@ The format is (loosely) based on [Keep a Changelog](http://keepachangelog.com/) ### Added -- support for --limit option for `fast` command to cap the number of STAC objects validated - ### Changed ### Fixed @@ -22,8 +20,11 @@ The format is (loosely) based on [Keep a Changelog](http://keepachangelog.com/) ### Added -- support for --recursive option for `fast` command to validate static STAC catalogs -- support for --api option for `fast` command to validate STAC API endpoints +- support for --recursive option for `fast` command to validate static STAC catalogs. [#294](https://github.com/stac-utils/stac-validator/pull/294) +- support for --api option for `fast` command to validate STAC API endpoints. [#294](https://github.com/stac-utils/stac-validator/pull/294) +- support for --limit option for `fast` command to cap the number of STAC objects validated. [#294](https://github.com/stac-utils/stac-validator/pull/294) +- Added `run_dict` method to `FastValidator` for direct in-memory dictionary validation without file/network loading. [#294](https://github.com/stac-utils/stac-validator/pull/294) + ## [v4.3.0] - 2026-05-08 diff --git a/stac_validator/fast_validator.py b/stac_validator/fast_validator.py index fe82dbc..f4f3a16 100644 --- a/stac_validator/fast_validator.py +++ b/stac_validator/fast_validator.py @@ -551,6 +551,168 @@ def run(self): click.echo("\n") + def run_dict(self, stac_dict: Dict[str, Any], source_name: str = "in-memory"): + """Validate a native Python dictionary directly without file/network loading.""" + if not isinstance(stac_dict, dict): + self.valid = False + self.message = [ + { + "path": source_name, + "valid_stac": False, + "error_message": "Input to run_dict must be a dictionary.", + } + ] + return + + self.stac_file = source_name + + data = dict(stac_dict) + obj_type = data.get("type", "") + items_to_validate: List[Dict[str, Any]] = [] + + if obj_type == "FeatureCollection": + features = data.get("features", []) + items_to_validate = features if isinstance(features, list) else [] + elif obj_type in ["Feature", "Collection"]: + items_to_validate = [data] + elif obj_type == "Catalog" or ("id" in data and "description" in data): + data["type"] = "Catalog" + items_to_validate = [data] + else: + self.valid = False + if "type" in data: + error_msg = ( + f"Unknown JSON type. Unsupported 'type' value: {obj_type!r}." + ) + else: + error_msg = "Unknown JSON type. Missing 'type' field." + + self.message = [ + { + "path": source_name, + "valid_stac": False, + "error_message": error_msg, + } + ] + return + + available_objects = len(items_to_validate) + if self.limit is not None: + items_to_validate = items_to_validate[: self.limit] + + total_setup_ms = 0.0 + total_exec_ms = 0.0 + valid_count = 0 + invalid_count = 0 + error_registry: Dict[str, List[str]] = {} + stac_versions_found: Set[str] = set() + schemas_checked: Set[str] = set() + + self.valid = True + + for index, item in enumerate(items_to_validate): + item_id = item.get("id", f"unknown-{index}") + stac_version = item.get("stac_version", "1.0.0") + extensions = item.get("stac_extensions", []) + + stac_versions_found.add(stac_version) + + actual_type = ( + "Item" if item.get("type") == "Feature" else item.get("type", "Catalog") + ) + + try: + base_schema = self._get_base_schema_uri(actual_type, stac_version) + except ValueError: + base_schema = "" + + if base_schema: + schemas_checked.add(base_schema) + + for ext in extensions: + schemas_checked.add(ext) + + t0 = time.perf_counter() + try: + validator, _ = get_validator(actual_type, stac_version, extensions) + except Exception as e: + invalid_count += 1 + self.valid = False + error_msg = str(e) + if error_msg not in error_registry: + error_registry[error_msg] = [] + error_registry[error_msg].append(item_id) + continue + t1 = time.perf_counter() + total_setup_ms += (t1 - t0) * 1000 + + t2 = time.perf_counter() + try: + validator(item) + t3 = time.perf_counter() + total_exec_ms += (t3 - t2) * 1000 + valid_count += 1 + except fastjsonschema.JsonSchemaValueException as e: + t3 = time.perf_counter() + total_exec_ms += (t3 - t2) * 1000 + invalid_count += 1 + self.valid = False + error_msg = f"{e.name} {e.message.replace(e.name, '').strip()}" + if "disallowed definition" in error_msg and "collection" in error_msg: + error_msg = "STAC Spec Violation: Missing {'rel': 'collection'} in links array." + if error_msg not in error_registry: + error_registry[error_msg] = [] + error_registry[error_msg].append(item_id) + except Exception as e: + t3 = time.perf_counter() + total_exec_ms += (t3 - t2) * 1000 + if self._is_ref_resolution_error(e): + try: + self._validate_with_jsonschema_fallback( + item, + actual_type, + stac_version, + extensions, + ) + valid_count += 1 + except Exception as fallback_err: + invalid_count += 1 + self.valid = False + error_msg = str(fallback_err) + if error_msg not in error_registry: + error_registry[error_msg] = [] + error_registry[error_msg].append(item_id) + else: + invalid_count += 1 + self.valid = False + error_msg = str(e) + if error_msg not in error_registry: + error_registry[error_msg] = [] + error_registry[error_msg].append(item_id) + + self.message = [ + { + "path": source_name, + "valid_stac": self.valid, + "stac_versions": sorted(list(stac_versions_found)), + "schemas_checked": sorted(list(schemas_checked)), + "total_objects": len(items_to_validate), + "valid_objects": valid_count, + "invalid_objects": invalid_count, + "setup_time_ms": total_setup_ms, + "execution_time_ms": total_exec_ms, + "input_objects": available_objects, + "errors": [ + { + "error_message": err_msg, + "affected_items": affected_ids, + "count": len(affected_ids), + } + for err_msg, affected_ids in error_registry.items() + ], + } + ] + def run_recursive(self): """Recursively validate a local STAC catalog/collection and all its children.""" sys.setrecursionlimit(10000) diff --git a/tests/test_fast_validator.py b/tests/test_fast_validator.py index b81206a..89236fa 100644 --- a/tests/test_fast_validator.py +++ b/tests/test_fast_validator.py @@ -248,6 +248,74 @@ def test_limit_above_total_does_not_change_count(self, valid_feature_collection) assert msg["valid_objects"] == 5 +class TestFastValidatorRunDict: + """Test in-memory dictionary validation entrypoint.""" + + def test_run_dict_valid_item(self): + payload = { + "stac_version": "1.0.0", + "type": "Feature", + "id": "test-item", + "geometry": None, + "properties": {"datetime": "2023-01-01T00:00:00Z"}, + "links": [{"rel": "self", "href": "http://example.com"}], + "assets": {}, + } + + fv = FastValidator("", quiet=True) + fv.run_dict(payload) + + assert fv.valid is True + assert fv.message[0]["path"] == "in-memory" + assert fv.message[0]["total_objects"] == 1 + assert fv.message[0]["valid_objects"] == 1 + assert fv.message[0]["invalid_objects"] == 0 + + def test_run_dict_invalid_item(self): + payload = { + "stac_version": "1.0.0", + "type": "Feature", + "geometry": None, + "properties": {"datetime": "2023-01-01T00:00:00Z"}, + "links": [], + "assets": {}, + } + + fv = FastValidator("", quiet=True) + fv.run_dict(payload) + + assert fv.valid is False + assert fv.message[0]["total_objects"] == 1 + assert fv.message[0]["valid_objects"] == 0 + assert fv.message[0]["invalid_objects"] == 1 + assert len(fv.message[0]["errors"]) > 0 + + def test_run_dict_feature_collection_limit(self): + payload = { + "type": "FeatureCollection", + "features": [ + { + "stac_version": "1.0.0", + "type": "Feature", + "id": f"item-{i}", + "geometry": None, + "properties": {"datetime": "2023-01-01T00:00:00Z"}, + "links": [{"rel": "self", "href": "http://example.com"}], + "assets": {}, + } + for i in range(5) + ], + } + + fv = FastValidator("", quiet=True, limit=2) + fv.run_dict(payload) + + assert fv.valid is True + assert fv.message[0]["input_objects"] == 5 + assert fv.message[0]["total_objects"] == 2 + assert fv.message[0]["valid_objects"] == 2 + + class TestFastValidatorRecursiveAndApi: """Test recursive and API traversal behavior."""