Skip to content

Commit 85e7c0d

Browse files
authored
fix: chromadb offload async batch upsert/insert to worker thread (#7711)
1 parent 7cd69c1 commit 85e7c0d

2 files changed

Lines changed: 64 additions & 2 deletions

File tree

libs/agno/agno/vectordb/chroma/chromadb.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,11 @@ async def async_insert(
465465
if self._collection is None:
466466
logger.warning("Collection does not exist")
467467
else:
468-
self._batch_operation(
468+
# Run the synchronous ChromaDB batch on a worker thread so the
469+
# asyncio event loop stays responsive while the (potentially large)
470+
# write completes.
471+
await asyncio.to_thread(
472+
self._batch_operation,
469473
ids=ids,
470474
embeddings=docs_embeddings,
471475
documents=docs,
@@ -656,7 +660,11 @@ async def _async_upsert(
656660
if self._collection is None:
657661
logger.warning("Collection does not exist")
658662
else:
659-
self._batch_operation(
663+
# Run the synchronous ChromaDB batch on a worker thread so the
664+
# asyncio event loop stays responsive while the (potentially large)
665+
# write completes.
666+
await asyncio.to_thread(
667+
self._batch_operation,
660668
ids=ids,
661669
embeddings=docs_embeddings,
662670
documents=docs,

libs/agno/tests/unit/vectordb/test_chromadb.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1481,3 +1481,57 @@ def test_no_batch_size_no_batching_logs(chroma_db, caplog):
14811481
assert not batch_log_found, "Expected no batch log messages, but found batch related logs"
14821482

14831483
assert chroma_db.get_count() == num_documents
1484+
1485+
1486+
@pytest.mark.asyncio
1487+
async def test_async_insert_runs_batch_on_worker_thread(chroma_db, sample_documents):
1488+
"""Ensure async_insert offloads the synchronous ChromaDB batch to a worker thread."""
1489+
from threading import get_ident
1490+
1491+
for doc in sample_documents:
1492+
doc.embedding = chroma_db.embedder.get_embedding(doc.content)
1493+
1494+
main_thread_id = get_ident()
1495+
batch_thread_ids: List[int] = []
1496+
original_batch = ChromaDb._batch_operation
1497+
1498+
def _record_batch(self, *args, **kwargs):
1499+
batch_thread_ids.append(get_ident())
1500+
return original_batch(self, *args, **kwargs)
1501+
1502+
with patch.object(ChromaDb, "_batch_operation", _record_batch):
1503+
await chroma_db.async_insert(content_hash="test_hash", documents=sample_documents)
1504+
1505+
assert batch_thread_ids, "expected _batch_operation to be invoked"
1506+
for thread_id in batch_thread_ids:
1507+
assert thread_id != main_thread_id, (
1508+
"_batch_operation ran on the asyncio main thread; the synchronous ChromaDB "
1509+
"batch must be offloaded so the event loop stays responsive during async_insert"
1510+
)
1511+
1512+
1513+
@pytest.mark.asyncio
1514+
async def test_async_upsert_runs_batch_on_worker_thread(chroma_db, sample_documents):
1515+
"""Ensure async_upsert offloads the synchronous ChromaDB batch to a worker thread."""
1516+
from threading import get_ident
1517+
1518+
for doc in sample_documents:
1519+
doc.embedding = chroma_db.embedder.get_embedding(doc.content)
1520+
1521+
main_thread_id = get_ident()
1522+
batch_thread_ids: List[int] = []
1523+
original_batch = ChromaDb._batch_operation
1524+
1525+
def _record_batch(self, *args, **kwargs):
1526+
batch_thread_ids.append(get_ident())
1527+
return original_batch(self, *args, **kwargs)
1528+
1529+
with patch.object(ChromaDb, "_batch_operation", _record_batch):
1530+
await chroma_db.async_upsert(content_hash="test_hash", documents=sample_documents)
1531+
1532+
assert batch_thread_ids, "expected _batch_operation to be invoked"
1533+
for thread_id in batch_thread_ids:
1534+
assert thread_id != main_thread_id, (
1535+
"_batch_operation ran on the asyncio main thread; the synchronous ChromaDB "
1536+
"batch must be offloaded so the event loop stays responsive during async_upsert"
1537+
)

0 commit comments

Comments
 (0)