diff --git a/openavmkit/data.py b/openavmkit/data.py index 2ce0581..78489a1 100644 --- a/openavmkit/data.py +++ b/openavmkit/data.py @@ -181,13 +181,13 @@ def set(self, key: str, value: pd.DataFrame): def limit_sales_to_keys(self, new_sale_keys: list[str]): """ Update the sales DataFrame to only those that match a key in `new_sale_keys` - + Parameters ---------- new_sale_keys : list[str] List of sale keys to filter to """ - + s = self.sales.copy() s = s[s["key_sale"].isin(new_sale_keys)] self.sales = s @@ -2892,11 +2892,13 @@ def _enrich_df_overture( unit = area_unit(settings) - gdf_out = get_cached_df(gdf_in, "geom/overture", "key", s_enrich_this) - if gdf_out is not None: - if verbose: - print("--> found cached data...") - return gdf_out + duplicate_keys = gdf_in["key"].duplicated(keep=False).any() + if not duplicate_keys: + gdf_out = get_cached_df(gdf_in, "geom/overture", "key", s_enrich_this) + if gdf_out is not None: + if verbose: + print("--> found cached data...") + return gdf_out gdf = gdf_in.copy() @@ -2907,68 +2909,73 @@ def _enrich_df_overture( if verbose: print("Enriching with Overture building data...") - # Initialize Overture service with the correct settings path overture_settings = { "overture": s_overture # Pass the overture settings directly } - overture_service = init_service_overture(overture_settings) - # Get bounding box from data - bbox = gdf.to_crs("EPSG:4326").total_bounds + # Calculate building footprints + s_footprint = s_overture.get("footprint", {}) + footprint_units = s_footprint.get("units", None) + if footprint_units is None: + warnings.warn( + f"`process.enrich.overture.footprint.units` not specified, defaulting to '{unit}'" + ) + footprint_units = unit + footprint_field = s_footprint.get("field", None) + if footprint_field is None: + warnings.warn( + f"`process.enrich.overture.footprint.field` not specified, defaulting to 'bldg_area_footprint_{footprint_units}'" + ) + footprint_field = f"bldg_area_footprint_{footprint_units}" - # Fetch building data - buildings = overture_service.get_buildings( - bbox, use_cache=s_overture.get("cache", True), unit=unit, verbose=verbose - ) + # Calculate building height + len_unit = get_short_distance_unit(settings) + s_height = s_overture.get("height", {}) + height_units = s_height.get("units", None) + if height_units is None: + warnings.warn( + f"`process.enrich.overture.height.units` not specified, defaulting to '{len_unit}'" + ) + height_units = len_unit + height_field = s_height.get("field", None) + if height_field is None: + warnings.warn( + f"`process.enrich.overture.height.field` not specified, defaulting to 'bldg_height_{len_unit}'" + ) + height_field = f"bldg_height_{len_unit}" - if not buildings.empty: - # Calculate building footprints - sq_unit = area_unit(settings) - s_footprint = s_overture.get("footprint", {}) - footprint_units = s_footprint.get("units", None) - if footprint_units is None: - warnings.warn( - f"`process.enrich.overture.footprint.units` not specified, defaulting to '{unit}'" - ) - footprint_units = unit - footprint_field = s_footprint.get("field", None) - if footprint_field is None: - warnings.warn( - f"`process.enrich.overture.footprint.field` not specified, defaulting to 'bldg_area_footprint_{footprint_units}'" - ) - footprint_field = f"bldg_area_footprint_{footprint_units}" - - # Calculate building height - len_unit = get_short_distance_unit(settings) - s_height = s_overture.get("height", {}) - height_units = s_height.get("units", None) - if height_units is None: - warnings.warn( - f"`process.enrich.overture.height.units` not specified, defaulting to {len_unit}'" - ) - height_units = len_unit - height_field = s_height.get("field", None) - if height_field is None: - warnings.warn( - f"`process.enrich.overture.height.field` not specified, defaulting to 'bldg_height_{len_unit}'" - ) - height_field = f"bldg_height_{len_unit}" - - gdf = overture_service.calculate_building_stats( - gdf, - buildings, + overture_succeeded = False + bbox = gdf.to_crs("EPSG:4326").total_bounds + try: + overture_service = init_service_overture(overture_settings) + gdf = overture_service.calculate_building_stats_streaming( + gdf, + bbox, footprint_units, footprint_field, height_units, height_field, - verbose=verbose + use_cache=s_overture.get("cache", True), + verbose=verbose, + ) + overture_succeeded = True + except ValueError: + raise + except Exception as e: + message = ( + "Failed to calculate Overture building stats " + f"(bbox={bbox}, footprint_field={footprint_field!r}, " + f"height_field={height_field!r}, cache={s_overture.get('cache', True)}): {str(e)}" + ) + if verbose: + print(f"--> {message}") + print(f"--> Traceback: {traceback.format_exc()}") + warnings.warn( + f"{message}\n{traceback.format_exc()}" ) - - - elif verbose: - print("--> No buildings found in the area") - write_cached_df(gdf_in, gdf, "geom/overture", "key", s_enrich_this) + if overture_succeeded and not duplicate_keys: + write_cached_df(gdf_in, gdf, "geom/overture", "key", s_enrich_this) return gdf diff --git a/openavmkit/utilities/overture.py b/openavmkit/utilities/overture.py index 21ba283..3c8380f 100644 --- a/openavmkit/utilities/overture.py +++ b/openavmkit/utilities/overture.py @@ -9,14 +9,12 @@ """ import os import warnings +import gc +import duckdb import geopandas as gpd import pandas as pd import traceback -import pyarrow as pa -import pyarrow.compute as pc -import pyarrow.dataset as ds import pyarrow.fs as fs -from tqdm import tqdm import shapely.wkb from openavmkit.utilities.geometry import get_crs @@ -40,6 +38,7 @@ class OvertureService: "height", "est_height", "num_floors", "num_floors_underground", "subtype", "class", "sources" # sources = per-property confidence ] + DEFAULT_STREAM_BATCH_ROWS = 10_000 def __init__(self, settings: dict): """Initialize the Overture service with settings. @@ -99,31 +98,118 @@ def _stats_cache_load(self, cache_path: str, gdf: gpd.GeoDataFrame, stat_cols: l out[c] = out[c].fillna(0) return out - def _get_dataset(self): - """Get the PyArrow dataset for buildings.""" - path = f"{self.bucket}/{self.prefix}" - return ds.dataset(path, filesystem=self.fs, format="parquet") - - def _geoarrow_schema_adapter(self, schema: pa.Schema) -> pa.Schema: - """Convert a geoarrow-compatible schema to a proper geoarrow schema.""" - geometry_field_index = schema.get_field_index("geometry") - geometry_field = schema.field(geometry_field_index) - geoarrow_geometry_field = geometry_field.with_metadata( - {b"ARROW:extension:name": b"geoarrow.wkb"} - ) - return schema.set(geometry_field_index, geoarrow_geometry_field) + def _buildings_parquet_path(self) -> str: + return f"s3://{self.bucket}/{self.prefix}*" + + def _quote_sql_literal(self, value: str) -> str: + return "'" + str(value).replace("'", "''") + "'" - def _batch_to_geodataframe(self, batch: pa.RecordBatch) -> gpd.GeoDataFrame: - """Convert a PyArrow batch to a GeoDataFrame with proper geometry handling.""" - # Convert to pandas DataFrame first - df = batch.to_pandas() + def _quote_identifier(self, value: str) -> str: + return '"' + str(value).replace('"', '""') + '"' + + def _configure_duckdb_for_path(self, con, path: str) -> None: + if not path.startswith("s3://"): + return + try: + con.execute("LOAD httpfs;") + except Exception: + con.execute("INSTALL httpfs; LOAD httpfs;") + con.execute("SET s3_region = 'us-west-2';") + # Public, anonymously-readable bucket: configure an unsigned S3 secret so + # DuckDB does not try (and fail) to sign requests with ambient AWS creds. + con.execute( + "CREATE OR REPLACE SECRET overture_anon " + "(TYPE s3, PROVIDER config, REGION 'us-west-2');" + ) + def _stream_building_dfs(self, bbox, proj_cols, verbose=False): + """Yield matching Overture building rows as pandas DataFrame chunks (DuckDB). + + The bbox-overlap predicate is pushed down to the Parquet row-group statistics + on the ``bbox`` struct subfields, so only the row groups that can contain + matching buildings are decoded. This bounds peak memory to the matched rows + instead of scanning a huge fraction of the GLOBAL buildings theme like the + PyArrow dataset scanner does — PyArrow does not prune row groups on nested + struct subfields, so a ~1 km box pulled ~3 GB RSS over ~84 s and OOM-killed + the 4 GiB worker (ENG-3033); the same fetch under DuckDB is ~0.4 GB. + + ``fetch_df_chunk`` streams the (already-bounded) result so the full building + set is never materialized at once, preserving the streaming contract that the + per-parcel stats aggregation relies on. Pandas (not Arrow) is used because the + ``geometry`` column is a GeoArrow GEOMETRY type whose DuckDB Arrow export + resolves its CRS via the ``spatial`` extension catalog (not loaded) and raises + an internal error; the pandas path returns the raw stored WKB bytes verbatim + (bit-identical to the prior PyArrow read). + + Columns in ``proj_cols`` absent from the current release are dropped, mirroring + the prior PyArrow projection behavior. + """ + xmin, ymin, xmax, ymax = bbox + path = self._buildings_parquet_path() + path_sql = self._quote_sql_literal(path) + con = duckdb.connect() + try: + self._configure_duckdb_for_path(con, path) + described = con.execute( + f"DESCRIBE SELECT * FROM read_parquet({path_sql}) LIMIT 0" + ).fetchall() + available = {row[0] for row in described} + cols = [c for c in proj_cols if c in available] + missing = [c for c in proj_cols if c not in available] + if verbose and missing: + print(f"--> Skipping unavailable columns: {missing}") + if verbose: + print(f"--> Fetching Overture buildings via DuckDB for bbox {bbox}") + if not cols: + raise ValueError("No requested Overture columns are available in the buildings dataset") + col_sql = ", ".join(self._quote_identifier(c) for c in cols) + # Bounding-box overlap test, identical to the prior PyArrow predicate. + sql = ( + f"SELECT {col_sql} FROM read_parquet({path_sql}) " + "WHERE bbox.xmin < ? AND bbox.xmax > ? " + "AND bbox.ymin < ? AND bbox.ymax > ?" + ) + result_cursor = con.execute(sql, [xmax, xmin, ymax, ymin]) + while True: + chunk = result_cursor.fetch_df_chunk() + if chunk.empty: + break + yield chunk + finally: + con.close() + + def _to_geodataframe(self, df: pd.DataFrame) -> gpd.GeoDataFrame: + """Convert a fetched buildings DataFrame to a GeoDataFrame (WKB -> shapely).""" # Convert WKB geometry to shapely geometry - df["geometry"] = df["geometry"].apply(lambda wkb: shapely.wkb.loads(wkb) if pd.notnull(wkb) else None) + df["geometry"] = df["geometry"].apply(lambda wkb: shapely.wkb.loads(bytes(wkb)) if wkb is not None and pd.notnull(wkb) else None) # Create GeoDataFrame with WGS84 CRS (EPSG:4326) return gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326") + def _building_batches(self, bbox, columns: list[str] | None = None, verbose: bool = False): + """Yield Overture building batches as GeoDataFrames for a bbox. + + This is intentionally a generator so callers that only need aggregate + parcel stats can process one Arrow batch at a time instead of + materializing the full building set for dense urban bounding boxes. + """ + proj_cols = list(columns) if columns is not None else self.DEFAULT_COLUMNS.copy() + for req in ("geometry", "bbox"): + if req not in proj_cols: + proj_cols.append(req) + + stream_batch_rows = int( + self.settings.get("stream_batch_rows", self.DEFAULT_STREAM_BATCH_ROWS) + or self.DEFAULT_STREAM_BATCH_ROWS + ) + stream_batch_rows = max(1, stream_batch_rows) + + for chunk in self._stream_building_dfs(bbox, proj_cols, verbose=verbose): + for offset in range(0, len(chunk), stream_batch_rows): + sub = chunk.iloc[offset:offset + stream_batch_rows] + if len(sub) > 0: + yield self._to_geodataframe(sub.copy()) + def get_buildings( self, bbox, @@ -190,62 +276,14 @@ def get_buildings( print("--> Fetching data from Overture...") try: - # Create bounding box filter - xmin, ymin, xmax, ymax = bbox - filter = ( - (pc.field("bbox", "xmin") < xmax) - & (pc.field("bbox", "xmax") > xmin) - & (pc.field("bbox", "ymin") < ymax) - & (pc.field("bbox", "ymax") > ymin) - ) - - # Decide which columns to fetch - proj_cols = columns if columns is not None else self.DEFAULT_COLUMNS.copy() - # Ensure required columns are present - for req in ("geometry", "bbox"): - if req not in proj_cols: - proj_cols.append(req) - - # Get dataset and apply filter+projection - dataset = self._get_dataset() - if verbose: - print("--> Dataset columns:", dataset.schema.names) - available = set(dataset.schema.names) - missing = [c for c in proj_cols if c not in available] - proj_cols = [c for c in proj_cols if c in available] - if verbose and missing: - print(f"--> Skipping unavailable columns: {missing}") - batches = dataset.to_batches(filter=filter, columns=proj_cols) - - # Count total batches for progress bar - if verbose: - print("--> Counting batches...") - total_batches = sum(1 for _ in batches) - print(f"--> Found {total_batches} batches") - batches = dataset.to_batches(filter=filter, columns=proj_cols) # Reset iterator - - # Process batches with progress bar dfs = [] buildings_found = 0 - with tqdm( - total=total_batches if verbose else None, - desc="Processing batches", - disable=not verbose, - ) as pbar: - for batch in batches: - if batch.num_rows > 0: - try: - # Convert batch to GeoDataFrame with proper geometry handling - df = self._batch_to_geodataframe(batch) - if not df.empty: - df = self._derive_height_and_floors(df, typical_floor_height_m) - dfs.append(df) - buildings_found += len(df) - except Exception as e: - if verbose: - print(f"--> Error processing batch: {str(e)}") - pbar.update(1) + for df in self._building_batches(bbox, columns=columns, verbose=verbose): + if not df.empty: + df = self._derive_height_and_floors(df, typical_floor_height_m) + dfs.append(df) + buildings_found += len(df) if verbose: print(f"--> Found {buildings_found} buildings") @@ -344,6 +382,170 @@ def calculate_building_stats( gdf = self.calculate_building_footprints(gdf, buildings, footprint_units, footprint_field, verbose) gdf = self.calculate_building_heights(gdf, buildings, height_units, height_field, verbose) return gdf + + def _footprint_unit_multiplier(self, unit: str) -> float: + if unit == "sqft": + return 10.764 + if unit == "sqm": + return 1.0 + raise ValueError( + f"Unsupported footprint units: {unit}. Supported units are 'sqft' and 'sqm'." + ) + + def _height_unit_multiplier(self, unit: str) -> float: + if unit == "ft": + return 3.2808399 + if unit == "m": + return 1.0 + raise ValueError(f"Unsupported height units: {unit}. Use 'ft' or 'm'.") + + def calculate_building_stats_streaming( + self, + gdf: gpd.GeoDataFrame, + bbox, + footprint_units: str, + footprint_field: str, + height_units: str, + height_field: str, + use_cache: bool = True, + verbose: bool = False, + ) -> gpd.GeoDataFrame: + """Calculate Overture parcel stats without materializing all buildings. + + The full Overture bbox is still processed. The memory saving comes from + iterating the source dataset in Arrow batches, aggregating each batch's + parcel intersections into per-key totals/maxima, and dropping the batch + intermediates before reading the next batch. + """ + columns = self.DEFAULT_COLUMNS.copy() + frames = self._building_batches(bbox, columns=columns, verbose=verbose) + return self._calculate_building_stats_from_frames( + gdf, + frames, + footprint_units, + footprint_field, + height_units, + height_field, + use_cache=use_cache, + verbose=verbose, + ) + + def _calculate_building_stats_from_frames( + self, + gdf: gpd.GeoDataFrame, + building_frames, + footprint_units: str, + footprint_field: str, + height_units: str, + height_field: str, + use_cache: bool = True, + verbose: bool = False, + ) -> gpd.GeoDataFrame: + """Streaming implementation shared by the dataset path and tests.""" + footprint_mult = self._footprint_unit_multiplier(footprint_units) + height_mult = self._height_unit_multiplier(height_units) + + duplicate_keys = gdf["key"].duplicated(keep=False).any() + area_cache_path = self._get_cache_path("intersections_area", gdf.total_bounds) + height_cache_path = self._get_cache_path("intersections_height", gdf.total_bounds) + if use_cache and not duplicate_keys: + area_cached = self._stats_cache_load(area_cache_path, gdf, [footprint_field], verbose) + if area_cached is not None: + height_cached = self._stats_cache_load( + height_cache_path, + area_cached, + [height_field, "bldg_stories"], + verbose, + ) + if height_cached is not None: + return height_cached + + row_ids = pd.RangeIndex(len(gdf)) + gdf_rows = gdf.reset_index(drop=True) + gdf_projected = gdf_rows.to_crs(get_crs(gdf, "equal_area")) + gdf_projected["_overture_row_id"] = row_ids + gdf_for_height = gdf_rows.copy() + gdf_for_height["_overture_row_id"] = row_ids + footprint_totals = pd.Series(0.0, index=row_ids, dtype="float64") + height_max = pd.Series(pd.NA, index=row_ids, dtype="Float64") + floors_max = pd.Series(pd.NA, index=row_ids, dtype="Float64") + buildings_found = 0 + + for buildings in building_frames: + if buildings is None or buildings.empty: + continue + buildings_found += len(buildings) + if "height_m_best" not in buildings.columns or "floors_best" not in buildings.columns: + buildings = self._derive_height_and_floors(buildings.copy()) + + buildings_area = buildings.to_crs(gdf_projected.crs) + joined_area = gpd.sjoin( + gdf_projected, buildings_area, how="left", predicate="intersects" + ) + + if not joined_area.empty and not joined_area["index_right"].isna().all(): + def calculate_intersection_area(row): + try: + row_id = int(row["_overture_row_id"]) + parcel_geom = gdf_projected.loc[row_id, "geometry"] + building_idx = row["index_right"] + if pd.isna(building_idx): + return 0.0 + building_geom = buildings_area.loc[building_idx, "geometry"] + if parcel_geom.intersects(building_geom): + return parcel_geom.intersection(building_geom).area * footprint_mult + return 0.0 + except Exception as e: + parcel_key = row.get("key", "") + warnings.warn( + "Error calculating Overture building intersection area " + f"for parcel key={parcel_key!r}, building_idx={row.get('index_right')!r}: {e}" + ) + return 0.0 + + joined_area[footprint_field] = joined_area.apply( + calculate_intersection_area, axis=1 + ) + area_agg = joined_area.groupby("_overture_row_id")[footprint_field].sum() + footprint_totals = footprint_totals.add(area_agg, fill_value=0) + + buildings_height = buildings.to_crs(gdf_for_height.crs) + joined_height = gpd.sjoin( + gdf_for_height, buildings_height, how="left", predicate="intersects" + ) + if not joined_height.empty and not joined_height["index_right"].isna().all(): + joined_height["_height_out"] = ( + pd.to_numeric(joined_height["height_m_best"], errors="coerce") * height_mult + ) + if "floors_best" in joined_height.columns: + joined_height["_floors_out"] = pd.to_numeric( + joined_height["floors_best"], errors="coerce" + ) + else: + joined_height["_floors_out"] = pd.NA + height_agg = joined_height.groupby("_overture_row_id")["_height_out"].max(min_count=1) + floors_agg = joined_height.groupby("_overture_row_id")["_floors_out"].max(min_count=1) + height_max = pd.concat([height_max, height_agg], axis=1).max(axis=1) + floors_max = pd.concat([floors_max, floors_agg], axis=1).max(axis=1) + + del buildings, buildings_area, joined_area, buildings_height, joined_height + gc.collect() + + out = gdf.copy() + out[footprint_field] = footprint_totals.reindex(row_ids).fillna(0).to_numpy() + out[height_field] = height_max.reindex(row_ids).fillna(0).to_numpy() + out["bldg_stories"] = floors_max.reindex(row_ids).fillna(0).to_numpy() + + if verbose: + print(f"--> Streamed {buildings_found} buildings") + print( + f"--> Number of parcels with buildings: {(out[footprint_field] > 0).sum():,}" + ) + + if use_cache and not duplicate_keys: + self._stats_cache_save(area_cache_path, out, [footprint_field]) + self._stats_cache_save(height_cache_path, out, [height_field, "bldg_stories"]) + return out def calculate_building_footprints( diff --git a/requirements.txt b/requirements.txt index 4d9f43f..fb8bb99 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ pdfkit==1.0.0 pipreqs==0.5.0 polars==1.38.1 pyarrow==22.0.0 +duckdb~=1.5.3 pytest==9.0.2 python-dotenv==1.2.1 rich==15.0.0 @@ -43,4 +44,4 @@ scipy>=1.11.4,<1.17 scikit-learn~=1.8.0 tqdm~=4.67.3 rasterio~=1.4.0 -seamless-3dep>=0.4.1,<0.6 \ No newline at end of file +seamless-3dep>=0.4.1,<0.6 diff --git a/tests/test_overture_enrichment.py b/tests/test_overture_enrichment.py new file mode 100644 index 0000000..adbcd07 --- /dev/null +++ b/tests/test_overture_enrichment.py @@ -0,0 +1,133 @@ +from unittest.mock import MagicMock, patch + +import geopandas as gpd +import pytest +from shapely.geometry import Polygon + +from openavmkit.data import _enrich_df_overture + + +def _parcel_gdf(keys=None): + keys = ["p1"] if keys is None else keys + return gpd.GeoDataFrame( + {"key": keys}, + geometry=[ + Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)]) + for _ in keys + ], + crs="EPSG:4326", + ) + + +def _settings(): + return {"locality": {"units": "imperial"}} + + +def _enrich_settings(footprint_units="sqft", footprint_field="footprint_sqft", cache=True): + return { + "overture": { + "enabled": True, + "cache": cache, + "footprint": {"units": footprint_units, "field": footprint_field}, + "height": {"units": "ft", "field": "height_ft"}, + } + } + + +def test_enrich_df_overture_calls_streaming_stats(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = _parcel_gdf() + service = MagicMock() + service.calculate_building_stats_streaming.return_value = parcels.assign( + footprint_sqft=123.0, + height_ft=6.0, + ) + + with patch("openavmkit.data.get_cached_df", return_value=None), patch( + "openavmkit.data.write_cached_df" + ), patch("openavmkit.data.init_service_overture", return_value=service): + out = _enrich_df_overture( + parcels, + _enrich_settings(cache=False), + {}, + _settings(), + ) + + assert out["footprint_sqft"].tolist() == [123.0] + args, kwargs = service.calculate_building_stats_streaming.call_args + assert args[2:6] == ("sqft", "footprint_sqft", "ft", "height_ft") + assert kwargs == {"use_cache": False, "verbose": False} + + +def test_enrich_df_overture_fails_open_on_service_init_error(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = _parcel_gdf() + + with patch("openavmkit.data.get_cached_df", return_value=None), patch( + "openavmkit.data.write_cached_df" + ) as write_cached_df, patch( + "openavmkit.data.init_service_overture", + side_effect=RuntimeError("prefix lookup failed"), + ): + with pytest.warns(UserWarning, match="Failed to calculate Overture building stats"): + out = _enrich_df_overture(parcels, _enrich_settings(), {}, _settings()) + + assert list(out.columns) == list(parcels.columns) + assert out.geometry.equals(parcels.geometry) + write_cached_df.assert_not_called() + + +def test_enrich_df_overture_stats_failure_does_not_write_cache(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = _parcel_gdf() + service = MagicMock() + service.calculate_building_stats_streaming.side_effect = RuntimeError("duckdb failed") + + with patch("openavmkit.data.get_cached_df", return_value=None), patch( + "openavmkit.data.write_cached_df" + ) as write_cached_df, patch("openavmkit.data.init_service_overture", return_value=service): + with pytest.warns(UserWarning, match="Failed to calculate Overture building stats"): + out = _enrich_df_overture(parcels, _enrich_settings(), {}, _settings()) + + assert list(out.columns) == list(parcels.columns) + assert out.geometry.equals(parcels.geometry) + write_cached_df.assert_not_called() + + +def test_enrich_df_overture_skips_outer_cache_for_duplicate_keys(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = _parcel_gdf(["p1", "p1"]) + service = MagicMock() + service.calculate_building_stats_streaming.return_value = parcels.assign( + footprint_sqft=[100.0, 100.0], + height_ft=[10.0, 10.0], + ) + + with patch("openavmkit.data.get_cached_df") as get_cached_df, patch( + "openavmkit.data.write_cached_df" + ) as write_cached_df, patch("openavmkit.data.init_service_overture", return_value=service): + out = _enrich_df_overture(parcels, _enrich_settings(), {}, _settings()) + + assert out["footprint_sqft"].tolist() == [100.0, 100.0] + get_cached_df.assert_not_called() + write_cached_df.assert_not_called() + + +def test_enrich_df_overture_invalid_units_fail_fast(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = _parcel_gdf() + service = MagicMock() + service.calculate_building_stats_streaming.side_effect = ValueError("Unsupported footprint units") + + with patch("openavmkit.data.get_cached_df", return_value=None), patch( + "openavmkit.data.write_cached_df" + ) as write_cached_df, patch("openavmkit.data.init_service_overture", return_value=service): + with pytest.raises(ValueError, match="Unsupported footprint units"): + _enrich_df_overture( + parcels, + _enrich_settings("acres", "footprint_acres"), + {}, + _settings(), + ) + + write_cached_df.assert_not_called() diff --git a/tests/test_overture_fetch.py b/tests/test_overture_fetch.py new file mode 100644 index 0000000..42a503b --- /dev/null +++ b/tests/test_overture_fetch.py @@ -0,0 +1,132 @@ +"""Tests for the DuckDB-backed Overture buildings fetch (ENG-3033). + +`_stream_building_dfs` reads Overture's global buildings theme with the bbox-overlap +predicate pushed down to Parquet row-group statistics, so peak memory is bounded to +the matching rows instead of scanning the whole theme. The tests are network-free: +most mock DuckDB, and one uses a local Parquet file to exercise DuckDB's real struct +predicate handling. +""" +import geopandas as gpd +import pandas as pd +import pytest +import shapely.geometry as sgeom +import shapely.wkb as swkb +from unittest.mock import MagicMock, patch + +from openavmkit.utilities.overture import OvertureService + +# A small built-up parcel near Santa Cruz, CA (real lon/lat so UTM estimation works). +_POLY = sgeom.box(-122.0263, 36.9741, -122.0260, 36.9744) +_BBOX = (-122.0283, 36.9721, -122.0243, 36.9761) + + +def _make_service(): + with patch.object( + OvertureService, "_resolve_latest_buildings_prefix", + return_value="release/2099-01-01.0/theme=buildings/type=building/", + ), patch("openavmkit.utilities.overture.fs.S3FileSystem", MagicMock()): + return OvertureService({"overture": {"enabled": True}}) + + +def _building_result_df(): + return pd.DataFrame({ + "id": ["bldg-1"], + "geometry": [swkb.dumps(_POLY)], + "bbox": [{"xmin": -122.0263, "xmax": -122.0260, + "ymin": 36.9741, "ymax": 36.9744}], + "height": [6.5], + "num_floors": [2], + "sources": [[{"property": "/properties/height", "confidence": 0.9}]], + }) + + +def _fake_connection(describe_cols, building_rows, captured): + con = MagicMock() + + def _execute(sql, params=None): + captured.append((sql, params)) + cursor = MagicMock() + if sql.strip().upper().startswith("DESCRIBE"): + cursor.fetchall.return_value = [(c,) for c in describe_cols] + # fetch_df_chunk streams: yield the frame once, then an empty frame. + cursor.fetch_df_chunk.side_effect = [building_rows, building_rows.iloc[0:0]] + return cursor + + con.execute.side_effect = _execute + return con + + +def _patched_overture_stream(describe_cols=None, captured=None): + svc = _make_service() + captured = [] if captured is None else captured + describe_cols = describe_cols or ["id", "geometry", "bbox", "height", "num_floors", "sources"] + con = _fake_connection(describe_cols, _building_result_df(), captured) + return svc, patch("openavmkit.utilities.overture.duckdb.connect", return_value=con) + + +def test_stream_pushes_bbox_predicate_and_drops_unavailable_columns(): + captured = [] + # est_height / num_floors_underground / subtype / class are absent from this + # release and must be dropped from the projection (mirrors prior PyArrow behavior). + svc, duckdb_connect = _patched_overture_stream(captured=captured) + with duckdb_connect: + chunks = list(svc._stream_building_dfs( + _BBOX, OvertureService.DEFAULT_COLUMNS.copy())) + assert len(chunks) == 1 and len(chunks[0]) == 1 + select_calls = [c for c in captured if c[0].strip().upper().startswith("SELECT")] + assert len(select_calls) == 1 + sql, params = select_calls[0] + assert "bbox.xmin < ?" in sql and "bbox.xmax > ?" in sql + assert "bbox.ymin < ?" in sql and "bbox.ymax > ?" in sql + xmin, ymin, xmax, ymax = _BBOX + assert params == [xmax, xmin, ymax, ymin] + assert '"est_height"' not in sql and '"class"' not in sql + assert '"id"' in sql and '"geometry"' in sql + assert "read_parquet('s3://overturemaps-us-west-2/" in sql + + +def test_get_buildings_builds_geodataframe_with_footprint(): + svc, duckdb_connect = _patched_overture_stream() + with duckdb_connect: + gdf = svc.get_buildings(_BBOX, use_cache=False) + assert isinstance(gdf, gpd.GeoDataFrame) + assert len(gdf) == 1 + assert gdf.geometry.iloc[0].equals(_POLY) + expected_area = gpd.GeoSeries([_POLY], crs="EPSG:4326").to_crs( + gdf.estimate_utm_crs() + ).area.iloc[0] * 10.764 + assert gdf["bldg_area_footprint_sqft"].iloc[0] == pytest.approx(expected_area) + assert gdf["height_m_best"].iloc[0] == 6.5 + assert gdf["floors_best"].iloc[0] == 2 + + +def test_building_batches_yields_geodataframes_from_stream(): + svc, duckdb_connect = _patched_overture_stream() + with duckdb_connect: + frames = list(svc._building_batches(_BBOX)) + assert len(frames) == 1 + assert isinstance(frames[0], gpd.GeoDataFrame) + assert frames[0].geometry.iloc[0].equals(_POLY) + + +def test_stream_uses_real_duckdb_bbox_predicate_on_local_parquet(tmp_path): + path = tmp_path / "buildings.parquet" + matching = _building_result_df() + outside = pd.DataFrame( + { + "id": ["outside"], + "geometry": [swkb.dumps(sgeom.box(-123, 37, -122.9, 37.1))], + "bbox": [{"xmin": -123.0, "xmax": -122.9, "ymin": 37.0, "ymax": 37.1}], + "height": [4.0], + "num_floors": [1], + "sources": [[]], + } + ) + pd.concat([matching, outside], ignore_index=True).to_parquet(path) + + svc = _make_service() + with patch.object(svc, "_buildings_parquet_path", return_value=str(path)): + chunks = list(svc._stream_building_dfs(_BBOX, OvertureService.DEFAULT_COLUMNS.copy())) + + fetched_buildings = pd.concat(chunks, ignore_index=True) + assert fetched_buildings["id"].tolist() == ["bldg-1"] diff --git a/tests/test_overture_streaming.py b/tests/test_overture_streaming.py new file mode 100644 index 0000000..4cf8f6c --- /dev/null +++ b/tests/test_overture_streaming.py @@ -0,0 +1,245 @@ +from unittest.mock import patch + +import geopandas as gpd +import pandas as pd +import pytest +from pandas.testing import assert_series_equal +from shapely.geometry import Polygon + +from openavmkit.utilities.overture import OvertureService + + +FOOTPRINT = "bldg_area_footprint_sqft" +HEIGHT = "bldg_height_ft" + + +def _svc(tmp_path): + service = OvertureService.__new__(OvertureService) + service.cache_dir = str(tmp_path) + service.settings = {} + return service + + +def _parcel_gdf(): + return gpd.GeoDataFrame( + { + "key": ["p1", "p2", "p3"], + "address": ["1 Main", "2 Main", "3 Main"], + }, + geometry=[ + Polygon([(0, 0), (0.002, 0), (0.002, 0.002), (0, 0.002)]), + Polygon([(0.003, 0), (0.005, 0), (0.005, 0.002), (0.003, 0.002)]), + Polygon([(0.006, 0), (0.008, 0), (0.008, 0.002), (0.006, 0.002)]), + ], + crs="EPSG:4326", + ) + + +def _building_frame(records): + rows = [ + {key: value for key, value in record.items() if key != "geometry"} + for record in records + ] + return gpd.GeoDataFrame( + rows, + geometry=[record["geometry"] for record in records], + crs="EPSG:4326", + ) + + +def _building_one_record(extra=None): + record = { + "id": "b1", + "height": 6.0, + "num_floors": 2, + "geometry": Polygon( + [(0.0005, 0.0005), (0.0015, 0.0005), (0.0015, 0.0015), (0.0005, 0.0015)] + ), + } + if extra: + record.update(extra) + return record + + +def _single_building_batch(service): + return service._derive_height_and_floors(_building_frame([_building_one_record()])) + + +def _assert_stat_columns_equal(old, streamed): + assert list(streamed["address"]) == ["1 Main", "2 Main", "3 Main"] + for column in (FOOTPRINT, HEIGHT, "bldg_stories"): + assert column in streamed.columns + assert_series_equal( + old[column].reset_index(drop=True), + streamed[column].reset_index(drop=True), + check_names=False, + check_dtype=False, + rtol=1e-9, + atol=1e-6, + ) + + +def _assert_streaming_stats_match_legacy( + service, parcels, legacy_buildings, streaming_frames +): + old = service.calculate_building_stats( + parcels.copy(), + legacy_buildings, + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + ) + streamed = service._calculate_building_stats_from_frames( + parcels.copy(), + streaming_frames, + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + use_cache=False, + ) + + _assert_stat_columns_equal(old, streamed) + + +def test_building_batches_splits_large_arrow_batches(tmp_path, monkeypatch): + service = _svc(tmp_path) + service.settings = {"stream_batch_rows": 2} + geom = Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)]) + fetched = pd.DataFrame( + { + "id": [f"b{i}" for i in range(5)], + "geometry": [geom.wkb for _ in range(5)], + } + ) + + # The DuckDB fetch streams pandas chunks; _building_batches must re-slice them + # into stream_batch_rows-sized GeoDataFrames for the per-parcel stats aggregation. + def fake_stream(bbox, proj_cols, verbose=False): + yield fetched + + monkeypatch.setattr(service, "_stream_building_dfs", fake_stream) + + chunks = list(service._building_batches((0, 0, 1, 1))) + + assert [len(chunk) for chunk in chunks] == [2, 2, 1] + assert all(chunk.crs == "EPSG:4326" for chunk in chunks) + + +def test_streaming_stats_match_all_at_once_for_split_batches(tmp_path): + service = _svc(tmp_path) + parcels = _parcel_gdf() + batch_1 = _building_frame( + [ + _building_one_record({"est_height": None}), + { + "id": "b2", + "height": None, + "est_height": 9.0, + "num_floors": None, + "geometry": Polygon( + [(0.0034, 0.0004), (0.0046, 0.0004), (0.0046, 0.0016), (0.0034, 0.0016)] + ), + }, + ] + ) + batch_2 = _building_frame( + [ + { + "id": "b3", + "height": 12.0, + "est_height": None, + "num_floors": 4, + "geometry": Polygon( + [(0.001, 0.001), (0.004, 0.001), (0.004, 0.003), (0.001, 0.003)] + ), + }, + ] + ) + batches = [ + service._derive_height_and_floors(batch_1.copy()), + service._derive_height_and_floors(batch_2.copy()), + ] + all_buildings = pd.concat(batches, ignore_index=True) + + _assert_streaming_stats_match_legacy( + service, + parcels, + all_buildings, + [batch.copy() for batch in batches], + ) + + +def test_streaming_stats_match_all_at_once_for_empty_buildings(tmp_path): + service = _svc(tmp_path) + parcels = _parcel_gdf() + empty_buildings = gpd.GeoDataFrame({"id": []}, geometry=[], crs="EPSG:4326") + + _assert_streaming_stats_match_legacy(service, parcels, empty_buildings, []) + + +def test_streaming_stats_match_all_at_once_when_heights_are_absent(tmp_path): + service = _svc(tmp_path) + parcels = _parcel_gdf() + raw_buildings = _building_frame( + [ + _building_one_record({"height": None, "num_floors": None}), + { + "id": "b2", + "geometry": Polygon( + [(0.0035, 0.0005), (0.0045, 0.0005), (0.0045, 0.0015), (0.0035, 0.0015)] + ), + }, + ] + ) + derived_buildings = service._derive_height_and_floors(raw_buildings.copy()) + + _assert_streaming_stats_match_legacy( + service, + parcels, + derived_buildings, + [raw_buildings], + ) + + +def test_streaming_stats_handles_duplicate_parcel_keys(tmp_path): + service = _svc(tmp_path) + parcels = _parcel_gdf() + baseline = service._calculate_building_stats_from_frames( + parcels.copy(), + [_single_building_batch(service)], + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + use_cache=False, + ) + parcels = pd.concat([parcels, parcels.iloc[[0]].copy()], ignore_index=True) + batch = _single_building_batch(service) + + with patch.object(service, "_stats_cache_load") as stats_cache_load, patch.object( + service, "_stats_cache_save" + ) as stats_cache_save: + streamed = service._calculate_building_stats_from_frames( + parcels.copy(), + [batch], + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + ) + + assert len(streamed) == len(parcels) + stats_cache_load.assert_not_called() + stats_cache_save.assert_not_called() + duplicated = streamed[streamed["key"].eq("p1")] + expected = baseline.loc[baseline["key"].eq("p1")].iloc[0] + assert duplicated[FOOTPRINT].tolist() == pytest.approx( + [expected[FOOTPRINT], expected[FOOTPRINT]] + ) + assert duplicated[HEIGHT].tolist() == pytest.approx([expected[HEIGHT], expected[HEIGHT]]) + assert duplicated["bldg_stories"].tolist() == [ + expected["bldg_stories"], + expected["bldg_stories"], + ]