fix(valkey): post-filter metadata outside FT.SEARCH (#5794)#5797
fix(valkey): post-filter metadata outside FT.SEARCH (#5794)#5797devin-ai-integration[bot] wants to merge 2 commits into
Conversation
Extract duplicated Redis URL parsing into a shared cache_config utility. Introduce ValkeyCache as a lightweight async key/value cache using valkey-glide. Wire it into A2A task handling, agent card caching, and file upload caching. Part 1/4 of Valkey storage implementation. fix: async-safe embeddings and resilient drain_writes Add bytes→float validators on MemoryRecord and ItemState to handle Valkey returning embeddings as raw bytes. Make embed_texts() safe when called from an async context by using a thread pool. Improve drain_writes() with per-save timeouts and error logging instead of raising on failure. Part 3/4 of Valkey storage implementation. feat(valkey): ValkeyStorage vector memory backend Add ValkeyStorage, a distributed StorageBackend implementation using Valkey-GLIDE with Valkey Search for vector similarity. Wire it into Memory as the 'valkey' storage option. Pin scrapegraph-py<2 to fix unrelated upstream breakage. Part 4/4 of Valkey storage implementation. fix: use datetime.utcnow() for last_accessed consistency MemoryRecord defaults use utcnow() for created_at and last_accessed. Match that in ValkeyStorage.update_record() to avoid timezone inconsistency in recency scoring. feat(valkey): shared cache config + ValkeyCache for A2A and file uploads Extract duplicated Redis URL parsing into a shared cache_config utility. Introduce ValkeyCache as a lightweight async key/value cache using valkey-glide. Wire it into A2A task handling, agent card caching, and file upload caching. Part 1/4 of Valkey storage implementation. fix: handle non-numeric database path in cache URL parsing Extract _parse_db_from_path() helper that catches ValueError for paths like /mydb and defaults to 0 with a warning, instead of crashing. fix: async-safe embeddings and resilient drain_writes Add bytes→float validators on MemoryRecord and ItemState to handle Valkey returning embeddings as raw bytes. Make embed_texts() safe when called from an async context by using a thread pool. Improve drain_writes() with per-save timeouts and error logging instead of raising on failure. Part 3/4 of Valkey storage implementation. fix: catch concurrent.futures.TimeoutError for Python 3.10 compat In Python <3.11, concurrent.futures.TimeoutError is distinct from the builtin TimeoutError. Catch both so the timeout warning path works on all supported Python versions.
The memory_index FT schema only materializes embedding, scope,
categories, created_at, and importance. ValkeyStorage._vector_search
was emitting metadata_filter clauses as @{key}:{value} against that
index, but metadata keys are user-defined and not part of the schema,
so the FT.SEARCH server would either error out or silently return
results that the metadata predicate failed to narrow.
Move metadata_filter to a Python post-filter that runs after FT.SEARCH
parses results. Overfetch from KNN (limit * 10, capped at 1000) when
metadata_filter is supplied so the post-filter still returns the
caller-requested number of hits in the common case, then truncate to
limit. Scope/categories predicates remain pushed down into FT.SEARCH
because they are valid index fields.
Adds TestValkeyStorageMetadataPostFilter and
TestValkeyStorageMatchesMetadataFilter test classes; updates three
pre-existing tests that asserted the broken contract.
Closes #5794
Co-Authored-By: João <joao@crewai.com>
|
Prompt hidden (unlisted session) |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
📝 WalkthroughWalkthroughThis PR introduces comprehensive Valkey support as an alternative to Redis/aiocache for caching and memory storage, including vector search with metadata post-filtering, while maintaining backward compatibility with existing aiocache and in-memory backends. ChangesValkey Cache and Storage Backend Integration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
| mock_ft_search.return_value = create_mock_ft_search_response([(record1, 0.9)]) | ||
|
|
||
| query_embedding = [0.1, 0.2, 0.3, 0.4] | ||
| results = await valkey_storage.asearch( |
| mock_ft_search.return_value = create_mock_ft_search_response([(record1, 0.9)]) | ||
|
|
||
| query_embedding = [0.1, 0.2, 0.3, 0.4] | ||
| results = await valkey_storage.asearch( |
| mock_ft_search.return_value = create_mock_ft_search_response([(record1, 0.9)]) | ||
|
|
||
| query_embedding = [0.1, 0.2, 0.3, 0.4] | ||
| results = await valkey_storage.asearch(query_embedding, limit=10) |
| mock_ft_search.return_value = create_mock_ft_search_response([(record1, 0.9)]) | ||
|
|
||
| query_embedding = [0.1, 0.2, 0.3, 0.4] | ||
| results = await valkey_storage.asearch(query_embedding, limit=10) |
| mock_ft_search.return_value = create_mock_ft_search_response([(record1, 0.9)]) | ||
|
|
||
| query_embedding = [0.1, 0.2, 0.3, 0.4] | ||
| results = await valkey_storage.asearch(query_embedding, limit=10) |
| if _cache_configured: | ||
| return | ||
| caches.set_config(get_aiocache_config()) | ||
| _cache_configured = True |
| caches.set_config(get_aiocache_config()) | ||
| _task_cache = None | ||
|
|
||
| _lazy_init_complete = True |
| class CacheBackend(Protocol): | ||
| """Protocol for cache backends used by UploadCache.""" | ||
|
|
||
| async def get(self, key: str) -> CachedUpload | None: ... |
| """Protocol for cache backends used by UploadCache.""" | ||
|
|
||
| async def get(self, key: str) -> CachedUpload | None: ... | ||
| async def set(self, key: str, value: CachedUpload, ttl: int) -> None: ... |
There was a problem hiding this comment.
Actionable comments posted: 9
🧹 Nitpick comments (1)
lib/crewai/src/crewai/memory/types.py (1)
343-346: ⚡ Quick winModule-level ThreadPoolExecutor is never shut down.
The module-level
_EMBED_POOLThreadPoolExecutor is created but never explicitly shut down, which can leave threads running after the process should clean up. While this is mitigated by the pool being created once and Python's cleanup on exit, consider adding anatexithandler for proper resource management.♻️ Proposed fix to add cleanup handler
+import atexit + # Reusable thread pool for running embedder calls from sync context # when an async event loop is already running. Uses max_workers=2 so # a single slow/timed-out call doesn't block subsequent embeds. _EMBED_POOL = concurrent.futures.ThreadPoolExecutor(max_workers=2) + +def _cleanup_embed_pool() -> None: + """Shut down the embedding thread pool on exit.""" + _EMBED_POOL.shutdown(wait=False) + +atexit.register(_cleanup_embed_pool)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/types.py` around lines 343 - 346, The module-level ThreadPoolExecutor _EMBED_POOL is never shut down; add an explicit cleanup by importing atexit and registering a handler that calls _EMBED_POOL.shutdown(wait=True) (or wait=False if non-blocking shutdown is preferred) to ensure threads are cleaned up on process exit; place the atexit.register call near the _EMBED_POOL definition so the executor is registered for shutdown when the module is imported.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai-files/src/crewai_files/cache/upload_cache.py`:
- Around line 165-168: ValkeyCacheBackend.delete currently always returns True
causing callers like aremove to think a deletion succeeded even when the key
didn't exist; modify delete in class ValkeyCacheBackend to verify existence
before removing (e.g., call self._cache.exists(key) or get a delete count from
self._cache.delete if it returns one) and return True only if the key existed
and was actually removed, otherwise return False; update any logic that expects
a boolean accordingly so aremove reflects real removals.
In `@lib/crewai/src/crewai/a2a/utils/agent_card.py`:
- Around line 48-57: Concurrent calls to _ensure_cache_configured() can race on
the module-level _cache_configured flag; add a module-level lock (e.g.,
_cache_config_lock) and wrap the check-and-set sequence in
_ensure_cache_configured() with the lock so only one caller calls
caches.set_config(get_aiocache_config()) and sets _cache_configured = True;
locate and modify _cache_configured and _ensure_cache_configured() (and any
callers like afetch_agent_card()) to use the lock and re-check the flag inside
the locked section before configuring.
In `@lib/crewai/src/crewai/a2a/utils/task.py`:
- Around line 223-229: The cleanup deletion in the finally block (using
_task_cache or cache = caches.get("default") and await
cache.delete(f"cancel:{task_id}")) must be wrapped in a try/except so any errors
during delete do not propagate and mask the task's real result/exception; catch
and log the exception via the existing logger (or processLogger) and do not
re-raise — ensure both branches (when _task_cache is not None and when using
caches.get("default")) use the same safe try/except logging behavior around
await cache.delete(f"cancel:{task_id}").
In `@lib/crewai/src/crewai/memory/storage/valkey_cache.py`:
- Around line 57-109: The lazy initialization in _get_client is racy: multiple
coroutines can observe self._client is None and concurrently create clients; add
an asyncio-compatible lock on the instance (e.g. self._client_lock initialized
once) and perform a double-checked locking pattern inside _get_client — first
check self._client, await self._client_lock.acquire()/use async with, check
self._client again, then perform the GlideClient.create(...) logic and set
self._client; release the lock and return the single shared client; keep
existing timeout/error handling unchanged inside the locked block.
In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py`:
- Around line 1960-1967: The _areset implementation forwards scope_prefix=None
into adelete which triggers adelete's "no-op when all selectors unset" behavior,
so calling reset() does nothing; change _areset so that when scope_prefix is
None it calls adelete with an explicit selector that means "delete everything"
(e.g. pass an empty-string scope_prefix or the delete-all flag expected by
adelete) instead of None, leaving the call as await self.adelete(...) when
scope_prefix is provided; update the docstring accordingly and reference _areset
and adelete when making the change.
- Around line 1334-1432: The KNN overfetch only considers metadata_filter, so
strict scope post-filtering in _search (scope_prefix logic in records filtering)
can drop sibling hits and return fewer than limit; update the fetch_limit
calculation (the block that sets fetch_limit before building query) to also
overfetch when scope_prefix is provided and not "/" (i.e., when strict scope
post-filtering will run), using the same overfetch formula
(self._METADATA_POSTFILTER_OVERFETCH and self._METADATA_POSTFILTER_MAX_FETCH) so
the subsequent KNN call (ft.search) retrieves extra candidates for the
client-side scope filter to trim.
- Around line 389-390: The code is storing created_at as ISO8601 text
(record.created_at.isoformat()) but the schema defines created_at as a
NumericField; replace the ISO string with a numeric timestamp (e.g.,
record.created_at.timestamp() or float seconds) when serializing (where
record.created_at.isoformat() is used) and, on retrieval (where the ISO string
is parsed back), convert the numeric value back to a datetime using
datetime.fromtimestamp(...) (preserving timezone handling used elsewhere);
ensure the schema’s NumericField for created_at remains consistent with these
changes so numeric range queries work.
In `@lib/crewai/src/crewai/memory/unified_memory.py`:
- Around line 336-389: drain_writes() treats timed-out saves as abandoned but
close() still calls _save_pool.shutdown(wait=True) which can block shutdown;
change close() to avoid blocking by calling self._save_pool.shutdown(wait=False)
(or shutdown(..., cancel_futures=True) if you require cancellation and runtime
supports it), then iterate over the executor's tracked futures in
self._pending_saves to cancel any still-pending futures (future.cancel()) and
log cancellations; update references in close() and use the existing
_pending_saves/_pending_lock and drain_writes() behavior to ensure shutdown does
not block on long-running save tasks.
In `@lib/crewai/src/crewai/utilities/cache_config.py`:
- Around line 14-32: Summary: parse_cache_url() extracts a use_tls flag but
ValkeyCache and ValkeyCacheBackend ignore it, so TLS config is dropped. Fix: add
an optional use_tls: bool = False parameter to ValkeyCache.__init__ and
ValkeyCacheBackend.__init__, accept the value produced by parse_cache_url(), and
propagate it into the underlying ValkeyStorage/connection creation (or pass
through to whichever constructor currently accepts TLS) so that the TLS behavior
from parse_cache_url() is honored; keep default False to preserve backward
compatibility and update any call sites that construct
ValkeyCache/ValkeyCacheBackend to forward parse_cache_url()["use_tls"] when
available.
---
Nitpick comments:
In `@lib/crewai/src/crewai/memory/types.py`:
- Around line 343-346: The module-level ThreadPoolExecutor _EMBED_POOL is never
shut down; add an explicit cleanup by importing atexit and registering a handler
that calls _EMBED_POOL.shutdown(wait=True) (or wait=False if non-blocking
shutdown is preferred) to ensure threads are cleaned up on process exit; place
the atexit.register call near the _EMBED_POOL definition so the executor is
registered for shutdown when the module is imported.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 2c3143e4-58d4-423a-a07a-0a0380b805df
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (18)
lib/crewai-files/src/crewai_files/cache/upload_cache.pylib/crewai/pyproject.tomllib/crewai/src/crewai/a2a/utils/agent_card.pylib/crewai/src/crewai/a2a/utils/task.pylib/crewai/src/crewai/memory/encoding_flow.pylib/crewai/src/crewai/memory/storage/valkey_cache.pylib/crewai/src/crewai/memory/storage/valkey_storage.pylib/crewai/src/crewai/memory/types.pylib/crewai/src/crewai/memory/unified_memory.pylib/crewai/src/crewai/utilities/cache_config.pylib/crewai/tests/memory/storage/test_valkey_cache.pylib/crewai/tests/memory/storage/test_valkey_storage.pylib/crewai/tests/memory/storage/test_valkey_storage_errors.pylib/crewai/tests/memory/storage/test_valkey_storage_scope.pylib/crewai/tests/memory/storage/test_valkey_storage_search.pylib/crewai/tests/memory/test_embedding_safety.pylib/crewai/tests/utilities/test_cache_config.pypyproject.toml
| async def delete(self, key: str) -> bool: | ||
| await self._cache.delete(key) | ||
| return True # ValkeyCache.delete is void | ||
|
|
There was a problem hiding this comment.
ValkeyCacheBackend.delete() always reports success.
Returning True unconditionally makes aremove() report removals even when the key didn’t exist. Use an existence check before delete (or return a delete count from the underlying cache API).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai-files/src/crewai_files/cache/upload_cache.py` around lines 165 -
168, ValkeyCacheBackend.delete currently always returns True causing callers
like aremove to think a deletion succeeded even when the key didn't exist;
modify delete in class ValkeyCacheBackend to verify existence before removing
(e.g., call self._cache.exists(key) or get a delete count from
self._cache.delete if it returns one) and return True only if the key existed
and was actually removed, otherwise return False; update any logic that expects
a boolean accordingly so aremove reflects real removals.
| _cache_configured = False | ||
|
|
||
|
|
||
| def _ensure_cache_configured() -> None: | ||
| """Configure aiocache on first use (lazy initialization).""" | ||
| global _cache_configured | ||
| if _cache_configured: | ||
| return | ||
| caches.set_config(get_aiocache_config()) | ||
| _cache_configured = True |
There was a problem hiding this comment.
Race condition in cache configuration.
The _cache_configured flag and _ensure_cache_configured() function are not thread-safe. Multiple concurrent calls to afetch_agent_card() could both see _cache_configured=False and call caches.set_config() multiple times simultaneously, potentially causing configuration conflicts or unexpected behavior.
🔒 Proposed fix to add thread safety
+import threading
+
-_cache_configured = False
+_cache_lock = threading.Lock()
+_cache_configured = False
def _ensure_cache_configured() -> None:
"""Configure aiocache on first use (lazy initialization)."""
global _cache_configured
- if _cache_configured:
- return
- caches.set_config(get_aiocache_config())
- _cache_configured = True
+ with _cache_lock:
+ if _cache_configured:
+ return
+ caches.set_config(get_aiocache_config())
+ _cache_configured = True🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/a2a/utils/agent_card.py` around lines 48 - 57,
Concurrent calls to _ensure_cache_configured() can race on the module-level
_cache_configured flag; add a module-level lock (e.g., _cache_config_lock) and
wrap the check-and-set sequence in _ensure_cache_configured() with the lock so
only one caller calls caches.set_config(get_aiocache_config()) and sets
_cache_configured = True; locate and modify _cache_configured and
_ensure_cache_configured() (and any callers like afetch_agent_card()) to use the
lock and re-check the flag inside the locked section before configuring.
| # Clean up cancellation flag | ||
| if _task_cache is not None: | ||
| await _task_cache.delete(f"cancel:{task_id}") | ||
| else: | ||
| cache = caches.get("default") | ||
| await cache.delete(f"cancel:{task_id}") | ||
|
|
There was a problem hiding this comment.
Cleanup delete errors in finally can override task outcome.
If cache deletion fails here, it can raise from finally and mask a successful result or the original exception/cancellation. Wrap cleanup delete in try/except and only log failures.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/a2a/utils/task.py` around lines 223 - 229, The cleanup
deletion in the finally block (using _task_cache or cache =
caches.get("default") and await cache.delete(f"cancel:{task_id}")) must be
wrapped in a try/except so any errors during delete do not propagate and mask
the task's real result/exception; catch and log the exception via the existing
logger (or processLogger) and do not re-raise — ensure both branches (when
_task_cache is not None and when using caches.get("default")) use the same safe
try/except logging behavior around await cache.delete(f"cancel:{task_id}").
| async def _get_client(self) -> GlideClient: | ||
| """Get or create Valkey client (lazy initialization). | ||
|
|
||
| Returns: | ||
| Initialized GlideClient instance. | ||
|
|
||
| Raises: | ||
| RuntimeError: If connection to Valkey fails. | ||
| TimeoutError: If connection attempt times out (10 seconds). | ||
| """ | ||
| import asyncio | ||
|
|
||
| if self._client is None: | ||
| host = self._host | ||
| port = self._port | ||
| db = self._db | ||
| try: | ||
| from glide import ServerCredentials | ||
|
|
||
| config = GlideClientConfiguration( | ||
| addresses=[NodeAddress(host, port)], | ||
| database_id=db, | ||
| credentials=( | ||
| ServerCredentials(password=self._password) | ||
| if self._password | ||
| else None | ||
| ), | ||
| ) | ||
|
|
||
| # Add connection timeout (10 seconds) | ||
| try: | ||
| self._client = await asyncio.wait_for( | ||
| GlideClient.create(config), timeout=10.0 | ||
| ) | ||
| except asyncio.TimeoutError as e: | ||
| _logger.error("Connection timeout connecting to Valkey") | ||
| raise TimeoutError( | ||
| "Connection timeout to Valkey. " | ||
| "Ensure Valkey is running and accessible." | ||
| ) from e | ||
|
|
||
| _logger.info("Valkey cache client initialized") | ||
| except (TimeoutError, RuntimeError): | ||
| raise | ||
| except Exception as e: | ||
| _logger.error( | ||
| "Failed to create Valkey cache client: %s", type(e).__name__ | ||
| ) | ||
| raise RuntimeError( | ||
| "Cannot connect to Valkey. Check connection settings." | ||
| ) from e | ||
|
|
||
| return self._client |
There was a problem hiding this comment.
Lazy client initialization is racy under concurrent access.
If multiple coroutines hit _get_client() while _client is None, they can all create separate clients. Add an async lock (double-checked) around initialization to avoid duplicate connections/leaks.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/memory/storage/valkey_cache.py` around lines 57 - 109,
The lazy initialization in _get_client is racy: multiple coroutines can observe
self._client is None and concurrently create clients; add an asyncio-compatible
lock on the instance (e.g. self._client_lock initialized once) and perform a
double-checked locking pattern inside _get_client — first check self._client,
await self._client_lock.acquire()/use async with, check self._client again, then
perform the GlideClient.create(...) logic and set self._client; release the lock
and return the single shared client; keep existing timeout/error handling
unchanged inside the locked block.
| "importance": str(record.importance), | ||
| "created_at": record.created_at.isoformat(), |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
For Valkey Search / RediSearch HASH indexes, can a NumericFieldindex an ISO 8601 string like2026-05-13T12:34:56? What happens when a HASH field mapped as NumericField contains a non-numeric value?
💡 Result:
No, a NumericField in Valkey Search / RediSearch cannot index an ISO 8601 string like "2026-05-13T12:34:56". Numeric fields parse string values from HASH fields as 64-bit double-precision floating-point numbers, accepting only specific numeric formats such as integers ("42"), floats ("3.14"), scientific notation ("1e2"), and infinities ("inf", "-inf"). ISO 8601 strings are non-numeric and fall under "rejected" formats like "hello" [1]. When a HASH field mapped as NumericField contains a non-numeric value, the value is rejected during ingestion: the key becomes untracked for that numeric field (i.e., not indexed for numeric queries), but this is not treated as an error—the document remains indexed for other fields, and the key simply does not appear in numeric range queries [1]. This behavior applies to HASH indexing, as shown in the official examples using HSET on hash keys like "product:7 price 'hello'" [1].
Citations:
🏁 Script executed:
# First, let's find the file and check its structure
wc -l lib/crewai/src/crewai/memory/storage/valkey_storage.pyRepository: crewAIInc/crewAI
Length of output: 120
🏁 Script executed:
# Check the schema definition for created_at field
rg -n "NumericField.*created_at|created_at.*NumericField" lib/crewai/src/crewai/memory/storage/valkey_storage.pyRepository: crewAIInc/crewAI
Length of output: 108
🏁 Script executed:
# Check the schema definition in general and context around created_at
sed -n '1,150p' lib/crewai/src/crewai/memory/storage/valkey_storage.py | rg -n "Schema|NumericField|created_at" -A 2 -B 2Repository: crewAIInc/crewAI
Length of output: 176
🏁 Script executed:
# Check lines 389-390, 475-476, 554-555 to see all affected locations
sed -n '385,395p' lib/crewai/src/crewai/memory/storage/valkey_storage.pyRepository: crewAIInc/crewAI
Length of output: 567
🏁 Script executed:
sed -n '471,481p' lib/crewai/src/crewai/memory/storage/valkey_storage.pyRepository: crewAIInc/crewAI
Length of output: 650
🏁 Script executed:
sed -n '550,560p' lib/crewai/src/crewai/memory/storage/valkey_storage.pyRepository: crewAIInc/crewAI
Length of output: 550
🏁 Script executed:
# Get full schema definition to understand the structure
rg -n "Schema\(|FT\.SCHEMA" lib/crewai/src/crewai/memory/storage/valkey_storage.py -A 30Repository: crewAIInc/crewAI
Length of output: 42
🏁 Script executed:
# Get the complete schema definition
sed -n '545,570p' lib/crewai/src/crewai/memory/storage/valkey_storage.pyRepository: crewAIInc/crewAI
Length of output: 1010
🏁 Script executed:
# Check importance field definition and type
rg -n "importance" lib/crewai/src/crewai/memory/storage/valkey_storage.py | head -20Repository: crewAIInc/crewAI
Length of output: 423
🏁 Script executed:
# Check the exact storage and retrieval of importance
sed -n '380,400p' lib/crewai/src/crewai/memory/storage/valkey_storage.pyRepository: crewAIInc/crewAI
Length of output: 1010
🏁 Script executed:
sed -n '470,485p' lib/crewai/src/crewai/memory/storage/valkey_storage.pyRepository: crewAIInc/crewAI
Length of output: 827
created_at must be stored as a numeric timestamp, not ISO8601 text.
created_at is declared as NumericField in the schema (line 554), but the code stores record.created_at.isoformat() (line 389) and then parses it back (line 475). Valkey Search rejects ISO8601 strings as invalid for numeric fields—the field will not be indexed for numeric queries, silently breaking range-based searches on created_at.
Store a numeric timestamp (e.g., Unix time as float) instead and parse it back accordingly on retrieval.
Affected locations:
- Line 389: storage of
created_at - Line 475: retrieval of
created_at - Line 554: schema definition
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py` around lines 389 -
390, The code is storing created_at as ISO8601 text
(record.created_at.isoformat()) but the schema defines created_at as a
NumericField; replace the ISO string with a numeric timestamp (e.g.,
record.created_at.timestamp() or float seconds) when serializing (where
record.created_at.isoformat() is used) and, on retrieval (where the ISO string
is parsed back), convert the numeric value back to a datetime using
datetime.fromtimestamp(...) (preserving timezone handling used elsewhere);
ensure the schema’s NumericField for created_at remains consistent with these
changes so numeric range queries work.
| # When metadata_filter is supplied, overfetch candidates from KNN so | ||
| # the Python post-filter can still satisfy ``limit`` results. | ||
| if metadata_filter: | ||
| fetch_limit = min( | ||
| max(limit * self._METADATA_POSTFILTER_OVERFETCH, limit), | ||
| self._METADATA_POSTFILTER_MAX_FETCH, | ||
| ) | ||
| else: | ||
| fetch_limit = limit | ||
|
|
||
| # Build KNN query with filters | ||
| # Format: (filter)=>[KNN limit @field $BLOB AS score] | ||
| # Note: Don't wrap single "*" in parentheses | ||
| if filter_query == "*": | ||
| query = f"{filter_query}=>[KNN {fetch_limit} @embedding $BLOB AS score]" | ||
| else: | ||
| query = f"({filter_query})=>[KNN {fetch_limit} @embedding $BLOB AS score]" | ||
|
|
||
| # Prepare embedding blob for PARAMS | ||
| embedding_blob = self._embedding_to_bytes(query_embedding) | ||
|
|
||
| # Build FT.SEARCH options | ||
| # Note: Vector search results are sorted by distance ascending (nearest first). | ||
| # We convert distance to similarity in _parse_search_result and re-sort descending. | ||
| return_fields = [ | ||
| ReturnField(field_identifier="id"), | ||
| ReturnField(field_identifier="content"), | ||
| ReturnField(field_identifier="scope"), | ||
| ReturnField(field_identifier="categories"), | ||
| ReturnField(field_identifier="metadata"), | ||
| ReturnField(field_identifier="importance"), | ||
| ReturnField(field_identifier="created_at"), | ||
| ReturnField(field_identifier="last_accessed"), | ||
| ReturnField(field_identifier="source"), | ||
| ReturnField(field_identifier="private"), | ||
| ReturnField(field_identifier="score"), | ||
| ] | ||
|
|
||
| search_options = FtSearchOptions( | ||
| return_fields=return_fields, | ||
| params={"BLOB": embedding_blob}, | ||
| limit=FtSearchLimit(0, fetch_limit), | ||
| ) | ||
|
|
||
| try: | ||
| # Execute native ft.search | ||
| result = await ft.search(client, "memory_index", query, search_options) | ||
|
|
||
| # Native ft.search returns: [count, {key1: {fields...}, key2: {fields...}}] | ||
| if not result or not isinstance(result, list) or len(result) < 1: | ||
| return [] | ||
|
|
||
| # First element is total count | ||
| total_count_raw = result[0] | ||
| if isinstance(total_count_raw, (int, str)): | ||
| total_count = int(total_count_raw) if total_count_raw else 0 | ||
| else: | ||
| total_count = 0 | ||
| if total_count == 0: | ||
| return [] | ||
|
|
||
| # Parse documents from dict format | ||
| records: list[tuple[MemoryRecord, float]] = [] | ||
| if len(result) > 1 and isinstance(result[1], dict): | ||
| docs_dict = result[1] | ||
| for doc_fields in docs_dict.values(): | ||
| field_dict = self._normalize_field_dict(doc_fields) | ||
| parsed = self._parse_search_result(field_dict, min_score) | ||
| if parsed is not None: | ||
| records.append(parsed) | ||
|
|
||
| # Sort by score descending (should already be sorted, but ensure) | ||
| records.sort(key=lambda x: x[1], reverse=True) | ||
|
|
||
| # Post-filter for scope boundary correctness. | ||
| # The FT.SEARCH tag query uses prefix matching (@scope:{prefix*}) | ||
| # which can match siblings (e.g., /crew/a matches /crew/ab). | ||
| # Apply strict boundary check here. | ||
| if scope_prefix and scope_prefix != "/": | ||
| normalized = scope_prefix.rstrip("/") | ||
| records = [ | ||
| (rec, score) | ||
| for rec, score in records | ||
| if rec.scope == normalized or rec.scope.startswith(f"{normalized}/") | ||
| ] | ||
|
|
||
| # Apply metadata_filter as a Python post-filter. The FT index does | ||
| # not materialize arbitrary metadata keys, so this is the only | ||
| # correct place to enforce metadata predicates. | ||
| if metadata_filter: | ||
| records = [ | ||
| (rec, score) | ||
| for rec, score in records | ||
| if self._matches_metadata_filter(rec.metadata, metadata_filter) | ||
| ] | ||
|
|
||
| # Truncate to the caller-requested limit after post-filtering. | ||
| if len(records) > limit: | ||
| records = records[:limit] |
There was a problem hiding this comment.
Overfetch when strict scope post-filtering can discard sibling hits.
@scope:{prefix*} is intentionally approximate, and Lines 1412-1418 remove the sibling-scope false positives client-side. Right now fetch_limit only expands for metadata_filter, so a scoped search can return fewer than limit hits even when more valid in-scope matches exist just below the KNN cutoff.
💡 Proposed fix
- if metadata_filter:
+ needs_overfetch = metadata_filter is not None or (
+ scope_prefix is not None and scope_prefix != "/"
+ )
+ if needs_overfetch:
fetch_limit = min(
max(limit * self._METADATA_POSTFILTER_OVERFETCH, limit),
self._METADATA_POSTFILTER_MAX_FETCH,
)
else:🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py` around lines 1334 -
1432, The KNN overfetch only considers metadata_filter, so strict scope
post-filtering in _search (scope_prefix logic in records filtering) can drop
sibling hits and return fewer than limit; update the fetch_limit calculation
(the block that sets fetch_limit before building query) to also overfetch when
scope_prefix is provided and not "/" (i.e., when strict scope post-filtering
will run), using the same overfetch formula (self._METADATA_POSTFILTER_OVERFETCH
and self._METADATA_POSTFILTER_MAX_FETCH) so the subsequent KNN call (ft.search)
retrieves extra candidates for the client-side scope filter to trim.
| async def _areset(self, scope_prefix: str | None = None) -> None: | ||
| """Reset (delete all) memories in scope (async implementation). | ||
|
|
||
| Args: | ||
| scope_prefix: Optional scope path (None = reset all). | ||
| """ | ||
| # Use delete with scope_prefix to remove all records | ||
| await self.adelete(scope_prefix=scope_prefix) |
There was a problem hiding this comment.
reset() with the default argument currently deletes nothing.
_areset(None) forwards scope_prefix=None into adelete(), and adelete() explicitly no-ops when every selector is unset. The documented “reset all” path therefore leaves all records intact.
💡 Proposed fix
async def _areset(self, scope_prefix: str | None = None) -> None:
"""Reset (delete all) memories in scope (async implementation).
Args:
scope_prefix: Optional scope path (None = reset all).
"""
- # Use delete with scope_prefix to remove all records
- await self.adelete(scope_prefix=scope_prefix)
+ await self.adelete(scope_prefix="/" if scope_prefix is None else scope_prefix)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def _areset(self, scope_prefix: str | None = None) -> None: | |
| """Reset (delete all) memories in scope (async implementation). | |
| Args: | |
| scope_prefix: Optional scope path (None = reset all). | |
| """ | |
| # Use delete with scope_prefix to remove all records | |
| await self.adelete(scope_prefix=scope_prefix) | |
| async def _areset(self, scope_prefix: str | None = None) -> None: | |
| """Reset (delete all) memories in scope (async implementation). | |
| Args: | |
| scope_prefix: Optional scope path (None = reset all). | |
| """ | |
| await self.adelete(scope_prefix="/" if scope_prefix is None else scope_prefix) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py` around lines 1960 -
1967, The _areset implementation forwards scope_prefix=None into adelete which
triggers adelete's "no-op when all selectors unset" behavior, so calling reset()
does nothing; change _areset so that when scope_prefix is None it calls adelete
with an explicit selector that means "delete everything" (e.g. pass an
empty-string scope_prefix or the delete-all flag expected by adelete) instead of
None, leaving the call as await self.adelete(...) when scope_prefix is provided;
update the docstring accordingly and reference _areset and adelete when making
the change.
| def drain_writes(self, timeout_per_save: float = 60.0) -> None: | ||
| """Block until all pending background saves have completed. | ||
|
|
||
| Called automatically by ``recall()`` and should be called by the | ||
| crew at shutdown to ensure no saves are lost. | ||
|
|
||
| Args: | ||
| timeout_per_save: Maximum seconds to wait per save operation. | ||
| Default 60s. If a save times out, logs warning | ||
| but continues to avoid blocking crew completion. | ||
| """ | ||
| with self._pending_lock: | ||
| pending = list(self._pending_saves) | ||
| for future in pending: | ||
| future.result() # blocks until done; re-raises exceptions | ||
|
|
||
| if pending: | ||
| _logger.debug( | ||
| "[DRAIN_WRITES] Waiting for %d pending saves...", len(pending) | ||
| ) | ||
|
|
||
| failed_saves = 0 | ||
| for i, future in enumerate(pending): | ||
| try: | ||
| _logger.debug( | ||
| "[DRAIN_WRITES] Waiting for save %d/%d...", i + 1, len(pending) | ||
| ) | ||
| future.result(timeout=timeout_per_save) | ||
| _logger.debug( | ||
| "[DRAIN_WRITES] Save %d/%d completed", i + 1, len(pending) | ||
| ) | ||
| except (TimeoutError, concurrent.futures.TimeoutError): # noqa: PERF203 | ||
| failed_saves += 1 | ||
| _logger.warning( | ||
| "[DRAIN_WRITES] Save %d/%d timed out after %ss. " | ||
| "This save will be abandoned. Consider increasing timeout or checking " | ||
| "LLM/embedder performance.", | ||
| i + 1, | ||
| len(pending), | ||
| timeout_per_save, | ||
| ) | ||
| # Don't raise - just log and continue to avoid blocking crew completion | ||
| except Exception as e: | ||
| failed_saves += 1 | ||
| _logger.error( | ||
| "[DRAIN_WRITES] Save %d/%d failed: %s", i + 1, len(pending), e | ||
| ) | ||
| # Don't raise - just log and continue | ||
|
|
||
| if failed_saves > 0: | ||
| _logger.warning( | ||
| "[DRAIN_WRITES] %d/%d saves failed or timed out. " | ||
| "Some memories may not have been persisted.", | ||
| failed_saves, | ||
| len(pending), | ||
| ) |
There was a problem hiding this comment.
drain_writes() timeout can still lead to blocking shutdown.
This method treats timed-out saves as abandoned, but close() still does _save_pool.shutdown(wait=True), which can block on the same long-running save and negate the timeout behavior during shutdown.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/memory/unified_memory.py` around lines 336 - 389,
drain_writes() treats timed-out saves as abandoned but close() still calls
_save_pool.shutdown(wait=True) which can block shutdown; change close() to avoid
blocking by calling self._save_pool.shutdown(wait=False) (or shutdown(...,
cancel_futures=True) if you require cancellation and runtime supports it), then
iterate over the executor's tracked futures in self._pending_saves to cancel any
still-pending futures (future.cancel()) and log cancellations; update references
in close() and use the existing _pending_saves/_pending_lock and drain_writes()
behavior to ensure shutdown does not block on long-running save tasks.
| def parse_cache_url() -> dict[str, Any] | None: | ||
| """Parse VALKEY_URL or REDIS_URL from environment. | ||
|
|
||
| Priority: VALKEY_URL > REDIS_URL. | ||
|
|
||
| Returns: | ||
| Dict with host, port, db, password keys, or None if no URL is set. | ||
| """ | ||
| url = os.environ.get("VALKEY_URL") or os.environ.get("REDIS_URL") | ||
| if not url: | ||
| return None | ||
| parsed = urlparse(url) | ||
| return { | ||
| "host": parsed.hostname or "localhost", | ||
| "port": parsed.port or 6379, | ||
| "db": _parse_db_from_path(parsed.path), | ||
| "password": parsed.password, | ||
| "use_tls": parsed.scheme in ("rediss", "valkeys"), | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for use_tls consumption in cache-related code
rg -n -C3 'use_tls|parse_cache_url' --type=pyRepository: crewAIInc/crewAI
Length of output: 17902
🏁 Script executed:
# Check if use_tls is passed in crewai-files cache backend
grep -A 10 'ValkeyCacheBackend' lib/crewai-files/src/crewai_files/cache/upload_cache.py | head -20Repository: crewAIInc/crewAI
Length of output: 770
🏁 Script executed:
# Check a2a/utils/task.py to see how use_tls is handled with ValkeyCache
sed -n '104,120p' lib/crewai/src/crewai/a2a/utils/task.pyRepository: crewAIInc/crewAI
Length of output: 757
🏁 Script executed:
# Find ValkeyCache class definition and its __init__ signature
fd -e py --type f | xargs grep -l 'class ValkeyCache' 2>/dev/nullRepository: crewAIInc/crewAI
Length of output: 169
🏁 Script executed:
sed -n '1,50p' lib/crewai/src/crewai/memory/storage/valkey_cache.pyRepository: crewAIInc/crewAI
Length of output: 1526
🏁 Script executed:
# Check tests for use_tls usage expectations
grep -n "use_tls" lib/crewai/tests/utilities/test_cache_config.pyRepository: crewAIInc/crewAI
Length of output: 42
🏁 Script executed:
# Check if ValkeyCache is tested with use_tls
grep -A 20 "class.*ValkeyCache" lib/crewai/tests/memory/storage/test_valkey_cache.py 2>/dev/null | head -30Repository: crewAIInc/crewAI
Length of output: 1126
The use_tls field from parse_cache_url() is not consumed by all cache clients.
The function correctly extracts the TLS flag from the URL scheme (rediss/valkeys), but ValkeyCache (used in a2a/utils/task.py) and ValkeyCacheBackend (used in crewai-files/cache/upload_cache.py) don't accept this parameter in their __init__ methods. Only ValkeyStorage respects the TLS flag. This means TLS configuration from environment variables is silently dropped for some cache implementations.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/utilities/cache_config.py` around lines 14 - 32,
Summary: parse_cache_url() extracts a use_tls flag but ValkeyCache and
ValkeyCacheBackend ignore it, so TLS config is dropped. Fix: add an optional
use_tls: bool = False parameter to ValkeyCache.__init__ and
ValkeyCacheBackend.__init__, accept the value produced by parse_cache_url(), and
propagate it into the underlying ValkeyStorage/connection creation (or pass
through to whichever constructor currently accepts TLS) so that the TLS behavior
from parse_cache_url() is honored; keep default False to preserve backward
compatibility and update any call sites that construct
ValkeyCache/ValkeyCacheBackend to forward parse_cache_url()["use_tls"] when
available.
Summary
Fixes #5794. The
memory_indexFT schema only materializesembedding,scope,categories,created_at, andimportance.ValkeyStorage._vector_searchwas emittingmetadata_filterclauses as@{key}:{value}against that index, but metadata keys are user-defined and not part of the schema, so the FT.SEARCH server would either error out or silently return results that the metadata predicate failed to narrow.This PR implements Option B from the issue: apply
metadata_filteras a Python post-filter outsideFT.SEARCH. Scope and category predicates remain pushed down intoFT.SEARCHbecause they are valid index fields. The chosen approach avoids changing the index schema and works for arbitrary, dynamic metadata keys.Key changes in
lib/crewai/src/crewai/memory/storage/valkey_storage.py:_vector_searchno longer emits@<key>:{value}clauses formetadata_filterkeys.metadata_filteris supplied, KNN is overfetched (limit * 10, capped at1000) so the post-filter still returns the caller-requested number of hits in the common case. The final result is truncated back tolimit._matches_metadata_filterhelper performs string-coerced equality so callers can pass numeric / boolean / string filter values interchangeably.metadata_filter={}is normalized toNone(no overfetch, no post-filter work)._vector_search,asearch, andsearchupdated to reflect the new contract.Tests in
lib/crewai/tests/memory/storage/test_valkey_storage_search.py:TestValkeyStorageMetadataPostFilterclass — regression tests for feat(valkey): metadata_filter fields not indexed in memory_index FT schema #5794 covering: predicates are not pushed into the FT query, missing-key records are dropped, mismatched-value records are dropped, multi-key AND logic, numeric values, empty filter dict, overfetch preserves callerlimit, truncation, and scope/categories pushdown is unaffected.TestValkeyStorageMatchesMetadataFilterclass — unit tests for the new helper.Dependency note
This PR builds on the unmerged #5703 (the PR that introduces
valkey_storage.py). The two changed files are exclusive to #5703, so the diff againstmainincludes the parent PR's contents. Recommend merging #5703 first, then rebasing this PR ontomain— at that point the diff againstmainwill collapse to only this fix.Verification
Locally on
python3.13+uv sync --all-extras --dev:Review & Testing Checklist for Human
_METADATA_POSTFILTER_OVERFETCH = 10and_METADATA_POSTFILTER_MAX_FETCH = 1000. These trade off correctness (more candidates → more likely to satisfylimit) vs. latency. For workloads where most records match the metadata predicate the defaults are likely fine; for highly selective predicates the overfetch may still not returnlimitresults — that's an inherent post-filter limitation.valkey/valkey-bundle:latest) to confirm the post-filter behaves correctly when records are returned from a live FT.SEARCH, not just mocks.mainand is gated on Feat/valkey 4 storage #5703 merging first.Notes
str(record_metadata[key]) != str(expected)so callers can pass numeric or boolean filter values without first coercing them to strings. This mirrors how the legacy code stringified values viaf"@{key}:{{{str(value)}}}".limithappens after both the scope boundary check and the metadata post-filter, so records that pass all filters are returned in descending-score order.Link to Devin session: https://app.devin.ai/sessions/7edf70b38f1f4d9597b636d3fd0a31e5
Summary by CodeRabbit
New Features
Improvements
Tests