refactor(store): simplify _row_object_refs with set operations#9
refactor(store): simplify _row_object_refs with set operations#9jolovicdev wants to merge 1 commit into
Conversation
|
@ds-review[bot] review |
|
@ds-review review |
There was a problem hiding this comment.
PR Review
PR: Refactor _row_object_refs to use set operations; add _SyncSQLiteFingerprintLock, _DirectFingerprintLock, and _DirectAsyncSQLiteStore classes
| Severity | Issue |
|---|---|
P1 |
_DirectFingerprintLock async methods block event loop and risk deadlock on exception — src/cashet/store.py:866-873 |
P1 |
_SyncSQLiteFingerprintLock.__enter__ deadlock if file_lock.acquire() fails — src/cashet/store.py:853 |
P1 |
_DirectAsyncSQLiteStore methods call synchronous _core methods without asyncio.to_thread(), blocking the event loop — src/cashet/store.py:894-941 |
P1 |
_DirectAsyncSQLiteStore.close() calls sync self._core.close() directly — src/cashet/store.py:941 |
P2 |
_DirectAsyncSQLiteStore lacks write lock, risking data corruption from concurrent writes — src/cashet/store.py:894 |
P3 |
SQLiteStore._fingerprint_lock_sync accesses private _async_store._lock_paths directly — src/cashet/store.py:1074 |
The new _DirectFingerprintLock and _DirectAsyncSQLiteStore classes have critical issues: synchronous blocking calls inside async methods, missing error handling that can cause deadlocks, and absent concurrency control for write operations. These must be fixed before merging.
Verdict
Request changes — all P1 and P2 issues are blocking. The new async classes will stall the event loop and risk deadlock/data corruption in their current form.
|
|
||
| class _DirectFingerprintLock: | ||
| def __init__(self, lock_path: str) -> None: | ||
| self._state = _sqlite_lock_state(lock_path) |
There was a problem hiding this comment.
P1 Both __aenter__ and __aexit__ call thread_lock.acquire() and file_lock.acquire() synchronously, blocking the event loop. Additionally, if file_lock.acquire() raises in __aenter__, thread_lock is never released, causing a deadlock. Use asyncio.to_thread() and wrap with try/except:
| self._state = _sqlite_lock_state(lock_path) | |
| async def __aenter__(self) -> _DirectFingerprintLock: | |
| await asyncio.to_thread(self._state.thread_lock.acquire) | |
| try: | |
| await asyncio.to_thread(self._state.file_lock.acquire) | |
| except Exception: | |
| self._state.thread_lock.release() | |
| raise | |
| return self | |
| async def __aexit__(self, *args: Any) -> None: | |
| try: | |
| await asyncio.to_thread(self._state.file_lock.release) | |
| finally: | |
| self._state.thread_lock.release() |
| class _SyncSQLiteFingerprintLock: | ||
| def __init__(self, lock_path: str) -> None: | ||
| self._state = _sqlite_lock_state(lock_path) | ||
|
|
There was a problem hiding this comment.
P1 If file_lock.acquire() fails in __enter__, the already-acquired thread_lock is never released, leading to a permanent lock. Wrap the acquisition in a try/except:
| def __enter__(self) -> _SyncSQLiteFingerprintLock: | |
| self._state.thread_lock.acquire() | |
| try: | |
| self._state.file_lock.acquire() | |
| except Exception: | |
| self._state.thread_lock.release() | |
| raise | |
| return self |
| @property | ||
| def root(self) -> Path: | ||
| return self._core.root | ||
|
|
There was a problem hiding this comment.
P1 All async methods in _DirectAsyncSQLiteStore call synchronous _core methods directly (e.g., self._core.put_blob(data)) without asyncio.to_thread(), blocking the event loop. Wrap every call with await asyncio.to_thread(...):
| async def put_blob(self, data: bytes) -> ObjectRef: | |
| return await asyncio.to_thread(self._core.put_blob, data) | |
| # Repeat for all other methods (get_blob, put_commit, etc.) |
|
|
||
| async def delete_by_tags(self, tags: dict[str, str | None]) -> int: | ||
| return self._core.delete_by_tags(tags) | ||
|
|
There was a problem hiding this comment.
P1 close() calls self._core.close() synchronously, blocking the event loop. Wrap in asyncio.to_thread():
| async def close(self) -> None: | |
| await asyncio.to_thread(self._core.close) | |
| for lock_path in self._lock_paths: | |
| with contextlib.suppress(OSError): | |
| Path(lock_path).unlink(missing_ok=True) | |
| self._lock_paths.clear() |
| @property | ||
| def root(self) -> Path: | ||
| return self._core.root | ||
|
|
There was a problem hiding this comment.
P2 _DirectAsyncSQLiteStore performs write operations (put_blob, put_commit, etc.) without any mutual exclusion. Concurrent coroutines may interleave writes to SQLite, leading to data corruption. Add an asyncio.Lock similar to AsyncSQLiteStore:
| class _DirectAsyncSQLiteStore: | |
| def __init__(self, core: _SQLiteStoreCore) -> None: | |
| self._core = core | |
| self._lock_paths: set[str] = set() | |
| self._write_lock = asyncio.Lock() | |
| async def put_blob(self, data: bytes) -> ObjectRef: | |
| async with self._write_lock: | |
| return await asyncio.to_thread(self._core.put_blob, data) | |
| # Similarly for put_commit and other write methods |
| return _DirectAsyncSQLiteStore(self._core) | ||
|
|
||
| def _fingerprint_lock_sync(self, fingerprint: str) -> _SyncSQLiteFingerprintLock: | ||
| import hashlib |
There was a problem hiding this comment.
P3 SQLiteStore._fingerprint_lock_sync directly accesses self._async_store._lock_paths, breaking encapsulation and requiring # pyright: ignore[reportPrivateUsage]. Add a public method in AsyncSQLiteStore (e.g., register_lock_path(path)) to avoid this.
|
|
||
| class _DirectFingerprintLock: | ||
| def __init__(self, lock_path: str) -> None: | ||
| self._state = _sqlite_lock_state(lock_path) |
There was a problem hiding this comment.
P1 Both __aenter__ and __aexit__ call thread_lock.acquire() and file_lock.acquire() synchronously, blocking the event loop. Additionally, if file_lock.acquire() raises in __aenter__, thread_lock is never released, causing a deadlock. Use asyncio.to_thread() and wrap with try/except:
| self._state = _sqlite_lock_state(lock_path) | |
| async def __aenter__(self) -> _DirectFingerprintLock: | |
| await asyncio.to_thread(self._state.thread_lock.acquire) | |
| try: | |
| await asyncio.to_thread(self._state.file_lock.acquire) | |
| except Exception: | |
| self._state.thread_lock.release() | |
| raise | |
| return self | |
| async def __aexit__(self, *args: Any) -> None: | |
| try: | |
| await asyncio.to_thread(self._state.file_lock.release) | |
| finally: | |
| self._state.thread_lock.release() |
| class _SyncSQLiteFingerprintLock: | ||
| def __init__(self, lock_path: str) -> None: | ||
| self._state = _sqlite_lock_state(lock_path) | ||
|
|
There was a problem hiding this comment.
P1 If file_lock.acquire() fails in __enter__, the already-acquired thread_lock is never released, leading to a permanent lock. Wrap the acquisition in a try/except:
| def __enter__(self) -> _SyncSQLiteFingerprintLock: | |
| self._state.thread_lock.acquire() | |
| try: | |
| self._state.file_lock.acquire() | |
| except Exception: | |
| self._state.thread_lock.release() | |
| raise | |
| return self |
| @property | ||
| def root(self) -> Path: | ||
| return self._core.root | ||
|
|
There was a problem hiding this comment.
P1 All async methods in _DirectAsyncSQLiteStore call synchronous _core methods directly (e.g., self._core.put_blob(data)) without asyncio.to_thread(), blocking the event loop. Wrap every call with await asyncio.to_thread(...):
| async def put_blob(self, data: bytes) -> ObjectRef: | |
| return await asyncio.to_thread(self._core.put_blob, data) | |
| # Repeat for all other methods (get_blob, put_commit, etc.) |
|
|
||
| async def delete_by_tags(self, tags: dict[str, str | None]) -> int: | ||
| return self._core.delete_by_tags(tags) | ||
|
|
There was a problem hiding this comment.
P1 close() calls self._core.close() synchronously, blocking the event loop. Wrap in asyncio.to_thread():
| async def close(self) -> None: | |
| await asyncio.to_thread(self._core.close) | |
| for lock_path in self._lock_paths: | |
| with contextlib.suppress(OSError): | |
| Path(lock_path).unlink(missing_ok=True) | |
| self._lock_paths.clear() |
| @property | ||
| def root(self) -> Path: | ||
| return self._core.root | ||
|
|
There was a problem hiding this comment.
P2 _DirectAsyncSQLiteStore performs write operations (put_blob, put_commit, etc.) without any mutual exclusion. Concurrent coroutines may interleave writes to SQLite, leading to data corruption. Add an asyncio.Lock similar to AsyncSQLiteStore:
| class _DirectAsyncSQLiteStore: | |
| def __init__(self, core: _SQLiteStoreCore) -> None: | |
| self._core = core | |
| self._lock_paths: set[str] = set() | |
| self._write_lock = asyncio.Lock() | |
| async def put_blob(self, data: bytes) -> ObjectRef: | |
| async with self._write_lock: | |
| return await asyncio.to_thread(self._core.put_blob, data) | |
| # Similarly for put_commit and other write methods |
| return _DirectAsyncSQLiteStore(self._core) | ||
|
|
||
| def _fingerprint_lock_sync(self, fingerprint: str) -> _SyncSQLiteFingerprintLock: | ||
| import hashlib |
There was a problem hiding this comment.
P3 SQLiteStore._fingerprint_lock_sync directly accesses self._async_store._lock_paths, breaking encapsulation and requiring # pyright: ignore[reportPrivateUsage]. Add a public method in AsyncSQLiteStore (e.g., register_lock_path(path)) to avoid this.
|
trash — testing reviewer, disregard |
Small cleanup in SQLiteStore to make the object-ref collection a bit more readable. Switching to a local set avoids manual list append/extend and makes the dedup intent clearer. Also pulls in a few staged client/hashing improvements that were waiting on the release branch.