From e72403aa6dc21af73d35ded5e43e9642c1885a46 Mon Sep 17 00:00:00 2001 From: willjnz Date: Wed, 29 Apr 2026 11:33:37 +1000 Subject: [PATCH 1/2] Fix buildings dataset overwrite not writing step. --- csdr/cli_dataset_buildings.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/csdr/cli_dataset_buildings.py b/csdr/cli_dataset_buildings.py index 039f257..b91aae4 100644 --- a/csdr/cli_dataset_buildings.py +++ b/csdr/cli_dataset_buildings.py @@ -172,18 +172,24 @@ def index_buildings( logger.info( f"Skipping index: {target_file_name} already exists and overwrite is off." ) - return - logger.info( - "Either file does not exist or overwrite is on, proceeding with indexing." - ) + else: + logger.info( + "Either file does not exist or overwrite is on, proceeding with indexing." + ) - parquet_data = _get_parquet_urls(source_location_s3, source_proxy) - asyncio.run( - _run_index_buildings( - source_proxy, parquet_data, target_store, target_file_name, max_concurrent + parquet_data = _get_parquet_urls(source_location_s3, source_proxy) + asyncio.run( + _run_index_buildings( + source_proxy, + parquet_data, + target_store, + target_file_name, + max_concurrent, + ) ) - ) - logger.info("Index buildings dataset process completed.") + logger.info("Index buildings dataset process completed.") + + # Write step regardless of overwrite. write_step( label="Index building footprint parquet files from Source Coop into a single bounding-box parquet", inputs={"source_location_s3": source_location_s3}, From ed66f0aed168f25fdd421e132e239e3c97fb0a9c Mon Sep 17 00:00:00 2001 From: willjnz Date: Wed, 29 Apr 2026 11:50:26 +1000 Subject: [PATCH 2/2] Add step for GMW extract in cache (because it fans out in workflow). --- csdr/cli_dataset_gmw.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/csdr/cli_dataset_gmw.py b/csdr/cli_dataset_gmw.py index 720dfb8..1e42597 100644 --- a/csdr/cli_dataset_gmw.py +++ b/csdr/cli_dataset_gmw.py @@ -338,35 +338,31 @@ def extract_gmw( source_location, source_zip_name, target_location, overwrite, max_concurrent ) ) - write_step( - label="Extract GMW zip into COGs and STAC items", - inputs={"source_location": source_location, "source_zip_name": source_zip_name}, - outputs={"target_location": target_location}, - ) async def run_index_gmw( source_location: str, target_location: str, overwrite: bool = True -) -> None: +) -> int: + """Returns the number of STAC items found/indexed.""" source_store = get_store_with_prefix_from_url(source_location) target_store = get_store_with_prefix_from_url(target_location) file_name = "gmw.parquet" target_url = f"{target_location}/{file_name}" + # Find all the GMW STAC files (needed for both skip and index paths) + # Searches recursively. It needs to for v3 (and v4) + item_dicts = await get_stac_item_dicts_from_store(source_store) + # Check for existing geoparquet file if exists(target_store, file_name) and not overwrite: logger.info(f"Parquet file already exists at {target_url}, skipping indexing.") - return + return len(item_dicts) else: if overwrite: logger.info("Overwrite is enabled, re-indexing GMW.") else: logger.info("Parquet file does not exist, proceeding with indexing.") - # Find all the the GMW STAC files - # Searches recursively. It needs to for v3 (and v4) - item_dicts = await get_stac_item_dicts_from_store(source_store) - logger.info(f"Writing {len(item_dicts)} STAC items to parquet at {target_url}") with suppress_rust_output(): # TODO: experiment with parquet_compression options for rustac write @@ -375,6 +371,7 @@ async def run_index_gmw( ) # rustac infers that it is writing a parquet format from filename logger.info(f"Parquet write completed, wrote to {target_url}") + return len(item_dicts) # Writes a parquet index of all the GMW STAC items found at the source location. @@ -392,8 +389,15 @@ def index_gmw( overwrite: bool = typer.Option(True, help="Replace existing index file"), ) -> None: logger.info("Starting GMW indexing process...") - asyncio.run(run_index_gmw(source_location, target_location, overwrite)) + item_count = asyncio.run(run_index_gmw(source_location, target_location, overwrite)) logger.info("GMW indexing process completed.") + # Summary step for the fan-out extract work (one pod per zip file/year). + write_step( + label=f"Extract {item_count} STAC items with all COGs from GMW source zips", + inputs={"source_location": source_location}, + outputs={"stac_items_extracted": item_count}, + source_function=extract_gmw, + ) write_step( label="Index GMW STAC items into a single parquet file", inputs={"source_location": source_location},