Skip to content

fix(local): add threading.RLock to LocalCollection to prevent concurrent-upsert race (#1193)#1196

Open
aurelienbran wants to merge 1 commit into
qdrant:masterfrom
aurelienbran:fix/local-collection-threading-lock
Open

fix(local): add threading.RLock to LocalCollection to prevent concurrent-upsert race (#1193)#1196
aurelienbran wants to merge 1 commit into
qdrant:masterfrom
aurelienbran:fix/local-collection-threading-lock

Conversation

@aurelienbran
Copy link
Copy Markdown

Summary

Fixes #1193.

LocalCollection held no synchronization primitives, so two threads running upsert concurrently could interleave the mutations of self.payload (Python list.append) and self.deleted (numpy np.concatenate), leaving the parallel arrays at mismatched lengths. A subsequent scroll() / search() / retrieve() call then raised:

ValueError: operands could not be broadcast together with shapes (1600,) (1495,)

The reproducer posted in #1193 (8 threads × 20 batches × 10 points) failed 2-3 trials out of 5 on both qdrant-client==1.12.2 and qdrant-client==1.17.1.

Fix

  • threading.RLock created in LocalCollection.__init__. Reentrant because the public write methods call private mutators reachable from multiple entry points.
  • with self._lock: wraps the full body of every public write entry point that mutates the per-point parallel arrays (payload, deleted, vectors, sparse_vectors, multivectors, ids, ids_inv, deleted_per_vector):
    • upsert
    • update_vectors
    • delete_vectors
    • delete
    • set_payload
    • overwrite_payload
    • delete_payload
    • clear_payload

Read paths (search, scroll, retrieve, ...) are intentionally NOT locked. The bug is that writes corrupted the arrays into a permanently inconsistent state — serializing writes alone removes the corruption. Read calls that take a single len(...) at entry and then slice will observe a consistent snapshot. Leaving reads unlocked keeps the hot path lock-free.

Test

Adds test_concurrent_upsert_does_not_corrupt_local_collection in tests/test_in_memory.py, mirroring the issue reproducer (8 threads, 20 batches, 10 points = 1600 expected). Passes with the lock. When I monkey-patched LocalCollection._lock to a no-op in a local script, 3/5 trials reproduced the operands could not be broadcast together error exactly as the issue describes.

Ran locally on master HEAD (cd5eb25, v1.17.1):

$ python -m pytest tests/test_in_memory.py tests/test_local_persistence.py tests/test_common.py -q
.................................                                       [100%]
33 passed in 2.87s

Scope decisions

  • RLock vs Lock: RLock to accommodate private mutators that are also reachable from other public methods (e.g. batch paths). Lock would be faster but risks self-deadlock on internal calls.
  • Read-path locking: skipped to keep the lock-free read hot path. If a race is ever observed on a read (none known), a follow-up can wrap the small number of array-length reads under the same RLock.
  • async_qdrant_local.py / AsyncQdrantLocal: the async wrapper dispatches to the sync LocalCollection, so this fix applies automatically when asyncio.to_thread is used to run writes concurrently. No separate async lock needed.

Happy to split the 8-method lock wrapping into a decorator or reshape the scope if preferred.

…ent-upsert race (qdrant#1193)

`LocalCollection` held no synchronization primitives. Two threads running
`upsert` could interleave the mutations of `self.payload` (list append)
and `self.deleted` (numpy concatenate), leaving them at mismatched lengths.
A subsequent `scroll` / `search` then raised

    ValueError: operands could not be broadcast together with shapes (N,) (M,)

Repro from issue qdrant#1193: 8 threads x 20 batches x 10 points, failed
2-3 trials out of 5 on qdrant-client 1.17.1 (and 1.12.2).

Fix
---
- `threading.RLock` created in `__init__`. Reentrant because the public
  write methods call private mutators that are also reachable from other
  write entry points.
- `with self._lock:` wraps the full body of every public write entry
  point that touches the per-point parallel arrays (`payload`, `deleted`,
  `vectors`, `sparse_vectors`, `multivectors`, `ids`, `ids_inv`,
  `deleted_per_vector`):
  - `upsert`
  - `update_vectors`
  - `delete_vectors`
  - `delete`
  - `set_payload`
  - `overwrite_payload`
  - `delete_payload`
  - `clear_payload`

Read paths (`search`, `scroll`, `retrieve`, ...) are NOT locked. The bug
is that writes corrupted the arrays into a permanently inconsistent
state — serializing writes removes the corruption. Reads that take a
single `len(...)` at entry then slice will see a consistent snapshot.
Leaving reads unlocked keeps the hot path lock-free and preserves the
"in-memory is for testing / prototyping" performance tradeoff.

Test
----
Adds `test_concurrent_upsert_does_not_corrupt_local_collection`
(8 threads x 20 batches x 10 points = 1600 expected). Passes with the
lock. Patched locally to a no-op lock: 3/5 trials reproduce the
`operands could not be broadcast together` error, matching the issue
reproducer output exactly.

Ref: qdrant#1193
@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 23, 2026

Deploy Preview for poetic-froyo-8baba7 failed.

Name Link
🔨 Latest commit c342fa3
🔍 Latest deploy log https://app.netlify.com/projects/poetic-froyo-8baba7/deploys/69ea4d10ecfb4a0008ef4739

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 23, 2026

📝 Walkthrough

Walkthrough

A threading.RLock is added to LocalCollection to serialize all internal write operations. The eight write methods (upsert, update_vectors, delete_vectors, delete, set_payload, overwrite_payload, delete_payload, and clear_payload) are wrapped with lock acquisition to prevent concurrent mutations of core arrays (payload, deleted, vectors, ID mappings, and per-vector deletion flags). The upsert batch branch is refactored to uniformly derive vectors from list or dict representations and construct PointStruct entries routed through _upsert_point within the locked section. A regression test is added to verify thread-safety by spawning multiple worker threads that concurrently upsert point batches and validate consistency via scroll.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The PR title directly and specifically describes the main change: adding threading.RLock to LocalCollection to prevent concurrent-upsert race conditions, matching the core fix in the changeset.
Description check ✅ Passed The PR description is comprehensive and directly related to the changeset, explaining the race condition, the fix involving threading.RLock, affected methods, test coverage, and scope decisions.
Linked Issues check ✅ Passed The changeset fully addresses issue #1193 by implementing the suggested fix: adding threading.RLock to LocalCollection and wrapping all write methods that mutate per-point arrays, resolving the concurrent-upsert race condition.
Out of Scope Changes check ✅ Passed All changes are directly in scope: LocalCollection thread-safety locking (8 methods) and a regression test for concurrent upsert; no unrelated modifications.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

Review ran into problems

🔥 Problems

Git: Failed to clone repository. Please run the @coderabbitai full review command to re-trigger a full review. If the issue persists, set path_filters to include or exclude specific files.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
qdrant_client/local/local_collection.py (1)

2553-2599: ⚠️ Potential issue | 🟠 Major

Writer-reader race still reproduces the same broadcast error.

The lock only serializes writers, but _add_point mutates self.payload (in-place append on line 2399) before it re-assigns self.deleted / self.deleted_per_vector[name] / self.vectors[name] (lines 2403, 2421-2423, 2431-2434, etc.). Readers such as scroll, search, count, facet, and retrieve go through _payload_and_non_deleted_mask, whose payload_mask & ~self.deleted on line 523 will still raise ValueError: operands could not be broadcast together with shapes (N+1,) (N,) if a read runs while any writer is mid-_add_point. The issue #1193 repro happened to join all writers before reading, so it is fixed, but the general "concurrent upsert + concurrent scroll/search" pattern is not.

Given the PR's stated "read hot path lock-free" design choice this is intentional, but consider either:

  1. Documenting in the class/__init__ docstring that reads are not safe to overlap with writes, or
  2. Reordering _add_point so per-point array growth is committed atomically from the reader's perspective (e.g., build the new deleted / deleted_per_vector[name] / vectors[name] arrays first and only then append to self.payload / self.ids_inv / self.ids — readers then see either the old size or the new size, never a torn state). Option 2 preserves the lock-free read hot path.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@qdrant_client/local/local_collection.py` around lines 2553 - 2599, The upsert
path can leave readers in a torn state because _add_point mutates self.payload
in-place before committing related arrays, causing _payload_and_non_deleted_mask
(used by scroll/search/count/retrieve/facet) to sometimes see mismatched shapes;
fix by making per-point growth atomic: in _add_point (and any callers like
upsert) construct new arrays/masks for self.deleted,
self.deleted_per_vector[name], and self.vectors[name] (and ids/ids_inv) first
(or extend copies), then perform the final append/assignment to self.payload and
ids so readers see either the old full state or the new full state; ensure the
lock around writers still surrounds the commit step and update any helper
functions that assume in-place mutation to use the new-then-swap approach
(references: upsert, _add_point, _payload_and_non_deleted_mask, scroll, search,
count, retrieve, facet).
🧹 Nitpick comments (1)
tests/test_in_memory.py (1)

303-369: Nice, targeted regression test — optional: close the client.

Sizing (8×20×10 = 1600) and the post-join scroll(limit=expected_total + 1) precisely exercise the original failure mode. Two minor refinements if you feel like it:

  • The QdrantClient is never closed; for :memory: it's harmless in CI but leaves the sqlite connection dangling until GC. A try/finally or with contextlib.closing(...) (or calling client.close()) keeps the test tidy.
  • Consider asserting len({p.id for p in points}) == expected_total as well, to catch a hypothetical regression where the race silently drops or duplicates IDs without tripping the shape mismatch.
💡 Optional tweak
-    client = QdrantClient(":memory:")
-    client.create_collection(
+    client = QdrantClient(":memory:")
+    try:
+        client.create_collection(
             collection_name="race",
             ...
-    )
+        )
         ...
-    points, _ = client.scroll(collection_name="race", limit=expected_total + 1)
-    assert len(points) == expected_total, (
-        f"expected {expected_total} points after concurrent upserts, got {len(points)}"
-    )
+        points, _ = client.scroll(collection_name="race", limit=expected_total + 1)
+        assert len(points) == expected_total, (
+            f"expected {expected_total} points after concurrent upserts, got {len(points)}"
+        )
+        assert len({p.id for p in points}) == expected_total
+    finally:
+        client.close()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_in_memory.py` around lines 303 - 369, The test leaves the
QdrantClient open and could be tightened to also assert uniqueness of returned
IDs; wrap the QdrantClient usage in a try/finally (or use
contextlib.closing/with) and call client.close() in the finally to ensure the
in-memory sqlite connection is closed, and after the scroll call add an
assertion like len({p.id for p in points}) == expected_total to ensure no
duplicates/drops; update references in this test function
(test_concurrent_upsert_does_not_corrupt_local_collection) and keep the existing
scroll/expected_total logic unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@qdrant_client/local/local_collection.py`:
- Around line 2553-2599: The upsert path can leave readers in a torn state
because _add_point mutates self.payload in-place before committing related
arrays, causing _payload_and_non_deleted_mask (used by
scroll/search/count/retrieve/facet) to sometimes see mismatched shapes; fix by
making per-point growth atomic: in _add_point (and any callers like upsert)
construct new arrays/masks for self.deleted, self.deleted_per_vector[name], and
self.vectors[name] (and ids/ids_inv) first (or extend copies), then perform the
final append/assignment to self.payload and ids so readers see either the old
full state or the new full state; ensure the lock around writers still surrounds
the commit step and update any helper functions that assume in-place mutation to
use the new-then-swap approach (references: upsert, _add_point,
_payload_and_non_deleted_mask, scroll, search, count, retrieve, facet).

---

Nitpick comments:
In `@tests/test_in_memory.py`:
- Around line 303-369: The test leaves the QdrantClient open and could be
tightened to also assert uniqueness of returned IDs; wrap the QdrantClient usage
in a try/finally (or use contextlib.closing/with) and call client.close() in the
finally to ensure the in-memory sqlite connection is closed, and after the
scroll call add an assertion like len({p.id for p in points}) == expected_total
to ensure no duplicates/drops; update references in this test function
(test_concurrent_upsert_does_not_corrupt_local_collection) and keep the existing
scroll/expected_total logic unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 208aca99-7e6f-4c9d-9e70-2ff29123b1c4

📥 Commits

Reviewing files that changed from the base of the PR and between cd5eb25 and c342fa3.

📒 Files selected for processing (2)
  • qdrant_client/local/local_collection.py
  • tests/test_in_memory.py

@joein
Copy link
Copy Markdown
Member

joein commented May 18, 2026

Hey @aurelienbran

Local mode is not designed for the concurrent workloads.
It has an async-mock (AsyncQdrantLocal) which provides async methods, but they are actually synchronous under the hood, and that is made intentionally.

Trying to overcome this with .to_thread does not sound right to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

In-memory LocalCollection.upsert: numpy array race under concurrent threads (ValueError: operands could not be broadcast)

2 participants