From 5b5daf89f835aa0c9cbfd68a4a87a8a1cb809002 Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 8 Jun 2026 21:58:37 -0400 Subject: [PATCH 01/13] Stream Overture parcel building stats --- openavmkit/data.py | 93 +++++------ openavmkit/utilities/overture.py | 263 ++++++++++++++++++++++++------- tests/test_overture_streaming.py | 190 ++++++++++++++++++++++ 3 files changed, 442 insertions(+), 104 deletions(-) create mode 100644 tests/test_overture_streaming.py diff --git a/openavmkit/data.py b/openavmkit/data.py index 2ce05810..94515bc4 100644 --- a/openavmkit/data.py +++ b/openavmkit/data.py @@ -181,13 +181,13 @@ def set(self, key: str, value: pd.DataFrame): def limit_sales_to_keys(self, new_sale_keys: list[str]): """ Update the sales DataFrame to only those that match a key in `new_sale_keys` - + Parameters ---------- new_sale_keys : list[str] List of sale keys to filter to """ - + s = self.sales.copy() s = s[s["key_sale"].isin(new_sale_keys)] self.sales = s @@ -2916,57 +2916,48 @@ def _enrich_df_overture( # Get bounding box from data bbox = gdf.to_crs("EPSG:4326").total_bounds - # Fetch building data - buildings = overture_service.get_buildings( - bbox, use_cache=s_overture.get("cache", True), unit=unit, verbose=verbose - ) + # Calculate building footprints + s_footprint = s_overture.get("footprint", {}) + footprint_units = s_footprint.get("units", None) + if footprint_units is None: + warnings.warn( + f"`process.enrich.overture.footprint.units` not specified, defaulting to '{unit}'" + ) + footprint_units = unit + footprint_field = s_footprint.get("field", None) + if footprint_field is None: + warnings.warn( + f"`process.enrich.overture.footprint.field` not specified, defaulting to 'bldg_area_footprint_{footprint_units}'" + ) + footprint_field = f"bldg_area_footprint_{footprint_units}" - if not buildings.empty: - # Calculate building footprints - sq_unit = area_unit(settings) - s_footprint = s_overture.get("footprint", {}) - footprint_units = s_footprint.get("units", None) - if footprint_units is None: - warnings.warn( - f"`process.enrich.overture.footprint.units` not specified, defaulting to '{unit}'" - ) - footprint_units = unit - footprint_field = s_footprint.get("field", None) - if footprint_field is None: - warnings.warn( - f"`process.enrich.overture.footprint.field` not specified, defaulting to 'bldg_area_footprint_{footprint_units}'" - ) - footprint_field = f"bldg_area_footprint_{footprint_units}" - - # Calculate building height - len_unit = get_short_distance_unit(settings) - s_height = s_overture.get("height", {}) - height_units = s_height.get("units", None) - if height_units is None: - warnings.warn( - f"`process.enrich.overture.height.units` not specified, defaulting to {len_unit}'" - ) - height_units = len_unit - height_field = s_height.get("field", None) - if height_field is None: - warnings.warn( - f"`process.enrich.overture.height.field` not specified, defaulting to 'bldg_height_{len_unit}'" - ) - height_field = f"bldg_height_{len_unit}" - - gdf = overture_service.calculate_building_stats( - gdf, - buildings, - footprint_units, - footprint_field, - height_units, - height_field, - verbose=verbose + # Calculate building height + len_unit = get_short_distance_unit(settings) + s_height = s_overture.get("height", {}) + height_units = s_height.get("units", None) + if height_units is None: + warnings.warn( + f"`process.enrich.overture.height.units` not specified, defaulting to {len_unit}'" ) - - - elif verbose: - print("--> No buildings found in the area") + height_units = len_unit + height_field = s_height.get("field", None) + if height_field is None: + warnings.warn( + f"`process.enrich.overture.height.field` not specified, defaulting to 'bldg_height_{len_unit}'" + ) + height_field = f"bldg_height_{len_unit}" + + gdf = overture_service.calculate_building_stats_streaming( + gdf, + bbox, + footprint_units, + footprint_field, + height_units, + height_field, + unit=unit, + use_cache=s_overture.get("cache", True), + verbose=verbose, + ) write_cached_df(gdf_in, gdf, "geom/overture", "key", s_enrich_this) diff --git a/openavmkit/utilities/overture.py b/openavmkit/utilities/overture.py index 21ba2839..99f3fb81 100644 --- a/openavmkit/utilities/overture.py +++ b/openavmkit/utilities/overture.py @@ -9,6 +9,7 @@ """ import os import warnings +import gc import geopandas as gpd import pandas as pd import traceback @@ -124,6 +125,58 @@ def _batch_to_geodataframe(self, batch: pa.RecordBatch) -> gpd.GeoDataFrame: # Create GeoDataFrame with WGS84 CRS (EPSG:4326) return gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326") + def _building_batches(self, bbox, columns: list[str] | None = None, verbose: bool = False): + """Yield Overture building batches as GeoDataFrames for a bbox. + + This is intentionally a generator so callers that only need aggregate + parcel stats can process one Arrow batch at a time instead of + materializing the full building set for dense urban bounding boxes. + """ + xmin, ymin, xmax, ymax = bbox + filter = ( + (pc.field("bbox", "xmin") < xmax) + & (pc.field("bbox", "xmax") > xmin) + & (pc.field("bbox", "ymin") < ymax) + & (pc.field("bbox", "ymax") > ymin) + ) + + proj_cols = columns if columns is not None else self.DEFAULT_COLUMNS.copy() + for req in ("geometry", "bbox"): + if req not in proj_cols: + proj_cols.append(req) + + dataset = self._get_dataset() + if verbose: + print("--> Dataset columns:", dataset.schema.names) + available = set(dataset.schema.names) + missing = [c for c in proj_cols if c not in available] + proj_cols = [c for c in proj_cols if c in available] + if verbose and missing: + print(f"--> Skipping unavailable columns: {missing}") + + batches = dataset.to_batches(filter=filter, columns=proj_cols) + total_batches = None + if verbose: + print("--> Counting batches...") + total_batches = sum(1 for _ in batches) + print(f"--> Found {total_batches} batches") + batches = dataset.to_batches(filter=filter, columns=proj_cols) + + with tqdm( + total=total_batches, + desc="Processing batches", + disable=not verbose, + ) as pbar: + for batch in batches: + try: + if batch.num_rows > 0: + yield self._batch_to_geodataframe(batch) + except Exception as e: + if verbose: + print(f"--> Error processing batch: {str(e)}") + finally: + pbar.update(1) + def get_buildings( self, bbox, @@ -190,62 +243,14 @@ def get_buildings( print("--> Fetching data from Overture...") try: - # Create bounding box filter - xmin, ymin, xmax, ymax = bbox - filter = ( - (pc.field("bbox", "xmin") < xmax) - & (pc.field("bbox", "xmax") > xmin) - & (pc.field("bbox", "ymin") < ymax) - & (pc.field("bbox", "ymax") > ymin) - ) - - # Decide which columns to fetch - proj_cols = columns if columns is not None else self.DEFAULT_COLUMNS.copy() - # Ensure required columns are present - for req in ("geometry", "bbox"): - if req not in proj_cols: - proj_cols.append(req) - - # Get dataset and apply filter+projection - dataset = self._get_dataset() - if verbose: - print("--> Dataset columns:", dataset.schema.names) - available = set(dataset.schema.names) - missing = [c for c in proj_cols if c not in available] - proj_cols = [c for c in proj_cols if c in available] - if verbose and missing: - print(f"--> Skipping unavailable columns: {missing}") - batches = dataset.to_batches(filter=filter, columns=proj_cols) - - # Count total batches for progress bar - if verbose: - print("--> Counting batches...") - total_batches = sum(1 for _ in batches) - print(f"--> Found {total_batches} batches") - batches = dataset.to_batches(filter=filter, columns=proj_cols) # Reset iterator - - # Process batches with progress bar dfs = [] buildings_found = 0 - with tqdm( - total=total_batches if verbose else None, - desc="Processing batches", - disable=not verbose, - ) as pbar: - for batch in batches: - if batch.num_rows > 0: - try: - # Convert batch to GeoDataFrame with proper geometry handling - df = self._batch_to_geodataframe(batch) - if not df.empty: - df = self._derive_height_and_floors(df, typical_floor_height_m) - dfs.append(df) - buildings_found += len(df) - except Exception as e: - if verbose: - print(f"--> Error processing batch: {str(e)}") - pbar.update(1) + for df in self._building_batches(bbox, columns=columns, verbose=verbose): + if not df.empty: + df = self._derive_height_and_floors(df, typical_floor_height_m) + dfs.append(df) + buildings_found += len(df) if verbose: print(f"--> Found {buildings_found} buildings") @@ -344,6 +349,158 @@ def calculate_building_stats( gdf = self.calculate_building_footprints(gdf, buildings, footprint_units, footprint_field, verbose) gdf = self.calculate_building_heights(gdf, buildings, height_units, height_field, verbose) return gdf + + def calculate_building_stats_streaming( + self, + gdf: gpd.GeoDataFrame, + bbox, + footprint_units: str, + footprint_field: str, + height_units: str, + height_field: str, + unit: str = "sqft", + use_cache: bool = True, + verbose: bool = False, + ) -> gpd.GeoDataFrame: + """Calculate Overture parcel stats without materializing all buildings. + + The full Overture bbox is still processed. The memory saving comes from + iterating the source dataset in Arrow batches, aggregating each batch's + parcel intersections into per-key totals/maxima, and dropping the batch + intermediates before reading the next batch. + """ + columns = self.DEFAULT_COLUMNS.copy() + frames = self._building_batches(bbox, columns=columns, verbose=verbose) + return self._calculate_building_stats_from_frames( + gdf, + frames, + footprint_units, + footprint_field, + height_units, + height_field, + use_cache=use_cache, + verbose=verbose, + ) + + def _calculate_building_stats_from_frames( + self, + gdf: gpd.GeoDataFrame, + building_frames, + footprint_units: str, + footprint_field: str, + height_units: str, + height_field: str, + use_cache: bool = True, + verbose: bool = False, + ) -> gpd.GeoDataFrame: + """Streaming implementation shared by the dataset path and tests.""" + if footprint_units == "sqft": + footprint_mult = 10.764 + elif footprint_units == "sqm": + footprint_mult = 1.0 + else: + raise ValueError( + f"Unsupported units: {footprint_units}. Supported units are 'sqft' and 'sqm'." + ) + + if height_units == "ft": + height_mult = 3.2808399 + elif height_units == "m": + height_mult = 1.0 + else: + raise ValueError("Unsupported units: {height_units}. Use 'ft' or 'm'.") + + area_cache_path = self._get_cache_path("intersections_area", gdf.total_bounds) + height_cache_path = self._get_cache_path("intersections_height", gdf.total_bounds) + if use_cache: + area_cached = self._stats_cache_load(area_cache_path, gdf, [footprint_field], verbose) + if area_cached is not None: + height_cached = self._stats_cache_load( + height_cache_path, + area_cached, + [height_field, "bldg_stories"], + verbose, + ) + if height_cached is not None: + return height_cached + + gdf_projected = gdf.to_crs(get_crs(gdf, "equal_area")) + footprint_totals = pd.Series(0.0, index=gdf["key"], dtype="float64") + height_max = pd.Series(pd.NA, index=gdf["key"], dtype="Float64") + floors_max = pd.Series(pd.NA, index=gdf["key"], dtype="Float64") + buildings_found = 0 + + for buildings in building_frames: + if buildings is None or buildings.empty: + continue + buildings_found += len(buildings) + if "height_m_best" not in buildings.columns or "floors_best" not in buildings.columns: + buildings = self._derive_height_and_floors(buildings.copy()) + + buildings_area = buildings.to_crs(gdf_projected.crs) + joined_area = gpd.sjoin( + gdf_projected, buildings_area, how="left", predicate="intersects" + ) + + if not joined_area.empty and not joined_area["index_right"].isna().all(): + def calculate_intersection_area(row): + try: + parcel_geom = gdf_projected.loc[row.name, "geometry"] + building_idx = row["index_right"] + if pd.isna(building_idx): + return 0.0 + building_geom = buildings_area.loc[building_idx, "geometry"] + if parcel_geom.intersects(building_geom): + return parcel_geom.intersection(building_geom).area * footprint_mult + return 0.0 + except Exception as e: + if verbose: + print(f"Warning: Error calculating intersection area: {e}") + return 0.0 + + joined_area[footprint_field] = joined_area.apply( + calculate_intersection_area, axis=1 + ) + area_agg = joined_area.groupby("key")[footprint_field].sum() + footprint_totals = footprint_totals.add(area_agg, fill_value=0) + + buildings_height = buildings.to_crs(gdf.crs) + joined_height = gpd.sjoin( + gdf, buildings_height, how="left", predicate="intersects" + ) + if not joined_height.empty and not joined_height["index_right"].isna().all(): + joined_height["_height_out"] = ( + pd.to_numeric(joined_height["height_m_best"], errors="coerce") * height_mult + ) + if "floors_best" in joined_height.columns: + joined_height["_floors_out"] = pd.to_numeric( + joined_height["floors_best"], errors="coerce" + ) + else: + joined_height["_floors_out"] = pd.NA + height_agg = joined_height.groupby("key")["_height_out"].max(min_count=1) + floors_agg = joined_height.groupby("key")["_floors_out"].max(min_count=1) + height_max = pd.concat([height_max, height_agg], axis=1).max(axis=1) + floors_max = pd.concat([floors_max, floors_agg], axis=1).max(axis=1) + + del buildings, buildings_area, joined_area, buildings_height, joined_height + gc.collect() + + out = gdf.copy() + out[footprint_field] = out["key"].map(footprint_totals).fillna(0) + out[height_field] = out["key"].map(height_max).fillna(0) + out["bldg_stories"] = out["key"].map(floors_max).fillna(0) + + if verbose: + print(f"--> Streamed {buildings_found} buildings") + print( + f"--> Number of parcels with buildings: {(out[footprint_field] > 0).sum():,}" + ) + + if use_cache: + self._stats_cache_save(area_cache_path, out, [footprint_field]) + self._stats_cache_save(height_cache_path, out, [height_field, "bldg_stories"]) + return out def calculate_building_footprints( diff --git a/tests/test_overture_streaming.py b/tests/test_overture_streaming.py new file mode 100644 index 00000000..48b46a0f --- /dev/null +++ b/tests/test_overture_streaming.py @@ -0,0 +1,190 @@ +import geopandas as gpd +import pandas as pd +from pandas.testing import assert_series_equal +from shapely.geometry import Polygon + +from openavmkit.utilities.overture import OvertureService + + +FOOTPRINT = "bldg_area_footprint_sqft" +HEIGHT = "bldg_height_ft" + + +def _svc(tmp_path): + service = OvertureService.__new__(OvertureService) + service.cache_dir = str(tmp_path) + return service + + +def _parcel_gdf(): + return gpd.GeoDataFrame( + { + "key": ["p1", "p2", "p3"], + "address": ["1 Main", "2 Main", "3 Main"], + }, + geometry=[ + Polygon([(0, 0), (0.002, 0), (0.002, 0.002), (0, 0.002)]), + Polygon([(0.003, 0), (0.005, 0), (0.005, 0.002), (0.003, 0.002)]), + Polygon([(0.006, 0), (0.008, 0), (0.008, 0.002), (0.006, 0.002)]), + ], + crs="EPSG:4326", + ) + + +def _building_frame(records): + rows = [ + {key: value for key, value in record.items() if key != "geometry"} + for record in records + ] + return gpd.GeoDataFrame( + rows, + geometry=[record["geometry"] for record in records], + crs="EPSG:4326", + ) + + +def _assert_stat_columns_equal(old, streamed): + assert list(streamed["address"]) == ["1 Main", "2 Main", "3 Main"] + for column in (FOOTPRINT, HEIGHT, "bldg_stories"): + assert column in streamed.columns + assert_series_equal( + old[column].reset_index(drop=True), + streamed[column].reset_index(drop=True), + check_names=False, + check_dtype=False, + rtol=1e-9, + atol=1e-6, + ) + + +def test_streaming_stats_match_all_at_once_for_split_batches(tmp_path): + service = _svc(tmp_path) + parcels = _parcel_gdf() + batch_1 = _building_frame( + [ + { + "id": "b1", + "height": 6.0, + "est_height": None, + "num_floors": 2, + "geometry": Polygon( + [(0.0005, 0.0005), (0.0015, 0.0005), (0.0015, 0.0015), (0.0005, 0.0015)] + ), + }, + { + "id": "b2", + "height": None, + "est_height": 9.0, + "num_floors": None, + "geometry": Polygon( + [(0.0034, 0.0004), (0.0046, 0.0004), (0.0046, 0.0016), (0.0034, 0.0016)] + ), + }, + ] + ) + batch_2 = _building_frame( + [ + { + "id": "b3", + "height": 12.0, + "est_height": None, + "num_floors": 4, + "geometry": Polygon( + [(0.001, 0.001), (0.004, 0.001), (0.004, 0.003), (0.001, 0.003)] + ), + }, + ] + ) + batches = [ + service._derive_height_and_floors(batch_1.copy()), + service._derive_height_and_floors(batch_2.copy()), + ] + all_buildings = pd.concat(batches, ignore_index=True) + + old = service.calculate_building_stats( + parcels.copy(), + all_buildings, + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + ) + streamed = service._calculate_building_stats_from_frames( + parcels.copy(), + [batch.copy() for batch in batches], + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + use_cache=False, + ) + + _assert_stat_columns_equal(old, streamed) + + +def test_streaming_stats_match_all_at_once_for_empty_buildings(tmp_path): + service = _svc(tmp_path) + parcels = _parcel_gdf() + empty_buildings = gpd.GeoDataFrame({"id": []}, geometry=[], crs="EPSG:4326") + + old = service.calculate_building_stats( + parcels.copy(), + empty_buildings, + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + ) + streamed = service._calculate_building_stats_from_frames( + parcels.copy(), + [], + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + use_cache=False, + ) + + _assert_stat_columns_equal(old, streamed) + + +def test_streaming_stats_match_all_at_once_when_heights_are_absent(tmp_path): + service = _svc(tmp_path) + parcels = _parcel_gdf() + raw_buildings = _building_frame( + [ + { + "id": "b1", + "geometry": Polygon( + [(0.0005, 0.0005), (0.0015, 0.0005), (0.0015, 0.0015), (0.0005, 0.0015)] + ), + }, + { + "id": "b2", + "geometry": Polygon( + [(0.0035, 0.0005), (0.0045, 0.0005), (0.0045, 0.0015), (0.0035, 0.0015)] + ), + }, + ] + ) + derived_buildings = service._derive_height_and_floors(raw_buildings.copy()) + + old = service.calculate_building_stats( + parcels.copy(), + derived_buildings, + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + ) + streamed = service._calculate_building_stats_from_frames( + parcels.copy(), + [raw_buildings], + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + use_cache=False, + ) + + _assert_stat_columns_equal(old, streamed) From b5e9ec5960d19a2d4c64b5d3d0a611ce11bafdda Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 22 Jun 2026 17:05:27 -0400 Subject: [PATCH 02/13] Harden streaming Overture stats fallback --- openavmkit/data.py | 31 +++++++++++++-------- openavmkit/utilities/overture.py | 48 ++++++++++++++++++-------------- tests/test_overture_streaming.py | 35 +++++++++++++++++++++++ 3 files changed, 81 insertions(+), 33 deletions(-) diff --git a/openavmkit/data.py b/openavmkit/data.py index 94515bc4..8c662096 100644 --- a/openavmkit/data.py +++ b/openavmkit/data.py @@ -2937,7 +2937,7 @@ def _enrich_df_overture( height_units = s_height.get("units", None) if height_units is None: warnings.warn( - f"`process.enrich.overture.height.units` not specified, defaulting to {len_unit}'" + f"`process.enrich.overture.height.units` not specified, defaulting to '{len_unit}'" ) height_units = len_unit height_field = s_height.get("field", None) @@ -2947,17 +2947,24 @@ def _enrich_df_overture( ) height_field = f"bldg_height_{len_unit}" - gdf = overture_service.calculate_building_stats_streaming( - gdf, - bbox, - footprint_units, - footprint_field, - height_units, - height_field, - unit=unit, - use_cache=s_overture.get("cache", True), - verbose=verbose, - ) + try: + gdf = overture_service.calculate_building_stats_streaming( + gdf, + bbox, + footprint_units, + footprint_field, + height_units, + height_field, + use_cache=s_overture.get("cache", True), + verbose=verbose, + ) + except Exception as e: + if verbose: + print(f"--> Failed to calculate Overture building stats: {str(e)}") + print(f"--> Traceback: {traceback.format_exc()}") + warnings.warn( + f"Failed to calculate Overture building stats: {str(e)}\n{traceback.format_exc()}" + ) write_cached_df(gdf_in, gdf, "geom/overture", "key", s_enrich_this) diff --git a/openavmkit/utilities/overture.py b/openavmkit/utilities/overture.py index 99f3fb81..ca47fa67 100644 --- a/openavmkit/utilities/overture.py +++ b/openavmkit/utilities/overture.py @@ -350,6 +350,22 @@ def calculate_building_stats( gdf = self.calculate_building_heights(gdf, buildings, height_units, height_field, verbose) return gdf + def _footprint_unit_multiplier(self, unit: str) -> float: + if unit == "sqft": + return 10.764 + if unit == "sqm": + return 1.0 + raise ValueError( + f"Unsupported footprint units: {unit}. Supported units are 'sqft' and 'sqm'." + ) + + def _height_unit_multiplier(self, unit: str) -> float: + if unit == "ft": + return 3.2808399 + if unit == "m": + return 1.0 + raise ValueError(f"Unsupported height units: {unit}. Use 'ft' or 'm'.") + def calculate_building_stats_streaming( self, gdf: gpd.GeoDataFrame, @@ -358,7 +374,6 @@ def calculate_building_stats_streaming( footprint_field: str, height_units: str, height_field: str, - unit: str = "sqft", use_cache: bool = True, verbose: bool = False, ) -> gpd.GeoDataFrame: @@ -394,21 +409,8 @@ def _calculate_building_stats_from_frames( verbose: bool = False, ) -> gpd.GeoDataFrame: """Streaming implementation shared by the dataset path and tests.""" - if footprint_units == "sqft": - footprint_mult = 10.764 - elif footprint_units == "sqm": - footprint_mult = 1.0 - else: - raise ValueError( - f"Unsupported units: {footprint_units}. Supported units are 'sqft' and 'sqm'." - ) - - if height_units == "ft": - height_mult = 3.2808399 - elif height_units == "m": - height_mult = 1.0 - else: - raise ValueError("Unsupported units: {height_units}. Use 'ft' or 'm'.") + footprint_mult = self._footprint_unit_multiplier(footprint_units) + height_mult = self._height_unit_multiplier(height_units) area_cache_path = self._get_cache_path("intersections_area", gdf.total_bounds) height_cache_path = self._get_cache_path("intersections_height", gdf.total_bounds) @@ -425,9 +427,10 @@ def _calculate_building_stats_from_frames( return height_cached gdf_projected = gdf.to_crs(get_crs(gdf, "equal_area")) - footprint_totals = pd.Series(0.0, index=gdf["key"], dtype="float64") - height_max = pd.Series(pd.NA, index=gdf["key"], dtype="Float64") - floors_max = pd.Series(pd.NA, index=gdf["key"], dtype="Float64") + keys = pd.Index(gdf["key"].drop_duplicates()) + footprint_totals = pd.Series(0.0, index=keys, dtype="float64") + height_max = pd.Series(pd.NA, index=keys, dtype="Float64") + floors_max = pd.Series(pd.NA, index=keys, dtype="Float64") buildings_found = 0 for buildings in building_frames: @@ -454,8 +457,11 @@ def calculate_intersection_area(row): return parcel_geom.intersection(building_geom).area * footprint_mult return 0.0 except Exception as e: - if verbose: - print(f"Warning: Error calculating intersection area: {e}") + parcel_key = row.get("key", "") + warnings.warn( + "Error calculating Overture building intersection area " + f"for parcel key={parcel_key!r}, building_idx={row.get('index_right')!r}: {e}" + ) return 0.0 joined_area[footprint_field] = joined_area.apply( diff --git a/tests/test_overture_streaming.py b/tests/test_overture_streaming.py index 48b46a0f..e692006c 100644 --- a/tests/test_overture_streaming.py +++ b/tests/test_overture_streaming.py @@ -188,3 +188,38 @@ def test_streaming_stats_match_all_at_once_when_heights_are_absent(tmp_path): ) _assert_stat_columns_equal(old, streamed) + + +def test_streaming_stats_handles_duplicate_parcel_keys(tmp_path): + service = _svc(tmp_path) + parcels = _parcel_gdf() + parcels = pd.concat([parcels, parcels.iloc[[0]].copy()], ignore_index=True) + batch = service._derive_height_and_floors( + _building_frame( + [ + { + "id": "b1", + "height": 6.0, + "num_floors": 2, + "geometry": Polygon( + [(0.0005, 0.0005), (0.0015, 0.0005), (0.0015, 0.0015), (0.0005, 0.0015)] + ), + } + ] + ) + ) + + streamed = service._calculate_building_stats_from_frames( + parcels.copy(), + [batch], + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + use_cache=False, + ) + + assert len(streamed) == len(parcels) + duplicated = streamed[streamed["key"].eq("p1")] + assert duplicated[FOOTPRINT].notna().all() + assert duplicated[HEIGHT].notna().all() From 17164a39b081313f19d396d3932a54bb2ee28232 Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 22 Jun 2026 17:27:22 -0400 Subject: [PATCH 03/13] Harden streaming Overture aggregation --- openavmkit/data.py | 12 ++--- openavmkit/utilities/overture.py | 74 ++++++++++++------------------- tests/test_overture_enrichment.py | 38 ++++++++++++++++ tests/test_overture_streaming.py | 41 ++++++++++++++++- 4 files changed, 112 insertions(+), 53 deletions(-) create mode 100644 tests/test_overture_enrichment.py diff --git a/openavmkit/data.py b/openavmkit/data.py index 8c662096..ae1be93f 100644 --- a/openavmkit/data.py +++ b/openavmkit/data.py @@ -2907,14 +2907,9 @@ def _enrich_df_overture( if verbose: print("Enriching with Overture building data...") - # Initialize Overture service with the correct settings path overture_settings = { "overture": s_overture # Pass the overture settings directly } - overture_service = init_service_overture(overture_settings) - - # Get bounding box from data - bbox = gdf.to_crs("EPSG:4326").total_bounds # Calculate building footprints s_footprint = s_overture.get("footprint", {}) @@ -2947,7 +2942,10 @@ def _enrich_df_overture( ) height_field = f"bldg_height_{len_unit}" + overture_succeeded = False try: + overture_service = init_service_overture(overture_settings) + bbox = gdf.to_crs("EPSG:4326").total_bounds gdf = overture_service.calculate_building_stats_streaming( gdf, bbox, @@ -2958,6 +2956,7 @@ def _enrich_df_overture( use_cache=s_overture.get("cache", True), verbose=verbose, ) + overture_succeeded = True except Exception as e: if verbose: print(f"--> Failed to calculate Overture building stats: {str(e)}") @@ -2966,7 +2965,8 @@ def _enrich_df_overture( f"Failed to calculate Overture building stats: {str(e)}\n{traceback.format_exc()}" ) - write_cached_df(gdf_in, gdf, "geom/overture", "key", s_enrich_this) + if overture_succeeded: + write_cached_df(gdf_in, gdf, "geom/overture", "key", s_enrich_this) return gdf diff --git a/openavmkit/utilities/overture.py b/openavmkit/utilities/overture.py index ca47fa67..dcde37bc 100644 --- a/openavmkit/utilities/overture.py +++ b/openavmkit/utilities/overture.py @@ -412,9 +412,10 @@ def _calculate_building_stats_from_frames( footprint_mult = self._footprint_unit_multiplier(footprint_units) height_mult = self._height_unit_multiplier(height_units) + duplicate_keys = gdf["key"].duplicated(keep=False).any() area_cache_path = self._get_cache_path("intersections_area", gdf.total_bounds) height_cache_path = self._get_cache_path("intersections_height", gdf.total_bounds) - if use_cache: + if use_cache and not duplicate_keys: area_cached = self._stats_cache_load(area_cache_path, gdf, [footprint_field], verbose) if area_cached is not None: height_cached = self._stats_cache_load( @@ -426,11 +427,13 @@ def _calculate_building_stats_from_frames( if height_cached is not None: return height_cached - gdf_projected = gdf.to_crs(get_crs(gdf, "equal_area")) - keys = pd.Index(gdf["key"].drop_duplicates()) - footprint_totals = pd.Series(0.0, index=keys, dtype="float64") - height_max = pd.Series(pd.NA, index=keys, dtype="Float64") - floors_max = pd.Series(pd.NA, index=keys, dtype="Float64") + row_ids = pd.RangeIndex(len(gdf)) + gdf_rows = gdf.reset_index(drop=True) + gdf_projected = gdf_rows.to_crs(get_crs(gdf, "equal_area")) + gdf_projected["_overture_row_id"] = row_ids + footprint_totals = pd.Series(0.0, index=row_ids, dtype="float64") + height_max = pd.Series(pd.NA, index=row_ids, dtype="Float64") + floors_max = pd.Series(pd.NA, index=row_ids, dtype="Float64") buildings_found = 0 for buildings in building_frames: @@ -442,13 +445,14 @@ def _calculate_building_stats_from_frames( buildings_area = buildings.to_crs(gdf_projected.crs) joined_area = gpd.sjoin( - gdf_projected, buildings_area, how="left", predicate="intersects" + gdf_projected, buildings_area, how="inner", predicate="intersects" ) - if not joined_area.empty and not joined_area["index_right"].isna().all(): + if not joined_area.empty: def calculate_intersection_area(row): try: - parcel_geom = gdf_projected.loc[row.name, "geometry"] + row_id = int(row["_overture_row_id"]) + parcel_geom = gdf_projected.loc[row_id, "geometry"] building_idx = row["index_right"] if pd.isna(building_idx): return 0.0 @@ -467,35 +471,30 @@ def calculate_intersection_area(row): joined_area[footprint_field] = joined_area.apply( calculate_intersection_area, axis=1 ) - area_agg = joined_area.groupby("key")[footprint_field].sum() + area_agg = joined_area.groupby("_overture_row_id")[footprint_field].sum() footprint_totals = footprint_totals.add(area_agg, fill_value=0) - buildings_height = buildings.to_crs(gdf.crs) - joined_height = gpd.sjoin( - gdf, buildings_height, how="left", predicate="intersects" - ) - if not joined_height.empty and not joined_height["index_right"].isna().all(): - joined_height["_height_out"] = ( - pd.to_numeric(joined_height["height_m_best"], errors="coerce") * height_mult + joined_area["_height_out"] = ( + pd.to_numeric(joined_area["height_m_best"], errors="coerce") * height_mult ) - if "floors_best" in joined_height.columns: - joined_height["_floors_out"] = pd.to_numeric( - joined_height["floors_best"], errors="coerce" + if "floors_best" in joined_area.columns: + joined_area["_floors_out"] = pd.to_numeric( + joined_area["floors_best"], errors="coerce" ) else: - joined_height["_floors_out"] = pd.NA - height_agg = joined_height.groupby("key")["_height_out"].max(min_count=1) - floors_agg = joined_height.groupby("key")["_floors_out"].max(min_count=1) + joined_area["_floors_out"] = pd.NA + height_agg = joined_area.groupby("_overture_row_id")["_height_out"].max(min_count=1) + floors_agg = joined_area.groupby("_overture_row_id")["_floors_out"].max(min_count=1) height_max = pd.concat([height_max, height_agg], axis=1).max(axis=1) floors_max = pd.concat([floors_max, floors_agg], axis=1).max(axis=1) - del buildings, buildings_area, joined_area, buildings_height, joined_height + del buildings, buildings_area, joined_area gc.collect() out = gdf.copy() - out[footprint_field] = out["key"].map(footprint_totals).fillna(0) - out[height_field] = out["key"].map(height_max).fillna(0) - out["bldg_stories"] = out["key"].map(floors_max).fillna(0) + out[footprint_field] = footprint_totals.reindex(row_ids).fillna(0).to_numpy() + out[height_field] = height_max.reindex(row_ids).fillna(0).to_numpy() + out["bldg_stories"] = floors_max.reindex(row_ids).fillna(0).to_numpy() if verbose: print(f"--> Streamed {buildings_found} buildings") @@ -503,7 +502,7 @@ def calculate_intersection_area(row): f"--> Number of parcels with buildings: {(out[footprint_field] > 0).sum():,}" ) - if use_cache: + if use_cache and not duplicate_keys: self._stats_cache_save(area_cache_path, out, [footprint_field]) self._stats_cache_save(height_cache_path, out, [height_field, "bldg_stories"]) return out @@ -549,16 +548,7 @@ def calculate_building_footprints( gdf[field_name] = 0 return gdf - # Get appropriate unit conversion - unit_mult = 1.0 - if desired_units == "sqft": - unit_mult = 10.764 # Convert m² to sqft - elif desired_units == "sqm": - unit_mult = 1.0 - else: - raise ValueError( - f"Unsupported units: {desired_units}. Supported units are 'sqft' and 'sqm'." - ) + unit_mult = self._footprint_unit_multiplier(desired_units) t.start("crs") # Footprint stats depend only on the buildings (bbox-derived) and parcel geometry, so @@ -700,13 +690,7 @@ def calculate_building_heights( gdf["bldg_stories"] = 0 return gdf - # Units - if desired_units == "ft": - unit_mult = 3.2808399 - elif desired_units == "m": - unit_mult = 1.0 - else: - raise ValueError("Unsupported units: {desired_units}. Use 'ft' or 'm'.") + unit_mult = self._height_unit_multiplier(desired_units) # Height/stories depend only on the buildings (bbox-derived) and parcel geometry, so the # bounding box is the correct cache key. Cache ONLY the computed per-parcel stats and diff --git a/tests/test_overture_enrichment.py b/tests/test_overture_enrichment.py new file mode 100644 index 00000000..dacc0cff --- /dev/null +++ b/tests/test_overture_enrichment.py @@ -0,0 +1,38 @@ +from unittest.mock import patch + +import geopandas as gpd +import pytest +from shapely.geometry import Polygon + +from openavmkit.data import _enrich_df_overture + + +def test_enrich_df_overture_fails_open_on_service_init_error(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = gpd.GeoDataFrame( + {"key": ["p1"]}, + geometry=[Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)])], + crs="EPSG:4326", + ) + settings = {"locality": {"units": "imperial"}} + enrich_settings = { + "overture": { + "enabled": True, + "cache": True, + "footprint": {"units": "sqft", "field": "footprint_sqft"}, + "height": {"units": "ft", "field": "height_ft"}, + } + } + + with patch("openavmkit.data.get_cached_df", return_value=None), patch( + "openavmkit.data.write_cached_df" + ) as write_cached_df, patch( + "openavmkit.data.init_service_overture", + side_effect=RuntimeError("prefix lookup failed"), + ): + with pytest.warns(UserWarning, match="Failed to calculate Overture building stats"): + out = _enrich_df_overture(parcels, enrich_settings, {}, settings) + + assert list(out.columns) == list(parcels.columns) + assert out.geometry.equals(parcels.geometry) + write_cached_df.assert_not_called() diff --git a/tests/test_overture_streaming.py b/tests/test_overture_streaming.py index e692006c..53e16bc7 100644 --- a/tests/test_overture_streaming.py +++ b/tests/test_overture_streaming.py @@ -1,5 +1,6 @@ import geopandas as gpd import pandas as pd +import pytest from pandas.testing import assert_series_equal from shapely.geometry import Polygon @@ -193,6 +194,35 @@ def test_streaming_stats_match_all_at_once_when_heights_are_absent(tmp_path): def test_streaming_stats_handles_duplicate_parcel_keys(tmp_path): service = _svc(tmp_path) parcels = _parcel_gdf() + baseline = service._calculate_building_stats_from_frames( + parcels.copy(), + [ + service._derive_height_and_floors( + _building_frame( + [ + { + "id": "b1", + "height": 6.0, + "num_floors": 2, + "geometry": Polygon( + [ + (0.0005, 0.0005), + (0.0015, 0.0005), + (0.0015, 0.0015), + (0.0005, 0.0015), + ] + ), + } + ] + ) + ) + ], + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + use_cache=False, + ) parcels = pd.concat([parcels, parcels.iloc[[0]].copy()], ignore_index=True) batch = service._derive_height_and_floors( _building_frame( @@ -221,5 +251,12 @@ def test_streaming_stats_handles_duplicate_parcel_keys(tmp_path): assert len(streamed) == len(parcels) duplicated = streamed[streamed["key"].eq("p1")] - assert duplicated[FOOTPRINT].notna().all() - assert duplicated[HEIGHT].notna().all() + expected = baseline.loc[baseline["key"].eq("p1")].iloc[0] + assert duplicated[FOOTPRINT].tolist() == pytest.approx( + [expected[FOOTPRINT], expected[FOOTPRINT]] + ) + assert duplicated[HEIGHT].tolist() == pytest.approx([expected[HEIGHT], expected[HEIGHT]]) + assert duplicated["bldg_stories"].tolist() == [ + expected["bldg_stories"], + expected["bldg_stories"], + ] From 9deed302c52b14c402a09699d6b0079fd9cfee24 Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 22 Jun 2026 17:32:00 -0400 Subject: [PATCH 04/13] Narrow streaming fixes to duplicate keys --- openavmkit/utilities/overture.py | 48 +++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/openavmkit/utilities/overture.py b/openavmkit/utilities/overture.py index dcde37bc..7e337cf2 100644 --- a/openavmkit/utilities/overture.py +++ b/openavmkit/utilities/overture.py @@ -431,6 +431,8 @@ def _calculate_building_stats_from_frames( gdf_rows = gdf.reset_index(drop=True) gdf_projected = gdf_rows.to_crs(get_crs(gdf, "equal_area")) gdf_projected["_overture_row_id"] = row_ids + gdf_for_height = gdf_rows.copy() + gdf_for_height["_overture_row_id"] = row_ids footprint_totals = pd.Series(0.0, index=row_ids, dtype="float64") height_max = pd.Series(pd.NA, index=row_ids, dtype="Float64") floors_max = pd.Series(pd.NA, index=row_ids, dtype="Float64") @@ -445,10 +447,10 @@ def _calculate_building_stats_from_frames( buildings_area = buildings.to_crs(gdf_projected.crs) joined_area = gpd.sjoin( - gdf_projected, buildings_area, how="inner", predicate="intersects" + gdf_projected, buildings_area, how="left", predicate="intersects" ) - if not joined_area.empty: + if not joined_area.empty and not joined_area["index_right"].isna().all(): def calculate_intersection_area(row): try: row_id = int(row["_overture_row_id"]) @@ -474,21 +476,26 @@ def calculate_intersection_area(row): area_agg = joined_area.groupby("_overture_row_id")[footprint_field].sum() footprint_totals = footprint_totals.add(area_agg, fill_value=0) - joined_area["_height_out"] = ( - pd.to_numeric(joined_area["height_m_best"], errors="coerce") * height_mult + buildings_height = buildings.to_crs(gdf_for_height.crs) + joined_height = gpd.sjoin( + gdf_for_height, buildings_height, how="left", predicate="intersects" + ) + if not joined_height.empty and not joined_height["index_right"].isna().all(): + joined_height["_height_out"] = ( + pd.to_numeric(joined_height["height_m_best"], errors="coerce") * height_mult ) - if "floors_best" in joined_area.columns: - joined_area["_floors_out"] = pd.to_numeric( - joined_area["floors_best"], errors="coerce" + if "floors_best" in joined_height.columns: + joined_height["_floors_out"] = pd.to_numeric( + joined_height["floors_best"], errors="coerce" ) else: - joined_area["_floors_out"] = pd.NA - height_agg = joined_area.groupby("_overture_row_id")["_height_out"].max(min_count=1) - floors_agg = joined_area.groupby("_overture_row_id")["_floors_out"].max(min_count=1) + joined_height["_floors_out"] = pd.NA + height_agg = joined_height.groupby("_overture_row_id")["_height_out"].max(min_count=1) + floors_agg = joined_height.groupby("_overture_row_id")["_floors_out"].max(min_count=1) height_max = pd.concat([height_max, height_agg], axis=1).max(axis=1) floors_max = pd.concat([floors_max, floors_agg], axis=1).max(axis=1) - del buildings, buildings_area, joined_area + del buildings, buildings_area, joined_area, buildings_height, joined_height gc.collect() out = gdf.copy() @@ -548,7 +555,16 @@ def calculate_building_footprints( gdf[field_name] = 0 return gdf - unit_mult = self._footprint_unit_multiplier(desired_units) + # Get appropriate unit conversion + unit_mult = 1.0 + if desired_units == "sqft": + unit_mult = 10.764 # Convert m² to sqft + elif desired_units == "sqm": + unit_mult = 1.0 + else: + raise ValueError( + f"Unsupported units: {desired_units}. Supported units are 'sqft' and 'sqm'." + ) t.start("crs") # Footprint stats depend only on the buildings (bbox-derived) and parcel geometry, so @@ -690,7 +706,13 @@ def calculate_building_heights( gdf["bldg_stories"] = 0 return gdf - unit_mult = self._height_unit_multiplier(desired_units) + # Units + if desired_units == "ft": + unit_mult = 3.2808399 + elif desired_units == "m": + unit_mult = 1.0 + else: + raise ValueError("Unsupported units: {desired_units}. Use 'ft' or 'm'.") # Height/stories depend only on the buildings (bbox-derived) and parcel geometry, so the # bounding box is the correct cache key. Cache ONLY the computed per-parcel stats and From 60a809813cf93a0d9400f78d84cb390915456c43 Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 22 Jun 2026 17:45:16 -0400 Subject: [PATCH 05/13] Harden Overture cache edge cases --- openavmkit/data.py | 14 +++++-- tests/test_overture_enrichment.py | 64 ++++++++++++++++++++++++++++++- tests/test_overture_streaming.py | 55 +++++++++----------------- 3 files changed, 93 insertions(+), 40 deletions(-) diff --git a/openavmkit/data.py b/openavmkit/data.py index ae1be93f..12e8e96d 100644 --- a/openavmkit/data.py +++ b/openavmkit/data.py @@ -2943,6 +2943,7 @@ def _enrich_df_overture( height_field = f"bldg_height_{len_unit}" overture_succeeded = False + bbox = None try: overture_service = init_service_overture(overture_settings) bbox = gdf.to_crs("EPSG:4326").total_bounds @@ -2957,15 +2958,22 @@ def _enrich_df_overture( verbose=verbose, ) overture_succeeded = True + except ValueError: + raise except Exception as e: + message = ( + "Failed to calculate Overture building stats " + f"(bbox={bbox}, footprint_field={footprint_field!r}, " + f"height_field={height_field!r}, cache={s_overture.get('cache', True)}): {str(e)}" + ) if verbose: - print(f"--> Failed to calculate Overture building stats: {str(e)}") + print(f"--> {message}") print(f"--> Traceback: {traceback.format_exc()}") warnings.warn( - f"Failed to calculate Overture building stats: {str(e)}\n{traceback.format_exc()}" + f"{message}\n{traceback.format_exc()}" ) - if overture_succeeded: + if overture_succeeded and not gdf_in["key"].duplicated(keep=False).any(): write_cached_df(gdf_in, gdf, "geom/overture", "key", s_enrich_this) return gdf diff --git a/tests/test_overture_enrichment.py b/tests/test_overture_enrichment.py index dacc0cff..134a6081 100644 --- a/tests/test_overture_enrichment.py +++ b/tests/test_overture_enrichment.py @@ -1,4 +1,4 @@ -from unittest.mock import patch +from unittest.mock import MagicMock, patch import geopandas as gpd import pytest @@ -36,3 +36,65 @@ def test_enrich_df_overture_fails_open_on_service_init_error(tmp_path, monkeypat assert list(out.columns) == list(parcels.columns) assert out.geometry.equals(parcels.geometry) write_cached_df.assert_not_called() + + +def test_enrich_df_overture_skips_outer_cache_for_duplicate_keys(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = gpd.GeoDataFrame( + {"key": ["p1", "p1"]}, + geometry=[ + Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)]), + Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)]), + ], + crs="EPSG:4326", + ) + settings = {"locality": {"units": "imperial"}} + enrich_settings = { + "overture": { + "enabled": True, + "cache": True, + "footprint": {"units": "sqft", "field": "footprint_sqft"}, + "height": {"units": "ft", "field": "height_ft"}, + } + } + service = MagicMock() + service.calculate_building_stats_streaming.return_value = parcels.assign( + footprint_sqft=[100.0, 100.0], + height_ft=[10.0, 10.0], + ) + + with patch("openavmkit.data.get_cached_df", return_value=None), patch( + "openavmkit.data.write_cached_df" + ) as write_cached_df, patch("openavmkit.data.init_service_overture", return_value=service): + out = _enrich_df_overture(parcels, enrich_settings, {}, settings) + + assert out["footprint_sqft"].tolist() == [100.0, 100.0] + write_cached_df.assert_not_called() + + +def test_enrich_df_overture_invalid_units_fail_fast(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = gpd.GeoDataFrame( + {"key": ["p1"]}, + geometry=[Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)])], + crs="EPSG:4326", + ) + settings = {"locality": {"units": "imperial"}} + enrich_settings = { + "overture": { + "enabled": True, + "cache": True, + "footprint": {"units": "acres", "field": "footprint_acres"}, + "height": {"units": "ft", "field": "height_ft"}, + } + } + service = MagicMock() + service.calculate_building_stats_streaming.side_effect = ValueError("Unsupported footprint units") + + with patch("openavmkit.data.get_cached_df", return_value=None), patch( + "openavmkit.data.write_cached_df" + ) as write_cached_df, patch("openavmkit.data.init_service_overture", return_value=service): + with pytest.raises(ValueError, match="Unsupported footprint units"): + _enrich_df_overture(parcels, enrich_settings, {}, settings) + + write_cached_df.assert_not_called() diff --git a/tests/test_overture_streaming.py b/tests/test_overture_streaming.py index 53e16bc7..cf10f74f 100644 --- a/tests/test_overture_streaming.py +++ b/tests/test_overture_streaming.py @@ -44,6 +44,23 @@ def _building_frame(records): ) +def _single_building_batch(service): + return service._derive_height_and_floors( + _building_frame( + [ + { + "id": "b1", + "height": 6.0, + "num_floors": 2, + "geometry": Polygon( + [(0.0005, 0.0005), (0.0015, 0.0005), (0.0015, 0.0015), (0.0005, 0.0015)] + ), + } + ] + ) + ) + + def _assert_stat_columns_equal(old, streamed): assert list(streamed["address"]) == ["1 Main", "2 Main", "3 Main"] for column in (FOOTPRINT, HEIGHT, "bldg_stories"): @@ -196,27 +213,7 @@ def test_streaming_stats_handles_duplicate_parcel_keys(tmp_path): parcels = _parcel_gdf() baseline = service._calculate_building_stats_from_frames( parcels.copy(), - [ - service._derive_height_and_floors( - _building_frame( - [ - { - "id": "b1", - "height": 6.0, - "num_floors": 2, - "geometry": Polygon( - [ - (0.0005, 0.0005), - (0.0015, 0.0005), - (0.0015, 0.0015), - (0.0005, 0.0015), - ] - ), - } - ] - ) - ) - ], + [_single_building_batch(service)], "sqft", FOOTPRINT, "ft", @@ -224,20 +221,7 @@ def test_streaming_stats_handles_duplicate_parcel_keys(tmp_path): use_cache=False, ) parcels = pd.concat([parcels, parcels.iloc[[0]].copy()], ignore_index=True) - batch = service._derive_height_and_floors( - _building_frame( - [ - { - "id": "b1", - "height": 6.0, - "num_floors": 2, - "geometry": Polygon( - [(0.0005, 0.0005), (0.0015, 0.0005), (0.0015, 0.0015), (0.0005, 0.0015)] - ), - } - ] - ) - ) + batch = _single_building_batch(service) streamed = service._calculate_building_stats_from_frames( parcels.copy(), @@ -246,7 +230,6 @@ def test_streaming_stats_handles_duplicate_parcel_keys(tmp_path): FOOTPRINT, "ft", HEIGHT, - use_cache=False, ) assert len(streamed) == len(parcels) From 79439ce51a19f872abe5b6bf7aed782243071c9c Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 22 Jun 2026 17:54:06 -0400 Subject: [PATCH 06/13] Bypass Overture cache for duplicate keys --- openavmkit/data.py | 14 ++++--- tests/test_overture_enrichment.py | 3 +- tests/test_overture_streaming.py | 69 ++++++++++++++----------------- 3 files changed, 42 insertions(+), 44 deletions(-) diff --git a/openavmkit/data.py b/openavmkit/data.py index 12e8e96d..20448411 100644 --- a/openavmkit/data.py +++ b/openavmkit/data.py @@ -2892,11 +2892,13 @@ def _enrich_df_overture( unit = area_unit(settings) - gdf_out = get_cached_df(gdf_in, "geom/overture", "key", s_enrich_this) - if gdf_out is not None: - if verbose: - print("--> found cached data...") - return gdf_out + duplicate_keys = gdf_in["key"].duplicated(keep=False).any() + if not duplicate_keys: + gdf_out = get_cached_df(gdf_in, "geom/overture", "key", s_enrich_this) + if gdf_out is not None: + if verbose: + print("--> found cached data...") + return gdf_out gdf = gdf_in.copy() @@ -2973,7 +2975,7 @@ def _enrich_df_overture( f"{message}\n{traceback.format_exc()}" ) - if overture_succeeded and not gdf_in["key"].duplicated(keep=False).any(): + if overture_succeeded and not duplicate_keys: write_cached_df(gdf_in, gdf, "geom/overture", "key", s_enrich_this) return gdf diff --git a/tests/test_overture_enrichment.py b/tests/test_overture_enrichment.py index 134a6081..908dacae 100644 --- a/tests/test_overture_enrichment.py +++ b/tests/test_overture_enrichment.py @@ -63,12 +63,13 @@ def test_enrich_df_overture_skips_outer_cache_for_duplicate_keys(tmp_path, monke height_ft=[10.0, 10.0], ) - with patch("openavmkit.data.get_cached_df", return_value=None), patch( + with patch("openavmkit.data.get_cached_df") as get_cached_df, patch( "openavmkit.data.write_cached_df" ) as write_cached_df, patch("openavmkit.data.init_service_overture", return_value=service): out = _enrich_df_overture(parcels, enrich_settings, {}, settings) assert out["footprint_sqft"].tolist() == [100.0, 100.0] + get_cached_df.assert_not_called() write_cached_df.assert_not_called() diff --git a/tests/test_overture_streaming.py b/tests/test_overture_streaming.py index cf10f74f..f8b39ac8 100644 --- a/tests/test_overture_streaming.py +++ b/tests/test_overture_streaming.py @@ -1,3 +1,5 @@ +from unittest.mock import patch + import geopandas as gpd import pandas as pd import pytest @@ -44,21 +46,22 @@ def _building_frame(records): ) +def _building_one_record(extra=None): + record = { + "id": "b1", + "height": 6.0, + "num_floors": 2, + "geometry": Polygon( + [(0.0005, 0.0005), (0.0015, 0.0005), (0.0015, 0.0015), (0.0005, 0.0015)] + ), + } + if extra: + record.update(extra) + return record + + def _single_building_batch(service): - return service._derive_height_and_floors( - _building_frame( - [ - { - "id": "b1", - "height": 6.0, - "num_floors": 2, - "geometry": Polygon( - [(0.0005, 0.0005), (0.0015, 0.0005), (0.0015, 0.0015), (0.0005, 0.0015)] - ), - } - ] - ) - ) + return service._derive_height_and_floors(_building_frame([_building_one_record()])) def _assert_stat_columns_equal(old, streamed): @@ -80,15 +83,7 @@ def test_streaming_stats_match_all_at_once_for_split_batches(tmp_path): parcels = _parcel_gdf() batch_1 = _building_frame( [ - { - "id": "b1", - "height": 6.0, - "est_height": None, - "num_floors": 2, - "geometry": Polygon( - [(0.0005, 0.0005), (0.0015, 0.0005), (0.0015, 0.0015), (0.0005, 0.0015)] - ), - }, + _building_one_record({"est_height": None}), { "id": "b2", "height": None, @@ -171,12 +166,7 @@ def test_streaming_stats_match_all_at_once_when_heights_are_absent(tmp_path): parcels = _parcel_gdf() raw_buildings = _building_frame( [ - { - "id": "b1", - "geometry": Polygon( - [(0.0005, 0.0005), (0.0015, 0.0005), (0.0015, 0.0015), (0.0005, 0.0015)] - ), - }, + _building_one_record({"height": None, "num_floors": None}), { "id": "b2", "geometry": Polygon( @@ -223,16 +213,21 @@ def test_streaming_stats_handles_duplicate_parcel_keys(tmp_path): parcels = pd.concat([parcels, parcels.iloc[[0]].copy()], ignore_index=True) batch = _single_building_batch(service) - streamed = service._calculate_building_stats_from_frames( - parcels.copy(), - [batch], - "sqft", - FOOTPRINT, - "ft", - HEIGHT, - ) + with patch.object(service, "_stats_cache_load") as stats_cache_load, patch.object( + service, "_stats_cache_save" + ) as stats_cache_save: + streamed = service._calculate_building_stats_from_frames( + parcels.copy(), + [batch], + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + ) assert len(streamed) == len(parcels) + stats_cache_load.assert_not_called() + stats_cache_save.assert_not_called() duplicated = streamed[streamed["key"].eq("p1")] expected = baseline.loc[baseline["key"].eq("p1")].iloc[0] assert duplicated[FOOTPRINT].tolist() == pytest.approx( From 8f7a5b397c6c14d65731b6f1a4fcf3644bb695c1 Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 22 Jun 2026 18:03:27 -0400 Subject: [PATCH 07/13] Tighten Overture enrichment tests --- openavmkit/data.py | 3 +- tests/test_overture_enrichment.py | 74 +++++++++++++------------------ 2 files changed, 33 insertions(+), 44 deletions(-) diff --git a/openavmkit/data.py b/openavmkit/data.py index 20448411..78489a1a 100644 --- a/openavmkit/data.py +++ b/openavmkit/data.py @@ -2945,10 +2945,9 @@ def _enrich_df_overture( height_field = f"bldg_height_{len_unit}" overture_succeeded = False - bbox = None + bbox = gdf.to_crs("EPSG:4326").total_bounds try: overture_service = init_service_overture(overture_settings) - bbox = gdf.to_crs("EPSG:4326").total_bounds gdf = overture_service.calculate_building_stats_streaming( gdf, bbox, diff --git a/tests/test_overture_enrichment.py b/tests/test_overture_enrichment.py index 908dacae..abcb5303 100644 --- a/tests/test_overture_enrichment.py +++ b/tests/test_overture_enrichment.py @@ -7,23 +7,37 @@ from openavmkit.data import _enrich_df_overture -def test_enrich_df_overture_fails_open_on_service_init_error(tmp_path, monkeypatch): - monkeypatch.chdir(tmp_path) - parcels = gpd.GeoDataFrame( - {"key": ["p1"]}, - geometry=[Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)])], +def _parcel_gdf(keys=None): + keys = ["p1"] if keys is None else keys + return gpd.GeoDataFrame( + {"key": keys}, + geometry=[ + Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)]) + for _ in keys + ], crs="EPSG:4326", ) - settings = {"locality": {"units": "imperial"}} - enrich_settings = { + + +def _settings(): + return {"locality": {"units": "imperial"}} + + +def _enrich_settings(footprint_units="sqft", footprint_field="footprint_sqft"): + return { "overture": { "enabled": True, "cache": True, - "footprint": {"units": "sqft", "field": "footprint_sqft"}, + "footprint": {"units": footprint_units, "field": footprint_field}, "height": {"units": "ft", "field": "height_ft"}, } } + +def test_enrich_df_overture_fails_open_on_service_init_error(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = _parcel_gdf() + with patch("openavmkit.data.get_cached_df", return_value=None), patch( "openavmkit.data.write_cached_df" ) as write_cached_df, patch( @@ -31,7 +45,7 @@ def test_enrich_df_overture_fails_open_on_service_init_error(tmp_path, monkeypat side_effect=RuntimeError("prefix lookup failed"), ): with pytest.warns(UserWarning, match="Failed to calculate Overture building stats"): - out = _enrich_df_overture(parcels, enrich_settings, {}, settings) + out = _enrich_df_overture(parcels, _enrich_settings(), {}, _settings()) assert list(out.columns) == list(parcels.columns) assert out.geometry.equals(parcels.geometry) @@ -40,23 +54,7 @@ def test_enrich_df_overture_fails_open_on_service_init_error(tmp_path, monkeypat def test_enrich_df_overture_skips_outer_cache_for_duplicate_keys(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) - parcels = gpd.GeoDataFrame( - {"key": ["p1", "p1"]}, - geometry=[ - Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)]), - Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)]), - ], - crs="EPSG:4326", - ) - settings = {"locality": {"units": "imperial"}} - enrich_settings = { - "overture": { - "enabled": True, - "cache": True, - "footprint": {"units": "sqft", "field": "footprint_sqft"}, - "height": {"units": "ft", "field": "height_ft"}, - } - } + parcels = _parcel_gdf(["p1", "p1"]) service = MagicMock() service.calculate_building_stats_streaming.return_value = parcels.assign( footprint_sqft=[100.0, 100.0], @@ -66,7 +64,7 @@ def test_enrich_df_overture_skips_outer_cache_for_duplicate_keys(tmp_path, monke with patch("openavmkit.data.get_cached_df") as get_cached_df, patch( "openavmkit.data.write_cached_df" ) as write_cached_df, patch("openavmkit.data.init_service_overture", return_value=service): - out = _enrich_df_overture(parcels, enrich_settings, {}, settings) + out = _enrich_df_overture(parcels, _enrich_settings(), {}, _settings()) assert out["footprint_sqft"].tolist() == [100.0, 100.0] get_cached_df.assert_not_called() @@ -75,20 +73,7 @@ def test_enrich_df_overture_skips_outer_cache_for_duplicate_keys(tmp_path, monke def test_enrich_df_overture_invalid_units_fail_fast(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) - parcels = gpd.GeoDataFrame( - {"key": ["p1"]}, - geometry=[Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)])], - crs="EPSG:4326", - ) - settings = {"locality": {"units": "imperial"}} - enrich_settings = { - "overture": { - "enabled": True, - "cache": True, - "footprint": {"units": "acres", "field": "footprint_acres"}, - "height": {"units": "ft", "field": "height_ft"}, - } - } + parcels = _parcel_gdf() service = MagicMock() service.calculate_building_stats_streaming.side_effect = ValueError("Unsupported footprint units") @@ -96,6 +81,11 @@ def test_enrich_df_overture_invalid_units_fail_fast(tmp_path, monkeypatch): "openavmkit.data.write_cached_df" ) as write_cached_df, patch("openavmkit.data.init_service_overture", return_value=service): with pytest.raises(ValueError, match="Unsupported footprint units"): - _enrich_df_overture(parcels, enrich_settings, {}, settings) + _enrich_df_overture( + parcels, + _enrich_settings("acres", "footprint_acres"), + {}, + _settings(), + ) write_cached_df.assert_not_called() From 09b81d1bb99c903d8d0e51e6045af0656a71a96a Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 22 Jun 2026 18:18:04 -0400 Subject: [PATCH 08/13] Extract Overture streaming test helper --- tests/test_overture_streaming.py | 82 ++++++++++++-------------------- 1 file changed, 31 insertions(+), 51 deletions(-) diff --git a/tests/test_overture_streaming.py b/tests/test_overture_streaming.py index f8b39ac8..5ff9b71b 100644 --- a/tests/test_overture_streaming.py +++ b/tests/test_overture_streaming.py @@ -78,6 +78,30 @@ def _assert_stat_columns_equal(old, streamed): ) +def _assert_streaming_stats_match_legacy( + service, parcels, legacy_buildings, streaming_frames +): + old = service.calculate_building_stats( + parcels.copy(), + legacy_buildings, + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + ) + streamed = service._calculate_building_stats_from_frames( + parcels.copy(), + streaming_frames, + "sqft", + FOOTPRINT, + "ft", + HEIGHT, + use_cache=False, + ) + + _assert_stat_columns_equal(old, streamed) + + def test_streaming_stats_match_all_at_once_for_split_batches(tmp_path): service = _svc(tmp_path) parcels = _parcel_gdf() @@ -114,51 +138,20 @@ def test_streaming_stats_match_all_at_once_for_split_batches(tmp_path): ] all_buildings = pd.concat(batches, ignore_index=True) - old = service.calculate_building_stats( - parcels.copy(), + _assert_streaming_stats_match_legacy( + service, + parcels, all_buildings, - "sqft", - FOOTPRINT, - "ft", - HEIGHT, - ) - streamed = service._calculate_building_stats_from_frames( - parcels.copy(), [batch.copy() for batch in batches], - "sqft", - FOOTPRINT, - "ft", - HEIGHT, - use_cache=False, ) - _assert_stat_columns_equal(old, streamed) - def test_streaming_stats_match_all_at_once_for_empty_buildings(tmp_path): service = _svc(tmp_path) parcels = _parcel_gdf() empty_buildings = gpd.GeoDataFrame({"id": []}, geometry=[], crs="EPSG:4326") - old = service.calculate_building_stats( - parcels.copy(), - empty_buildings, - "sqft", - FOOTPRINT, - "ft", - HEIGHT, - ) - streamed = service._calculate_building_stats_from_frames( - parcels.copy(), - [], - "sqft", - FOOTPRINT, - "ft", - HEIGHT, - use_cache=False, - ) - - _assert_stat_columns_equal(old, streamed) + _assert_streaming_stats_match_legacy(service, parcels, empty_buildings, []) def test_streaming_stats_match_all_at_once_when_heights_are_absent(tmp_path): @@ -177,26 +170,13 @@ def test_streaming_stats_match_all_at_once_when_heights_are_absent(tmp_path): ) derived_buildings = service._derive_height_and_floors(raw_buildings.copy()) - old = service.calculate_building_stats( - parcels.copy(), + _assert_streaming_stats_match_legacy( + service, + parcels, derived_buildings, - "sqft", - FOOTPRINT, - "ft", - HEIGHT, - ) - streamed = service._calculate_building_stats_from_frames( - parcels.copy(), [raw_buildings], - "sqft", - FOOTPRINT, - "ft", - HEIGHT, - use_cache=False, ) - _assert_stat_columns_equal(old, streamed) - def test_streaming_stats_handles_duplicate_parcel_keys(tmp_path): service = _svc(tmp_path) From 62a6e7f84895688a213eedc222dc738f9f81d101 Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Thu, 11 Jun 2026 11:01:01 -0400 Subject: [PATCH 09/13] Push Overture buildings bbox predicate down to Parquet row groups (DuckDB) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The streaming fetch still OOM-killed the 4 GiB AVM worker on wide bounding boxes: _building_batches read the global Overture buildings theme through PyArrow's dataset scanner, which does not prune row groups on the nested `bbox` struct subfields. Streaming the *output* did not bound memory because the *scan* still materializes a huge fraction of the global theme regardless of bbox — a ~1 km box pulled ~3 GB RSS over ~84 s, and a single 0.05deg tile alone exceeded 4 GiB, so subprocess tiling could never bound it either. Fetch via DuckDB read_parquet instead, with the bbox-overlap predicate pushed to Parquet row-group statistics. The matched rows are streamed back with fetch_df_chunk (so the full building set is still never materialized at once, preserving the streaming contract the per-parcel stats aggregation relies on). The same ~1 km fetch returns the identical rows at ~0.4 GB; full get_buildings peak ~0.57 GB. Building outputs (geometry WKB, height_m_best, floors_best, footprint area) are bit-for-bit identical to the prior streaming path, verified row-for-row on a built-up multi-parcel bbox, so there is no model-output change and no version bump is required. Pandas (not Arrow) is used because the Overture `geometry` column is a GeoArrow GEOMETRY type whose DuckDB Arrow export resolves its CRS via the `spatial` extension catalog (not loaded) and raises an internal error; the pandas path returns the raw stored WKB verbatim. httpfs is loaded explicitly (INSTALL is an idempotent no-op when the extension is already present, e.g. baked into the image). Adds duckdb to requirements; repoints the _building_batches split test at the new DuckDB stream seam; adds a network-free unit test covering the pushdown SQL/params, the unavailable-column drop, and the GeoDataFrame construction. ENG-3033 --- openavmkit/utilities/overture.py | 129 ++++++++++++++++++------------- requirements.txt | 3 +- tests/test_overture_fetch.py | 102 ++++++++++++++++++++++++ tests/test_overture_streaming.py | 25 ++++++ 4 files changed, 205 insertions(+), 54 deletions(-) create mode 100644 tests/test_overture_fetch.py diff --git a/openavmkit/utilities/overture.py b/openavmkit/utilities/overture.py index 7e337cf2..3dfe80a4 100644 --- a/openavmkit/utilities/overture.py +++ b/openavmkit/utilities/overture.py @@ -10,14 +10,12 @@ import os import warnings import gc +import duckdb import geopandas as gpd import pandas as pd import traceback import pyarrow as pa -import pyarrow.compute as pc -import pyarrow.dataset as ds import pyarrow.fs as fs -from tqdm import tqdm import shapely.wkb from openavmkit.utilities.geometry import get_crs @@ -41,6 +39,7 @@ class OvertureService: "height", "est_height", "num_floors", "num_floors_underground", "subtype", "class", "sources" # sources = per-property confidence ] + DEFAULT_STREAM_BATCH_ROWS = 500 def __init__(self, settings: dict): """Initialize the Overture service with settings. @@ -100,10 +99,65 @@ def _stats_cache_load(self, cache_path: str, gdf: gpd.GeoDataFrame, stat_cols: l out[c] = out[c].fillna(0) return out - def _get_dataset(self): - """Get the PyArrow dataset for buildings.""" - path = f"{self.bucket}/{self.prefix}" - return ds.dataset(path, filesystem=self.fs, format="parquet") + def _stream_building_dfs(self, bbox, proj_cols, verbose=False): + """Yield matching Overture building rows as pandas DataFrame chunks (DuckDB). + + The bbox-overlap predicate is pushed down to the Parquet row-group statistics + on the ``bbox`` struct subfields, so only the row groups that can contain + matching buildings are decoded. This bounds peak memory to the matched rows + instead of scanning a huge fraction of the GLOBAL buildings theme like the + PyArrow dataset scanner does — PyArrow does not prune row groups on nested + struct subfields, so a ~1 km box pulled ~3 GB RSS over ~84 s and OOM-killed + the 4 GiB worker (ENG-3033); the same fetch under DuckDB is ~0.4 GB. + + ``fetch_df_chunk`` streams the (already-bounded) result so the full building + set is never materialized at once, preserving the streaming contract that the + per-parcel stats aggregation relies on. Pandas (not Arrow) is used because the + ``geometry`` column is a GeoArrow GEOMETRY type whose DuckDB Arrow export + resolves its CRS via the ``spatial`` extension catalog (not loaded) and raises + an internal error; the pandas path returns the raw stored WKB bytes verbatim + (bit-identical to the prior PyArrow read). + + Columns in ``proj_cols`` absent from the current release are dropped, mirroring + the prior PyArrow projection behavior. + """ + xmin, ymin, xmax, ymax = bbox + path = f"s3://{self.bucket}/{self.prefix}*" + con = duckdb.connect() + try: + con.execute("INSTALL httpfs; LOAD httpfs;") + con.execute("SET s3_region = 'us-west-2';") + # Public, anonymously-readable bucket: configure an unsigned S3 secret so + # DuckDB does not try (and fail) to sign requests with ambient AWS creds. + con.execute( + "CREATE OR REPLACE SECRET overture_anon " + "(TYPE s3, PROVIDER config, REGION 'us-west-2');" + ) + described = con.execute( + f"DESCRIBE SELECT * FROM read_parquet('{path}') LIMIT 0" + ).fetchall() + available = {row[0] for row in described} + cols = [c for c in proj_cols if c in available] + missing = [c for c in proj_cols if c not in available] + if verbose and missing: + print(f"--> Skipping unavailable columns: {missing}") + if verbose: + print(f"--> Fetching Overture buildings via DuckDB for bbox {bbox}") + col_sql = ", ".join(f'"{c}"' for c in cols) + # Bounding-box overlap test, identical to the prior PyArrow predicate. + sql = ( + f"SELECT {col_sql} FROM read_parquet('{path}') " + "WHERE bbox.xmin < ? AND bbox.xmax > ? " + "AND bbox.ymin < ? AND bbox.ymax > ?" + ) + res = con.execute(sql, [xmax, xmin, ymax, ymin]) + while True: + chunk = res.fetch_df_chunk() + if chunk.empty: + break + yield chunk + finally: + con.close() def _geoarrow_schema_adapter(self, schema: pa.Schema) -> pa.Schema: """Convert a geoarrow-compatible schema to a proper geoarrow schema.""" @@ -114,13 +168,10 @@ def _geoarrow_schema_adapter(self, schema: pa.Schema) -> pa.Schema: ) return schema.set(geometry_field_index, geoarrow_geometry_field) - def _batch_to_geodataframe(self, batch: pa.RecordBatch) -> gpd.GeoDataFrame: - """Convert a PyArrow batch to a GeoDataFrame with proper geometry handling.""" - # Convert to pandas DataFrame first - df = batch.to_pandas() - + def _to_geodataframe(self, df: pd.DataFrame) -> gpd.GeoDataFrame: + """Convert a fetched buildings DataFrame to a GeoDataFrame (WKB -> shapely).""" # Convert WKB geometry to shapely geometry - df["geometry"] = df["geometry"].apply(lambda wkb: shapely.wkb.loads(wkb) if pd.notnull(wkb) else None) + df["geometry"] = df["geometry"].apply(lambda wkb: shapely.wkb.loads(bytes(wkb)) if wkb is not None and pd.notnull(wkb) else None) # Create GeoDataFrame with WGS84 CRS (EPSG:4326) return gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326") @@ -132,50 +183,22 @@ def _building_batches(self, bbox, columns: list[str] | None = None, verbose: boo parcel stats can process one Arrow batch at a time instead of materializing the full building set for dense urban bounding boxes. """ - xmin, ymin, xmax, ymax = bbox - filter = ( - (pc.field("bbox", "xmin") < xmax) - & (pc.field("bbox", "xmax") > xmin) - & (pc.field("bbox", "ymin") < ymax) - & (pc.field("bbox", "ymax") > ymin) - ) - - proj_cols = columns if columns is not None else self.DEFAULT_COLUMNS.copy() + proj_cols = list(columns) if columns is not None else self.DEFAULT_COLUMNS.copy() for req in ("geometry", "bbox"): if req not in proj_cols: proj_cols.append(req) - dataset = self._get_dataset() - if verbose: - print("--> Dataset columns:", dataset.schema.names) - available = set(dataset.schema.names) - missing = [c for c in proj_cols if c not in available] - proj_cols = [c for c in proj_cols if c in available] - if verbose and missing: - print(f"--> Skipping unavailable columns: {missing}") - - batches = dataset.to_batches(filter=filter, columns=proj_cols) - total_batches = None - if verbose: - print("--> Counting batches...") - total_batches = sum(1 for _ in batches) - print(f"--> Found {total_batches} batches") - batches = dataset.to_batches(filter=filter, columns=proj_cols) - - with tqdm( - total=total_batches, - desc="Processing batches", - disable=not verbose, - ) as pbar: - for batch in batches: - try: - if batch.num_rows > 0: - yield self._batch_to_geodataframe(batch) - except Exception as e: - if verbose: - print(f"--> Error processing batch: {str(e)}") - finally: - pbar.update(1) + stream_batch_rows = int( + self.settings.get("stream_batch_rows", self.DEFAULT_STREAM_BATCH_ROWS) + or self.DEFAULT_STREAM_BATCH_ROWS + ) + stream_batch_rows = max(1, stream_batch_rows) + + for chunk in self._stream_building_dfs(bbox, proj_cols, verbose=verbose): + for offset in range(0, len(chunk), stream_batch_rows): + sub = chunk.iloc[offset:offset + stream_batch_rows] + if len(sub) > 0: + yield self._to_geodataframe(sub.copy()) def get_buildings( self, diff --git a/requirements.txt b/requirements.txt index 4d9f43f6..fb8bb994 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ pdfkit==1.0.0 pipreqs==0.5.0 polars==1.38.1 pyarrow==22.0.0 +duckdb~=1.5.3 pytest==9.0.2 python-dotenv==1.2.1 rich==15.0.0 @@ -43,4 +44,4 @@ scipy>=1.11.4,<1.17 scikit-learn~=1.8.0 tqdm~=4.67.3 rasterio~=1.4.0 -seamless-3dep>=0.4.1,<0.6 \ No newline at end of file +seamless-3dep>=0.4.1,<0.6 diff --git a/tests/test_overture_fetch.py b/tests/test_overture_fetch.py new file mode 100644 index 00000000..e403fd78 --- /dev/null +++ b/tests/test_overture_fetch.py @@ -0,0 +1,102 @@ +"""Tests for the DuckDB-backed Overture buildings fetch (ENG-3033). + +`_stream_building_dfs` reads Overture's global buildings theme with the bbox-overlap +predicate pushed down to Parquet row-group statistics, so peak memory is bounded to +the matching rows instead of scanning the whole theme. These tests mock DuckDB so they +are fully network-free; the real S3 fetch and its bit-for-bit equivalence to the prior +PyArrow path are exercised by the avm-python-service scorecard. +""" +import geopandas as gpd +import pandas as pd +import shapely.geometry as sgeom +import shapely.wkb as swkb +from unittest.mock import MagicMock, patch + +from openavmkit.utilities.overture import OvertureService + +# A small built-up parcel near Santa Cruz, CA (real lon/lat so UTM estimation works). +_POLY = sgeom.box(-122.0263, 36.9741, -122.0260, 36.9744) +_BBOX = (-122.0283, 36.9721, -122.0243, 36.9761) + + +def _make_service(): + with patch.object( + OvertureService, "_resolve_latest_buildings_prefix", + return_value="release/2099-01-01.0/theme=buildings/type=building/", + ), patch("openavmkit.utilities.overture.fs.S3FileSystem", MagicMock()): + return OvertureService({"overture": {"enabled": True}}) + + +def _result_df(): + return pd.DataFrame({ + "id": ["bldg-1"], + "geometry": [swkb.dumps(_POLY)], + "bbox": [{"xmin": -122.0263, "xmax": -122.0260, + "ymin": 36.9741, "ymax": 36.9744}], + "height": [6.5], + "num_floors": [2], + "sources": [[{"property": "/properties/height", "confidence": 0.9}]], + }) + + +def _fake_connection(describe_cols, result_df, captured): + con = MagicMock() + + def _execute(sql, params=None): + captured.append((sql, params)) + res = MagicMock() + if sql.strip().upper().startswith("DESCRIBE"): + res.fetchall.return_value = [(c,) for c in describe_cols] + # fetch_df_chunk streams: yield the frame once, then an empty frame. + res.fetch_df_chunk.side_effect = [result_df, result_df.iloc[0:0]] + return res + + con.execute.side_effect = _execute + return con + + +def test_stream_pushes_bbox_predicate_and_drops_unavailable_columns(): + svc = _make_service() + captured = [] + # est_height / num_floors_underground / subtype / class are absent from this + # release and must be dropped from the projection (mirrors prior PyArrow behavior). + describe_cols = ["id", "geometry", "bbox", "height", "num_floors", "sources"] + con = _fake_connection(describe_cols, _result_df(), captured) + with patch("openavmkit.utilities.overture.duckdb.connect", return_value=con): + chunks = list(svc._stream_building_dfs( + _BBOX, OvertureService.DEFAULT_COLUMNS.copy())) + assert len(chunks) == 1 and len(chunks[0]) == 1 + select_calls = [c for c in captured if c[0].strip().upper().startswith("SELECT")] + assert len(select_calls) == 1 + sql, params = select_calls[0] + assert "bbox.xmin < ?" in sql and "bbox.xmax > ?" in sql + assert "bbox.ymin < ?" in sql and "bbox.ymax > ?" in sql + xmin, ymin, xmax, ymax = _BBOX + assert params == [xmax, xmin, ymax, ymin] + assert '"est_height"' not in sql and '"class"' not in sql + assert '"id"' in sql and '"geometry"' in sql + + +def test_get_buildings_builds_geodataframe_with_footprint(): + svc = _make_service() + describe_cols = ["id", "geometry", "bbox", "height", "num_floors", "sources"] + con = _fake_connection(describe_cols, _result_df(), []) + with patch("openavmkit.utilities.overture.duckdb.connect", return_value=con): + gdf = svc.get_buildings(_BBOX, use_cache=False) + assert isinstance(gdf, gpd.GeoDataFrame) + assert len(gdf) == 1 + assert gdf.geometry.iloc[0].equals(_POLY) + assert gdf["bldg_area_footprint_sqft"].iloc[0] > 0 + assert gdf["height_m_best"].iloc[0] == 6.5 + assert gdf["floors_best"].iloc[0] == 2 + + +def test_building_batches_yields_geodataframes_from_stream(): + svc = _make_service() + describe_cols = ["id", "geometry", "bbox", "height", "num_floors", "sources"] + con = _fake_connection(describe_cols, _result_df(), []) + with patch("openavmkit.utilities.overture.duckdb.connect", return_value=con): + frames = list(svc._building_batches(_BBOX)) + assert len(frames) == 1 + assert isinstance(frames[0], gpd.GeoDataFrame) + assert frames[0].geometry.iloc[0].equals(_POLY) diff --git a/tests/test_overture_streaming.py b/tests/test_overture_streaming.py index 5ff9b71b..4cf8f6cc 100644 --- a/tests/test_overture_streaming.py +++ b/tests/test_overture_streaming.py @@ -16,6 +16,7 @@ def _svc(tmp_path): service = OvertureService.__new__(OvertureService) service.cache_dir = str(tmp_path) + service.settings = {} return service @@ -102,6 +103,30 @@ def _assert_streaming_stats_match_legacy( _assert_stat_columns_equal(old, streamed) +def test_building_batches_splits_large_arrow_batches(tmp_path, monkeypatch): + service = _svc(tmp_path) + service.settings = {"stream_batch_rows": 2} + geom = Polygon([(0, 0), (0.001, 0), (0.001, 0.001), (0, 0.001)]) + fetched = pd.DataFrame( + { + "id": [f"b{i}" for i in range(5)], + "geometry": [geom.wkb for _ in range(5)], + } + ) + + # The DuckDB fetch streams pandas chunks; _building_batches must re-slice them + # into stream_batch_rows-sized GeoDataFrames for the per-parcel stats aggregation. + def fake_stream(bbox, proj_cols, verbose=False): + yield fetched + + monkeypatch.setattr(service, "_stream_building_dfs", fake_stream) + + chunks = list(service._building_batches((0, 0, 1, 1))) + + assert [len(chunk) for chunk in chunks] == [2, 2, 1] + assert all(chunk.crs == "EPSG:4326" for chunk in chunks) + + def test_streaming_stats_match_all_at_once_for_split_batches(tmp_path): service = _svc(tmp_path) parcels = _parcel_gdf() From caa297e458fa59667c67f19e955fb90a6006adfe Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 22 Jun 2026 17:08:24 -0400 Subject: [PATCH 10/13] Harden DuckDB Overture fetch path --- openavmkit/utilities/overture.py | 60 +++++++++++++--------- tests/test_overture_fetch.py | 88 +++++++++++++++++++++++++++++--- 2 files changed, 115 insertions(+), 33 deletions(-) diff --git a/openavmkit/utilities/overture.py b/openavmkit/utilities/overture.py index 3dfe80a4..3c8380fe 100644 --- a/openavmkit/utilities/overture.py +++ b/openavmkit/utilities/overture.py @@ -14,7 +14,6 @@ import geopandas as gpd import pandas as pd import traceback -import pyarrow as pa import pyarrow.fs as fs import shapely.wkb @@ -39,7 +38,7 @@ class OvertureService: "height", "est_height", "num_floors", "num_floors_underground", "subtype", "class", "sources" # sources = per-property confidence ] - DEFAULT_STREAM_BATCH_ROWS = 500 + DEFAULT_STREAM_BATCH_ROWS = 10_000 def __init__(self, settings: dict): """Initialize the Overture service with settings. @@ -99,6 +98,30 @@ def _stats_cache_load(self, cache_path: str, gdf: gpd.GeoDataFrame, stat_cols: l out[c] = out[c].fillna(0) return out + def _buildings_parquet_path(self) -> str: + return f"s3://{self.bucket}/{self.prefix}*" + + def _quote_sql_literal(self, value: str) -> str: + return "'" + str(value).replace("'", "''") + "'" + + def _quote_identifier(self, value: str) -> str: + return '"' + str(value).replace('"', '""') + '"' + + def _configure_duckdb_for_path(self, con, path: str) -> None: + if not path.startswith("s3://"): + return + try: + con.execute("LOAD httpfs;") + except Exception: + con.execute("INSTALL httpfs; LOAD httpfs;") + con.execute("SET s3_region = 'us-west-2';") + # Public, anonymously-readable bucket: configure an unsigned S3 secret so + # DuckDB does not try (and fail) to sign requests with ambient AWS creds. + con.execute( + "CREATE OR REPLACE SECRET overture_anon " + "(TYPE s3, PROVIDER config, REGION 'us-west-2');" + ) + def _stream_building_dfs(self, bbox, proj_cols, verbose=False): """Yield matching Overture building rows as pandas DataFrame chunks (DuckDB). @@ -122,19 +145,13 @@ def _stream_building_dfs(self, bbox, proj_cols, verbose=False): the prior PyArrow projection behavior. """ xmin, ymin, xmax, ymax = bbox - path = f"s3://{self.bucket}/{self.prefix}*" + path = self._buildings_parquet_path() + path_sql = self._quote_sql_literal(path) con = duckdb.connect() try: - con.execute("INSTALL httpfs; LOAD httpfs;") - con.execute("SET s3_region = 'us-west-2';") - # Public, anonymously-readable bucket: configure an unsigned S3 secret so - # DuckDB does not try (and fail) to sign requests with ambient AWS creds. - con.execute( - "CREATE OR REPLACE SECRET overture_anon " - "(TYPE s3, PROVIDER config, REGION 'us-west-2');" - ) + self._configure_duckdb_for_path(con, path) described = con.execute( - f"DESCRIBE SELECT * FROM read_parquet('{path}') LIMIT 0" + f"DESCRIBE SELECT * FROM read_parquet({path_sql}) LIMIT 0" ).fetchall() available = {row[0] for row in described} cols = [c for c in proj_cols if c in available] @@ -143,31 +160,24 @@ def _stream_building_dfs(self, bbox, proj_cols, verbose=False): print(f"--> Skipping unavailable columns: {missing}") if verbose: print(f"--> Fetching Overture buildings via DuckDB for bbox {bbox}") - col_sql = ", ".join(f'"{c}"' for c in cols) + if not cols: + raise ValueError("No requested Overture columns are available in the buildings dataset") + col_sql = ", ".join(self._quote_identifier(c) for c in cols) # Bounding-box overlap test, identical to the prior PyArrow predicate. sql = ( - f"SELECT {col_sql} FROM read_parquet('{path}') " + f"SELECT {col_sql} FROM read_parquet({path_sql}) " "WHERE bbox.xmin < ? AND bbox.xmax > ? " "AND bbox.ymin < ? AND bbox.ymax > ?" ) - res = con.execute(sql, [xmax, xmin, ymax, ymin]) + result_cursor = con.execute(sql, [xmax, xmin, ymax, ymin]) while True: - chunk = res.fetch_df_chunk() + chunk = result_cursor.fetch_df_chunk() if chunk.empty: break yield chunk finally: con.close() - def _geoarrow_schema_adapter(self, schema: pa.Schema) -> pa.Schema: - """Convert a geoarrow-compatible schema to a proper geoarrow schema.""" - geometry_field_index = schema.get_field_index("geometry") - geometry_field = schema.field(geometry_field_index) - geoarrow_geometry_field = geometry_field.with_metadata( - {b"ARROW:extension:name": b"geoarrow.wkb"} - ) - return schema.set(geometry_field_index, geoarrow_geometry_field) - def _to_geodataframe(self, df: pd.DataFrame) -> gpd.GeoDataFrame: """Convert a fetched buildings DataFrame to a GeoDataFrame (WKB -> shapely).""" # Convert WKB geometry to shapely geometry diff --git a/tests/test_overture_fetch.py b/tests/test_overture_fetch.py index e403fd78..668b286d 100644 --- a/tests/test_overture_fetch.py +++ b/tests/test_overture_fetch.py @@ -2,16 +2,18 @@ `_stream_building_dfs` reads Overture's global buildings theme with the bbox-overlap predicate pushed down to Parquet row-group statistics, so peak memory is bounded to -the matching rows instead of scanning the whole theme. These tests mock DuckDB so they -are fully network-free; the real S3 fetch and its bit-for-bit equivalence to the prior -PyArrow path are exercised by the avm-python-service scorecard. +the matching rows instead of scanning the whole theme. The tests are network-free: +most mock DuckDB, and one uses a local Parquet file to exercise DuckDB's real struct +predicate handling. """ import geopandas as gpd import pandas as pd +import pytest import shapely.geometry as sgeom import shapely.wkb as swkb from unittest.mock import MagicMock, patch +from openavmkit.data import _enrich_df_overture from openavmkit.utilities.overture import OvertureService # A small built-up parcel near Santa Cruz, CA (real lon/lat so UTM estimation works). @@ -44,12 +46,12 @@ def _fake_connection(describe_cols, result_df, captured): def _execute(sql, params=None): captured.append((sql, params)) - res = MagicMock() + result = MagicMock() if sql.strip().upper().startswith("DESCRIBE"): - res.fetchall.return_value = [(c,) for c in describe_cols] + result.fetchall.return_value = [(c,) for c in describe_cols] # fetch_df_chunk streams: yield the frame once, then an empty frame. - res.fetch_df_chunk.side_effect = [result_df, result_df.iloc[0:0]] - return res + result.fetch_df_chunk.side_effect = [result_df, result_df.iloc[0:0]] + return result con.execute.side_effect = _execute return con @@ -75,6 +77,7 @@ def test_stream_pushes_bbox_predicate_and_drops_unavailable_columns(): assert params == [xmax, xmin, ymax, ymin] assert '"est_height"' not in sql and '"class"' not in sql assert '"id"' in sql and '"geometry"' in sql + assert "read_parquet('s3://overturemaps-us-west-2/" in sql def test_get_buildings_builds_geodataframe_with_footprint(): @@ -86,7 +89,10 @@ def test_get_buildings_builds_geodataframe_with_footprint(): assert isinstance(gdf, gpd.GeoDataFrame) assert len(gdf) == 1 assert gdf.geometry.iloc[0].equals(_POLY) - assert gdf["bldg_area_footprint_sqft"].iloc[0] > 0 + expected_area = gpd.GeoSeries([_POLY], crs="EPSG:4326").to_crs( + gdf.estimate_utm_crs() + ).area.iloc[0] * 10.764 + assert gdf["bldg_area_footprint_sqft"].iloc[0] == pytest.approx(expected_area) assert gdf["height_m_best"].iloc[0] == 6.5 assert gdf["floors_best"].iloc[0] == 2 @@ -100,3 +106,69 @@ def test_building_batches_yields_geodataframes_from_stream(): assert len(frames) == 1 assert isinstance(frames[0], gpd.GeoDataFrame) assert frames[0].geometry.iloc[0].equals(_POLY) + + +def test_stream_uses_real_duckdb_bbox_predicate_on_local_parquet(tmp_path): + path = tmp_path / "buildings.parquet" + matching = _result_df() + outside = pd.DataFrame( + { + "id": ["outside"], + "geometry": [swkb.dumps(sgeom.box(-123, 37, -122.9, 37.1))], + "bbox": [{"xmin": -123.0, "xmax": -122.9, "ymin": 37.0, "ymax": 37.1}], + "height": [4.0], + "num_floors": [1], + "sources": [[]], + } + ) + pd.concat([matching, outside], ignore_index=True).to_parquet(path) + + svc = _make_service() + with patch.object(svc, "_buildings_parquet_path", return_value=str(path)): + chunks = list(svc._stream_building_dfs(_BBOX, OvertureService.DEFAULT_COLUMNS.copy())) + + result = pd.concat(chunks, ignore_index=True) + assert result["id"].tolist() == ["bldg-1"] + + +def test_enrich_df_overture_calls_streaming_stats_and_preserves_input_on_failure(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = gpd.GeoDataFrame( + {"key": ["p1"]}, + geometry=[_POLY], + crs="EPSG:4326", + ) + settings = {"locality": {"units": "imperial"}} + enrich_settings = { + "overture": { + "enabled": True, + "cache": False, + "footprint": {"units": "sqft", "field": "footprint_sqft"}, + "height": {"units": "ft", "field": "height_ft"}, + } + } + service = MagicMock() + service.calculate_building_stats_streaming.return_value = parcels.assign( + footprint_sqft=123.0, + height_ft=6.0, + ) + + with patch("openavmkit.data.get_cached_df", return_value=None), patch( + "openavmkit.data.write_cached_df" + ), patch("openavmkit.data.init_service_overture", return_value=service): + out = _enrich_df_overture(parcels, enrich_settings, {}, settings) + + assert out["footprint_sqft"].tolist() == [123.0] + args, kwargs = service.calculate_building_stats_streaming.call_args + assert args[2:6] == ("sqft", "footprint_sqft", "ft", "height_ft") + assert kwargs == {"use_cache": False, "verbose": False} + + service.calculate_building_stats_streaming.side_effect = RuntimeError("duckdb failed") + with patch("openavmkit.data.get_cached_df", return_value=None), patch( + "openavmkit.data.write_cached_df" + ), patch("openavmkit.data.init_service_overture", return_value=service): + with pytest.warns(UserWarning, match="Failed to calculate Overture building stats"): + failed = _enrich_df_overture(parcels, enrich_settings, {}, settings) + + assert list(failed.columns) == list(parcels.columns) + assert failed.geometry.equals(parcels.geometry) From 0034e0b76ee0b1d32d935ba6fb90a68ac9a25d80 Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 22 Jun 2026 17:45:44 -0400 Subject: [PATCH 11/13] Clarify Overture fetch tests --- tests/test_overture_fetch.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_overture_fetch.py b/tests/test_overture_fetch.py index 668b286d..c6e58a44 100644 --- a/tests/test_overture_fetch.py +++ b/tests/test_overture_fetch.py @@ -46,12 +46,12 @@ def _fake_connection(describe_cols, result_df, captured): def _execute(sql, params=None): captured.append((sql, params)) - result = MagicMock() + cursor = MagicMock() if sql.strip().upper().startswith("DESCRIBE"): - result.fetchall.return_value = [(c,) for c in describe_cols] + cursor.fetchall.return_value = [(c,) for c in describe_cols] # fetch_df_chunk streams: yield the frame once, then an empty frame. - result.fetch_df_chunk.side_effect = [result_df, result_df.iloc[0:0]] - return result + cursor.fetch_df_chunk.side_effect = [result_df, result_df.iloc[0:0]] + return cursor con.execute.side_effect = _execute return con @@ -127,8 +127,8 @@ def test_stream_uses_real_duckdb_bbox_predicate_on_local_parquet(tmp_path): with patch.object(svc, "_buildings_parquet_path", return_value=str(path)): chunks = list(svc._stream_building_dfs(_BBOX, OvertureService.DEFAULT_COLUMNS.copy())) - result = pd.concat(chunks, ignore_index=True) - assert result["id"].tolist() == ["bldg-1"] + fetched_buildings = pd.concat(chunks, ignore_index=True) + assert fetched_buildings["id"].tolist() == ["bldg-1"] def test_enrich_df_overture_calls_streaming_stats_and_preserves_input_on_failure(tmp_path, monkeypatch): From 46498f633eb8311c27b114ac3fc80cdaea010a68 Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 22 Jun 2026 17:54:52 -0400 Subject: [PATCH 12/13] Tighten Overture fetch regressions --- tests/test_overture_fetch.py | 37 +++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/tests/test_overture_fetch.py b/tests/test_overture_fetch.py index c6e58a44..d3bcd642 100644 --- a/tests/test_overture_fetch.py +++ b/tests/test_overture_fetch.py @@ -29,7 +29,7 @@ def _make_service(): return OvertureService({"overture": {"enabled": True}}) -def _result_df(): +def _building_result_df(): return pd.DataFrame({ "id": ["bldg-1"], "geometry": [swkb.dumps(_POLY)], @@ -41,7 +41,7 @@ def _result_df(): }) -def _fake_connection(describe_cols, result_df, captured): +def _fake_connection(describe_cols, building_rows, captured): con = MagicMock() def _execute(sql, params=None): @@ -50,21 +50,27 @@ def _execute(sql, params=None): if sql.strip().upper().startswith("DESCRIBE"): cursor.fetchall.return_value = [(c,) for c in describe_cols] # fetch_df_chunk streams: yield the frame once, then an empty frame. - cursor.fetch_df_chunk.side_effect = [result_df, result_df.iloc[0:0]] + cursor.fetch_df_chunk.side_effect = [building_rows, building_rows.iloc[0:0]] return cursor con.execute.side_effect = _execute return con -def test_stream_pushes_bbox_predicate_and_drops_unavailable_columns(): +def _patched_overture_stream(describe_cols=None, captured=None): svc = _make_service() + captured = [] if captured is None else captured + describe_cols = describe_cols or ["id", "geometry", "bbox", "height", "num_floors", "sources"] + con = _fake_connection(describe_cols, _building_result_df(), captured) + return svc, patch("openavmkit.utilities.overture.duckdb.connect", return_value=con) + + +def test_stream_pushes_bbox_predicate_and_drops_unavailable_columns(): captured = [] # est_height / num_floors_underground / subtype / class are absent from this # release and must be dropped from the projection (mirrors prior PyArrow behavior). - describe_cols = ["id", "geometry", "bbox", "height", "num_floors", "sources"] - con = _fake_connection(describe_cols, _result_df(), captured) - with patch("openavmkit.utilities.overture.duckdb.connect", return_value=con): + svc, duckdb_connect = _patched_overture_stream(captured=captured) + with duckdb_connect: chunks = list(svc._stream_building_dfs( _BBOX, OvertureService.DEFAULT_COLUMNS.copy())) assert len(chunks) == 1 and len(chunks[0]) == 1 @@ -81,10 +87,8 @@ def test_stream_pushes_bbox_predicate_and_drops_unavailable_columns(): def test_get_buildings_builds_geodataframe_with_footprint(): - svc = _make_service() - describe_cols = ["id", "geometry", "bbox", "height", "num_floors", "sources"] - con = _fake_connection(describe_cols, _result_df(), []) - with patch("openavmkit.utilities.overture.duckdb.connect", return_value=con): + svc, duckdb_connect = _patched_overture_stream() + with duckdb_connect: gdf = svc.get_buildings(_BBOX, use_cache=False) assert isinstance(gdf, gpd.GeoDataFrame) assert len(gdf) == 1 @@ -98,10 +102,8 @@ def test_get_buildings_builds_geodataframe_with_footprint(): def test_building_batches_yields_geodataframes_from_stream(): - svc = _make_service() - describe_cols = ["id", "geometry", "bbox", "height", "num_floors", "sources"] - con = _fake_connection(describe_cols, _result_df(), []) - with patch("openavmkit.utilities.overture.duckdb.connect", return_value=con): + svc, duckdb_connect = _patched_overture_stream() + with duckdb_connect: frames = list(svc._building_batches(_BBOX)) assert len(frames) == 1 assert isinstance(frames[0], gpd.GeoDataFrame) @@ -110,7 +112,7 @@ def test_building_batches_yields_geodataframes_from_stream(): def test_stream_uses_real_duckdb_bbox_predicate_on_local_parquet(tmp_path): path = tmp_path / "buildings.parquet" - matching = _result_df() + matching = _building_result_df() outside = pd.DataFrame( { "id": ["outside"], @@ -166,9 +168,10 @@ def test_enrich_df_overture_calls_streaming_stats_and_preserves_input_on_failure service.calculate_building_stats_streaming.side_effect = RuntimeError("duckdb failed") with patch("openavmkit.data.get_cached_df", return_value=None), patch( "openavmkit.data.write_cached_df" - ), patch("openavmkit.data.init_service_overture", return_value=service): + ) as write_cached_df, patch("openavmkit.data.init_service_overture", return_value=service): with pytest.warns(UserWarning, match="Failed to calculate Overture building stats"): failed = _enrich_df_overture(parcels, enrich_settings, {}, settings) assert list(failed.columns) == list(parcels.columns) assert failed.geometry.equals(parcels.geometry) + write_cached_df.assert_not_called() From b0b5cb8582b5b05adaf105a2eaeaa91180438dd6 Mon Sep 17 00:00:00 2001 From: "fabrica-cc-engineering-agent[bot]" Date: Mon, 22 Jun 2026 18:10:48 -0400 Subject: [PATCH 13/13] Separate Overture fetch and enrichment tests --- tests/test_overture_enrichment.py | 46 +++++++++++++++++++++++++++++-- tests/test_overture_fetch.py | 45 ------------------------------ 2 files changed, 44 insertions(+), 47 deletions(-) diff --git a/tests/test_overture_enrichment.py b/tests/test_overture_enrichment.py index abcb5303..adbcd079 100644 --- a/tests/test_overture_enrichment.py +++ b/tests/test_overture_enrichment.py @@ -23,17 +23,42 @@ def _settings(): return {"locality": {"units": "imperial"}} -def _enrich_settings(footprint_units="sqft", footprint_field="footprint_sqft"): +def _enrich_settings(footprint_units="sqft", footprint_field="footprint_sqft", cache=True): return { "overture": { "enabled": True, - "cache": True, + "cache": cache, "footprint": {"units": footprint_units, "field": footprint_field}, "height": {"units": "ft", "field": "height_ft"}, } } +def test_enrich_df_overture_calls_streaming_stats(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = _parcel_gdf() + service = MagicMock() + service.calculate_building_stats_streaming.return_value = parcels.assign( + footprint_sqft=123.0, + height_ft=6.0, + ) + + with patch("openavmkit.data.get_cached_df", return_value=None), patch( + "openavmkit.data.write_cached_df" + ), patch("openavmkit.data.init_service_overture", return_value=service): + out = _enrich_df_overture( + parcels, + _enrich_settings(cache=False), + {}, + _settings(), + ) + + assert out["footprint_sqft"].tolist() == [123.0] + args, kwargs = service.calculate_building_stats_streaming.call_args + assert args[2:6] == ("sqft", "footprint_sqft", "ft", "height_ft") + assert kwargs == {"use_cache": False, "verbose": False} + + def test_enrich_df_overture_fails_open_on_service_init_error(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) parcels = _parcel_gdf() @@ -52,6 +77,23 @@ def test_enrich_df_overture_fails_open_on_service_init_error(tmp_path, monkeypat write_cached_df.assert_not_called() +def test_enrich_df_overture_stats_failure_does_not_write_cache(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + parcels = _parcel_gdf() + service = MagicMock() + service.calculate_building_stats_streaming.side_effect = RuntimeError("duckdb failed") + + with patch("openavmkit.data.get_cached_df", return_value=None), patch( + "openavmkit.data.write_cached_df" + ) as write_cached_df, patch("openavmkit.data.init_service_overture", return_value=service): + with pytest.warns(UserWarning, match="Failed to calculate Overture building stats"): + out = _enrich_df_overture(parcels, _enrich_settings(), {}, _settings()) + + assert list(out.columns) == list(parcels.columns) + assert out.geometry.equals(parcels.geometry) + write_cached_df.assert_not_called() + + def test_enrich_df_overture_skips_outer_cache_for_duplicate_keys(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) parcels = _parcel_gdf(["p1", "p1"]) diff --git a/tests/test_overture_fetch.py b/tests/test_overture_fetch.py index d3bcd642..42a503b3 100644 --- a/tests/test_overture_fetch.py +++ b/tests/test_overture_fetch.py @@ -13,7 +13,6 @@ import shapely.wkb as swkb from unittest.mock import MagicMock, patch -from openavmkit.data import _enrich_df_overture from openavmkit.utilities.overture import OvertureService # A small built-up parcel near Santa Cruz, CA (real lon/lat so UTM estimation works). @@ -131,47 +130,3 @@ def test_stream_uses_real_duckdb_bbox_predicate_on_local_parquet(tmp_path): fetched_buildings = pd.concat(chunks, ignore_index=True) assert fetched_buildings["id"].tolist() == ["bldg-1"] - - -def test_enrich_df_overture_calls_streaming_stats_and_preserves_input_on_failure(tmp_path, monkeypatch): - monkeypatch.chdir(tmp_path) - parcels = gpd.GeoDataFrame( - {"key": ["p1"]}, - geometry=[_POLY], - crs="EPSG:4326", - ) - settings = {"locality": {"units": "imperial"}} - enrich_settings = { - "overture": { - "enabled": True, - "cache": False, - "footprint": {"units": "sqft", "field": "footprint_sqft"}, - "height": {"units": "ft", "field": "height_ft"}, - } - } - service = MagicMock() - service.calculate_building_stats_streaming.return_value = parcels.assign( - footprint_sqft=123.0, - height_ft=6.0, - ) - - with patch("openavmkit.data.get_cached_df", return_value=None), patch( - "openavmkit.data.write_cached_df" - ), patch("openavmkit.data.init_service_overture", return_value=service): - out = _enrich_df_overture(parcels, enrich_settings, {}, settings) - - assert out["footprint_sqft"].tolist() == [123.0] - args, kwargs = service.calculate_building_stats_streaming.call_args - assert args[2:6] == ("sqft", "footprint_sqft", "ft", "height_ft") - assert kwargs == {"use_cache": False, "verbose": False} - - service.calculate_building_stats_streaming.side_effect = RuntimeError("duckdb failed") - with patch("openavmkit.data.get_cached_df", return_value=None), patch( - "openavmkit.data.write_cached_df" - ) as write_cached_df, patch("openavmkit.data.init_service_overture", return_value=service): - with pytest.warns(UserWarning, match="Failed to calculate Overture building stats"): - failed = _enrich_df_overture(parcels, enrich_settings, {}, settings) - - assert list(failed.columns) == list(parcels.columns) - assert failed.geometry.equals(parcels.geometry) - write_cached_df.assert_not_called()