feat(sql): support USING zonemap with distributed and consolidated build paths#513
Draft
LuciferYang wants to merge 2 commits into
Draft
feat(sql): support USING zonemap with distributed and consolidated build paths#513LuciferYang wants to merge 2 commits into
LuciferYang wants to merge 2 commits into
Conversation
7 tasks
LuciferYang
added a commit
to LuciferYang/lance-spark
that referenced
this pull request
May 8, 2026
The previous parser change (recognizing "zonemap" as a CREATE INDEX method) made the SQL form syntactically valid, but a runtime call like ALTER TABLE foo CREATE INDEX idx USING zonemap (col) still failed with: Invalid user input: Unsupported index type (patched): ZoneMap, .../dataset.rs Root cause: AddIndexExec calls dataset.mergeIndexMetadata(...) unconditionally after the per-fragment build phase, but lance-core's merge_index_metadata (rust/lance/src/dataset.rs) only matches Inverted / BTree / Bitmap / Vector arms — ZoneMap falls into the catch-all "Unsupported index type" arm and throws. Zonemap reads do not need a merged structure: lance-core's getZonemapStats opens each per-fragment zonemap.lance directly via load_indices_by_name and concatenates the per-zone batches. The merge step is required only for index types (BTree, Inverted, Bitmap) that produce a single consolidated artifact for point lookups. Skipping the merge call for IndexType.ZONEMAP makes the per-fragment build the final state, which is the correct end state for zonemap reads. Adds five integration tests covering the surface PR lance-format#513 introduces: - testCreateZonemapIndex: USING zonemap succeeds end-to-end and zonemap stats are populated on the indexed column - testZonemapStatsEmptyForBtreeOnly: locks down the lance-core contract that getZonemapStats(col) skips BTree-typed indexes (BTree's plugin reports index_type "BTree", and the JNI accessor filters on index_type().to_lowercase().contains("zonemap")) - testCreateBtreeThenZonemapSameNameReplaces: same-name CREATE INDEX is last-write-wins via withRemovedIndices in AddIndexOperation - testCreateZonemapThenBtreeSameNameReplaces: reverse direction; zonemap stats correctly drop to empty after btree replaces zonemap on the same name - testCreateBtreeAndZonemapDifferentNamesCoexist: distinct names produce two coexisting indexes; the zonemap-typed entry serves getZonemapStats
This was referenced May 8, 2026
Contributor
Author
|
It's still a draft, please don't review it for now. |
650cf90 to
c9dcf64
Compare
5689bd2 to
d0a11e8
Compare
Contributor
|
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
LuciferYang
commented
May 14, 2026
|
|
||
| if (indexType == IndexType.ZONEMAP) { | ||
| if (consolidateZonemap) { | ||
| return runZonemapConsolidated(dataset, lanceDataset, readOptions, fragmentIds) |
Contributor
Author
There was a problem hiding this comment.
The consolidated path uses two new Lance APIs that are in review upstream:
- feat(zonemap): public API + driver helper for build-time consolidation lance#6779 — Rust compute_zonemap_batch + write_consolidated_zonemap_segment
- feat(jni): add Java bindings for compute_zonemap_batch + write_consolidated_zonemap_segment lance#6780 — Java JNI bindings, stacked on #6779
Adds `ALTER TABLE ... CREATE INDEX ... USING zonemap (col)` end-to-end with a properly distributed multi-segment build. Two production-side changes: 1. IndexUtils now recognises "zonemap" as a method name in both directions (buildIndexType → IndexType.ZONEMAP, buildScalarIndexParamType → "zonemap") with case-insensitive lookup. 2. AddIndexExec routes ZONEMAP through a new runZonemapDistributed path. Unlike the FragmentBasedIndexJob shared-UUID pattern (which BTree uses correctly because per-partition writes use distinct file names — part_<id>_page_data.lance, part_<id>_page_lookup.lance — and a merge step consolidates them), ZoneMap writes a fixed `zonemap.lance` filename and has no merge step. Sharing one UUID across N executor tasks would race on the same object-store path; only one fragment's data would survive. Each Spark task now calls dataset.createIndex with withFragmentIds=[id] and NO withIndexUUID. The lance-core JNI takes the execute_uncommitted path (skip_commit = fragment_ids.is_some()) and generates a fresh per-task UUID, so each task writes to its own indices_dir/<uuid>/zonemap.lance directory. The driver collects all N per-task ZonemapFragmentResult values (uuid, fragmentId, indexDetails, indexVersion, createdAt) and commits them as a single AddIndexOperation transaction with N IndexMetadata entries sharing the index name. lance-core's existing read-side infrastructure — describe_indices chunking by name, getZonemapStats iterating load_indices_by_name and reading each segment's zonemap.lance — handles the multi-segment shape transparently. mergeIndexMetadata is intentionally skipped for ZONEMAP: there is no per-fragment file consolidation needed, and lance-core's merge_index_metadata has no ZoneMap arm anyway (would throw "Unsupported index type"). Supporting refactors: - Extract resolveFieldIdsOrThrow(dataset, columns) — shared by the parallel path's post-build commit and the new distributed ZoneMap path. Single source of truth for the "Cannot find index column in Lance schema" error. - Extract extractNamespaceInfo(lanceDataset, readOptions) — shared between createIndexJob and runZonemapDistributed. - ZonemapFragmentTask wraps execute() in try/catch that re-throws with fragment-id context, preserving the specific exception subclass (IllegalArgumentException, IllegalStateException, RuntimeException) so callers matching on type still match. - Driver-side decoding of per-task results wraps decode failures with task-index context so deploy-skew / serialization issues are diagnosable. - runZonemapBuild validates that createdIndex.indexDetails() is non-empty, mirroring the parallel path's extractIndexBuildResult guard. Tests: IndexUtilsTest (6 cases) — symbol-mapping unit tests for buildIndexType / buildScalarIndexParamType: forward and reverse mappings, case-insensitive lookup, unknown-method rejection with message substring assertions. BaseAddIndexTest additions (5 zonemap cases): - testCreateZonemapIndex — end-to-end USING zonemap + strict per-fragment coverage assertion (every indexed fragment must contribute ≥1 zone). - testCreateZonemapOnNonExistentColumn — fail-fast IllegalArgumentException with "Cannot find index column" message substring. - testCreateZonemapOnStringColumn — strict assertion that every zone's min/max is a non-null String (catches string-codec regressions). - testCreateZonemapWithZoneSize — smoke test for `with (zone_size=N)` parameter forwarding through IndexUtils.toJson. - testZonemapDistributedCommitShape — locks the multi-segment commit invariants: exactly one IndexMetadata per fragment, every segment has a distinct UUID, every segment's fragment-bitmap is a singleton, the union of segment fragment-bitmaps equals the indexed fragment set, all segments share one field-id list. A regression to shared-UUID single-segment commits — the original race — would fail every assertion in this test. Verified end-to-end against lance-core 6.0.0-rc.2 (current upstream/main). Closes lance-format#512. Closes lance-format#514.
d0a11e8 to
2959c55
Compare
Wire lance-core's computeZonemapBatch + writeZonemapIndexFromBatches
APIs into AddIndexExec. When spark.lance.zonemap.consolidate.enabled=true,
the consumer routes through runZonemapConsolidated:
- executors call dataset.computeZonemapBatch on their fragment and
return per-zone min/max stats as Arrow-IPC-encoded bytes
- driver decodes every batch into VectorSchemaRoots and calls
dataset.writeZonemapIndexFromBatches once, producing a single
<uuid>/zonemap.lance file covering the union of all fragments
- driver commits exactly one IndexMetadata entry via the same
AddIndexOperation path used by runZonemapDistributed
Default off: preserves the multi-segment distributed shape the read
path has served for the entire history of this code.
sf=100 store_sales A/B (ss_sold_date_sk, local[*], Spark 4.0):
| metric | distributed | consolidated |
|---------------------|-------------|--------------|
| wall-clock | 15.0 s | 28.1 s |
| index segments | 234 | 1 |
| manifest-referenced | 1,099,920 B | 137,835 B |
The 8x footprint shrink comes from amortising Lance file overhead
(header + footer + schema metadata) across one consolidated file
instead of paying it 234 times. Wall-clock regression is the expected
trade-off: parallel per-fragment writes become a single driver-side
write. At larger scales and on object stores with high per-PUT
latency, manifest- and listing-cost wins on the read side should
pay this back.
Depends on the new lance-core APIs landing upstream (see
lance-format/lance#6779 and #6780).
2959c55 to
75d0595
Compare
Contributor
Author
|
cc @hamersaw this PR depends on
to build successfully. |
5 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #512. Closes #514.
Adds
ALTER TABLE ... CREATE INDEX ... USING zonemap (col)with two build paths:Distributed (default): one Lance segment per fragment, built in parallel. Each task uses a fresh per-task UUID so the per-task
<uuid>/zonemap.lancefiles don't collide; the driver commits N IndexMetadata entries under the shared name in one transaction. Works against current upstream lance-core, no new APIs needed.Consolidated (opt-in via
spark.lance.zonemap.consolidate.enabled=true): workers compute per-fragment zone batches viadataset.computeZonemapBatchand return them to the driver; the driver merges them viadataset.writeZonemapIndexFromBatchesand commits a single IndexMetadata entry covering every fragment.Creation time and footprint (sf=100 store_sales,
ss_sold_date_sk)Same dataset (234 fragments, 287,997,024 rows), same column, single zonemap index:
_indices/bytesThe consolidated path is ~2× slower in wall-clock (driver-side write replaces parallel executor writes) but produces ~8× less on-disk index data because Lance's per-file header overhead is paid once instead of 234 times. Manifest grows by 1 IndexMetadata entry instead of 234.
Bulk-build of all 15 fact-table zonemaps on the four clustered tables (~903M rows across the four facts) takes 56 s end-to-end via the consolidated path (default per-zone is
rows_per_zone=8192).Dependency on lance-core
The consolidated path uses two new lance-core APIs that are in review upstream:
compute_zonemap_batch+write_consolidated_zonemap_segmentNot mergeable until both land and a lance-core release with those APIs ships. The distributed path alone works against current
<lance.version>(no pom change in this PR).What changed
AddIndexExec.scala: SQLzonemapmethod,runZonemapDistributed,runZonemapConsolidated, two per-fragment task classes, and a pre-commit check that the consolidated file'sfragment_bitmapmatches the dispatcher's fragment set.LanceSparkReadOptions.java:spark.lance.zonemap.consolidate.enabled(lenient parse: case-insensitive"true"after trim, anything else falls through to distributed).docs/src/operations/ddl/create-index.md: zonemap method,rows_per_zoneoption, consolidated SparkConf flag, single-column constraint.integration-tests/test_lance_spark.py: two pytest cases (default path + consolidated mode).Tests
AddIndexTest: 38 cases, green on Spark 3.5/2.12 and 4.0/2.13.rows_per_zoneforwarding, N=1 degenerate case, idempotent re-create, two coexisting indexes on different columns."false"routes to distributed,"TRUE"with whitespace), type coverage (int / long / string / nullable-string),rows_per_zonemin/max correctness, end-to-end SELECT after CREATE INDEX.IndexUtilsTest: 14 cases for the SQL →IndexTypemapping andtoJsonwire-shape edge cases.integration-tests/test_lance_spark.py:test_create_zonemap_index(default distributed path withrows_per_zone=16) andtest_create_zonemap_index_consolidated(asserts exactly one IndexMetadata segment under the consolidated mode).mergeIndexMetadatais skipped on both paths: nothing to merge for distributed (Lance's read path serves multi-segment natively), already merged in-memory for consolidated.