Skip to content

In-memory LocalCollection.upsert: numpy array race under concurrent threads (ValueError: operands could not be broadcast) #1193

@aurelienbran

Description

@aurelienbran

Summary

When multiple threads call QdrantClient(":memory:").upsert(...) on the same collection concurrently, the internal numpy arrays (payload, deleted) in qdrant_client.local.LocalCollection can end up with mismatched shapes. A subsequent scroll() / search() then raises:

ValueError: operands could not be broadcast together with shapes (1600,) (1438,)

Reproducer

import uuid, threading, random, sys
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, PointStruct, SparseVector, SparseVectorParams, VectorParams,
)
from importlib.metadata import version

def worker(client, collection, batch_size, runs, seed):
    rng = random.Random(seed)
    for _ in range(runs):
        points = []
        for i in range(batch_size):
            points.append(PointStruct(
                id=str(uuid.uuid4()),
                vector={
                    "dense": [rng.random() for _ in range(8)],
                    "bm25": SparseVector(indices=[0, 1, 2], values=[0.5, 0.3, 0.1]),
                },
                payload={"thread": seed, "run_idx": i},
            ))
        client.upsert(collection_name=collection, points=points)

print(f"qdrant-client version: {version('qdrant-client')}")
failures = 0
for trial in range(5):
    client = QdrantClient(":memory:")
    client.create_collection(
        collection_name="race",
        vectors_config={"dense": VectorParams(size=8, distance=Distance.COSINE)},
        sparse_vectors_config={"bm25": SparseVectorParams()},
    )
    threads = [threading.Thread(target=worker, args=(client, "race", 10, 20, s)) for s in range(8)]
    for t in threads: t.start()
    for t in threads: t.join()
    try:
        res, _ = client.scroll(collection_name="race", limit=5000)
        assert len(res) == 1600
        print(f"  trial {trial}: OK ({len(res)} points)")
    except Exception as e:
        failures += 1
        print(f"  trial {trial}: RACE — {type(e).__name__}: {str(e)[:100]}")
sys.exit(1 if failures else 0)

Observed

Tested on two versions separated by 9 months:

Version Trials failing
1.12.2 3 / 5
1.17.1 2 / 5

Sample output from 1.17.1:

qdrant-client version: 1.17.1
  trial 0: OK (1600 points)
  trial 1: OK (1600 points)
  trial 2: RACE — ValueError: operands could not be broadcast together with shapes (1600,) (1598,)
  trial 3: OK (1600 points)
  trial 4: RACE — ValueError: operands could not be broadcast together with shapes (1600,) (1585,)

Root cause

qdrant_client/local/local_collection.py contains no locking primitive (grep -E "threading|Lock\(|_lock\b" qdrant_client/local/local_collection.py returns 0 matches in both 1.12.2 and 1.17.1).

upsert grows self.payload and self.deleted numpy arrays independently via np.concatenate / slice assignments. Two concurrent upsert threads can interleave so that payload is extended by N points while deleted is only extended by N-k, leaving the two arrays at mismatched lengths. The first broadcast-style operation in a subsequent read (scroll, search, filter) then fails.

Impact

  • In-memory backend (:memory:) is documented as "test / prototyping only", but projects using pytest-asyncio with asyncio.to_thread(client.upsert, ...) or multi-thread fixtures will hit this race intermittently — flaky CI.
  • Server-backed and file-backed QdrantClient paths route through different storage, so production is unaffected — this is strictly an in-memory bug.
  • We currently have to permanently deselect a concurrency test from CI because of this race.

Suggested fix

Add a threading.Lock in LocalCollection.__init__ and acquire it around the payload / deleted / vectors mutations inside upsert, delete, set_payload, and any other write path. The in-memory backend is not perf-critical (it's explicitly for testing), so a coarse-grained lock is acceptable. A finer-grained RLock keyed on operation type would also work.

Happy to open a PR if the maintainers would welcome it — just confirm the preferred scope (per-collection vs per-operation, RLock vs Lock, async compatibility via asyncio.Lock passthrough) and I'll send one.

Environment

  • Python 3.10
  • numpy >= 1.24
  • qdrant-client 1.17.1 (and 1.12.2)
  • Linux (Ubuntu 22.04)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions