diff --git a/src/daemon/include/core/base_event_handler.h b/src/daemon/include/core/base_event_handler.h index c2abc75..921a996 100755 --- a/src/daemon/include/core/base_event_handler.h +++ b/src/daemon/include/core/base_event_handler.h @@ -18,7 +18,9 @@ ANYTHING_NAMESPACE_BEGIN enum class index_job_type : char { - add, remove, update, scan, recursive_update, init_scan, refresh + add, remove, update, scan, recursive_update, init_scan, refresh, + commit_volatile_index, + commit_persistent_index }; struct index_job { @@ -45,6 +47,8 @@ class base_event_handler void set_index_invalid_and_restart(); virtual bool handle_config_change(const std::string &key, const event_handler_config &new_config); + static gboolean trigger_commit_persistent_index(base_event_handler *handler); + protected: void set_batch_size(std::size_t size); @@ -60,6 +64,7 @@ class base_event_handler private: void eat_jobs(std::vector& jobs, std::size_t number); void eat_job(const anything::index_job& job); + void check_jobs_load(); void jobs_push(std::string src, anything::index_job_type type, std::optional dst = std::nullopt); @@ -82,19 +87,17 @@ class base_event_handler std::thread timer_; bool index_dirty_; - bool volatile_index_dirty_; int commit_volatile_index_timeout_; - int commit_persistent_index_timeout_; anything::index_status index_status_; - gint event_process_thread_count_; - std::atomic stop_scan_directory_; std::mutex config_access_mtx_; gint batch_count_; + + bool wait_commit_persistent_index_; }; #endif // ANYTHING_BASE_EVENT_HANDLER_H_ diff --git a/src/daemon/src/core/base_event_handler.cpp b/src/daemon/src/core/base_event_handler.cpp index b82ca44..fe13fb4 100755 --- a/src/daemon/src/core/base_event_handler.cpp +++ b/src/daemon/src/core/base_event_handler.cpp @@ -14,6 +14,7 @@ #include #define REFRESH_INDEX_FILE "refresh_index" +#define JOB_BATCH_COUNT_FOR_LIGHT_LOAD 4 static std::string get_refresh_index_path(const std::string &volatile_index_dir) @@ -67,13 +68,11 @@ base_event_handler::base_event_handler(const event_handler_config &config) pool_(1), cancellable_(g_cancellable_new()), index_dirty_(false), - volatile_index_dirty_(false), commit_volatile_index_timeout_(config_.commit_volatile_index_timeout), - commit_persistent_index_timeout_(config_.commit_persistent_index_timeout), index_status_(anything::index_status::loading), - event_process_thread_count_(0), stop_scan_directory_(false), - batch_count_(0) { + batch_count_(0), + wait_commit_persistent_index_(false) { // 若发现索引数量为空或异常退出, 则设置 refresh_index 标志以触发扫盘逻辑 // 索引版本号会保存为一条记录 if (index_manager_.document_size(false) <= 1 || !is_last_time_normal_quit()) { @@ -135,6 +134,15 @@ bool base_event_handler::handle_config_change(const std::string &key, const even } } +gboolean base_event_handler::trigger_commit_persistent_index(base_event_handler *handler) +{ + // 不使用 jobs_push 避免将 index_dirty_ 置为 true + std::lock_guard lock(handler->jobs_mtx_); + spdlog::debug("Push job: commit_persistent_index"); + handler->jobs_.emplace_back("", anything::index_job_type::commit_persistent_index); + return FALSE; +} + void base_event_handler::set_batch_size(std::size_t size) { batch_size_ = size; } @@ -204,26 +212,22 @@ void base_event_handler::init_refresh_scan_indexes(std::vector& ind void base_event_handler::eat_jobs(std::vector& jobs, std::size_t number) { std::vector processing_jobs; + processing_jobs.insert( processing_jobs.end(), std::make_move_iterator(jobs.begin()), std::make_move_iterator(jobs.begin() + number)); jobs.erase(jobs.begin(), jobs.begin() + number); + g_atomic_int_inc(&batch_count_); pool_.enqueue_detach([this, processing_jobs = std::move(processing_jobs)]() { - g_atomic_int_inc(&this->event_process_thread_count_); for (const auto& job : processing_jobs) { eat_job(job); } - g_atomic_int_dec_and_test(&this->event_process_thread_count_); g_atomic_int_dec_and_test(&this->batch_count_); + check_jobs_load(); }); - if ((g_atomic_int_get(&batch_count_)*(int)batch_size_) >= config_.pending_events_trigger_updating && - index_status_ == anything::index_status::monitoring) { - index_status_ = anything::index_status::updating; - index_manager_.set_index_updating(); - } } void base_event_handler::eat_job(const anything::index_job& job) { @@ -298,6 +302,34 @@ void base_event_handler::eat_job(const anything::index_job& job) { index_manager_.refresh_indexes(get_blacklist_paths(), true, false); ret = true; break; + case anything::index_job_type::commit_volatile_index: + spdlog::debug("Eat job: commit_volatile_index"); + if (g_atomic_int_get(&batch_count_) <= JOB_BATCH_COUNT_FOR_LIGHT_LOAD) { + // 现在线程池中待处理的任务较少 + if (index_status_ == anything::index_status::updating) + index_status_ = anything::index_status::monitoring; + ret = index_manager_.commit(index_status_); + if (ret) { + if (!wait_commit_persistent_index_) { + wait_commit_persistent_index_ = true; + g_timeout_add_seconds (config_.commit_persistent_index_timeout, + (GSourceFunc)trigger_commit_persistent_index, + this); + } + } else { + spdlog::error("Failed to commit index"); + } + } else { + ret = true; + spdlog::debug("Skip to commit volatile index due to a large number of pending events"); + } + break; + case anything::index_job_type::commit_persistent_index: + spdlog::debug("Eat job: commit_persistent_index"); + index_manager_.persist_index(); + wait_commit_persistent_index_ = false; + ret = true; + break; default: spdlog::error("Invalid job type: {}", static_cast(job.type)); break; @@ -309,6 +341,16 @@ void base_event_handler::eat_job(const anything::index_job& job) { } } +void base_event_handler::check_jobs_load() +{ + if ((g_atomic_int_get(&batch_count_)*(int)batch_size_) >= config_.pending_events_trigger_updating && + index_status_ == anything::index_status::monitoring) { + index_status_ = anything::index_status::updating; + index_manager_.set_index_updating(); + spdlog::info("Set index status to updating"); + } +} + void base_event_handler::jobs_push(std::string src, anything::index_job_type type, std::optional dst) { @@ -366,49 +408,19 @@ void base_event_handler::timer_worker(int64_t interval) { while(!cancellable_wait(cancellable_, cancellable_fd, (gint)interval)) { std::lock_guard lock(jobs_mtx_); - if (!jobs_.empty()) { - eat_jobs(jobs_, std::min(batch_size_, jobs_.size())); - } - // Commit volatile index + // trigger commit volatile index if (index_dirty_ && commit_volatile_index_timeout_ > 0) --commit_volatile_index_timeout_; - if (commit_volatile_index_timeout_ == 0 && jobs_.empty()) { - int retry_count = 1; - bool cancellable_wait_ret = false; - while (true) { - if (!pool_.busy() && g_atomic_int_get(&event_process_thread_count_) == 0) { - if (index_status_ == anything::index_status::updating) - index_status_ = anything::index_status::monitoring; - if (!index_manager_.commit(index_status_)) { - spdlog::info("Failed to commit index"); - set_index_invalid_and_restart(); - } - commit_volatile_index_timeout_ = config_.commit_volatile_index_timeout; - index_dirty_ = false; - volatile_index_dirty_ = true; - break; - } else { - if (retry_count-- <= 0) - break; - cancellable_wait_ret = cancellable_wait(cancellable_, cancellable_fd, (gint)interval); - if (cancellable_wait_ret) - break; - } - } - if (cancellable_wait_ret) - break; + if (commit_volatile_index_timeout_ == 0) { + spdlog::debug("Push job: commit_volatile_index"); + jobs_.emplace_back("", anything::index_job_type::commit_volatile_index); + commit_volatile_index_timeout_ = config_.commit_volatile_index_timeout; + index_dirty_ = false; } - // Commit persistent index - if (volatile_index_dirty_ && commit_persistent_index_timeout_ > 0) - --commit_persistent_index_timeout_; - if (commit_persistent_index_timeout_ == 0 && jobs_.empty() && !pool_.busy() && - g_atomic_int_get(&event_process_thread_count_) == 0) { - index_manager_.persist_index(); - commit_persistent_index_timeout_ = config_.commit_persistent_index_timeout; - volatile_index_dirty_ = false; - } + if (!jobs_.empty()) + eat_jobs(jobs_, jobs_.size()); } if (cancellable_fd != -1) {