Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/daemon/include/core/base_event_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
ANYTHING_NAMESPACE_BEGIN

enum class index_job_type : char {
add, remove, update, scan, recursive_update, init_scan, refresh
add, remove, update, scan, recursive_update, init_scan, refresh,
commit_volatile_index,
commit_persistent_index
};

struct index_job {
Expand All @@ -45,6 +47,8 @@ class base_event_handler
void set_index_invalid_and_restart();
virtual bool handle_config_change(const std::string &key, const event_handler_config &new_config);

static gboolean trigger_commit_persistent_index(base_event_handler *handler);

protected:
void set_batch_size(std::size_t size);

Expand All @@ -60,6 +64,7 @@ class base_event_handler
private:
void eat_jobs(std::vector<anything::index_job>& jobs, std::size_t number);
void eat_job(const anything::index_job& job);
void check_jobs_load();

void jobs_push(std::string src, anything::index_job_type type, std::optional<std::string> dst = std::nullopt);

Expand All @@ -82,19 +87,17 @@ class base_event_handler
std::thread timer_;

bool index_dirty_;
bool volatile_index_dirty_;
int commit_volatile_index_timeout_;
int commit_persistent_index_timeout_;

anything::index_status index_status_;

gint event_process_thread_count_;

std::atomic<bool> stop_scan_directory_;

std::mutex config_access_mtx_;

gint batch_count_;

bool wait_commit_persistent_index_;
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
};

#endif // ANYTHING_BASE_EVENT_HANDLER_H_
110 changes: 61 additions & 49 deletions src/daemon/src/core/base_event_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
#include "utils/running_flag.h"

#include <QCoreApplication>
#include <glib/gstdio.h>

Check warning on line 14 in src/daemon/src/core/base_event_handler.cpp

View workflow job for this annotation

GitHub Actions / cppcheck

Include file: <glib/gstdio.h> not found. Please note: Cppcheck does not need standard library headers to get proper results.

Check warning on line 14 in src/daemon/src/core/base_event_handler.cpp

View workflow job for this annotation

GitHub Actions / static-check / static-check

Include file: <glib/gstdio.h> not found. Please note: Cppcheck does not need standard library headers to get proper results.

#define REFRESH_INDEX_FILE "refresh_index"
#define JOB_BATCH_COUNT_FOR_LIGHT_LOAD 4


static std::string get_refresh_index_path(const std::string &volatile_index_dir)
Expand Down Expand Up @@ -67,13 +68,11 @@
pool_(1),
cancellable_(g_cancellable_new()),
index_dirty_(false),
volatile_index_dirty_(false),
commit_volatile_index_timeout_(config_.commit_volatile_index_timeout),
commit_persistent_index_timeout_(config_.commit_persistent_index_timeout),
index_status_(anything::index_status::loading),
event_process_thread_count_(0),
stop_scan_directory_(false),
batch_count_(0) {
batch_count_(0),
wait_commit_persistent_index_(false) {
// 若发现索引数量为空或异常退出, 则设置 refresh_index 标志以触发扫盘逻辑
// 索引版本号会保存为一条记录
if (index_manager_.document_size(false) <= 1 || !is_last_time_normal_quit()) {
Expand Down Expand Up @@ -135,6 +134,15 @@
}
}

gboolean base_event_handler::trigger_commit_persistent_index(base_event_handler *handler)
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
{
// 不使用 jobs_push 避免将 index_dirty_ 置为 true
std::lock_guard<std::mutex> lock(handler->jobs_mtx_);
spdlog::debug("Push job: commit_persistent_index");
handler->jobs_.emplace_back("", anything::index_job_type::commit_persistent_index);
return FALSE;
}

void base_event_handler::set_batch_size(std::size_t size) {
batch_size_ = size;
}
Expand Down Expand Up @@ -204,26 +212,22 @@

void base_event_handler::eat_jobs(std::vector<anything::index_job>& jobs, std::size_t number) {
std::vector<anything::index_job> processing_jobs;

processing_jobs.insert(
processing_jobs.end(),
std::make_move_iterator(jobs.begin()),
std::make_move_iterator(jobs.begin() + number));
jobs.erase(jobs.begin(), jobs.begin() + number);

g_atomic_int_inc(&batch_count_);
pool_.enqueue_detach([this, processing_jobs = std::move(processing_jobs)]() {
g_atomic_int_inc(&this->event_process_thread_count_);
for (const auto& job : processing_jobs) {
eat_job(job);
}
g_atomic_int_dec_and_test(&this->event_process_thread_count_);
g_atomic_int_dec_and_test(&this->batch_count_);
check_jobs_load();
});

if ((g_atomic_int_get(&batch_count_)*(int)batch_size_) >= config_.pending_events_trigger_updating &&
index_status_ == anything::index_status::monitoring) {
index_status_ = anything::index_status::updating;
index_manager_.set_index_updating();
}
}

void base_event_handler::eat_job(const anything::index_job& job) {
Expand Down Expand Up @@ -298,6 +302,34 @@
index_manager_.refresh_indexes(get_blacklist_paths(), true, false);
ret = true;
break;
case anything::index_job_type::commit_volatile_index:
spdlog::debug("Eat job: commit_volatile_index");
if (g_atomic_int_get(&batch_count_) <= JOB_BATCH_COUNT_FOR_LIGHT_LOAD) {
// 现在线程池中待处理的任务较少
if (index_status_ == anything::index_status::updating)
index_status_ = anything::index_status::monitoring;
ret = index_manager_.commit(index_status_);
if (ret) {
if (!wait_commit_persistent_index_) {
wait_commit_persistent_index_ = true;
g_timeout_add_seconds (config_.commit_persistent_index_timeout,
(GSourceFunc)trigger_commit_persistent_index,
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
this);
}
} else {
spdlog::error("Failed to commit index");
}
} else {
ret = true;
spdlog::debug("Skip to commit volatile index due to a large number of pending events");
}
break;
case anything::index_job_type::commit_persistent_index:
spdlog::debug("Eat job: commit_persistent_index");
index_manager_.persist_index();
wait_commit_persistent_index_ = false;
ret = true;
break;
default:
spdlog::error("Invalid job type: {}", static_cast<int>(job.type));
break;
Expand All @@ -309,6 +341,16 @@
}
}

void base_event_handler::check_jobs_load()
{
if ((g_atomic_int_get(&batch_count_)*(int)batch_size_) >= config_.pending_events_trigger_updating &&
index_status_ == anything::index_status::monitoring) {
index_status_ = anything::index_status::updating;
index_manager_.set_index_updating();
spdlog::info("Set index status to updating");
}
}

void base_event_handler::jobs_push(std::string src,
anything::index_job_type type, std::optional<std::string> dst) {

Expand Down Expand Up @@ -366,49 +408,19 @@

while(!cancellable_wait(cancellable_, cancellable_fd, (gint)interval)) {
std::lock_guard<std::mutex> lock(jobs_mtx_);
if (!jobs_.empty()) {
eat_jobs(jobs_, std::min(batch_size_, jobs_.size()));
}

// Commit volatile index
// trigger commit volatile index
if (index_dirty_ && commit_volatile_index_timeout_ > 0)
--commit_volatile_index_timeout_;
if (commit_volatile_index_timeout_ == 0 && jobs_.empty()) {
int retry_count = 1;
bool cancellable_wait_ret = false;
while (true) {
if (!pool_.busy() && g_atomic_int_get(&event_process_thread_count_) == 0) {
if (index_status_ == anything::index_status::updating)
index_status_ = anything::index_status::monitoring;
if (!index_manager_.commit(index_status_)) {
spdlog::info("Failed to commit index");
set_index_invalid_and_restart();
}
commit_volatile_index_timeout_ = config_.commit_volatile_index_timeout;
index_dirty_ = false;
volatile_index_dirty_ = true;
break;
} else {
if (retry_count-- <= 0)
break;
cancellable_wait_ret = cancellable_wait(cancellable_, cancellable_fd, (gint)interval);
if (cancellable_wait_ret)
break;
}
}
if (cancellable_wait_ret)
break;
if (commit_volatile_index_timeout_ == 0) {
spdlog::debug("Push job: commit_volatile_index");
jobs_.emplace_back("", anything::index_job_type::commit_volatile_index);
commit_volatile_index_timeout_ = config_.commit_volatile_index_timeout;
index_dirty_ = false;
}

// Commit persistent index
if (volatile_index_dirty_ && commit_persistent_index_timeout_ > 0)
--commit_persistent_index_timeout_;
if (commit_persistent_index_timeout_ == 0 && jobs_.empty() && !pool_.busy() &&
g_atomic_int_get(&event_process_thread_count_) == 0) {
index_manager_.persist_index();
commit_persistent_index_timeout_ = config_.commit_persistent_index_timeout;
volatile_index_dirty_ = false;
}
if (!jobs_.empty())
eat_jobs(jobs_, jobs_.size());
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
}

if (cancellable_fd != -1) {
Expand Down
Loading