diff --git a/CHANGELOG.md b/CHANGELOG.md index 74993e85..8abdaf41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## vTBD +- [BP-1714](https://movai.atlassian.net/browse/BP-1714): Validate file size and safeguard for noneviction raised errors + - Add validation for file size is lower than available memory before writing to Redis. + ## v3.25.4 - [BP-1715](https://movai.atlassian.net/browse/BP-1715): Add tests for scopes import/export functionality diff --git a/dal/movaidb/database.py b/dal/movaidb/database.py index 0dd6aec8..d96775df 100755 --- a/dal/movaidb/database.py +++ b/dal/movaidb/database.py @@ -32,6 +32,7 @@ StrOrDictRecursive = Union[str, None, Dict[str, "StrOrDictRecursive"]] DB_CONNECT_RETRIES = 3 DB_CONNECT_BASE_DELAY = 0.1 +REDIS_WRITE_BUFFER = 64 * 1024 * 1024 LOGGER = Log.get_logger("dal.mov.ai") @@ -39,6 +40,18 @@ dal_directory = path.dirname(dal.__file__) +def redis_value_size(value: Any) -> int: + """Best-effort byte size for Redis write diagnostics.""" + if isinstance(value, bytes): + return len(value) + if isinstance(value, str): + return len(value.encode("utf-8")) + try: + return len(pickle.dumps(value)) + except Exception: + return 0 + + def longest_common_prefix(strings: List[str]) -> str: """ Finds the longest common prefix string amongst an array of strings. @@ -342,6 +355,21 @@ def __init__( self._background_tasks = set() + def validate_file_write(self, key, value): + payload_size = redis_value_size(value) + required_memory = payload_size + REDIS_WRITE_BUFFER + + memory = self.db_write.info("memory") + maxmemory = memory.get("maxmemory", 0) + used_memory = memory.get("used_memory", 0) + + available_memory = maxmemory - used_memory + + if required_memory > available_memory: + raise Exception( + f"Cannot write key '{key}': payload size {payload_size} bytes exceeds available memory {available_memory} bytes." + ) + def search(self, _input: dict) -> list: """ Search redis for a certain structure, returns a list of matching @@ -498,6 +526,32 @@ def set( db_set = pipe if isinstance(pipe, Pipeline) else self.db_write.pipeline() # Save each key value in redis according to template value type for key, value, source in kvs: + if source == "file": + # For files, validate if file size is within Redis limits before writing + self.validate_file_write(key, value) + + if pickl: + value = pickle.dumps(value) + + try: + # The file may still not fit in Redis, even if it passed validation + # as there is some object and allocation overhead. + # If the write fails (due to noneviction policy), an OOM error is raised. + + # Pipeline errors include the full command in redis-py 3.x. + # For large values, formatting that error can exhaust the + # backend memory when Redis uses the noeviction policy. + self.db_write.set(key, value, ex=ex, px=px, nx=nx, xx=xx) + except Exception as error: + LOGGER.error( + "Redis file write failed for key '%s' (%s bytes) with error: %s", + key, + redis_value_size(value), + error, + ) + raise + continue + if pickl and source not in ["hash", "list"]: value = pickle.dumps(value) try: diff --git a/pyproject.toml b/pyproject.toml index 5d78d5a1..e5cff8d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,7 @@ dal = [ line-length = 100 [tool.bumpversion] -current_version = "3.25.4.1" +current_version = "3.25.5.0" parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)?(\\.(?P\\d+))?" serialize = ["{major}.{minor}.{patch}.{build}"]