[core] fix: replace per-run log sinks with single dispatch sink to fix leaks and HDFS stalls#63
Conversation
…urrency Replace the "one loguru sink per run" design with a single global dispatch sink that routes each record to its run's file in O(1) by run_id, backed by one background writer thread. Previously every record fanned out across all live sinks (O(num_runs) filtering per message) and each run leaked a sink + thread + file descriptor when not cleaned up, which is the main reason logging fell over under large-scale concurrent training. Also call cleanup_handlers(run_id) in UniAgentLoop.run()'s finally block so the per-run file is closed on the training path, and fix DEBUG_MODE parsing so values like "0"/"false" are no longer treated as enabled. Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Code Review
This pull request refactors the logging system to use a single global Loguru sink with asynchronous queueing and custom dispatching to run-specific files, replacing the previous dynamic handler registry. In the agent loop, a cleanup call is added to close these files when a run finishes. The review feedback identifies a critical issue where closing the file descriptor immediately in cleanup_handlers can cause pending asynchronous log messages to be lost. To resolve this, it is recommended to make cleanup_handlers asynchronous and await logger.complete() before closing the file, and consequently await cleanup_handlers in the agent loop's finally block.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def cleanup_handlers(run_id: str) -> None: | ||
| with _lock: | ||
| entry = _run_files.pop(run_id, None) | ||
| if entry is not None: | ||
| try: | ||
| entry[0].close() | ||
| except OSError: | ||
| pass |
There was a problem hiding this comment.
Since the global loguru sink is configured with enqueue=True, log messages are processed asynchronously in a background thread. If cleanup_handlers immediately pops and closes the file object, any pending log messages still in the queue for this run_id will be silently discarded because the file will either be closed or removed from _run_files before the background thread can process them.
To prevent losing critical trailing logs (such as final metrics, status, or error details), cleanup_handlers should be made asynchronous and await logger.complete() to ensure all enqueued log messages are fully flushed and written before the file is closed.
| def cleanup_handlers(run_id: str) -> None: | |
| with _lock: | |
| entry = _run_files.pop(run_id, None) | |
| if entry is not None: | |
| try: | |
| entry[0].close() | |
| except OSError: | |
| pass | |
| async def cleanup_handlers(run_id: str) -> None: | |
| await logger.complete() | |
| with _lock: | |
| entry = _run_files.pop(run_id, None) | |
| if entry is not None: | |
| try: | |
| entry[0].close() | |
| except OSError: | |
| pass |
| output = await self._build_empty_agent_output(exit_reason="agent_loop_failed") | ||
| finally: | ||
| await self.env.close() | ||
| cleanup_handlers(self.run_id) |
Summary
Rework the per-run logging backend so it survives large-scale concurrent
training. Replaces the "one loguru sink per run" design with a single global
dispatch sink, fixes a sink/thread/fd leak, and stops per-record flushes from
stalling the writer thread when
log_dirlives on HDFS/FUSE.Problem
The previous implementation called
logger.add(<file>)once per run:each re-evaluating a per-run
case_filter. Cost grew with the number ofconcurrent runs.
If
cleanup_handlerswasn't called (and it wasn't, on the training path),these accumulated until logging fell over / fds were exhausted.
log_dir, each flush is a synchronous remotewrite(); with a single sharedwriter thread, one slow flush blocked logging for all runs.
DEBUG_MODEparsing treated"0"/"false"/""as enabled (non-empty stringis truthy).
What changed
uni_agent/async_logging.pylogger.add(_dispatch, enqueue=True)routes eachrecord to its run's file in O(1) by
run_id, backed by one background writerthread.
cleanup_handlers(run_id)closes the run's file object anddrops it from the registry (no more leaked sinks/threads/fds).
and reach the OS ~every
io.DEFAULT_BUFFER_SIZEor onclose(), drasticallycutting HDFS/FUSE
write()frequency. SetLOG_FLUSH_EACH_LINE=1fornear-real-time tailing on local disks.
runs outside it, so a slow write can't block run registration/cleanup. The
write-after-close race is handled by catching
ValueError/OSError.DEBUG_MODEparsing via explicit truthy-value parsing.uni_agent/agent_loop.pycleanup_handlers(self.run_id)inUniAgentLoop.run()'sfinallyblockso the per-run file is closed on the training path (success and failure).