Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 34 additions & 13 deletions src/ray/util/event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,40 @@ LogEventReporter::LogEventReporter(SourceTypeVariant source_type,
file_name_ = "event_" + source_type_name +
(add_pid_to_file ? "_" + std::to_string(getpid()) : "") + ".log";

std::string log_sink_key = GetReporterKey() + log_dir_ + file_name_;
log_sink_ = spdlog::get(log_sink_key);
// If the file size is over {rotate_max_file_size_} MB, this file would be renamed
// for example event_GCS.0.log, event_GCS.1.log, event_GCS.2.log ...
// We allow to rotate for {rotate_max_file_num_} times.
if (log_sink_ == nullptr) {
log_sink_ = spdlog::rotating_logger_mt(log_sink_key,
log_dir_ + file_name_,
1048576 * rotate_max_file_size_,
rotate_max_file_num_);
}
log_sink_->set_pattern("%v");
// Store the sink key for lazy initialization.
// The log file will only be created on the first call to Report() or
// ReportExportEvent(), avoiding stale/empty log files at startup.
// See: https://github.com/ray-project/ray/issues/64153
log_sink_key_ = GetReporterKey() + log_dir_ + file_name_;
// Do NOT initialize log_sink_ here. It is lazily initialized in
// EnsureSinkInitialized(), which is called on the first write.
}

LogEventReporter::~LogEventReporter() { Flush(); }

void LogEventReporter::Flush() { log_sink_->flush(); }
void LogEventReporter::EnsureSinkInitialized() {
std::call_once(sink_init_once_, [this]() {
log_sink_ = spdlog::get(log_sink_key_);
// If the file size is over {rotate_max_file_size_} MB, this file would be renamed
// for example event_GCS.0.log, event_GCS.1.log, event_GCS.2.log ...
// We allow to rotate for {rotate_max_file_num_} times.
if (log_sink_ == nullptr) {
log_sink_ = spdlog::rotating_logger_mt(log_sink_key_,
log_dir_ + file_name_,
1048576 * rotate_max_file_size_,
rotate_max_file_num_);
}
log_sink_->set_pattern("%v");
});
}
Comment on lines +86 to 100

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In a multi-threaded environment, EventManager::Publish can be called concurrently from multiple threads because it only acquires a reader lock (absl::ReaderMutexLock). This means LogEventReporter::Report (and consequently EnsureSinkInitialized) can be executed concurrently by multiple threads.

Without synchronization, the check if (log_sink_ != nullptr) and the subsequent initialization of log_sink_ (which is a std::shared_ptr) is a data race, leading to undefined behavior. Furthermore, concurrent calls to spdlog::rotating_logger_mt with the same key will throw a spdlog_ex exception, causing the process to crash.

To resolve this, we should use std::call_once with a std::once_flag to ensure that log_sink_ is initialized safely and exactly once.

void LogEventReporter::EnsureSinkInitialized() {
  std::call_once(sink_init_once_, [this]() {
    log_sink_ = spdlog::get(log_sink_key_);
    // If the file size is over {rotate_max_file_size_} MB, this file would be renamed
    // for example event_GCS.0.log, event_GCS.1.log, event_GCS.2.log ...
    // We allow to rotate for {rotate_max_file_num_} times.
    if (log_sink_ == nullptr) {
      log_sink_ = spdlog::rotating_logger_mt(log_sink_key_,
                                             log_dir_ + file_name_,
                                             1048576 * rotate_max_file_size_,
                                             rotate_max_file_num_);
    }
    log_sink_->set_pattern("%v");
  });
}


void LogEventReporter::Flush() {
// Guard against an uninitialized sink (no events were ever written,
// so no log file was created).
if (log_sink_ != nullptr) {
log_sink_->flush();
}
}

std::string LogEventReporter::replaceLineFeed(std::string message) {
std::stringstream ss;
Expand Down Expand Up @@ -169,6 +186,8 @@ std::string LogEventReporter::ExportEventToString(const rpc::ExportEvent &export
void LogEventReporter::Report(const rpc::Event &event, const json &custom_fields) {
RAY_CHECK(Event_SourceType_IsValid(event.source_type()));
RAY_CHECK(Event_Severity_IsValid(event.severity()));
// Lazily create the log file on the first write.
EnsureSinkInitialized();
std::string result = EventToString(event, custom_fields);

log_sink_->info(result);
Expand All @@ -179,6 +198,8 @@ void LogEventReporter::Report(const rpc::Event &event, const json &custom_fields

void LogEventReporter::ReportExportEvent(const rpc::ExportEvent &export_event) {
RAY_CHECK(ExportEvent_SourceType_IsValid(export_event.source_type()));
// Lazily create the log file on the first write.
EnsureSinkInitialized();
std::string result = ExportEventToString(export_event);

log_sink_->info(result);
Expand Down
18 changes: 18 additions & 0 deletions src/ray/util/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <iomanip>
#include <iostream>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <utility>
Expand Down Expand Up @@ -126,6 +127,15 @@ class LogEventReporter : public BaseEventReporter {

virtual void Flush();

/**
* @brief Lazily initializes log_sink_ on the first write.
*
* Ensures that no log file is created on disk unless at least one event
* is actually written, avoiding stale/empty log files.
* Fixes: https://github.com/ray-project/ray/issues/64153
*/
void EnsureSinkInitialized();
Comment thread
cursor[bot] marked this conversation as resolved.

std::string GetReporterKey() override { return "log.event.reporter"; }

protected:
Expand All @@ -136,7 +146,15 @@ class LogEventReporter : public BaseEventReporter {

std::string file_name_;

// The key used to look up / register the spdlog logger. Stored to enable
// lazy initialization of log_sink_ in EnsureSinkInitialized().
std::string log_sink_key_;

// The underlying spdlog logger. nullptr until the first event is written.
std::shared_ptr<spdlog::logger> log_sink_;
Comment on lines +153 to 154

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

To support thread-safe lazy initialization of log_sink_ using std::call_once, we need to declare a std::once_flag member variable.

  // The underlying spdlog logger. nullptr until the first event is written.
  std::shared_ptr<spdlog::logger> log_sink_;

  // Ensure thread-safe lazy initialization of log_sink_.
  std::once_flag sink_init_once_;


// Ensure thread-safe lazy initialization of log_sink_.
std::once_flag sink_init_once_;
};

// store the reporters, add reporters and clean reporters
Expand Down
Loading