diff --git a/Makefile b/Makefile index 1bf9c39..898ec21 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ cache-gmw-v4-local: --source-locations=https://files.auspatious.com/gmw-v4/raw/gmw_mng_2020_v4019_gtiff.zip \ --target-location=./cache/datasets/gmw-v4/0-0-1/raw \ --out-file=/tmp/cached_files.json \ - --overwrite + --no-overwrite cache-gmw-v4-s3: csdr gmw cache \ @@ -28,7 +28,7 @@ extract-gmw-v4-local: --source-location=./cache/datasets/gmw-v4/0-0-1/raw \ --source-zip-name=gmw_mng_2020_v4019_gtiff.zip \ --target-location=$(PWD)/cache/datasets/gmw-v4/0-0-1/data \ - --overwrite + --no-overwrite extract-gmw-v4-s3: csdr gmw extract \ @@ -42,18 +42,18 @@ index-gmw-v4-local: csdr gmw index \ --source-location=$(PWD)/cache/datasets/gmw-v4/0-0-1/data \ --target-location=$(PWD)/cache/datasets/gmw-v4/0-0-1 \ - --overwrite + --no-overwrite index-gmw-v4-s3: csdr gmw index \ --source-location=s3://csdr-public-dev/datasets/gmw-v4/0-0-1/data \ --target-location=s3://csdr-public-dev/datasets/gmw-v4/0-0-1 \ - --overwrite + --no-overwrite # Make a Dataset in the app and use the ID here provenance-gmw-v4-local-db: csdr provenance dataset \ - --id=5714917f-3549-4a95-9fc4-ff96efbdf311 \ + --id=forest-cover \ --dataset-url=./cache/datasets/gmw-v4/0-0-1/gmw.parquet \ --source-url="https://zenodo.org/records/12756047" \ --source-metadata-url="https://zenodo.org/records/12756047" \ @@ -237,8 +237,8 @@ dataset-aca-index-local: csdr aca index \ --source-location=./cache/datasets/aca/0-0-1/data \ --target-location=./cache/datasets/aca/0-0-1 \ - --write-pmtiles \ - --overwrite + --no-write-pmtiles \ + --no-overwrite dataset-aca-index-s3: csdr aca index \ @@ -258,6 +258,7 @@ dataset-aca-provenance-local-db: --post-to-database \ --overwrite + # Dataset MS Buildings # Index is done in-place in Source Coop. dataset-buildings-index-local: @@ -283,7 +284,7 @@ dataset-buildings-provenance-local-db: geometry-eez-cache-local: csdr eez cache \ --target-location=./cache/geometries/eez-v4/0-0-1/raw \ - --overwrite + --no-overwrite geometry-eez-cache-s3: csdr eez cache \ @@ -297,7 +298,8 @@ geometry-eez-convert-local: --source-zip-location ./cache/geometries/eez-v4/0-0-1/raw/EEZ_land_union_v4_202410.zip \ --source-internal-path-name=EEZ_land_union_v4_202410/EEZ_land_union_v4_202410.shp \ --target-location=./cache/geometries/eez-v4/0-0-1/runs/test-run-id \ - --create-pmtiles + --create-pmtiles \ + --no-overwrite geometry-eez-convert-s3: csdr convert zip-to-parquet \ @@ -308,20 +310,20 @@ geometry-eez-convert-s3: --create-pmtiles ### EEZ provenance -geometry-eez-provenance-local: - csdr provenance geometry \ - --id=6231cc07-5723-4c95-8e64-39322a9be2ed \ - --run-id=test-run-id \ - --geometry-url=./cache/geometries/eez-v4/0-0-1/runs/test-run-id/EEZ_land_union_v4_202410.parquet \ - --pmtiles-url=./cache/geometries/eez-v4/0-0-1/runs/test-run-id/EEZ_land_union_v4_202410.pmtiles \ - --source-url="https://www.marineregions.org/downloads.php" \ - --source-metadata-url="https://www.marineregions.org/downloads.php" \ - --geometry-type=geoparquet \ - --overwrite +# geometry-eez-provenance-local: +# csdr provenance geometry \ +# --id=australia-geometries \ +# --run-id=test-run-id \ +# --geometry-url=./cache/geometries/eez-v4/0-0-1/runs/test-run-id/EEZ_land_union_v4_202410.parquet \ +# --pmtiles-url=./cache/geometries/eez-v4/0-0-1/runs/test-run-id/EEZ_land_union_v4_202410.pmtiles \ +# --source-url="https://www.marineregions.org/downloads.php" \ +# --source-metadata-url="https://www.marineregions.org/downloads.php" \ +# --geometry-type=geoparquet \ +# --overwrite geometry-eez-provenance-local-db: csdr provenance geometry \ - --id=65427160-c63c-4c24-a4ac-7013940fae9e \ + --id=australia-geometries \ --run-id=755206f2-dc2f-5b11-8355-2a86b34f7984 \ --geometry-url=./cache/geometries/eez-v4/0-0-1/runs/755206f2-dc2f-5b11-8355-2a86b34f7984/EEZ_land_union_v4_202410.parquet \ --pmtiles-url=./cache/geometries/eez-v4/0-0-1/runs/755206f2-dc2f-5b11-8355-2a86b34f7984/EEZ_land_union_v4_202410.pmtiles \ @@ -471,7 +473,7 @@ product-gmw-v4-eez-process-geometry-local: --target-location=./cache/products/gmw-v4-eez/0-0-1/runs/test-product-gmw-v4-eez-run-id \ --indicators-to-extract='{"sum-mangrove-area": {"indicator-name": "mangrove", "indicator-value": 1.0}}' \ --datetime=2020 \ - --load-kwargs='{"resolution": 10, "crs": "epsg:6933"}' \ + --load-kwargs='{"resolution": 500, "crs": "epsg:6933"}' \ --geometry-id=b4c4c411-4daa-57d2-b3f7-fb14ec95d6f2 \ --overwrite @@ -499,6 +501,7 @@ product-gmw-v4-eez-consolidate-local: --geometry-provenance-url=./cache/geometries/eez-v4/0-0-1/runs/755206f2-dc2f-5b11-8355-2a86b34f7984/EEZ_land_union_v4_202410.parquet.provenance.json \ --dataset-provenance-url=./cache/datasets/gmw-v4/0-0-1/gmw.parquet.provenance.json \ --indicator-name=mangrove \ + --no-overwrite product-gmw-v4-eez-consolidate-s3: csdr products consolidate \ @@ -507,18 +510,19 @@ product-gmw-v4-eez-consolidate-s3: --geometry-provenance-url=s3://csdr-public-dev/geometries/eez-v4/0-0-1/runs/test-product-gmw-v4-eez-run-id/EEZ_land_union_v4_202410.parquet.provenance.json \ --dataset-provenance-url=s3://csdr-public-dev/datasets/gmw-v4/0-0-1/gmw.parquet.provenance.json \ --indicator-name=mangrove \ + --overwrite # You need to make a Product in the app before running provenance. Use that product ID here. # You also need to make a indicator. It must have the ID 'sum-mangrove-area'. product-gmw-v4-eez-provenance-local-db: csdr provenance product \ - --product-id=935e9c13-7e2e-40c5-a4f8-f5f62ea54381 \ + --product-id=forest-cover-product \ --product-url=./cache/products/gmw-v4-eez/0-0-1/runs/test-product-gmw-v4-eez-run-id/mangrove/935e9c13-7e2e-40c5-a4f8-f5f62ea54381.parquet \ - --run-id=test-product-gmw-v4-eez-run-id \ - --dataset-run-id=9d2cf140-1d6f-405a-93af-ba1a1dcd7029 \ + --run-id=test-product-gmw-v4-eez-run-id5 \ + --dataset-run-id=cded8fbc-faf2-49fa-afef-145b7870231d \ --geometries-run-id=755206f2-dc2f-5b11-8355-2a86b34f7984 \ --post-to-database \ - --overwrite + --no-overwrite product-gmw-v4-eez-provenance-s3-db: csdr provenance product \ @@ -842,14 +846,6 @@ product-dep-mangrove-pacific-eez-provenance-local-db: ### OTHER ### -# Test GeoJSON -geometry-geojson-convert: - csdr convert geo-to-parquet \ - --source-location=tests/data/single_geometry.geojson \ - --target-location=tests/data \ - --name-field=name \ - --overwrite - geometry-geojson-provenance: csdr provenance geometry \ --geometry-url=tests/data/single_geometry.parquet \ diff --git a/csdr/cli.py b/csdr/cli.py index 733c690..612d02b 100644 --- a/csdr/cli.py +++ b/csdr/cli.py @@ -10,7 +10,6 @@ from csdr.cli_dataset_buildings import buildings_app from csdr.cli_dataset_gmw import gmw_app from csdr.cli_dataset_seagrass import seagrass_app -from csdr.cli_geometries import geometry_app from csdr.cli_geometry_acsc2 import acsc2_app from csdr.cli_geometry_aus_states import aus_states_app from csdr.cli_geometry_cwa import cwa_app @@ -32,9 +31,6 @@ logging.getLogger("csdr").setLevel(logging.INFO) # Our logging level. # Add the subcommands -app.add_typer( - geometry_app, name="geometries", help="Commands for processing geometries." -) ## Datasets # GMW diff --git a/csdr/cli_conversion.py b/csdr/cli_conversion.py index 49d2712..229e589 100644 --- a/csdr/cli_conversion.py +++ b/csdr/cli_conversion.py @@ -12,10 +12,10 @@ from csdr.io import ( exists, get_store_with_prefix_from_url, - read_geospatial_file, split_path_and_file_name_from_url, write_gdf_to_parquet, ) +from csdr.provenance import write_step from csdr.utils import CSDRException conversion_app = typer.Typer() @@ -95,148 +95,90 @@ def convert_zipfile_to_parquet( logger.info( f"Target parquet file already exists at {target_url} and overwrite is off. Use --overwrite to replace. Exiting successfully." ) - raise typer.Exit(code=0) # Exit successfully, nothing to do - logger.info( - "Target parquet file does not exist or overwrite is on, proceeding with extraction." - ) - - # Pull the whole zip into memory - zip_bytes = BytesIO(source_store.get(source_zip_name).bytes()) - zip_bytes.seek(0) # Ensure pointer is at start - - with ZipMemoryFile(zip_bytes) as z: - files_in_zip = z.listdir() - logger.info(f"Files in zip: {files_in_zip}") - logger.info(f"Requested internal path: {source_internal_path_name}") - if source_internal_path_name not in files_in_zip: - raise CSDRException( - f"Internal path {source_internal_path_name} not found in zip file." - ) - # Open the shapefile within the ZIP - with z.open(source_internal_path_name) as src: - gdf = gpd.GeoDataFrame.from_features(src, crs=src.crs) - - logger.info(f"Loaded {len(gdf)} records from the shapefile.") - - # Add ID and Name fields - gdf = add_geometry_id_name( - gdf, - name_field=name_field, - geometry_id=_get_geometry_id(geometry_id, source_internal_path_name), - ) - - write_gdf_to_parquet(gdf, target_store, target_filename) - - # !tippecanoe --force -z 10 --no-simplification-of-shared-nodes - # --simplification 10 --drop-densest-as-needed - # -l "data" -o ../geometries/acsc-ga-2015/out/acsc-primary-compartments.pmtiles ../geometries/acsc-ga-2015/out/acsc-primary-compartments.geojson - - if create_pmtiles: - logger.info("Creating PMTiles file alongside the parquet...") - # Create a PMTiles files with tippecanoe - pmtiles_file = target_filename.replace(".parquet", ".pmtiles") - - # Do the work in a local temp directory - with TemporaryDirectory() as tmpdirname: - local_geojson = os.path.join(tmpdirname, "data.geojson") - local_pmtiles = os.path.join(tmpdirname, "data.pmtiles") - - # Keep only the id and name fields, plus geometry - gdf = gdf[["csdr-id", "csdr-name", "geometry"]] - gdf.to_file(local_geojson, driver="GeoJSON") - - # Create PMTiles file with tippecanoe - subprocess.run( - [ - "tippecanoe", - "--force", - "-z", - "10", - "--no-simplification-of-shared-nodes", - "--simplification", - "10", - "--drop-densest-as-needed", - "--layer", - "data", - "--output", - local_pmtiles, - local_geojson, - ], - check=True, - ) - - # Upload the PMTiles file to the target store - target_store.put(pmtiles_file, local_pmtiles) - logger.info(f"Created PMTiles file at {pmtiles_file}") - else: - logger.info("Skipping PMTiles creation because flag is set to false.") - - logger.info(f"Parquet extraction process completed. Wrote file to {target_url}") - - -@conversion_app.command("geo-to-parquet") -def convert_geospatial_file_to_parquet( - source_location: str = typer.Option( - help="Local or remote path (local or s3://) to the geospatial file.", - default="./tests/data/single_geometry.geojson", - ), - target_location: str | None = typer.Option( - help="Local or remote path (local or s3://) to store the converted file.", - default=None, - ), - name_field: str = typer.Option( - "name", help="The field in the data to use for the 'Name' attribute." - ), - overwrite: bool = typer.Option( - True, help="Replace existing parquet file if it exists." - ), -) -> None: - logger.info("Starting geospatial to parquet conversion process...") - - source_path, source_name = split_path_and_file_name_from_url(source_location) - source_store = get_store_with_prefix_from_url(source_path) - - if not exists(source_store, source_name): - raise CSDRException( - f"Source geospatial file does not exist at {source_location}. Cannot convert." - ) else: logger.info( - f"Source geospatial file found at {source_location}, proceeding with conversion." + "Target parquet file does not exist or overwrite is on, proceeding with extraction." ) - if target_location is None: - target_location = source_location - - target_location = target_location.rstrip("/") - - # Set up the target store - target_store = get_store_with_prefix_from_url(target_location) - target_filename = source_name.rsplit(".", 1)[0] + ".parquet" - target_url = f"{target_location}/{target_filename}" - - # Check if target file already exists - if exists(target_store, target_filename) and not overwrite: - logger.warning( - f"Target parquet file already exists at {target_url}. Use --overwrite to replace." + # Pull the whole zip into memory + zip_bytes = BytesIO(source_store.get(source_zip_name).bytes()) + zip_bytes.seek(0) # Ensure pointer is at start + + with ZipMemoryFile(zip_bytes) as z: + files_in_zip = z.listdir() + logger.info(f"Files in zip: {files_in_zip}") + logger.info(f"Requested internal path: {source_internal_path_name}") + if source_internal_path_name not in files_in_zip: + raise CSDRException( + f"Internal path {source_internal_path_name} not found in zip file." + ) + # Open the shapefile within the ZIP + with z.open(source_internal_path_name) as src: + gdf = gpd.GeoDataFrame.from_features(src, crs=src.crs) + + logger.info(f"Loaded {len(gdf)} records from the shapefile.") + + # Add ID and Name fields + gdf = add_geometry_id_name( + gdf, + name_field=name_field, + geometry_id=_get_geometry_id(geometry_id, source_internal_path_name), ) - raise typer.Exit(code=0) # Exit successfully, nothing to do - - # Read the geospatial file into a GeoDataFrame - gdf = read_geospatial_file(source_location) - gdf = add_geometry_id_name( - gdf, name_field=name_field, geometry_id=_get_geometry_id(None, source_name) + write_gdf_to_parquet(gdf, target_store, target_filename) + + # !tippecanoe --force -z 10 --no-simplification-of-shared-nodes + # --simplification 10 --drop-densest-as-needed + # -l "data" -o ../geometries/acsc-ga-2015/out/acsc-primary-compartments.pmtiles ../geometries/acsc-ga-2015/out/acsc-primary-compartments.geojson + + if create_pmtiles: + logger.info("Creating PMTiles file alongside the parquet...") + # Create a PMTiles files with tippecanoe + pmtiles_file = target_filename.replace(".parquet", ".pmtiles") + + # Do the work in a local temp directory + with TemporaryDirectory() as tmpdirname: + local_geojson = os.path.join(tmpdirname, "data.geojson") + local_pmtiles = os.path.join(tmpdirname, "data.pmtiles") + + # Keep only the id and name fields, plus geometry + gdf = gdf[["csdr-id", "csdr-name", "geometry"]] + gdf.to_file(local_geojson, driver="GeoJSON") + + # Create PMTiles file with tippecanoe + subprocess.run( + [ + "tippecanoe", + "--force", + "-z", + "10", + "--no-simplification-of-shared-nodes", + "--simplification", + "10", + "--drop-densest-as-needed", + "--layer", + "data", + "--output", + local_pmtiles, + local_geojson, + ], + check=True, + ) + + # Upload the PMTiles file to the target store + target_store.put(pmtiles_file, local_pmtiles) + logger.info(f"Created PMTiles file at {pmtiles_file}") + else: + logger.info("Skipping PMTiles creation because flag is set to false.") + + logger.info(f"Parquet extraction process completed. Wrote file to {target_url}") + + # Write step regardless of whether we skipped or did the work. + write_step( + label=f"Convert zipped shapefile to GeoParquet{' and PMTiles' if create_pmtiles else ''}", + inputs={ + "source_zip_location": source_zip_location, + "source_internal_path_name": source_internal_path_name, + }, + outputs={"target_url": target_url}, ) - - logger.info(f"Opened file with {len(gdf)} features") - - with BytesIO() as parquet_buffer: - gdf.to_parquet(parquet_buffer, engine="pyarrow") - parquet_buffer.seek(0) - - # Write the parquet bytes to the target store using obstore - target_store.put(target_filename, parquet_buffer.getvalue()) - - logger.info(f"Loaded {len(gdf)} records from the geospatial file.") - logger.info(f"Parquet conversion process completed. Wrote file to {target_url}") diff --git a/csdr/cli_dataset_aca.py b/csdr/cli_dataset_aca.py index 5f97d22..a136246 100644 --- a/csdr/cli_dataset_aca.py +++ b/csdr/cli_dataset_aca.py @@ -18,6 +18,7 @@ get_store_with_prefix_from_url, write_gdf_to_parquet, ) +from csdr.provenance import write_step from csdr.utils import CSDRException aca_app = typer.Typer() @@ -129,6 +130,11 @@ def extract_aca( _run_extract_aca(source_location, target_location, overwrite, max_concurrent) ) logger.info("ACA extraction process completed.") + write_step( + label="Extract ACA reef extent zip files into individual region folders", + inputs={"source_location": source_location}, + outputs={"target_location": target_location}, + ) # _partition_parquet partitions a large GeoParquet file into smaller Parquet files based on a global grid. @@ -310,6 +316,11 @@ def index_aca( ) ) logger.info("ACA merge/index process completed.") + write_step( + label="Merge per-region Reef Extent geopackages into a combined parquet and pmtiles file", + inputs={"source_location": source_location}, + outputs={"target_file": f"{target_location}/reefextent.parquet"}, + ) if __name__ == "__main__": diff --git a/csdr/cli_dataset_ace.py b/csdr/cli_dataset_ace.py index 9f42b27..6a2e788 100644 --- a/csdr/cli_dataset_ace.py +++ b/csdr/cli_dataset_ace.py @@ -10,6 +10,7 @@ exists, get_store_with_prefix_from_url, ) +from csdr.provenance import write_step from csdr.utils import suppress_rust_output ace_app = typer.Typer() @@ -96,3 +97,8 @@ def index_aus_coastal_ecosystems( ) ) logger.info("ACE indexing process completed.") + write_step( + label="Index Australian Coastal Ecosystems STAC collection into a single parquet file", + inputs={"source_stac_url": source_stac_url, "stac_collection": stac_collection}, + outputs={"target_file": f"{target_location}/{target_filename}.parquet"}, + ) diff --git a/csdr/cli_dataset_buildings.py b/csdr/cli_dataset_buildings.py index 99651ef..039f257 100644 --- a/csdr/cli_dataset_buildings.py +++ b/csdr/cli_dataset_buildings.py @@ -17,6 +17,7 @@ get_store_with_prefix_from_url, write_gdf_to_parquet, ) +from csdr.provenance import write_step from csdr.utils import CSDRException buildings_app = typer.Typer() @@ -183,6 +184,11 @@ def index_buildings( ) ) logger.info("Index buildings dataset process completed.") + write_step( + label="Index building footprint parquet files from Source Coop into a single bounding-box parquet", + inputs={"source_location_s3": source_location_s3}, + outputs={"target_file": f"{target_location}/{target_file_name}"}, + ) if __name__ == "__main__": diff --git a/csdr/cli_dataset_gmw.py b/csdr/cli_dataset_gmw.py index 6f662da..720dfb8 100644 --- a/csdr/cli_dataset_gmw.py +++ b/csdr/cli_dataset_gmw.py @@ -22,6 +22,7 @@ get_store_with_prefix_from_url, split_path_and_file_name_from_url, ) +from csdr.provenance import write_step from csdr.utils import CSDRException, suppress_rust_output gmw_app = typer.Typer() @@ -157,6 +158,14 @@ def cache_gmw( logger.info( f"GMW caching process completed. Cached to {target_location.rstrip('/')}" ) + write_step( + label="Cache Global Mangrove Watch source zip files", + inputs={ + "source_file_count": len(source_locations_list), + "source_locations_sample": source_locations_list[:3], + }, + outputs={"target_location": target_location}, + ) async def process_single_file( @@ -329,6 +338,11 @@ def extract_gmw( source_location, source_zip_name, target_location, overwrite, max_concurrent ) ) + write_step( + label="Extract GMW zip into COGs and STAC items", + inputs={"source_location": source_location, "source_zip_name": source_zip_name}, + outputs={"target_location": target_location}, + ) async def run_index_gmw( @@ -380,3 +394,8 @@ def index_gmw( logger.info("Starting GMW indexing process...") asyncio.run(run_index_gmw(source_location, target_location, overwrite)) logger.info("GMW indexing process completed.") + write_step( + label="Index GMW STAC items into a single parquet file", + inputs={"source_location": source_location}, + outputs={"target_file": f"{target_location}/gmw.parquet"}, + ) diff --git a/csdr/cli_dataset_seagrass.py b/csdr/cli_dataset_seagrass.py index ee7947e..690bbb7 100644 --- a/csdr/cli_dataset_seagrass.py +++ b/csdr/cli_dataset_seagrass.py @@ -9,6 +9,7 @@ exists, get_store_with_prefix_from_url, ) +from csdr.provenance import write_step from csdr.utils import suppress_rust_output seagrass_app = typer.Typer() @@ -68,3 +69,8 @@ def index_dep_seagrass( logger.info("Starting DEP Seagrass indexing process...") asyncio.run(run_index_dep_seagrass(stac_api_url, target_location, overwrite)) logger.info("DEP Seagrass indexing process completed.") + write_step( + label="Index DEP Seagrass STAC collection into a single parquet file", + inputs={"stac_api_url": stac_api_url}, + outputs={"target_file": f"{target_location}/dep_s2_seagrass.parquet"}, + ) diff --git a/csdr/cli_geometries.py b/csdr/cli_geometries.py deleted file mode 100644 index 7b05530..0000000 --- a/csdr/cli_geometries.py +++ /dev/null @@ -1,144 +0,0 @@ -import glob -import logging -import os - -import geopandas as gpd -import typer - -from csdr.utils import CSDRException - -geometry_app = typer.Typer() -logger = logging.getLogger(__name__) - - -@geometry_app.command("convert-vector") -def convert_vector( - input_dir: str = typer.Option( - ..., "--input-dir", help="Directory containing the input vector file(s)." - ), - output_path: str = typer.Option( - ..., "--output-path", "-o", help="Output path for the GeoParquet file." - ), - target_crs: str = typer.Option( - ..., - "--target-crs", - help="Target CRS for the output GeoParquet file (e.g., EPSG:4326).", - ), - input_glob: str = typer.Option( - "*.shp", - "--input-glob", - help="Glob pattern to find the input vector file(s) within the input directory.", - ), - name_property: str = typer.Option( - None, - "--name-property", - help="Name of the property to use as the name of the feature.", - ), - source_crs_option: str = typer.Option( - None, - "--source-crs", - help=( - "Optional: Specify source CRS (e.g., 'EPSG:7844'). " - "Overrides CRS detection from file." - ), - ), -) -> None: - """ - Converts first found vector file matching glob to GeoParquet, applying CRS. - - Reads from --input-dir, finds file matching --input-glob, converts to - --output-path with --target-crs. - """ - if not input_dir or not output_path or not target_crs: - raise CSDRException( - "--input-dir, --output-path, and --target-crs are required." - ) - - try: - # Find input vector file using glob relative to input_dir - # Search recursively within the input directory - search_path = os.path.join(input_dir, "**", input_glob) - logger.info(f"Searching for input vector file(s) matching: {search_path}") - found_files = glob.glob(search_path, recursive=True) - - if not found_files: - raise CSDRException( - f"No files matching '{input_glob}' found within {input_dir}" - ) - - vector_file_path = found_files[0] # Use the first found file - if len(found_files) > 1: - logger.warning( - f"Multiple files found matching '{input_glob}'. Using the first one: {vector_file_path}" - ) - - # Ensure output directory exists - output_dir = os.path.dirname(output_path) - if output_dir: - os.makedirs(output_dir, exist_ok=True) - - # Read and process vector file - logger.info(f"Reading {vector_file_path}") - # TODO: Use io.read_geospatial_file - gdf = gpd.read_file(vector_file_path) - - # Determine source CRS - source_crs = source_crs_option if source_crs_option else gdf.crs - if not source_crs: - raise CSDRException( - "Could not determine source CRS from file and --source-crs not provided." - ) - logger.info(f"Using source CRS: {source_crs}") - - # Reproject - logger.info(f"Projecting from {source_crs} to {target_crs}") - gdf = gdf.to_crs(target_crs) - - if name_property: - gdf = gdf.rename(columns={name_property: "name"}) - - logger.info("Applying schema/normalization (placeholder)...") - - # Write out geoparquet - logger.info(f"Writing to {output_path}") - gdf.to_parquet(output_path) - logger.info("Vector conversion complete.") - - except Exception as e: - raise CSDRException(f"An error occurred during vector conversion: {e}") - - -@geometry_app.command("validate") -def validate( - input_file: str = typer.Option( - ..., "--input-file", help="Path to the GeoParquet file to validate." - ), - schema_path: str = typer.Option( - None, "--schema", help="Path to the GeoParquet schema file to validate against." - ), -) -> None: - """ - Validate the GeoParquet file against the provided schema. - """ - if not input_file: - raise CSDRException("Input file is required.") - - try: - # Read the GeoParquet file - # TODO: Use io.read_geospatial_file - gdf = gpd.read_parquet(input_file) - - # Fail if no geometry column - if "geometry" not in gdf.columns: - raise CSDRException("No geometry column found in the GeoParquet file.") - - # Validate the GeoParquet file - # validate_geoparquet(gdf, schema_path) - logger.info("Validation complete.") - - except Exception as e: - raise CSDRException(f"An error occurred during validation: {e}") - - -if __name__ == "__main__": - geometry_app() diff --git a/csdr/cli_geometry_acsc2.py b/csdr/cli_geometry_acsc2.py index b009067..d103f2f 100644 --- a/csdr/cli_geometry_acsc2.py +++ b/csdr/cli_geometry_acsc2.py @@ -12,6 +12,7 @@ exists, get_store_with_prefix_from_url, ) +from csdr.provenance import write_step acsc2_app = typer.Typer() logger = logging.getLogger(__name__) @@ -41,7 +42,7 @@ async def run_cache_acsc2( logger.info( "File already exists at target location and overwrite is off, skipping download." ) - raise typer.Exit(code=0) # Exit successfully, nothing to do + return target_location # Return successfully, nothing to do logger.info("File doesn't exist or overwrite is on. Re-downloading.") @@ -81,3 +82,8 @@ def cache_acsc2( logger.info( f"'{geometry_name}' caching process completed. Cached to '{result_path}'" ) + write_step( + label="Cache ACSC2 zipped shapefile from source", + inputs={"source_url": source_url}, + outputs={"target_location": target_location}, + ) diff --git a/csdr/cli_geometry_aus_states.py b/csdr/cli_geometry_aus_states.py index 7451568..3e3b471 100644 --- a/csdr/cli_geometry_aus_states.py +++ b/csdr/cli_geometry_aus_states.py @@ -10,6 +10,7 @@ get_store_with_prefix_from_url, split_path_and_file_name_from_url, ) +from csdr.provenance import write_step aus_states_app = typer.Typer() logger = logging.getLogger(__name__) @@ -56,13 +57,18 @@ def cache_aus_states( logger.info( "File already exists at target location and overwrite is disabled. Skipping download." ) - exit(0) # Exit successfully, nothing to do - logger.info( - "File either doesn't exist at target location or overwrite is enabled. Re-downloading." - ) + else: + logger.info( + "File either doesn't exist at target location or overwrite is enabled. Re-downloading." + ) - asyncio.run(_run_cache_aus_states(source_url, source_file_name, target_store)) + asyncio.run(_run_cache_aus_states(source_url, source_file_name, target_store)) - logger.info( - f"Australian States caching process completed. Downloaded to {target_url}" + logger.info( + f"Australian States caching process completed. Downloaded to {target_url}" + ) + write_step( + label="Cache Australian States zipped shapefile from ABS", + inputs={"source_url": source_url}, + outputs={"target_url": target_url}, ) diff --git a/csdr/cli_geometry_cwa.py b/csdr/cli_geometry_cwa.py index 922b994..098b2cd 100644 --- a/csdr/cli_geometry_cwa.py +++ b/csdr/cli_geometry_cwa.py @@ -9,6 +9,7 @@ exists, get_store_with_prefix_from_url, ) +from csdr.provenance import write_step cwa_app = typer.Typer() logger = logging.getLogger(__name__) @@ -38,7 +39,7 @@ async def run_cache_cwa( logger.info( "File already exists at target location and overwrite is off, skipping download." ) - raise typer.Exit(code=0) # Exit successfully, nothing to do + return target_location # Return successfully, nothing to do logger.info("File doesn't exist or overwrite is on. Re-downloading.") @@ -78,3 +79,8 @@ def cache_cwa( logger.info( f"'{geometry_name}' caching process completed. Cached to '{result_path}'" ) + write_step( + label="Cache Coastal Waters Areas zipped shapefile from source", + inputs={"source_url": source_url}, + outputs={"target_location": target_location}, + ) diff --git a/csdr/cli_geometry_eez.py b/csdr/cli_geometry_eez.py index 3a5d641..241c3c9 100644 --- a/csdr/cli_geometry_eez.py +++ b/csdr/cli_geometry_eez.py @@ -9,6 +9,7 @@ get_store_with_prefix_from_url, split_path_and_file_name_from_url, ) +from csdr.provenance import write_step eez_app = typer.Typer() logger = logging.getLogger(__name__) @@ -34,14 +35,14 @@ async def run_cache_eez( if exists(target_store, target_file_name): if not overwrite: logger.info("File already exists at target location, skipping download.") - raise typer.Exit(code=0) # Exit successfully, nothing to do + return target_location # Return successfully, nothing to do else: dest_meta = target_store.head(target_file_name) if size is not None and "size" in dest_meta and dest_meta["size"] == size: logger.info( f"Overwrite is on but file already exists at target location with matching size of {size}. Skipping download." ) - raise typer.Exit(code=0) # Exit successfully, nothing to do + return target_location # Return successfully, nothing to do else: logger.info( f"Overwrite is on. File already exists at target location but size does not match (local: {size}, remote: {dest_meta['size']}). Re-downloading." @@ -73,3 +74,8 @@ def cache_eez( result_path = asyncio.run(run_cache_eez(source_url, target_location, overwrite)) logger.info(f"EEZ caching process completed. Cached to {result_path}") + write_step( + label="Cache EEZ zip file from source", + inputs={"source_url": source_url}, + outputs={"target_location": target_location}, + ) diff --git a/csdr/cli_helpers.py b/csdr/cli_helpers.py index 3c5a2fb..deab849 100644 --- a/csdr/cli_helpers.py +++ b/csdr/cli_helpers.py @@ -10,6 +10,7 @@ split_path_and_file_name_from_url, write_gdf_to_parquet, ) +from csdr.provenance import write_step from csdr.utils import CSDRException, make_uuid helpers_app = typer.Typer() @@ -109,3 +110,8 @@ def filter_geometries_by_name( target_store = get_store_with_prefix_from_url(target_path) write_gdf_to_parquet(filtered_gdf, target_store, target_filename) logger.info(f"Filtered geometries written to {target_url}") + write_step( + label="Filter geometries by name from source parquet", + inputs={"source_url": source_url, "geometry_names": geometry_names}, + outputs={"target_url": target_url}, + ) diff --git a/csdr/cli_products.py b/csdr/cli_products.py index 95e71b0..cfa94c5 100644 --- a/csdr/cli_products.py +++ b/csdr/cli_products.py @@ -1,5 +1,6 @@ import json import logging +import os import sys from typing import Any @@ -21,7 +22,7 @@ write_json, ) from csdr.products import process_indicators_for_geometry -from csdr.provenance import read_provenance +from csdr.provenance import read_provenance, write_step from csdr.utils import CSDRException, make_uuid products_app = typer.Typer() @@ -125,6 +126,7 @@ def _create_product_output( "metadata": { "geometryProvenanceUrl": geometry_provenance_url, "datasetProvenanceUrl": dataset_provenance_url, + "command": " ".join([os.path.basename(sys.argv[0]), *sys.argv[1:]]), }, } @@ -299,7 +301,7 @@ def process_geometry( target_url = f"{target_location}/{target_path}" if exists(target_store, target_path) and not overwrite: logger.info(f"Product already exists at {target_url}, skipping processing.") - raise typer.Exit(code=0) # Exit successfully, nothing to do + return None # Exit successfully, nothing to do logger.info("JSON doesn't exist or overwrite is True, processing geometry.") # Load geometry data using Sedona so filtering is done before loading into memory @@ -358,6 +360,9 @@ def process_geometry( write_json(target_store, target_path, product_output) logger.info(f"Wrote results to {target_url}") + # No write_step here — process-geometry fans out to many pods. + # A single summary step is written by consolidate instead. + finally: # Release resources if client is not None: @@ -382,18 +387,18 @@ def consolidate_product( indicator_name: str = typer.Option( "asset", help="Name of the indicator to use for calculations (if applicable)" ), - datetime: str | None = typer.Option( - None, - help="Parseable datetime to use as the timePoint for the product output (e.g. '2024-01-01T00:00:00Z' or '2024-01')", + overwrite: bool = typer.Option( + True, help="Replace existing parquet file if it exists." ), ) -> None: logger.info(f"Consolidating product {product_id} from {location}") location = location.rstrip("/") store = get_store_with_prefix_from_url(location) - path = get_product_path(product_id, indicator_name, datetime=datetime) + path = get_product_path(product_id, indicator_name) logger.info(f"path {path}") url = f"{location}/{path}" - logger.info(f"Looking for product files in {url}") + output_file = f"{path}/{product_id}.parquet" + target_url = f"{location}/{output_file}" # TODO: Use io.find_matching_files for this step # Get a list of all the json files in the product directory @@ -442,9 +447,61 @@ def consolidate_product( df = pd.DataFrame(all_data) logger.info(f"Consolidated product data from {url}: {df.shape[0]} rows") - # Write the consolidated DataFrame to a new parquet - output_file = f"{path}/{product_id}.parquet" - write_gdf_to_parquet(df, store, output_file) + if exists(store, output_file) and not overwrite: + logger.info( + f"Consolidated product already exists at {target_url}, skipping writing consolidated data." + ) + else: + logger.info(f"Looking for product files in {url}") - target_url = f"{location}/{output_file}" - logger.info(f"Wrote consolidated product data to {target_url}") + # Write the consolidated DataFrame to a new parquet + write_gdf_to_parquet(df, store, output_file) + + logger.info(f"Wrote consolidated product data to {target_url}") + + # Extract the unique years from the processed geometry data + years = sorted( + set(item.get("timePoint", "")[:4] for item in all_data if item.get("timePoint")) + ) + + # Read the example process-geometry command from the first product's metadata. + # Any single example is sufficient since they all share the same structure. + # Replace the geometry-specific values with placeholders to show the template. + first_product = all_data[0] + process_geometry_command = first_product.get("metadata", {}).get( + "command", "csdr products process-geometry" + ) + first_geometry_id = first_product.get("geometryOutputId", "") + first_datetime = first_product.get("timePoint", "")[:4] # e.g. "2020" + if first_geometry_id: + process_geometry_command = process_geometry_command.replace( + first_geometry_id, "{geometry_id}" + ) + if first_datetime: + process_geometry_command = process_geometry_command.replace( + first_datetime, "{datetime}" + ) + + # Write a summary step for the fan-out process-geometry work. + # Each geometry is processed in its own pod, so we record a single + # summary step here where the count is known. + write_step( + label=f"Process {len(all_data)} geometr{'ies' if len(all_data) != 1 else 'y'} for {len(years)} year{'s' if len(years) != 1 else ''} to compute indicators", + inputs={ + "geometry_provenance_url": geometry_provenance_url, + "dataset_provenance_url": dataset_provenance_url, + "indicator_name": indicator_name, + "years": years, + }, + outputs={"geometries_processed": len(all_data)}, + source_function=process_geometry, + command=process_geometry_command, + ) + write_step( + label="Consolidate per-geometry-and-year results into a single product parquet", + inputs={ + "location": location, + "indicator_name": indicator_name, + }, + outputs={"target_url": target_url}, + ) diff --git a/csdr/cli_provenance.py b/csdr/cli_provenance.py index 67a0c74..f2a9f2b 100644 --- a/csdr/cli_provenance.py +++ b/csdr/cli_provenance.py @@ -1,5 +1,5 @@ import logging -from json import dumps +from json import dumps, loads from typing import Literal import typer @@ -23,7 +23,7 @@ write_json, ) from csdr.products import parse_outputs -from csdr.provenance import get_provenance +from csdr.provenance import clear_steps, get_provenance, read_steps from csdr.utils import CSDRException provenance_app = Typer() @@ -41,6 +41,7 @@ def _meta_provenance( post_to_database: bool, source_url: str | None = None, source_metadata_url: str | None = None, + workflow_dag: list | None = None, # extra_info_dict likely includes geometriesRunId for geometries and productRunId for products extra_info_dict: dict | None = None, ) -> str | None: @@ -57,6 +58,7 @@ def _meta_provenance( post_to_database (bool): If true, post the provenance to the database. source_url (str | None): URL of the original source data. source_metadata_url (str): URL of the source metadata. + workflow_dag (list | None): List of workflow step objects for provenance. extra_info_dict (dict | None): Additional information to include in the provenance, such as geometriesRunId for geometries and productRunId for products. Dataset run IDs are all made by the DB. Returns: @@ -80,6 +82,7 @@ def _meta_provenance( data_type=data_type, source_url=source_url, source_metadata_url=source_metadata_url, + workflow_dag=workflow_dag, extra_info_dict=extra_info_dict, ) @@ -116,7 +119,7 @@ def _meta_provenance( @provenance_app.command("dataset") -def write_dataset_provenance( +def _write_dataset_provenance( id: str = typer.Option(..., help="ID of the dataset"), dataset_url: str = typer.Option(..., help="URL that points to the dataset"), dataset_type: str = typer.Option( @@ -140,9 +143,15 @@ def write_dataset_provenance( post_to_database: bool = typer.Option( False, help="If true, post the provenance to the database" ), + workflow_dag: str = typer.Option( + None, + help="Workflow DAG as a JSON array of step objects. If not provided, reads from local provenance step files.", + ), ) -> None: logger.info(f"Getting provenance for dataset: {dataset_url}") + workflow_dag_parsed = loads(workflow_dag) if workflow_dag else read_steps() + extra_info_dict = {} if ( pmtiles_url is not None @@ -159,14 +168,16 @@ def write_dataset_provenance( source_metadata_url=source_metadata_url, overwrite=overwrite, post_to_database=post_to_database, + workflow_dag=workflow_dag_parsed, extra_info_dict=extra_info_dict, # extra_info_dict can contain dataPmtilesUrl (needed for ACA Reef dataset) ) + clear_steps() logger.info(f"dataset_run_id: {dataset_run_id}") logger.info(f"Wrote provenance for dataset: {dataset_url}") @provenance_app.command("geometry") -def write_geometry_provenance( +def _write_geometry_provenance( id: str = typer.Option(..., help="ID of the geometry"), # run_id is always passed from the workflow. # It is however optional because when running this CLI command seperately from the workflow, @@ -197,6 +208,10 @@ def write_geometry_provenance( post_to_database: bool = typer.Option( False, help="If true, post the provenance to the database" ), + workflow_dag: str = typer.Option( + None, + help="Workflow DAG as a JSON array of step objects. If not provided, reads from local provenance step files.", + ), post_geometry_outputs: bool = typer.Option( False, help="If true, post the geometry outputs to the database" ), @@ -209,6 +224,8 @@ def write_geometry_provenance( ) -> None: logger.info(f"Getting provenance for geometry: {geometry_url}") + workflow_dag_parsed = loads(workflow_dag) if workflow_dag else read_steps() + if run_id is not None: logger.info(f"Run ID '{run_id}' was provided.") else: @@ -233,6 +250,7 @@ def write_geometry_provenance( overwrite=overwrite, post_to_database=post_to_database, extra_info_dict=extra_info_dict, + workflow_dag=workflow_dag_parsed, ) logger.info(f"Wrote provenance for geometry: {geometry_url}") consolidated_run_id = run_id if run_id is not None else run_id_created @@ -246,10 +264,11 @@ def write_geometry_provenance( else: logger.info("Posting geometry outputs to database one at a time...") post_geometry_outputs_to_database(geometry_url, run_id=consolidated_run_id) + clear_steps() @provenance_app.command("product") -def write_product_provenance( +def _write_product_provenance( product_url: str = typer.Option(..., help="URL that points to the product parquet"), product_id: str = typer.Option(..., help="Product ID"), run_id: str | None = typer.Option( @@ -264,6 +283,10 @@ def write_product_provenance( ..., help="Geometries run ID", ), + workflow_dag: str = typer.Option( + None, + help="Workflow DAG as a JSON array of step objects. If not provided, reads from local provenance step files.", + ), post_to_database: bool = typer.Option( False, help="If true, post the provenance to the database" ), @@ -275,6 +298,8 @@ def write_product_provenance( df = read_geospatial_file(product_url) parsed_outputs = parse_outputs(df) + workflow_dag_parsed = loads(workflow_dag) if workflow_dag else read_steps() + if run_id is not None: logger.info(f"Run ID '{run_id}' was provided.") else: @@ -295,6 +320,7 @@ def write_product_provenance( data_type="parquet", overwrite=overwrite, post_to_database=post_to_database, + workflow_dag=workflow_dag_parsed, extra_info_dict=extra_info_dict, ) logger.info(f"Wrote provenance for product: {product_url}") @@ -328,3 +354,4 @@ def write_product_provenance( logger.info( f"Posted product output for indicator {indicator} timePoint {timePoint}: {response.status_code}" ) + clear_steps() diff --git a/csdr/provenance.py b/csdr/provenance.py index ba07c14..dc7a09f 100644 --- a/csdr/provenance.py +++ b/csdr/provenance.py @@ -1,8 +1,13 @@ +import inspect +import json import logging import os +import sys +import types from datetime import UTC, datetime from io import BytesIO from json import load +from pathlib import Path from obstore.store import ObjectStore @@ -17,6 +22,117 @@ logger = logging.getLogger(__name__) SUPPORTED_DATA_FORMATS = ["stac-geoparquet", "geoparquet", "parquet"] +PROVENANCE_STEPS_DIR = Path("/tmp/provenance-steps") + + +def _get_github_url(file: str, line: int) -> str | None: + """Build a GitHub permalink to the caller's source line using the IMAGE_REPO env var.""" + image_repo = os.getenv("IMAGE_REPO", None) + if image_repo is None or image_repo == "unknown": + return None + # IMAGE_REPO is e.g. https://github.com/.../tree// + # We need /blob//path#Lline + base_url = image_repo.rstrip("/").replace("/tree/", "/blob/") + # Find the relative path within the repo (everything after last csdr/) + marker = "csdr/" + idx = file.rfind(marker) + if idx == -1: + return None + relative_path = file[idx:] + return f"{base_url}/{relative_path}#L{line}" + + +def write_step( + label: str, + inputs: dict | None = None, + outputs: dict | None = None, + source_function: object | None = None, + command: str | None = None, +) -> None: + """Write a provenance step JSON file for the current CLI command. + + Call this at the end of any ``csdr`` command that should appear in the + workflow provenance. The caller's file and line are captured + automatically via :func:`inspect.stack`. + + If *source_function* is provided (a callable), the source metadata will + point to that function instead of the direct caller. This is useful when + a step summarises work done by a different command (e.g. consolidate + writing a summary step on behalf of process-geometry). + """ + if source_function is not None: + if isinstance(source_function, types.FunctionType): + src_file = inspect.getfile(source_function) + src_line = source_function.__code__.co_firstlineno + src_name = source_function.__name__ + else: + raise CSDRException( + f"source_function must be a function, got {type(source_function)}" + ) + else: + caller = inspect.stack()[1] + src_file = caller.filename + src_line = caller.frame.f_code.co_firstlineno + src_name = caller.function + + source: dict[str, str | int] = { + "file": src_file, + "line": src_line, + "function": src_name, + } + + github_url = _get_github_url(src_file, src_line) + if github_url is not None: + source["github"] = github_url + else: + raise CSDRException( + "GitHub URL for provenance step is not available. Make sure the COMMIT environment variable is set to a valid commit hash, and that the caller file is within the csdr/ directory of the repository." + ) + + step = { + "label": label, + "command": command or " ".join([os.path.basename(sys.argv[0]), *sys.argv[1:]]), + "inputs": inputs or {}, + "outputs": outputs or {}, + "completed_at": datetime.now(UTC).isoformat() + "Z", + "source": source, + } + + step_dir = PROVENANCE_STEPS_DIR + step_dir.mkdir(parents=True, exist_ok=True) + + # Name files by count so ordering is preserved + existing_count = len(list(step_dir.glob("step-*.json"))) + step["order"] = existing_count + step_file = step_dir / f"step-{existing_count:04d}.json" + + step_file.write_text(json.dumps(step, indent=2)) + logger.info(f"Wrote provenance step to {step_file}") + + +# Read steps and clear steps are needed for local work. In Argo, steps are in individual pods which get cleaned up so they aren't needed. +def read_steps() -> list[dict]: + """Read all provenance steps from the steps directory, in order.""" + if not PROVENANCE_STEPS_DIR.exists(): + return [] + files = sorted(PROVENANCE_STEPS_DIR.glob("step-*.json")) + steps = [] + for f in files: + try: + steps.append(json.loads(f.read_text())) + except (json.JSONDecodeError, OSError): + logger.warning(f"Could not read provenance step file {f}") + return steps + + +def clear_steps() -> None: + """Remove all provenance step files. Call after reading steps into provenance.""" + if not PROVENANCE_STEPS_DIR.exists(): + return + for f in PROVENANCE_STEPS_DIR.glob("step-*.json"): + f.unlink() + logger.info("Cleared provenance step files") + def get_image_state() -> dict[str, str]: """Get the image state from environment variables""" @@ -46,6 +162,7 @@ def get_provenance( description: str = "", source_url: str | None = None, source_metadata_url: str | None = None, + workflow_dag: list | None = None, # Dataset can pass an extra_info_dict with dataPmtilesUrl, geometry does (including PMTiles url, and geometry run ID). Product probably does (incl. product run ID). extra_info_dict: dict[str, str | int] | None = None, ) -> dict[str, str | int]: @@ -78,6 +195,7 @@ def get_provenance( "provenanceUrl": f"{get_url_from_store(store)}/{file_name}.provenance.json", # These three get removed from the dict if posting to database "provenanceUpdated": datetime.now(UTC).isoformat() + "Z", + "workflowDag": workflow_dag, # Extra stuff! e.g. geometriesRunId and productRunId **extra_info_dict, }