Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## vTBD
- [BP-1714](https://movai.atlassian.net/browse/BP-1714): Validate file size and safeguard for noneviction raised errors
- Add validation for file size is lower than available memory before writing to Redis.

## v3.23.7
- [BP-1704](https://movai.atlassian.net/browse/BP-1704): Optimize and remove unused imports
- Use lazy imports for dal/scopes
Expand Down
54 changes: 54 additions & 0 deletions dal/movaidb/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,26 @@
StrOrDictRecursive = Union[str, None, Dict[str, "StrOrDictRecursive"]]
DB_CONNECT_RETRIES = 3
DB_CONNECT_BASE_DELAY = 0.1
REDIS_WRITE_BUFFER = 64 * 1024 * 1024

LOGGER = Log.get_logger("dal.mov.ai")


dal_directory = path.dirname(dal.__file__)


def redis_value_size(value: Any) -> int:
"""Best-effort byte size for Redis write diagnostics."""
if isinstance(value, bytes):
return len(value)
if isinstance(value, str):
return len(value.encode("utf-8"))
try:
return len(pickle.dumps(value))
except Exception:
return 0


def longest_common_prefix(strings: List[str]) -> str:
"""
Finds the longest common prefix string amongst an array of strings.
Expand Down Expand Up @@ -342,6 +355,21 @@

self._background_tasks = set()

def validate_file_write(self, key, value):
payload_size = redis_value_size(value)
required_memory = payload_size + REDIS_WRITE_BUFFER

memory = self.db_write.info("memory")
maxmemory = memory.get("maxmemory", 0)
used_memory = memory.get("used_memory", 0)

available_memory = maxmemory - used_memory

if required_memory > available_memory:
raise Exception(
f"Cannot write key '{key}': payload size {payload_size} bytes exceeds available memory {available_memory} bytes."

Check warning on line 370 in dal/movaidb/database.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

line too long (129 > 100 characters)

See more on https://sonarcloud.io/project/issues?id=MOV-AI_data-access-layer&issues=AZ7Lh_6ICXkBVYQip6Ab&open=AZ7Lh_6ICXkBVYQip6Ab&pullRequest=439
)

Check warning on line 371 in dal/movaidb/database.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this generic exception class with a more specific one.

See more on https://sonarcloud.io/project/issues?id=MOV-AI_data-access-layer&issues=AZ7Lh_6ICXkBVYQip6Aa&open=AZ7Lh_6ICXkBVYQip6Aa&pullRequest=439

def search(self, _input: dict) -> list:
"""
Search redis for a certain structure, returns a list of matching
Expand Down Expand Up @@ -498,6 +526,32 @@
db_set = pipe if isinstance(pipe, Pipeline) else self.db_write.pipeline()
# Save each key value in redis according to template value type
for key, value, source in kvs:
if source == "file":
# For files, validate if file size is within Redis limits before writing
self.validate_file_write(key, value)

if pickl:
value = pickle.dumps(value)

try:
# The file may still not fit in Redis, even if it passed validation
# as there is some object and allocation overhead.
# If the write fails (due to noneviction policy), an OOM error is raised.

# Pipeline errors include the full command in redis-py 3.x.
# For large values, formatting that error can exhaust the
# backend memory when Redis uses the noeviction policy.
self.db_write.set(key, value, ex=ex, px=px, nx=nx, xx=xx)
except Exception as error:
LOGGER.error(
"Redis file write failed for key '%s' (%s bytes) with error: %s",
key,
redis_value_size(value),
error,
)
raise
continue

if pickl and source not in ["hash", "list"]:
value = pickle.dumps(value)
try:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ dal = [
line-length = 100

[tool.bumpversion]
current_version = "3.23.7.1"
current_version = "3.23.8.0"
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)?(\\.(?P<build>\\d+))?"
serialize = ["{major}.{minor}.{patch}.{build}"]

Expand Down
Loading