feat: add distributed zonemap index build with configurable segments#516
feat: add distributed zonemap index build with configurable segments#516beinan wants to merge 1 commit into
Conversation
hamersaw
left a comment
There was a problem hiding this comment.
This looks pretty good. Thanks for the PR! A few things that we should tighten up IMO.
| Option[Map[String, String]]) = catalog match { | ||
| column: String, | ||
| segments: Seq[Index]): Unit = { | ||
| val dataset = Utils.openDatasetBuilder(readOptions).build() |
There was a problem hiding this comment.
Is there a specific reason we are opening two datasets within this function?
There was a problem hiding this comment.
The two opens are needed because commitExistingIndexSegments creates a new dataset version, so we need a fresh dataset handle to read the updated index state for the cleanup transaction. I've added a comment in the code explaining this. The first dataset is used for the segment commit, and the second reads the post-commit state to remove old segments.
There was a problem hiding this comment.
I am pretty concerned that this can leave us in a poor state if we commit the new index segments and there is a failure before removing the old ones. I think results should still be correct, but there will be quite a bit of overhead until the indexing process is reran.
It doesn't look like there is a solution for this in the lance core SDK when committing segmented indices, maybe we need to devise a solution for transactionally replacing segmented indices?
There was a problem hiding this comment.
Great catch. After investigating Lance core's commit_existing_index_segments (index.rs:1065-1164), it turns out the core API already handles atomic replacement — it finds existing segments whose fragments overlap with incoming ones and removes them in the same CreateIndex transaction. The Spark-side manual cleanup (second dataset open + removal transaction) was redundant and is what introduced the race.
Fixed in the latest push by simplifying commitIndexSegments to just call dataset.commitExistingIndexSegments() and let Lance core handle the atomic add+remove. The method went from ~50 lines to ~10.
416231b to
fbe05fb
Compare
|
Thanks for the thorough review! All feedback has been addressed in the latest push (force-pushed as a single clean commit on latest main): Scan-side changes removed entirely:
Index creation fixes:
|
29ef8a5 to
78bb5ea
Compare
|
@hamersaw All review feedback has been addressed — the key change since your last review is simplifying |
|
@LuciferYang do you mind making a pass here? Specifically, I'm interested with how this compares to your proposal (#513) to support building distributed ZoneMap indexes. |
will give feedback later today. |
|
@hamersaw @beinan Had a closer look. After the May 12 force-push on #516, the two PRs are adjacent rather than overlapping.
The main difference between the two distributed paths is that For reference, sf=100 What I'd want to inherit from #516:
Suggested path:
One nit on #516, will leave inline: |
|
@LuciferYang Thanks for the thorough comparison — the side-by-side table is really helpful. You're right about
Agreed on the suggested path — happy to land #516 as the distributed foundation, then have #513 rebase its distributed path onto |
LuciferYang
left a comment
There was a problem hiding this comment.
Does the PR description also need to be updated?
| val validatedNumSegments: Option[Int] = numSegmentsOpt.map { arg => | ||
| val value = | ||
| try { | ||
| arg.value.asInstanceOf[Number].intValue() |
There was a problem hiding this comment.
Scala's null.asInstanceOf[Number] returns null instead of throwing ClassCastException, so a WITH (num_segments = null) argument bypasses the friendly error path and dies with an opaque NullPointerException at .intValue().
On the other hand, if the parser delivers a java.lang.Long (e.g., for an out-of-range literal), .intValue() silently truncates rather than rejecting. Negative Longs below Int.MinValue truncate to positive Ints and slip past the value <= 0 check on line 111. Validate the Long bounds before narrowing.
It can be revised as follows:
val value = arg.value match {
case null =>
throw new IllegalArgumentException("num_segments must be a positive integer, got: null")
case n: Number =>
val asLong = n.longValue()
if (asLong < 1L || asLong > Int.MaxValue) throw new IllegalArgumentException(
s"num_segments must be a positive integer that fits in Int, got: $asLong")
asLong.toInt
case other =>
throw new IllegalArgumentException(
s"num_segments must be a positive integer, got: $other")
}With this in place the redundant if (value <= 0) throw … block on line 111 can be removed.
There was a problem hiding this comment.
Fixed — switched to pattern match handling null, Long bounds, and non-Number types explicitly. Removed the redundant <= 0 check.
| Option[Map[String, String]]) = catalog match { | ||
| column: String, | ||
| segments: Seq[Index]): Unit = { | ||
| val dataset = Utils.openDatasetBuilder(readOptions).build() |
There was a problem hiding this comment.
The driver opens once on line 72 to enumerate fragmentIds, closes it, then commitIndexSegments opens a fresh one on line 214 to call commitExistingIndexSegments. Both go through the same Utils.openDatasetBuilder(readOptions). Consider consolidating to a single driver-side open scoped to the entire zonemap branch — derive fragmentIds from it and pass that same handle into commitIndexSegments.
Caveat worth verifying first: if commitExistingIndexSegments requires a handle at the latest dataset version (not the version captured at line 72), reusing the older handle could fail the commit on a version mismatch. If the Lance core contract requires a fresh handle, leave commitIndexSegments as-is and only optimize the line 72 enumeration (e.g., enumerate via lanceDataset if it already exposes fragment IDs).
There was a problem hiding this comment.
Good point. Left as-is for now since commitExistingIndexSegments may require a handle at the latest version. Worth consolidating in a follow-up if we confirm the version contract.
| math.min(n, addIndexExec.session.sparkContext.defaultParallelism)) | ||
| } | ||
| (0 until k).map { i => | ||
| fragmentIds.slice(i * n / k, (i + 1) * n / k) |
There was a problem hiding this comment.
i * n (both Int) overflows once the product exceeds Int.MaxValue ≈ 2.1×10^9. Triggering requires a deliberately large num_segments and a fragment count where i*n crosses the boundary — e.g., 200k fragments with num_segments near n puts i*n near 4×10^10. Not a hot-path concern, but cheap to make overflow-safe. Promote one operand to Long:
fragmentIds.slice((i.toLong * n / k).toInt, ((i.toLong + 1) * n / k).toInt)Equivalent and overflow-safe.
There was a problem hiding this comment.
Fixed — promoted to i.toLong * n / k to avoid overflow.
| private def createIndexJob( | ||
| dataset: Dataset, | ||
| lanceDataset: LanceDataset, | ||
| // Lance core's commitExistingIndexSegments handles atomic replacement: |
There was a problem hiding this comment.
nit: Drop a one-line comment at the call site (line 132) — e.g., // atomic add+remove via Lance core; see commitIndexSegments — so the replacement semantics are visible without jumping definitions.
| } | ||
|
|
||
| // Zonemap uses logical segment commit path | ||
| if (useLogicalSegmentCommit) { |
There was a problem hiding this comment.
nit: Cosmetic micro-allocation, but it signals incorrectly that the UUID is needed by every branch. Move val uuid = UUID.randomUUID() below the if (useLogicalSegmentCommit) { … return … } block so it's only generated for the merge-metadata branch.
There was a problem hiding this comment.
Done — moved val uuid below the zonemap early return.
|
|
||
| private def batchFragments( | ||
| fragmentIds: List[Integer], | ||
| numSegments: Option[Int] = None): Seq[List[Integer]] = { |
There was a problem hiding this comment.
batchFragments is private and called from a single site that always passes the argument. Drop = None to avoid signaling an extension point that doesn't exist.
There was a problem hiding this comment.
Dropped the = None default.
| case e: Exception => | ||
| throw new RuntimeException( | ||
| "Zonemap segment build failed. Uncommitted segments (if any) " + | ||
| "will be cleaned up by Lance's garbage collection.", |
There was a problem hiding this comment.
Will it really be cleaned up automatically?
There was a problem hiding this comment.
Good question — updated the message. Uncommitted segments are not visible to readers and do not affect query correctness. They are orphaned artifacts that occupy storage but have no semantic impact.
| } | ||
|
|
||
| @Test | ||
| public void testCreateZonemapIndex() { |
There was a problem hiding this comment.
nit: no negative test cases, such as
- negative test for multi-column zonemap
- negative test for
num_segmentsonbtree/fts - test for
num_segments = 0/ negative values
There was a problem hiding this comment.
Added negative tests for: multi-column zonemap, num_segments on btree, and zero/negative num_segments.
| } | ||
|
|
||
| @Test | ||
| public void testRepeatedCreateZonemapIndexReplacesExistingSegments() { |
There was a problem hiding this comment.
The test runs the SQL twice and asserts segment count stays at expectedSegmentCount. That catches the duplication failure mode (second run adds instead of replacing) but not the no-op failure mode (second run silently does nothing). Capture the segment UUIDs (or createdAt) after the first run and assert they differ after the second. This assumes Lance's createIndex mints fresh UUIDs per call — if UUIDs are content-addressed or otherwise stable across rebuilds, fall back to comparing createdAt.
There was a problem hiding this comment.
Fixed — now captures segment UUIDs after the first run and asserts they differ after the second.
|
|
||
| | Option | Type | Description | | ||
| |-----------------|------|----------------------------------------------| | ||
| | `rows_per_zone` | Long | The approximate number of rows per zonemap zone. | |
There was a problem hiding this comment.
If both are passed through IndexUtils.toJson the same way, label them the same way. zone_size in the btree section below should match too for cross-section consistency.
There was a problem hiding this comment.
Fixed table alignment for consistency.
LuciferYang
left a comment
There was a problem hiding this comment.
The "What Changed" section still references LanceScanBuilder.java, LanceScan.java, LanceInputPartition.java, LanceFragmentScanner.java, and LanceCountStarPartitionReader.java, plus the bullet "Add segmented zonemap scan support with Spark-side post-scan filtering fallback" in Summary. None of this is in the current diff.
others LGTM
Add zonemap as a new index type in CREATE INDEX DDL with distributed build support. Each segment is built in parallel on Spark executors and committed as a logical index on the driver. Co-Authored-By: Beinan Wang <beinanwang@microsoft.com>
804c1b9 to
5c18049
Compare
Sorry for my delay, just updated. can we merge this pr? @LuciferYang @hamersaw |
Summary
CREATE INDEXDDL with distributed build supportnum_segmentsoption (defaults tospark.default.parallelism)What Changed
AddIndexExec.scala: Zonemap-specific path withZonemapIndexJob/ZonemapIndexTaskandcommitIndexSegmentscreate-index.md: Document zonemap index type, options, and usageNotes
main7.0.0-beta.10or newer which includes zonemap segment supportTest plan
num_segments🤖 Generated with Claude Code