From cb37749f5a5526945f145cd2704390c64a1c0e54 Mon Sep 17 00:00:00 2001 From: Sachin Yadav Date: Fri, 22 May 2026 00:20:32 +0530 Subject: [PATCH 01/10] style: apply ruff formatting fixes --- ...t_multi_query_embedding_retriever_async.py | 138 ++++++++++++++++++ .../test_multi_query_text_retriever_async.py | 114 +++++++++++++++ .../test_text_embedding_retriever_async.py | 93 ++++++++++++ 3 files changed, 345 insertions(+) create mode 100644 test/components/retrievers/test_multi_query_embedding_retriever_async.py create mode 100644 test/components/retrievers/test_multi_query_text_retriever_async.py create mode 100644 test/components/retrievers/test_text_embedding_retriever_async.py diff --git a/test/components/retrievers/test_multi_query_embedding_retriever_async.py b/test/components/retrievers/test_multi_query_embedding_retriever_async.py new file mode 100644 index 0000000000..926a41b4bd --- /dev/null +++ b/test/components/retrievers/test_multi_query_embedding_retriever_async.py @@ -0,0 +1,138 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +import numpy as np +import pytest + +from haystack import AsyncPipeline, Document, component +from haystack.components.retrievers import InMemoryEmbeddingRetriever, MultiQueryEmbeddingRetriever +from haystack.document_stores.in_memory import InMemoryDocumentStore + + +@component +class MockQueryEmbedder: + @component.output_types(embedding=list[float]) + def run(self, text: str) -> dict[str, list[float]]: + return {"embedding": np.ones(384).tolist()} + + @component.output_types(embedding=list[float]) + async def run_async(self, text: str) -> dict[str, list[float]]: + return {"embedding": np.ones(384).tolist()} + + +class TestMultiQueryEmbeddingRetrieverAsync: + @pytest.mark.asyncio + async def test_run_async_with_empty_queries(self): + multi_retriever = MultiQueryEmbeddingRetriever( + retriever=InMemoryEmbeddingRetriever(document_store=InMemoryDocumentStore()), + query_embedder=MockQueryEmbedder(), + ) + result = await multi_retriever.run_async(queries=[]) + assert "documents" in result + assert result["documents"] == [] + + @pytest.mark.asyncio + async def test_run_async_returns_documents_sorted_by_score(self): + doc_high = Document(content="Solar energy", id="doc1", score=0.9) + doc_low = Document(content="Fossil fuels", id="doc2", score=0.3) + doc_mid = Document(content="Wind energy", id="doc3", score=0.6) + + @component + class MockRetriever: + @component.output_types(documents=list[Document]) + def run( + self, + query_embedding: list[float], + filters: dict[str, Any] | None = None, + top_k: int | None = None, + **kwargs: Any, + ) -> dict[str, list[Document]]: + return {"documents": [doc_low, doc_high, doc_mid]} + + @component.output_types(documents=list[Document]) + async def run_async( + self, + query_embedding: list[float], + filters: dict[str, Any] | None = None, + top_k: int | None = None, + **kwargs: Any, + ) -> dict[str, list[Document]]: + return {"documents": [doc_low, doc_high, doc_mid]} + + multi_retriever = MultiQueryEmbeddingRetriever(retriever=MockRetriever(), query_embedder=MockQueryEmbedder()) + result = await multi_retriever.run_async(queries=["query1", "query2"]) + + scores = [doc.score for doc in result["documents"]] + assert scores == sorted(scores, reverse=True) + + @pytest.mark.asyncio + async def test_run_async_deduplication(self): + doc2 = Document(content="Wind energy is clean", id="doc2", score=0.8) + # doc3 shares the same id as doc1 — simulates the same doc retrieved by different queries + doc3 = Document(content="Solar energy is renewable", id="doc1", score=0.7) + + @component + class MockRetriever: + @component.output_types(documents=list[Document]) + def run( + self, + query_embedding: list[float], + filters: dict[str, Any] | None = None, + top_k: int | None = None, + **kwargs: Any, + ) -> dict[str, list[Document]]: + return {"documents": [doc3, doc2]} + + @component.output_types(documents=list[Document]) + async def run_async( + self, + query_embedding: list[float], + filters: dict[str, Any] | None = None, + top_k: int | None = None, + **kwargs: Any, + ) -> dict[str, list[Document]]: + return {"documents": [doc3, doc2]} + + multi_retriever = MultiQueryEmbeddingRetriever(retriever=MockRetriever(), query_embedder=MockQueryEmbedder()) + result = await multi_retriever.run_async(queries=["query1", "query2"]) + + assert "documents" in result + assert len(result["documents"]) == 2 + contents = [doc.content for doc in result["documents"]] + assert contents.count("Solar energy is renewable") == 1 + assert contents.count("Wind energy is clean") == 1 + + @pytest.mark.asyncio + async def test_run_async_falls_back_to_sync_when_no_run_async(self): + @component + class SyncOnlyEmbedder: + @component.output_types(embedding=list[float]) + def run(self, text: str) -> dict[str, list[float]]: + return {"embedding": np.ones(384).tolist()} + + multi_retriever = MultiQueryEmbeddingRetriever( + retriever=InMemoryEmbeddingRetriever(document_store=InMemoryDocumentStore()), + query_embedder=SyncOnlyEmbedder(), + ) + result = await multi_retriever.run_async(queries=["query"]) + assert "documents" in result + assert result["documents"] == [] + + @pytest.mark.asyncio + @pytest.mark.integration + async def test_run_async_with_pipeline(self): + multi_retriever = MultiQueryEmbeddingRetriever( + retriever=InMemoryEmbeddingRetriever(document_store=InMemoryDocumentStore()), + query_embedder=MockQueryEmbedder(), + ) + pipeline = AsyncPipeline() + pipeline.add_component("retriever", multi_retriever) + result = await pipeline.run_async(data={"retriever": {"queries": ["green energy", "solar power"]}}) + + assert result + assert "retriever" in result + assert "documents" in result["retriever"] + assert result["retriever"]["documents"] == [] diff --git a/test/components/retrievers/test_multi_query_text_retriever_async.py b/test/components/retrievers/test_multi_query_text_retriever_async.py new file mode 100644 index 0000000000..d883b6ae65 --- /dev/null +++ b/test/components/retrievers/test_multi_query_text_retriever_async.py @@ -0,0 +1,114 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +import pytest + +from haystack import AsyncPipeline, Document, component +from haystack.components.retrievers import InMemoryBM25Retriever, MultiQueryTextRetriever +from haystack.components.writers import DocumentWriter +from haystack.document_stores.in_memory import InMemoryDocumentStore +from haystack.document_stores.types import DuplicatePolicy + + +class TestMultiQueryTextRetrieverAsync: + @pytest.fixture + def sample_documents(self): + return [ + Document(content="Renewable energy is energy that is collected from renewable resources."), + Document(content="Solar energy is a type of green energy that is harnessed from the sun."), + Document(content="Wind energy is another type of green energy that is generated by wind turbines."), + Document(content="Geothermal energy is heat that comes from the sub-surface of the earth."), + ] + + @pytest.fixture + def document_store_with_docs(self, sample_documents): + document_store = InMemoryDocumentStore() + doc_writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP) + doc_writer.run(documents=sample_documents) + return document_store + + @pytest.mark.asyncio + async def test_run_async_with_empty_queries(self, document_store_with_docs): + multi_retriever = MultiQueryTextRetriever( + retriever=InMemoryBM25Retriever(document_store=document_store_with_docs) + ) + result = await multi_retriever.run_async(queries=[]) + assert "documents" in result + assert result["documents"] == [] + + @pytest.mark.asyncio + async def test_run_async_with_multiple_queries(self, document_store_with_docs): + multi_retriever = MultiQueryTextRetriever( + retriever=InMemoryBM25Retriever(document_store=document_store_with_docs) + ) + result = await multi_retriever.run_async(queries=["renewable energy", "solar power"]) + + assert "documents" in result + assert len(result["documents"]) > 0 + assert all(isinstance(doc, Document) for doc in result["documents"]) + scores = [doc.score for doc in result["documents"] if doc.score is not None] + assert scores == sorted(scores, reverse=True) + + @pytest.mark.asyncio + async def test_run_async_deduplication(self): + doc2 = Document(content="Wind energy is clean", id="doc2", score=0.8) + # doc3 shares the same id as doc1 — simulates the same doc retrieved by different queries + doc3 = Document(content="Solar energy is renewable", id="doc1", score=0.7) + + @component + class MockRetriever: + @component.output_types(documents=list[Document]) + def run( + self, query: str, filters: dict[str, Any] | None = None, top_k: int | None = None, **kwargs: Any + ) -> dict[str, list[Document]]: + return {"documents": [doc3, doc2]} + + @component.output_types(documents=list[Document]) + async def run_async( + self, query: str, filters: dict[str, Any] | None = None, top_k: int | None = None, **kwargs: Any + ) -> dict[str, list[Document]]: + return {"documents": [doc3, doc2]} + + multi_retriever = MultiQueryTextRetriever(retriever=MockRetriever()) + result = await multi_retriever.run_async(queries=["query1", "query2"]) + + assert "documents" in result + assert len(result["documents"]) == 2 + contents = [doc.content for doc in result["documents"]] + assert contents.count("Solar energy is renewable") == 1 + assert contents.count("Wind energy is clean") == 1 + + @pytest.mark.asyncio + async def test_run_async_falls_back_to_sync_when_no_run_async(self, document_store_with_docs): + @component + class SyncOnlyRetriever: + @component.output_types(documents=list[Document]) + def run( + self, query: str, filters: dict[str, Any] | None = None, top_k: int | None = None + ) -> dict[str, list[Document]]: + return {"documents": []} + + multi_retriever = MultiQueryTextRetriever(retriever=SyncOnlyRetriever()) + result = await multi_retriever.run_async(queries=["query"]) + assert "documents" in result + assert result["documents"] == [] + + @pytest.mark.asyncio + @pytest.mark.integration + async def test_run_async_with_pipeline(self, document_store_with_docs): + multi_retriever = MultiQueryTextRetriever( + retriever=InMemoryBM25Retriever(document_store=document_store_with_docs) + ) + pipeline = AsyncPipeline() + pipeline.add_component("retriever", multi_retriever) + result = await pipeline.run_async(data={"retriever": {"queries": ["renewable energy", "solar power"]}}) + + assert result + assert "retriever" in result + assert "documents" in result["retriever"] + assert len(result["retriever"]["documents"]) > 0 + scores = [doc.score for doc in result["retriever"]["documents"] if doc.score is not None] + assert scores == sorted(scores, reverse=True) diff --git a/test/components/retrievers/test_text_embedding_retriever_async.py b/test/components/retrievers/test_text_embedding_retriever_async.py new file mode 100644 index 0000000000..8f34fed9e4 --- /dev/null +++ b/test/components/retrievers/test_text_embedding_retriever_async.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +import numpy as np +import pytest + +from haystack import AsyncPipeline, Document, component +from haystack.components.retrievers import InMemoryEmbeddingRetriever, TextEmbeddingRetriever +from haystack.document_stores.in_memory import InMemoryDocumentStore + + +@component +class MockTextEmbedder: + @component.output_types(embedding=list[float]) + def run(self, text: str) -> dict[str, list[float]]: + return {"embedding": np.ones(384).tolist()} + + @component.output_types(embedding=list[float]) + async def run_async(self, text: str) -> dict[str, list[float]]: + return {"embedding": np.ones(384).tolist()} + + +class TestTextEmbeddingRetrieverAsync: + @pytest.mark.asyncio + async def test_run_async_with_empty_document_store(self): + retriever = TextEmbeddingRetriever( + retriever=InMemoryEmbeddingRetriever(document_store=InMemoryDocumentStore()), + text_embedder=MockTextEmbedder(), + ) + result = await retriever.run_async(query="green energy") + assert "documents" in result + assert result["documents"] == [] + + @pytest.mark.asyncio + async def test_run_async_returns_documents_sorted_by_score(self): + doc_high = Document(content="Solar energy", id="doc1", score=0.9) + doc_low = Document(content="Fossil fuels", id="doc2", score=0.3) + doc_mid = Document(content="Wind energy", id="doc3", score=0.6) + + @component + class MockRetriever: + @component.output_types(documents=list[Document]) + def run( + self, query_embedding: list[float], filters: dict[str, Any] | None = None, top_k: int | None = None + ) -> dict[str, list[Document]]: + return {"documents": [doc_low, doc_high, doc_mid]} + + @component.output_types(documents=list[Document]) + async def run_async( + self, query_embedding: list[float], filters: dict[str, Any] | None = None, top_k: int | None = None + ) -> dict[str, list[Document]]: + return {"documents": [doc_low, doc_high, doc_mid]} + + retriever = TextEmbeddingRetriever(retriever=MockRetriever(), text_embedder=MockTextEmbedder()) + result = await retriever.run_async(query="energy") + + scores = [doc.score for doc in result["documents"]] + assert scores == sorted(scores, reverse=True) + + @pytest.mark.asyncio + async def test_run_async_falls_back_to_sync_when_no_run_async(self): + @component + class SyncOnlyEmbedder: + @component.output_types(embedding=list[float]) + def run(self, text: str) -> dict[str, list[float]]: + return {"embedding": np.ones(384).tolist()} + + retriever = TextEmbeddingRetriever( + retriever=InMemoryEmbeddingRetriever(document_store=InMemoryDocumentStore()), + text_embedder=SyncOnlyEmbedder(), + ) + result = await retriever.run_async(query="green energy") + assert "documents" in result + assert result["documents"] == [] + + @pytest.mark.asyncio + @pytest.mark.integration + async def test_run_async_with_pipeline(self): + retriever = TextEmbeddingRetriever( + retriever=InMemoryEmbeddingRetriever(document_store=InMemoryDocumentStore()), + text_embedder=MockTextEmbedder(), + ) + pipeline = AsyncPipeline() + pipeline.add_component("retriever", retriever) + result = await pipeline.run_async(data={"retriever": {"query": "green energy"}}) + + assert result + assert "retriever" in result + assert "documents" in result["retriever"] + assert result["retriever"]["documents"] == [] From 2271fdd5f32c861cb2deacc29bd3a2c6ec834888 Mon Sep 17 00:00:00 2001 From: Sachin Yadav Date: Fri, 22 May 2026 00:30:31 +0530 Subject: [PATCH 02/10] feat: add run_async to TextEmbeddingRetriever, MultiQueryEmbeddingRetriever, and MultiQueryTextRetriever --- .../multi_query_embedding_retriever.py | 57 +++++++++++++++++++ .../retrievers/multi_query_text_retriever.py | 48 ++++++++++++++++ .../retrievers/text_embedding_retriever.py | 42 ++++++++++++++ ...-async-to-retrievers-a265779e909abc2c.yaml | 7 +++ 4 files changed, 154 insertions(+) create mode 100644 releasenotes/notes/add-run-async-to-retrievers-a265779e909abc2c.yaml diff --git a/haystack/components/retrievers/multi_query_embedding_retriever.py b/haystack/components/retrievers/multi_query_embedding_retriever.py index ca9f297368..945c965319 100644 --- a/haystack/components/retrievers/multi_query_embedding_retriever.py +++ b/haystack/components/retrievers/multi_query_embedding_retriever.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 +import asyncio from concurrent.futures import ThreadPoolExecutor from typing import Any @@ -125,6 +126,33 @@ def run(self, queries: list[str], retriever_kwargs: dict[str, Any] | None = None docs.sort(key=lambda x: x.score or 0.0, reverse=True) return {"documents": docs} + @component.output_types(documents=list[Document]) + async def run_async( + self, queries: list[str], retriever_kwargs: dict[str, Any] | None = None + ) -> dict[str, list[Document]]: + """ + Retrieve documents using multiple queries concurrently. + + Uses each component's `run_async` method if available, otherwise falls back to running `run` + in a thread executor. Queries are processed concurrently using asyncio.gather. + + :param queries: List of text queries to process. + :param retriever_kwargs: Optional dictionary of arguments to pass to the retriever's run method. + :returns: + A dictionary containing: + - `documents`: List of retrieved documents sorted by relevance score. + """ + retriever_kwargs = retriever_kwargs or {} + + if not self._is_warmed_up: + self.warm_up() + + results = await asyncio.gather(*[self._run_one_async(q, retriever_kwargs) for q in queries]) + docs: list[Document] = [doc for result in results if result for doc in result] + docs = _deduplicate_documents(docs) + docs.sort(key=lambda x: x.score or 0.0, reverse=True) + return {"documents": docs} + def _run_on_thread(self, query: str, retriever_kwargs: dict[str, Any] | None = None) -> list[Document] | None: """ Process a single query on a separate thread. @@ -140,6 +168,35 @@ def _run_on_thread(self, query: str, retriever_kwargs: dict[str, Any] | None = N return result["documents"] return None + async def _run_one_async(self, query: str, retriever_kwargs: dict[str, Any]) -> list[Document] | None: + """ + Process a single query asynchronously. + + :param query: The text query to process. + :param retriever_kwargs: Arguments to pass to the retriever's run method. + :returns: + List of retrieved documents or None if no results. + """ + loop = asyncio.get_running_loop() + + if hasattr(self.query_embedder, "run_async") and callable(self.query_embedder.run_async): + embedding_result = await self.query_embedder.run_async(text=query) + else: + embedding_result = await loop.run_in_executor(None, lambda: self.query_embedder.run(text=query)) + + query_embedding = embedding_result["embedding"] + + if hasattr(self.retriever, "run_async") and callable(self.retriever.run_async): + result = await self.retriever.run_async(query_embedding=query_embedding, **retriever_kwargs) + else: + result = await loop.run_in_executor( + None, lambda: self.retriever.run(query_embedding=query_embedding, **retriever_kwargs) + ) + + if result and "documents" in result: + return result["documents"] + return None + def to_dict(self) -> dict[str, Any]: """ Serializes the component to a dictionary. diff --git a/haystack/components/retrievers/multi_query_text_retriever.py b/haystack/components/retrievers/multi_query_text_retriever.py index 2e6fbf6fd6..b265c5ec05 100644 --- a/haystack/components/retrievers/multi_query_text_retriever.py +++ b/haystack/components/retrievers/multi_query_text_retriever.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 +import asyncio from concurrent.futures import ThreadPoolExecutor from typing import Any @@ -105,6 +106,33 @@ def run(self, queries: list[str], retriever_kwargs: dict[str, Any] | None = None docs.sort(key=lambda x: x.score or 0.0, reverse=True) return {"documents": docs} + @component.output_types(documents=list[Document]) + async def run_async( + self, queries: list[str], retriever_kwargs: dict[str, Any] | None = None + ) -> dict[str, list[Document]]: + """ + Retrieve documents using multiple queries concurrently. + + Uses the retriever's `run_async` method if available, otherwise falls back to running `run` + in a thread executor. Queries are processed concurrently using asyncio.gather. + + :param queries: List of text queries to process. + :param retriever_kwargs: Optional dictionary of arguments to pass to the retriever's run method. + :returns: + A dictionary containing: + `documents`: List of retrieved documents sorted by relevance score. + """ + retriever_kwargs = retriever_kwargs or {} + + if not self._is_warmed_up: + self.warm_up() + + results = await asyncio.gather(*[self._run_one_async(q, retriever_kwargs) for q in queries]) + docs: list[Document] = [doc for result in results if result for doc in result] + docs = _deduplicate_documents(docs) + docs.sort(key=lambda x: x.score or 0.0, reverse=True) + return {"documents": docs} + def _run_on_thread(self, query: str, retriever_kwargs: dict[str, Any] | None = None) -> list[Document] | None: """ Process a single query on a separate thread. @@ -119,6 +147,26 @@ def _run_on_thread(self, query: str, retriever_kwargs: dict[str, Any] | None = N return result["documents"] return None + async def _run_one_async(self, query: str, retriever_kwargs: dict[str, Any]) -> list[Document] | None: + """ + Process a single query asynchronously. + + :param query: The text query to process. + :param retriever_kwargs: Arguments to pass to the retriever's run method. + :returns: + List of retrieved documents or None if no results. + """ + loop = asyncio.get_running_loop() + + if hasattr(self.retriever, "run_async") and callable(self.retriever.run_async): + result = await self.retriever.run_async(query=query, **retriever_kwargs) + else: + result = await loop.run_in_executor(None, lambda: self.retriever.run(query=query, **retriever_kwargs)) + + if result and "documents" in result: + return result["documents"] + return None + def to_dict(self) -> dict[str, Any]: """ Serializes the component to a dictionary. diff --git a/haystack/components/retrievers/text_embedding_retriever.py b/haystack/components/retrievers/text_embedding_retriever.py index 8b13a49a38..f8cc210026 100644 --- a/haystack/components/retrievers/text_embedding_retriever.py +++ b/haystack/components/retrievers/text_embedding_retriever.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 +import asyncio from typing import Any from haystack import Document, component, default_from_dict, default_to_dict @@ -104,6 +105,47 @@ def run( docs.sort(key=lambda x: x.score or 0.0, reverse=True) return {"documents": docs} + @component.output_types(documents=list[Document]) + async def run_async( + self, query: str, filters: dict[str, Any] | None = None, top_k: int | None = None + ) -> dict[str, list[Document]]: + """ + Retrieve documents using a single query asynchronously. + + Uses `run_async` on the text embedder and retriever if available, otherwise falls back to + running `run` in a thread executor. + + :param query: The query to retrieve documents for. + :param filters: A dictionary of filters to apply when retrieving documents. + :param top_k: The maximum number of documents to return. + :returns: + A dictionary containing: + - `documents`: List of retrieved documents sorted by relevance score. + """ + if not self._is_warmed_up: + self.warm_up() + + loop = asyncio.get_running_loop() + + if hasattr(self.text_embedder, "run_async") and callable(self.text_embedder.run_async): + embedding_result = await self.text_embedder.run_async(text=query) + else: + embedding_result = await loop.run_in_executor(None, lambda: self.text_embedder.run(text=query)) + + if hasattr(self.retriever, "run_async") and callable(self.retriever.run_async): + result = await self.retriever.run_async( + query_embedding=embedding_result["embedding"], filters=filters, top_k=top_k + ) + else: + result = await loop.run_in_executor( + None, + lambda: self.retriever.run(query_embedding=embedding_result["embedding"], filters=filters, top_k=top_k), + ) + + docs: list[Document] = result["documents"] + docs.sort(key=lambda x: x.score or 0.0, reverse=True) + return {"documents": docs} + def to_dict(self) -> dict[str, Any]: """ Serializes the component to a dictionary. diff --git a/releasenotes/notes/add-run-async-to-retrievers-a265779e909abc2c.yaml b/releasenotes/notes/add-run-async-to-retrievers-a265779e909abc2c.yaml new file mode 100644 index 0000000000..552172be87 --- /dev/null +++ b/releasenotes/notes/add-run-async-to-retrievers-a265779e909abc2c.yaml @@ -0,0 +1,7 @@ +--- +enhancements: + - | + Added ``run_async`` to ``TextEmbeddingRetriever``, ``MultiQueryEmbeddingRetriever``, and + ``MultiQueryTextRetriever``. These components now execute natively as coroutines in + ``AsyncPipeline``, delegating to each wrapped component's ``run_async`` when available and + falling back to a thread executor otherwise. From df49c1b56e26176515e6209761a065a4cce4709e Mon Sep 17 00:00:00 2001 From: Sachin Yadav Date: Fri, 22 May 2026 00:40:20 +0530 Subject: [PATCH 03/10] style: fix stale comment in deduplication tests --- .../retrievers/test_multi_query_embedding_retriever_async.py | 2 +- .../retrievers/test_multi_query_text_retriever_async.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/components/retrievers/test_multi_query_embedding_retriever_async.py b/test/components/retrievers/test_multi_query_embedding_retriever_async.py index 926a41b4bd..74bb0dd8ce 100644 --- a/test/components/retrievers/test_multi_query_embedding_retriever_async.py +++ b/test/components/retrievers/test_multi_query_embedding_retriever_async.py @@ -71,7 +71,7 @@ async def run_async( @pytest.mark.asyncio async def test_run_async_deduplication(self): doc2 = Document(content="Wind energy is clean", id="doc2", score=0.8) - # doc3 shares the same id as doc1 — simulates the same doc retrieved by different queries + # doc3 intentionally uses the duplicate id "doc1" to simulate deduplication across multiple queries doc3 = Document(content="Solar energy is renewable", id="doc1", score=0.7) @component diff --git a/test/components/retrievers/test_multi_query_text_retriever_async.py b/test/components/retrievers/test_multi_query_text_retriever_async.py index d883b6ae65..f5b37fcbd6 100644 --- a/test/components/retrievers/test_multi_query_text_retriever_async.py +++ b/test/components/retrievers/test_multi_query_text_retriever_async.py @@ -55,7 +55,7 @@ async def test_run_async_with_multiple_queries(self, document_store_with_docs): @pytest.mark.asyncio async def test_run_async_deduplication(self): doc2 = Document(content="Wind energy is clean", id="doc2", score=0.8) - # doc3 shares the same id as doc1 — simulates the same doc retrieved by different queries + # doc3 intentionally uses the duplicate id "doc1" to simulate deduplication across multiple queries doc3 = Document(content="Solar energy is renewable", id="doc1", score=0.7) @component From bc80d83a80d7fedf25f9a9453583a0e103baa92f Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Wed, 27 May 2026 13:34:39 +0200 Subject: [PATCH 04/10] adding test_run_async_with_filters --- .../test_multi_query_text_retriever_async.py | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/test/components/retrievers/test_multi_query_text_retriever_async.py b/test/components/retrievers/test_multi_query_text_retriever_async.py index f5b37fcbd6..7450a09ad9 100644 --- a/test/components/retrievers/test_multi_query_text_retriever_async.py +++ b/test/components/retrievers/test_multi_query_text_retriever_async.py @@ -17,10 +17,27 @@ class TestMultiQueryTextRetrieverAsync: @pytest.fixture def sample_documents(self): return [ - Document(content="Renewable energy is energy that is collected from renewable resources."), - Document(content="Solar energy is a type of green energy that is harnessed from the sun."), - Document(content="Wind energy is another type of green energy that is generated by wind turbines."), - Document(content="Geothermal energy is heat that comes from the sub-surface of the earth."), + Document( + content="Solar energy is a type of green energy that is harnessed from the sun.", + meta={"category": "solar"}, + ), + Document(content="Solar panels convert sunlight directly into electricity.", meta={"category": "solar"}), + Document(content="Photovoltaic cells are the building blocks of solar panels.", meta={"category": "solar"}), + Document( + content="Wind energy is another type of green energy that is generated by wind turbines.", + meta={"category": "wind"}, + ), + Document( + content="Geothermal energy is heat that comes from the sub-surface of the earth.", + meta={"category": "geo"}, + ), + Document( + content="Renewable energy is energy that is collected from renewable resources.", + meta={"category": "renewable"}, + ), + Document( + content="Hydropower is a form of renewable energy using the flow of water.", meta={"category": "hydro"} + ), ] @pytest.fixture @@ -96,6 +113,19 @@ def run( assert "documents" in result assert result["documents"] == [] + @pytest.mark.asyncio + @pytest.mark.integration + async def test_run_async_with_filters(self, document_store_with_docs): + in_memory_retriever = InMemoryBM25Retriever(document_store=document_store_with_docs) + filters = {"field": "category", "operator": "==", "value": "solar"} + multi_retriever = MultiQueryTextRetriever(retriever=in_memory_retriever) + result = await multi_retriever.run_async( + queries=["energy", "sunlight", "photovoltaic"], retriever_kwargs={"filters": filters} + ) + assert "documents" in result + assert len(result["documents"]) > 0 + assert all(doc.meta.get("category") == "solar" for doc in result["documents"]) + @pytest.mark.asyncio @pytest.mark.integration async def test_run_async_with_pipeline(self, document_store_with_docs): From 8e79e51918b79029dc7eb2bba727eb41582a4f47 Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Wed, 27 May 2026 15:36:15 +0200 Subject: [PATCH 05/10] adding missing docstring --- .../components/retrievers/multi_query_embedding_retriever.py | 1 + 1 file changed, 1 insertion(+) diff --git a/haystack/components/retrievers/multi_query_embedding_retriever.py b/haystack/components/retrievers/multi_query_embedding_retriever.py index 945c965319..a066ae7319 100644 --- a/haystack/components/retrievers/multi_query_embedding_retriever.py +++ b/haystack/components/retrievers/multi_query_embedding_retriever.py @@ -158,6 +158,7 @@ def _run_on_thread(self, query: str, retriever_kwargs: dict[str, Any] | None = N Process a single query on a separate thread. :param query: The text query to process. + :param retriever_kwargs: Arguments to pass to the retriever's run method. :returns: List of retrieved documents or None if no results. """ From ea27090d2c2a7419bd64ee22e56691e15de283fc Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Wed, 27 May 2026 15:57:34 +0200 Subject: [PATCH 06/10] Adding test for async run with filters on the multi_query_embedding_retriever --- ...t_multi_query_embedding_retriever_async.py | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/test/components/retrievers/test_multi_query_embedding_retriever_async.py b/test/components/retrievers/test_multi_query_embedding_retriever_async.py index 74bb0dd8ce..4115fec22b 100644 --- a/test/components/retrievers/test_multi_query_embedding_retriever_async.py +++ b/test/components/retrievers/test_multi_query_embedding_retriever_async.py @@ -121,6 +121,64 @@ def run(self, text: str) -> dict[str, list[float]]: assert "documents" in result assert result["documents"] == [] + @pytest.fixture + def document_store_with_categorized_docs(self): + documents = [ + Document( + content="Solar energy is harnessed from the sun.", + embedding=np.ones(384).tolist(), + meta={"category": "solar"}, + ), + Document( + content="Solar panels convert sunlight into electricity.", + embedding=np.ones(384).tolist(), + meta={"category": "solar"}, + ), + Document( + content="Photovoltaic cells are the building blocks of solar panels.", + embedding=np.ones(384).tolist(), + meta={"category": "solar"}, + ), + Document( + content="Wind energy is generated by wind turbines.", + embedding=np.ones(384).tolist(), + meta={"category": "wind"}, + ), + Document( + content="Geothermal energy comes from the sub-surface of the earth.", + embedding=np.ones(384).tolist(), + meta={"category": "geo"}, + ), + Document( + content="Renewable energy is collected from renewable resources.", + embedding=np.ones(384).tolist(), + meta={"category": "renewable"}, + ), + Document( + content="Hydropower uses the flow of water to generate electricity.", + embedding=np.ones(384).tolist(), + meta={"category": "hydro"}, + ), + ] + document_store = InMemoryDocumentStore() + document_store.write_documents(documents) + return document_store + + @pytest.mark.asyncio + @pytest.mark.integration + async def test_run_async_with_filters(self, document_store_with_categorized_docs): + in_memory_retriever = InMemoryEmbeddingRetriever(document_store=document_store_with_categorized_docs) + filters = {"field": "category", "operator": "==", "value": "solar"} + multi_retriever = MultiQueryEmbeddingRetriever( + retriever=in_memory_retriever, query_embedder=MockQueryEmbedder() + ) + result = await multi_retriever.run_async( + queries=["energy", "sunlight", "photovoltaic"], retriever_kwargs={"filters": filters} + ) + assert "documents" in result + assert len(result["documents"]) > 0 + assert all(doc.meta.get("category") == "solar" for doc in result["documents"]) + @pytest.mark.asyncio @pytest.mark.integration async def test_run_async_with_pipeline(self): From 5660af6c76c276bb71f76e664d0c59167fec660d Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Wed, 27 May 2026 15:59:49 +0200 Subject: [PATCH 07/10] adding test when retriever doesn't have run_async --- ...st_multi_query_embedding_retriever_async.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/components/retrievers/test_multi_query_embedding_retriever_async.py b/test/components/retrievers/test_multi_query_embedding_retriever_async.py index 4115fec22b..9e8cccbc60 100644 --- a/test/components/retrievers/test_multi_query_embedding_retriever_async.py +++ b/test/components/retrievers/test_multi_query_embedding_retriever_async.py @@ -121,6 +121,24 @@ def run(self, text: str) -> dict[str, list[float]]: assert "documents" in result assert result["documents"] == [] + @pytest.mark.asyncio + async def test_run_async_falls_back_to_sync_retriever_when_no_run_async(self): + @component + class SyncOnlyRetriever: + @component.output_types(documents=list[Document]) + def run( + self, query_embedding: list[float], filters: dict[str, Any] | None = None, top_k: int | None = None + ) -> dict[str, list[Document]]: + return {"documents": [Document(content="Solar energy", id="doc1", score=0.9)]} + + multi_retriever = MultiQueryEmbeddingRetriever( + retriever=SyncOnlyRetriever(), query_embedder=MockQueryEmbedder() + ) + result = await multi_retriever.run_async(queries=["query1", "query2"]) + assert "documents" in result + assert len(result["documents"]) == 1 + assert result["documents"][0].content == "Solar energy" + @pytest.fixture def document_store_with_categorized_docs(self): documents = [ From a6850d00854320a4f9de52a7923d85c1ffd68808 Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Wed, 27 May 2026 16:11:20 +0200 Subject: [PATCH 08/10] adding test when retriever doesn't have run_async --- .../test_text_embedding_retriever_async.py | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/test/components/retrievers/test_text_embedding_retriever_async.py b/test/components/retrievers/test_text_embedding_retriever_async.py index 8f34fed9e4..739ec28e7c 100644 --- a/test/components/retrievers/test_text_embedding_retriever_async.py +++ b/test/components/retrievers/test_text_embedding_retriever_async.py @@ -76,6 +76,52 @@ def run(self, text: str) -> dict[str, list[float]]: assert "documents" in result assert result["documents"] == [] + @pytest.fixture + def document_store_with_categorized_docs(self): + documents = [ + Document( + content="Solar energy is harnessed from the sun.", + embedding=np.ones(384).tolist(), + meta={"category": "solar"}, + ), + Document( + content="Solar panels convert sunlight into electricity.", + embedding=np.ones(384).tolist(), + meta={"category": "solar"}, + ), + Document( + content="Wind energy is generated by wind turbines.", + embedding=np.ones(384).tolist(), + meta={"category": "wind"}, + ), + Document( + content="Geothermal energy comes from the sub-surface of the earth.", + embedding=np.ones(384).tolist(), + meta={"category": "geo"}, + ), + Document( + content="Renewable energy is collected from renewable resources.", + embedding=np.ones(384).tolist(), + meta={"category": "renewable"}, + ), + ] + document_store = InMemoryDocumentStore() + document_store.write_documents(documents) + return document_store + + @pytest.mark.asyncio + @pytest.mark.integration + async def test_run_async_with_filters(self, document_store_with_categorized_docs): + retriever = TextEmbeddingRetriever( + retriever=InMemoryEmbeddingRetriever(document_store=document_store_with_categorized_docs), + text_embedder=MockTextEmbedder(), + ) + filters = {"field": "category", "operator": "==", "value": "solar"} + result = await retriever.run_async(query="energy", filters=filters) + assert "documents" in result + assert len(result["documents"]) > 0 + assert all(doc.meta.get("category") == "solar" for doc in result["documents"]) + @pytest.mark.asyncio @pytest.mark.integration async def test_run_async_with_pipeline(self): From d0f6c17485911a7ae7bb28ba22120981148e59a4 Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Wed, 27 May 2026 17:35:57 +0200 Subject: [PATCH 09/10] improving tests --- .../test_multi_query_embedding_retriever_async.py | 6 +++--- .../retrievers/test_multi_query_text_retriever_async.py | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/test/components/retrievers/test_multi_query_embedding_retriever_async.py b/test/components/retrievers/test_multi_query_embedding_retriever_async.py index 9e8cccbc60..120635380f 100644 --- a/test/components/retrievers/test_multi_query_embedding_retriever_async.py +++ b/test/components/retrievers/test_multi_query_embedding_retriever_async.py @@ -106,7 +106,7 @@ async def run_async( assert contents.count("Wind energy is clean") == 1 @pytest.mark.asyncio - async def test_run_async_falls_back_to_sync_when_no_run_async(self): + async def test_run_async_falls_back_to_sync_when_no_run_async(self, document_store_with_categorized_docs): @component class SyncOnlyEmbedder: @component.output_types(embedding=list[float]) @@ -114,12 +114,12 @@ def run(self, text: str) -> dict[str, list[float]]: return {"embedding": np.ones(384).tolist()} multi_retriever = MultiQueryEmbeddingRetriever( - retriever=InMemoryEmbeddingRetriever(document_store=InMemoryDocumentStore()), + retriever=InMemoryEmbeddingRetriever(document_store=document_store_with_categorized_docs), query_embedder=SyncOnlyEmbedder(), ) result = await multi_retriever.run_async(queries=["query"]) assert "documents" in result - assert result["documents"] == [] + assert len(result["documents"]) > 0 @pytest.mark.asyncio async def test_run_async_falls_back_to_sync_retriever_when_no_run_async(self): diff --git a/test/components/retrievers/test_multi_query_text_retriever_async.py b/test/components/retrievers/test_multi_query_text_retriever_async.py index 7450a09ad9..af8c7014b4 100644 --- a/test/components/retrievers/test_multi_query_text_retriever_async.py +++ b/test/components/retrievers/test_multi_query_text_retriever_async.py @@ -99,19 +99,20 @@ async def run_async( assert contents.count("Wind energy is clean") == 1 @pytest.mark.asyncio - async def test_run_async_falls_back_to_sync_when_no_run_async(self, document_store_with_docs): + async def test_run_async_falls_back_to_sync_when_no_run_async(self): @component class SyncOnlyRetriever: @component.output_types(documents=list[Document]) def run( self, query: str, filters: dict[str, Any] | None = None, top_k: int | None = None ) -> dict[str, list[Document]]: - return {"documents": []} + return {"documents": [Document(content="Renewable energy", id="doc1", score=0.9)]} multi_retriever = MultiQueryTextRetriever(retriever=SyncOnlyRetriever()) result = await multi_retriever.run_async(queries=["query"]) assert "documents" in result - assert result["documents"] == [] + assert len(result["documents"]) == 1 + assert result["documents"][0].content == "Renewable energy" @pytest.mark.asyncio @pytest.mark.integration From 36a5e5dbc86001d03d537d4b66c9fc8b47beea1e Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Fri, 29 May 2026 12:28:55 +0200 Subject: [PATCH 10/10] improving tests --- .../retrievers/test_text_embedding_retriever_async.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/components/retrievers/test_text_embedding_retriever_async.py b/test/components/retrievers/test_text_embedding_retriever_async.py index 739ec28e7c..9b659a3262 100644 --- a/test/components/retrievers/test_text_embedding_retriever_async.py +++ b/test/components/retrievers/test_text_embedding_retriever_async.py @@ -61,7 +61,7 @@ async def run_async( assert scores == sorted(scores, reverse=True) @pytest.mark.asyncio - async def test_run_async_falls_back_to_sync_when_no_run_async(self): + async def test_run_async_falls_back_to_sync_when_no_run_async(self, document_store_with_categorized_docs): @component class SyncOnlyEmbedder: @component.output_types(embedding=list[float]) @@ -69,12 +69,12 @@ def run(self, text: str) -> dict[str, list[float]]: return {"embedding": np.ones(384).tolist()} retriever = TextEmbeddingRetriever( - retriever=InMemoryEmbeddingRetriever(document_store=InMemoryDocumentStore()), + retriever=InMemoryEmbeddingRetriever(document_store=document_store_with_categorized_docs), text_embedder=SyncOnlyEmbedder(), ) result = await retriever.run_async(query="green energy") assert "documents" in result - assert result["documents"] == [] + assert len(result["documents"]) > 0 @pytest.fixture def document_store_with_categorized_docs(self):