Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions wavefront/server/apps/floware/floware/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ async def start_redis_listener(
queue = asyncio.Queue()

pubsub = cache_manager.subscribe(channels=[REDIS_API_SERVICE_UPDATES_CHANNEL])

if pubsub is None:
logger.info('Cache is disabled — skipping Redis pubsub listener')
return

logger.info('Subscribed to Redis channel: %s', REDIS_API_SERVICE_UPDATES_CHANNEL)

# Capture the running loop from the main thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def __init__(
self.initial_backoff = initial_backoff
self.max_backoff = max_backoff

self.cache_enabled = (
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DId you test this? Is it working fine with cache disabled ?

os.getenv('WAVEFRONT_CACHE_ENABLED', 'true').lower() == 'true'
)

self.pool = self._create_connection_pool(
connection_timeout=connection_timeout,
socket_timeout=socket_timeout,
Expand All @@ -50,6 +54,12 @@ def __init__(
logger.error('Server will not start without Redis connectivity')
raise RuntimeError(f'Redis connection test failed: {e}') from e

if not self.cache_enabled:
logger.info(
'Cache get/set operations are disabled via WAVEFRONT_CACHE_ENABLED; '
'pub/sub remains active'
)

def _create_connection_pool(
self,
connection_timeout: int,
Expand Down Expand Up @@ -100,6 +110,9 @@ def add(
expiry: int = 3600,
nx: bool = False,
) -> bool:
if not self.cache_enabled:
return False

try:
logger.info(f'Adding key: {key} to cache with expiry: {expiry} seconds')
return bool(
Expand All @@ -115,6 +128,9 @@ def add(
retry=retry_if_exception_type((RedisError, ConnectionError, TimeoutError)),
)
def get_str(self, key: str, default: Any = None) -> Optional[str]:
if not self.cache_enabled:
return default

try:
value = self.redis.get(f'{self.namespace}/{key}')
return value if value is not None else default
Expand All @@ -124,10 +140,16 @@ def get_str(self, key: str, default: Any = None) -> Optional[str]:
raise

def get_int(self, key: str, default: int = 0) -> int:
if not self.cache_enabled:
return default

value = self.get_str(key, default)
return int(value) if value is not None else default

def remove(self, key: str) -> bool:
if not self.cache_enabled:
return False

try:
return bool(self.redis.delete(f'{self.namespace}/{key}'))
except (RedisError, ConnectionError, TimeoutError) as e:
Expand All @@ -136,6 +158,9 @@ def remove(self, key: str) -> bool:

def invalidate_query(self, pattern: str) -> int:
"""Remove all keys matching the given pattern"""
if not self.cache_enabled:
return 0

try:
# Get all keys matching the pattern
search_pattern = f'{self.namespace}/{pattern}'
Expand Down Expand Up @@ -170,6 +195,7 @@ def publish(self, channel: str, message: str) -> int:
Raises:
RedisError: If publishing fails
"""

try:
full_channel = f'{self.namespace}/{channel}'
logger.info(f'Publishing message to channel: {full_channel}')
Expand Down Expand Up @@ -203,6 +229,7 @@ def subscribe(
if message['type'] == 'message':
print(f"Received: {message['data']}")
"""

try:
pubsub = self.redis.pubsub()

Expand Down
Loading