Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions csdr/cli_dataset_buildings.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,24 @@ def index_buildings(
logger.info(
f"Skipping index: {target_file_name} already exists and overwrite is off."
)
return
logger.info(
"Either file does not exist or overwrite is on, proceeding with indexing."
)
else:
logger.info(
"Either file does not exist or overwrite is on, proceeding with indexing."
)

parquet_data = _get_parquet_urls(source_location_s3, source_proxy)
asyncio.run(
_run_index_buildings(
source_proxy, parquet_data, target_store, target_file_name, max_concurrent
parquet_data = _get_parquet_urls(source_location_s3, source_proxy)
asyncio.run(
_run_index_buildings(
source_proxy,
parquet_data,
target_store,
target_file_name,
max_concurrent,
)
)
)
logger.info("Index buildings dataset process completed.")
logger.info("Index buildings dataset process completed.")

# Write step regardless of overwrite.
write_step(
label="Index building footprint parquet files from Source Coop into a single bounding-box parquet",
inputs={"source_location_s3": source_location_s3},
Expand Down
28 changes: 16 additions & 12 deletions csdr/cli_dataset_gmw.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,35 +338,31 @@ def extract_gmw(
source_location, source_zip_name, target_location, overwrite, max_concurrent
)
)
write_step(
label="Extract GMW zip into COGs and STAC items",
inputs={"source_location": source_location, "source_zip_name": source_zip_name},
outputs={"target_location": target_location},
)


async def run_index_gmw(
source_location: str, target_location: str, overwrite: bool = True
) -> None:
) -> int:
"""Returns the number of STAC items found/indexed."""
source_store = get_store_with_prefix_from_url(source_location)
target_store = get_store_with_prefix_from_url(target_location)
file_name = "gmw.parquet"
target_url = f"{target_location}/{file_name}"

# Find all the GMW STAC files (needed for both skip and index paths)
# Searches recursively. It needs to for v3 (and v4)
item_dicts = await get_stac_item_dicts_from_store(source_store)

# Check for existing geoparquet file
if exists(target_store, file_name) and not overwrite:
logger.info(f"Parquet file already exists at {target_url}, skipping indexing.")
return
return len(item_dicts)
else:
if overwrite:
logger.info("Overwrite is enabled, re-indexing GMW.")
else:
logger.info("Parquet file does not exist, proceeding with indexing.")

# Find all the the GMW STAC files
# Searches recursively. It needs to for v3 (and v4)
item_dicts = await get_stac_item_dicts_from_store(source_store)

logger.info(f"Writing {len(item_dicts)} STAC items to parquet at {target_url}")
with suppress_rust_output():
# TODO: experiment with parquet_compression options for rustac write
Expand All @@ -375,6 +371,7 @@ async def run_index_gmw(
) # rustac infers that it is writing a parquet format from filename

logger.info(f"Parquet write completed, wrote to {target_url}")
return len(item_dicts)


# Writes a parquet index of all the GMW STAC items found at the source location.
Expand All @@ -392,8 +389,15 @@ def index_gmw(
overwrite: bool = typer.Option(True, help="Replace existing index file"),
) -> None:
logger.info("Starting GMW indexing process...")
asyncio.run(run_index_gmw(source_location, target_location, overwrite))
item_count = asyncio.run(run_index_gmw(source_location, target_location, overwrite))
logger.info("GMW indexing process completed.")
# Summary step for the fan-out extract work (one pod per zip file/year).
write_step(
label=f"Extract {item_count} STAC items with all COGs from GMW source zips",
inputs={"source_location": source_location},
outputs={"stac_items_extracted": item_count},
source_function=extract_gmw,
)
write_step(
label="Index GMW STAC items into a single parquet file",
inputs={"source_location": source_location},
Expand Down
Loading