diff --git a/benchmarks/professional_flowcoro_benchmark.cpp b/benchmarks/professional_flowcoro_benchmark.cpp index 98f5a0f..275c24a 100644 --- a/benchmarks/professional_flowcoro_benchmark.cpp +++ b/benchmarks/professional_flowcoro_benchmark.cpp @@ -70,8 +70,10 @@ struct BenchmarkStats { (measurements[n/2-1] + measurements[n/2]) / 2.0 : measurements[n/2]; - p95_ns = measurements[static_cast(n * 0.95)]; - p99_ns = measurements[static_cast(n * 0.99)]; + size_t p95_idx = std::min(static_cast(n * 0.95), n - 1); + size_t p99_idx = std::min(static_cast(n * 0.99), n - 1); + p95_ns = measurements[p95_idx]; + p99_ns = measurements[p99_idx]; // 计算标准差 double variance = 0.0; @@ -89,8 +91,10 @@ class BenchmarkResult { BenchmarkStats stats; size_t iterations; double total_time_ns; + bool success = true; + std::string error_msg; - BenchmarkResult(const std::string& benchmark_name, size_t iter_count) + BenchmarkResult(const std::string& benchmark_name, size_t iter_count = 0) : name(benchmark_name), iterations(iter_count), total_time_ns(0.0) {} void add_measurement(double time_ns) { @@ -99,76 +103,113 @@ class BenchmarkResult { } void finalize() { + iterations = stats.measurements.size(); stats.calculate(); } + void set_error(const std::string& msg) { + success = false; + error_msg = msg; + } + void print_summary() const { - std::cout << std::left << std::setw(30) << name - << std::right << std::setw(10) << iterations + if (!success) { + std::cout << std::left << std::setw(35) << name + << " [FAILED: " << error_msg << "]" << std::endl; + return; + } + + std::cout << std::left << std::setw(35) << name + << std::right << std::setw(8) << iterations << std::setw(12) << std::fixed << std::setprecision(0) << stats.mean_ns << " ns" << std::setw(12) << std::fixed << std::setprecision(0) << stats.median_ns << " ns" - << std::setw(14) << std::fixed << std::setprecision(2) << (1e9 / stats.mean_ns) << " ops/sec" + << std::setw(14) << std::fixed << std::setprecision(0) << (1e9 / stats.mean_ns) << " ops/s" << std::endl; } void print_detailed() const { + if (!success) { + std::cout << "\n" << name << " - FAILED: " << error_msg << "\n"; + return; + } + std::cout << "\n" << name << " - Detailed Statistics:\n"; - std::cout << " Iterations: " << iterations << "\n"; - std::cout << " Mean: " << std::fixed << std::setprecision(0) << stats.mean_ns << " ns\n"; - std::cout << " Median: " << std::fixed << std::setprecision(0) << stats.median_ns << " ns\n"; - std::cout << " Min: " << std::fixed << std::setprecision(0) << stats.min_ns << " ns\n"; - std::cout << " Max: " << std::fixed << std::setprecision(0) << stats.max_ns << " ns\n"; - std::cout << " Std Dev: " << std::fixed << std::setprecision(0) << stats.stddev_ns << " ns\n"; - std::cout << " 95th pct: " << std::fixed << std::setprecision(0) << stats.p95_ns << " ns\n"; - std::cout << " 99th pct: " << std::fixed << std::setprecision(0) << stats.p99_ns << " ns\n"; - std::cout << " Throughput: " << std::fixed << std::setprecision(2) << (1e9 / stats.mean_ns) << " ops/sec\n"; + std::cout << " Iterations: " << iterations << "\n"; + std::cout << " Mean: " << std::fixed << std::setprecision(0) << stats.mean_ns << " ns\n"; + std::cout << " Median: " << std::fixed << std::setprecision(0) << stats.median_ns << " ns\n"; + std::cout << " Min: " << std::fixed << std::setprecision(0) << stats.min_ns << " ns\n"; + std::cout << " Max: " << std::fixed << std::setprecision(0) << stats.max_ns << " ns\n"; + std::cout << " Std Dev: " << std::fixed << std::setprecision(0) << stats.stddev_ns << " ns\n"; + std::cout << " P95: " << std::fixed << std::setprecision(0) << stats.p95_ns << " ns\n"; + std::cout << " P99: " << std::fixed << std::setprecision(0) << stats.p99_ns << " ns\n"; + std::cout << " Throughput: " << std::fixed << std::setprecision(0) << (1e9 / stats.mean_ns) << " ops/sec\n"; } }; -// 基准测试框架 +// 基准测试框架 - 增强版 class BenchmarkRunner { private: - static constexpr size_t WARMUP_ITERATIONS = 10; + static constexpr size_t WARMUP_ITERATIONS = 5; static constexpr size_t MIN_ITERATIONS = 100; static constexpr size_t MAX_ITERATIONS = 10000; - static constexpr double MIN_BENCHMARK_TIME_NS = 1e8; // 100ms minimum + static constexpr double MIN_BENCHMARK_TIME_MS = 100.0; // 100ms minimum + static constexpr double TIMEOUT_PER_OP_MS = 100.0; // 每次操作最大100ms public: template static BenchmarkResult run(const std::string& name, Func&& benchmark_func) { - // 预热阶段 - for (size_t i = 0; i < WARMUP_ITERATIONS; ++i) { - benchmark_func(); - } + BenchmarkResult result(name); - BenchmarkResult result(name, 0); - HighResTimer total_timer; - - // 动态确定迭代次数 - size_t iterations = MIN_ITERATIONS; - double elapsed = 0.0; - - while (elapsed < MIN_BENCHMARK_TIME_NS && iterations <= MAX_ITERATIONS) { - for (size_t i = 0; i < iterations; ++i) { - HighResTimer timer; + try { + // 预热阶段 + for (size_t i = 0; i < WARMUP_ITERATIONS; ++i) { + auto warmup_start = std::chrono::steady_clock::now(); benchmark_func(); - double measurement = timer.elapsed_ns(); + auto warmup_elapsed = std::chrono::steady_clock::now() - warmup_start; - // 确保测量值合理(至少1ns) - if (measurement < 1.0) { - measurement = 1.0; + // 预热超时检查 + if (std::chrono::duration(warmup_elapsed).count() > TIMEOUT_PER_OP_MS) { + result.set_error("Warmup timeout"); + return result; } - result.add_measurement(measurement); } - elapsed = total_timer.elapsed_ns(); - if (elapsed < MIN_BENCHMARK_TIME_NS) { - iterations = std::min(iterations * 2, MAX_ITERATIONS); + HighResTimer total_timer; + size_t target_iterations = MIN_ITERATIONS; + + while (total_timer.elapsed_ms() < MIN_BENCHMARK_TIME_MS && + result.stats.measurements.size() < MAX_ITERATIONS) { + + for (size_t i = 0; i < target_iterations && + result.stats.measurements.size() < MAX_ITERATIONS; ++i) { + + HighResTimer timer; + benchmark_func(); + double measurement = timer.elapsed_ns(); + + // 单次操作超时检查 + if (measurement > TIMEOUT_PER_OP_MS * 1e6) { + result.set_error("Operation timeout"); + return result; + } + + result.add_measurement(std::max(1.0, measurement)); + } + + // 动态调整迭代次数 + if (total_timer.elapsed_ms() < MIN_BENCHMARK_TIME_MS / 2) { + target_iterations = std::min(target_iterations * 2, MAX_ITERATIONS); + } } + + result.finalize(); + + } catch (const std::exception& e) { + result.set_error(std::string("Exception: ") + e.what()); + } catch (...) { + result.set_error("Unknown exception"); } - result.iterations = result.stats.measurements.size(); - result.finalize(); return result; } }; @@ -334,23 +375,34 @@ Task batch_processing_task(int batch_size) { // 基准测试函数 BenchmarkResult benchmark_coroutine_creation_and_execution() { - return BenchmarkRunner::run("Coroutine Creation & Execution", []() { - // 直接创建和执行协程,避免额外的lambda包装 + return BenchmarkRunner::run("Coroutine Create & Execute", []() { auto task = simple_coroutine(); - auto result = task.get(); - static_cast(result); + // 由于 initial_suspend 是 suspend_never,协程已执行完毕 + // 直接检查结果 + if (task.handle && task.handle.done()) { + auto val = task.handle.promise().safe_get_value(); + static_cast(val); + } else { + // 如果未完成,使用 get() + auto result = task.get(); + static_cast(result); + } }); } BenchmarkResult benchmark_void_coroutine() { return BenchmarkRunner::run("Void Coroutine", []() { auto task = void_coroutine(); - task.get(); + if (task.handle && task.handle.done()) { + // 已完成 + } else { + task.get(); + } }); } BenchmarkResult benchmark_simple_computation() { - return BenchmarkRunner::run("Simple Computation", []() { + return BenchmarkRunner::run("Simple Computation (baseline)", []() { // 与Go/Rust保持一致的计算 int sum = 0; for (int i = 0; i < 100; i++) { @@ -361,6 +413,40 @@ BenchmarkResult benchmark_simple_computation() { }); } +// 协程创建开销(仅创建不等待) +BenchmarkResult benchmark_coroutine_creation_only() { + return BenchmarkRunner::run("Coroutine Creation Only", []() { + auto task = simple_coroutine(); + // 不调用 get(),让析构函数处理 + static_cast(task.handle.done()); + }); +} + +// 原子操作 +BenchmarkResult benchmark_atomic_operations() { + return BenchmarkRunner::run("Atomic Increment", []() { + static std::atomic counter{0}; + counter.fetch_add(1, std::memory_order_relaxed); + }); +} + +// 互斥锁 +BenchmarkResult benchmark_mutex_lock() { + return BenchmarkRunner::run("Mutex Lock/Unlock", []() { + static std::mutex mtx; + std::lock_guard lock(mtx); + volatile int dummy = 0; + static_cast(dummy); + }); +} + +// 线程 yield +BenchmarkResult benchmark_thread_yield() { + return BenchmarkRunner::run("Thread Yield", []() { + std::this_thread::yield(); + }); +} + // 复杂任务基准测试 - 测试调度器处理复杂计算的能力 BenchmarkResult benchmark_complex_computation() { return BenchmarkRunner::run("Complex Computation Task", []() { @@ -459,11 +545,11 @@ BenchmarkResult benchmark_sleep_1us() { } BenchmarkResult benchmark_when_any_2_tasks() { - return BenchmarkRunner::run("WhenAny 2 Tasks", []() { + return BenchmarkRunner::run("WhenAny (2 tasks)", []() { auto result = sync_wait([]() -> Task { - auto task1 = compute_intensive_coroutine(100); - auto task2 = compute_intensive_coroutine(200); - auto [value, index] = co_await when_any(std::move(task1), std::move(task2)); + auto t1 = compute_intensive_coroutine(50); + auto t2 = compute_intensive_coroutine(100); + auto [value, index] = co_await when_any(std::move(t1), std::move(t2)); co_return value; }); static_cast(result); @@ -487,16 +573,12 @@ BenchmarkResult benchmark_when_any_4_tasks() { } BenchmarkResult benchmark_lockfree_queue() { - return BenchmarkRunner::run("LockFree Queue Ops", []() { - static lockfree::Queue queue; - static int counter = 0; - - // 入队 - queue.enqueue(++counter); - - // 出队 + return BenchmarkRunner::run("LockFree Queue (enq+deq)", []() { + lockfree::Queue queue; // 每次创建新队列 + queue.enqueue(42); int value; queue.dequeue(value); + static_cast(value); }); } @@ -708,32 +790,28 @@ BenchmarkResult benchmark_concurrent_echo_clients() { } BenchmarkResult benchmark_small_data_transfer() { - return BenchmarkRunner::run("Small Data Transfer (64B)", []() { - // 与Go/Rust保持一致的数据传输测试 - std::vector data(64); + return BenchmarkRunner::run("Data Transfer (64B)", []() { + std::array data; for (size_t i = 0; i < data.size(); i++) { - data[i] = static_cast(i % 256); + data[i] = static_cast(i); } - // Simulate checksum volatile int sum = 0; - for (uint8_t b : data) { - sum += static_cast(b); + for (auto b : data) { + sum += b; } static_cast(sum); }); } BenchmarkResult benchmark_medium_data_transfer() { - return BenchmarkRunner::run("Medium Data Transfer (4KB)", []() { - // 与Go/Rust保持一致的数据传输测试 + return BenchmarkRunner::run("Data Transfer (4KB)", []() { std::vector data(4096); for (size_t i = 0; i < data.size(); i++) { data[i] = static_cast(i % 256); } - // Simulate checksum - volatile int sum = 0; - for (uint8_t b : data) { - sum += static_cast(b); + volatile size_t sum = 0; + for (auto b : data) { + sum += b; } static_cast(sum); }); @@ -862,12 +940,11 @@ int main() { std::vector results; // Core coroutine benchmarks + results.emplace_back(benchmark_simple_computation()); + results.emplace_back(benchmark_coroutine_creation_only()); results.emplace_back(benchmark_coroutine_creation_and_execution()); results.emplace_back(benchmark_void_coroutine()); - // Core computation benchmarks (与Go/Rust保持一致) - results.emplace_back(benchmark_simple_computation()); - // 复杂任务基准测试 - 测试调度器能力 results.emplace_back(benchmark_complex_computation()); @@ -883,6 +960,11 @@ int main() { // Memory and data structure benchmarks results.emplace_back(benchmark_lockfree_queue()); + results.emplace_back(benchmark_atomic_operations()); + results.emplace_back(benchmark_mutex_lock()); + + // System call benchmarks + results.emplace_back(benchmark_thread_yield()); results.emplace_back(benchmark_memory_allocation()); results.emplace_back(benchmark_memory_pool_allocation()); diff --git a/examples/advanced_features_demo.cpp b/examples/advanced_features_demo.cpp new file mode 100644 index 0000000..42d3645 --- /dev/null +++ b/examples/advanced_features_demo.cpp @@ -0,0 +1,125 @@ +/** + * @file advanced_features_demo.cpp + * @brief 演示FlowCoro的高级架构特性 + */ + +#include +#include + +using namespace flowcoro; + +// 演示1:使用yield进行协作式调度 +Task cooperative_task(int id) { + int result = 0; + for (int i = 0; i < 1000; ++i) { + result += i; + + // 每100次迭代yield一次,让其他协程有机会执行 + if (i % 100 == 0) { + co_await yield(); // 轻量级yield,不需要完整的重新调度 + } + } + co_return result; +} + +// 演示2:批量处理与周期性yield +Task batch_processing_task() { + std::vector data(10000); + size_t counter = 0; + + for (size_t i = 0; i < data.size(); ++i) { + data[i] = i * 2; + + // 使用BatchYieldAwaiter自动管理yield频率 + co_await BatchYieldAwaiter(counter, 500); + } + + std::cout << "Batch processing completed with " << counter << " yields\n"; +} + +// 演示3:高性能并发任务 +Task concurrent_workers() { + std::vector> workers; + + // 创建多个协作式工作任务 + for (int i = 0; i < 10; ++i) { + workers.push_back(cooperative_task(i)); + } + + // 等待所有任务完成 + int total = 0; + for (auto& worker : workers) { + total += co_await worker; + } + + std::cout << "Total result from " << workers.size() << " workers: " << total << "\n"; +} + +// 演示4:立即执行的void任务(利用suspend_never) +Task immediate_void_task() { + // 这个任务会立即执行,因为Task现在使用suspend_never + std::cout << "Immediate void task executing!\n"; + co_return; +} + +// 演示5:组合使用不同的异步原语 +Task combined_async_operations() { + std::cout << "Starting combined operations...\n"; + + // 1. 立即执行的任务 + co_await immediate_void_task(); + + // 2. 协作式yield + co_await yield(); + + // 3. 延时操作 + co_await sleep_for(std::chrono::milliseconds(10)); + + // 4. 并发任务 + auto task1 = cooperative_task(1); + auto task2 = cooperative_task(2); + + auto [value, index] = co_await when_any(std::move(task1), std::move(task2)); + std::cout << "First completed task (index " << index << ") returned: " << value << "\n"; + + std::cout << "Combined operations completed!\n"; +} + +int main() { + std::cout << "=== FlowCoro Advanced Features Demo ===\n\n"; + + try { + // 演示1: 协作式任务 + std::cout << "Demo 1: Cooperative Task\n"; + auto result1 = sync_wait(cooperative_task(0)); + std::cout << "Result: " << result1 << "\n\n"; + + // 演示2: 批量处理 + std::cout << "Demo 2: Batch Processing\n"; + sync_wait(batch_processing_task()); + std::cout << "\n"; + + // 演示3: 并发工作器 + std::cout << "Demo 3: Concurrent Workers\n"; + sync_wait(concurrent_workers()); + std::cout << "\n"; + + // 演示4: 立即执行的void任务 + std::cout << "Demo 4: Immediate Void Task\n"; + sync_wait(immediate_void_task()); + std::cout << "\n"; + + // 演示5: 组合操作 + std::cout << "Demo 5: Combined Async Operations\n"; + sync_wait(combined_async_operations()); + std::cout << "\n"; + + std::cout << "=== All demos completed successfully! ===\n"; + + } catch (const std::exception& e) { + std::cerr << "Exception: " << e.what() << "\n"; + return 1; + } + + return 0; +} diff --git a/include/flowcoro.hpp b/include/flowcoro.hpp index 16127b0..80b2641 100644 --- a/include/flowcoro.hpp +++ b/include/flowcoro.hpp @@ -49,6 +49,8 @@ #include "flowcoro/simple_db.h" #include "flowcoro/rpc.h" #include "flowcoro/channel.h" +#include "flowcoro/yield.h" // 新增:轻量级yield支持 +#include "flowcoro/task_allocator.h" // 新增:缓存友好的任务分配器 // #include "flowcoro/rpc.h" // 暂时注释掉复杂的RPC实现 diff --git a/include/flowcoro/coroutine_manager.h b/include/flowcoro/coroutine_manager.h index 6fc63bc..162843c 100644 --- a/include/flowcoro/coroutine_manager.h +++ b/include/flowcoro/coroutine_manager.h @@ -160,54 +160,7 @@ class CoroutineManager { return timer_id; } -private: - // 专用定时器线程主函数 - void dedicated_timer_thread_func() - { - LOG_INFO("Dedicated timer thread started"); - while (!timer_thread_stop_.load(std::memory_order_acquire)) - { - std::unique_lock lock(timer_mutex_); - if (timer_queue_.empty()) - { - // 等待新的定时器或停止信号 - timer_thread_cv_.wait(lock, [this] - { return timer_thread_stop_.load(std::memory_order_acquire) || - !timer_queue_.empty(); }); - continue; - } - auto now = std::chrono::steady_clock::now(); - const auto &[when, handle] = timer_queue_.top(); - if (when <= now) - { - // 批量处理到期的定时器 - std::vector> expired_handles; - while (!timer_queue_.empty() && timer_queue_.top().first <= now) - { - expired_handles.push_back(timer_queue_.top().second); - timer_queue_.pop(); - } - lock.unlock(); // 释放锁,避免在恢复协程时阻塞 - // 批量恢复协程(通过线程池异步执行) - for (auto h : expired_handles) - { - if (h && !h.done()) - { - // 使用现有的协程池恢复 - schedule_coroutine_enhanced(h); - } - } - } - else - { - // 精确等待到下一个定时器时间 - timer_thread_cv_.wait_until(lock, when, [this] - { return timer_thread_stop_.load(std::memory_order_acquire); }); - } - } - LOG_INFO("Dedicated timer thread stopped"); - } - + // 公开的队列处理方法 - 用于外部驱动 void process_timer_queue() { static constexpr size_t BATCH_SIZE = 32; // 批处理大小 std::array, BATCH_SIZE> batch; @@ -296,6 +249,54 @@ class CoroutineManager { } } +private: + // 专用定时器线程主函数 + void dedicated_timer_thread_func() + { + LOG_INFO("Dedicated timer thread started"); + while (!timer_thread_stop_.load(std::memory_order_acquire)) + { + std::unique_lock lock(timer_mutex_); + if (timer_queue_.empty()) + { + // 等待新的定时器或停止信号 + timer_thread_cv_.wait(lock, [this] + { return timer_thread_stop_.load(std::memory_order_acquire) || + !timer_queue_.empty(); }); + continue; + } + auto now = std::chrono::steady_clock::now(); + const auto &[when, handle] = timer_queue_.top(); + if (when <= now) + { + // 批量处理到期的定时器 + std::vector> expired_handles; + while (!timer_queue_.empty() && timer_queue_.top().first <= now) + { + expired_handles.push_back(timer_queue_.top().second); + timer_queue_.pop(); + } + lock.unlock(); // 释放锁,避免在恢复协程时阻塞 + // 批量恢复协程(通过线程池异步执行) + for (auto h : expired_handles) + { + if (h && !h.done()) + { + // 使用现有的协程池恢复 + schedule_coroutine_enhanced(h); + } + } + } + else + { + // 精确等待到下一个定时器时间 + timer_thread_cv_.wait_until(lock, when, [this] + { return timer_thread_stop_.load(std::memory_order_acquire); }); + } + } + LOG_INFO("Dedicated timer thread stopped"); + } + // 定时器队列(最小堆) std::priority_queue< std::pair>, diff --git a/include/flowcoro/task.h b/include/flowcoro/task.h index dcb215f..92e2cf3 100644 --- a/include/flowcoro/task.h +++ b/include/flowcoro/task.h @@ -258,45 +258,62 @@ struct Task { } } - // 安全恢复协程 - 统一使用协程管理器调度 - if (!handle.done() && !handle.promise().is_cancelled()) { + // 🔧 关键修复:先检查是否已完成(suspend_never 情况下协程会立即执行) + if (handle.done()) { + goto get_result; + } + + // 只有在未完成时才进入调度逻辑 + if (!handle.promise().is_cancelled()) { auto& manager = CoroutineManager::get_instance(); manager.schedule_resume(handle); - // 等待协程完成(优化的自适应等待) - auto wait_time = std::chrono::nanoseconds(100); // 从100ns开始 - int spin_count = 0; - const int max_spins = 1000; // 先自旋1000次 - const auto max_wait = std::chrono::nanoseconds(50000); // 50μs + // 🔧 修复:添加超时保护和正确的等待策略 + auto start_time = std::chrono::steady_clock::now(); + const auto timeout = std::chrono::seconds(5); // 5秒超时 + + auto wait_time = std::chrono::microseconds(1); + const auto max_wait = std::chrono::microseconds(100); + size_t spin_count = 0; + const size_t max_spins = 100; while (!handle.done() && !handle.promise().is_cancelled()) { - manager.drive(); // 驱动协程池执行 + // 超时检查 + auto elapsed = std::chrono::steady_clock::now() - start_time; + if (elapsed > timeout) { + LOG_ERROR("Task::get: Timeout after 5 seconds"); + if constexpr (std::is_default_constructible_v) { + return T{}; + } else { + std::terminate(); + } + } + + // 驱动协程管理器 + manager.drive(); - // 先进行无睡眠的快速轮询 + // 自适应等待策略 if (spin_count < max_spins) { ++spin_count; - continue; + std::this_thread::yield(); + } else { + std::this_thread::sleep_for(wait_time); + wait_time = std::min(wait_time * 2, max_wait); } - - // 然后使用自适应睡眠 - std::this_thread::sleep_for(wait_time); - wait_time = std::min(wait_time * 2, max_wait); } } +get_result: // 检查是否有错误 if (handle.promise().safe_has_error()) { - // 不使用异常,记录错误日志并返回默认值 LOG_ERROR("Task execution failed"); if constexpr (std::is_default_constructible_v) { return T{}; } else { - // 对于不可默认构造的类型,尝试使用value的移动构造 auto safe_value = handle.promise().safe_get_value(); if (safe_value.has_value()) { return std::move(safe_value.value()); } - LOG_ERROR("Cannot provide default value for non-default-constructible type"); std::terminate(); } } diff --git a/include/flowcoro/task_allocator.h b/include/flowcoro/task_allocator.h new file mode 100644 index 0000000..9c6d87d --- /dev/null +++ b/include/flowcoro/task_allocator.h @@ -0,0 +1,66 @@ +#pragma once +#include +#include +#include + +namespace flowcoro { + +// 缓存对齐的协程promise分配器 +// 提高缓存局部性,减少false sharing +template +class CacheAlignedAllocator { +public: + using value_type = T; + using size_type = std::size_t; + using difference_type = std::ptrdiff_t; + + // C++20 alignas支持 + static constexpr size_t alignment = std::hardware_destructive_interference_size; + + CacheAlignedAllocator() noexcept = default; + + template + CacheAlignedAllocator(const CacheAlignedAllocator&) noexcept {} + + T* allocate(size_t n) { + if (n == 0) return nullptr; + + size_t size = n * sizeof(T); + // 对齐到缓存行 + void* ptr = ::operator new(size, std::align_val_t{alignment}); + + if (!ptr) { + throw std::bad_alloc(); + } + + return static_cast(ptr); + } + + void deallocate(T* ptr, size_t) noexcept { + ::operator delete(ptr, std::align_val_t{alignment}); + } + + template + bool operator==(const CacheAlignedAllocator&) const noexcept { + return true; + } + + template + bool operator!=(const CacheAlignedAllocator&) const noexcept { + return false; + } +}; + +// 为promise_type提供自定义分配器支持 +// 可以在promise_type中添加以下方法来使用: +// void* operator new(std::size_t size) { +// CacheAlignedAllocator alloc; +// return alloc.allocate(1); +// } +// +// void operator delete(void* ptr) noexcept { +// CacheAlignedAllocator alloc; +// alloc.deallocate(static_cast(ptr), 1); +// } + +} // namespace flowcoro diff --git a/include/flowcoro/task_specializations.h b/include/flowcoro/task_specializations.h index 468cf7b..4e01518 100644 --- a/include/flowcoro/task_specializations.h +++ b/include/flowcoro/task_specializations.h @@ -218,17 +218,29 @@ struct Task { std::atomic is_destroyed_{false}; std::chrono::steady_clock::time_point creation_time_; - promise_type() : creation_time_(std::chrono::steady_clock::now()) {} + promise_type() : creation_time_(std::chrono::steady_clock::now()) { + // 记录任务创建 + PerformanceMonitor::get_instance().on_task_created(); + } // 析构时标记销毁 ~promise_type() { is_destroyed_.store(true, std::memory_order_release); + + // 记录任务状态 + if (has_error) { + PerformanceMonitor::get_instance().on_task_failed(); + } else if (is_cancelled()) { + PerformanceMonitor::get_instance().on_task_cancelled(); + } else { + PerformanceMonitor::get_instance().on_task_completed(); + } } Task get_return_object() { return Task{std::coroutine_handle::from_promise(*this)}; } - std::suspend_always initial_suspend() noexcept { return {}; } // 懒执行 + std::suspend_never initial_suspend() noexcept { return {}; } // 立即执行 - 与Task保持一致 // 支持continuation的final_suspend auto final_suspend() noexcept { @@ -389,12 +401,19 @@ struct Task { return; } - // 支持懒执行:启动协程并驱动事件循环 - if (!handle.done() && !handle.promise().is_cancelled()) { - // 获取协程管理器实例 + // 🔧 关键修复:先检查是否已完成(suspend_never 情况下协程会立即执行) + if (handle.done()) { + // 检查是否有错误 + if (handle.promise().safe_has_error()) { + LOG_ERROR("Task execution failed"); + } + return; + } + + // 只有在未完成时才进入调度逻辑 + if (!handle.promise().is_cancelled()) { auto& manager = CoroutineManager::get_instance(); - // 启动协程(如果尚未开始)- 使用协程管理器调度 try { manager.schedule_resume(handle); } catch (const std::exception& e) { @@ -405,17 +424,31 @@ struct Task { return; } - // 等待协程完成(同步等待)- 快速自适应等待 + // 🔧 修复:添加超时保护和优化的等待策略 + auto start_time = std::chrono::steady_clock::now(); + const auto timeout = std::chrono::seconds(5); + + auto wait_time = std::chrono::microseconds(1); + const auto max_wait = std::chrono::microseconds(100); + size_t spin_count = 0; + const size_t max_spins = 100; + while (!handle.done() && !handle.promise().is_cancelled()) { - manager.drive(); // 驱动协程池执行 - // 快速检查几次 - for (int i = 0; i < 100 && !handle.done() && !handle.promise().is_cancelled(); ++i) { - manager.drive(); - std::this_thread::yield(); + // 超时检查 + auto elapsed = std::chrono::steady_clock::now() - start_time; + if (elapsed > timeout) { + LOG_ERROR("Task::get: Timeout after 5 seconds"); + return; } - // 如果还没完成,短暂休眠 - if (!handle.done() && !handle.promise().is_cancelled()) { - std::this_thread::sleep_for(std::chrono::microseconds(1)); + + manager.drive(); + + if (spin_count < max_spins) { + ++spin_count; + std::this_thread::yield(); + } else { + std::this_thread::sleep_for(wait_time); + wait_time = std::min(wait_time * 2, max_wait); } } } @@ -444,7 +477,7 @@ struct Task { } bool await_suspend(std::coroutine_handle<> waiting_handle) { - // 懒加载Task的正确实现 - Task版本 + // Task版本 - 与Task保持一致的实现 if (!handle || handle.promise().is_destroyed()) { // 句柄无效,直接恢复等待协程 auto& manager = CoroutineManager::get_instance(); @@ -462,10 +495,6 @@ struct Task { // 设置continuation:当task完成时恢复waiting_handle handle.promise().set_continuation(waiting_handle); - // 启动懒加载的任务执行(仅首次) - auto& manager = CoroutineManager::get_instance(); - manager.schedule_resume(handle); - // 挂起等待协程,等待task通过continuation唤醒 return true; } diff --git a/include/flowcoro/yield.h b/include/flowcoro/yield.h new file mode 100644 index 0000000..120a456 --- /dev/null +++ b/include/flowcoro/yield.h @@ -0,0 +1,50 @@ +#pragma once +#include +#include "coroutine_manager.h" + +namespace flowcoro { + +// 轻量级yield awaiter - 立即恢复,但给其他协程执行机会 +struct YieldAwaiter { + bool await_ready() const noexcept { + return false; // 总是挂起以给其他协程机会 + } + + void await_suspend(std::coroutine_handle<> h) noexcept { + // 立即重新调度当前协程,但允许其他协程先执行 + auto& manager = CoroutineManager::get_instance(); + manager.schedule_resume(h); + } + + void await_resume() const noexcept {} +}; + +// 便捷函数:让出执行权 +inline YieldAwaiter yield() noexcept { + return {}; +} + +// 优化的批量操作awaiter - 在循环中周期性yield +class BatchYieldAwaiter { +private: + size_t& counter_; + const size_t yield_interval_; + +public: + BatchYieldAwaiter(size_t& counter, size_t yield_interval = 100) + : counter_(counter), yield_interval_(yield_interval) {} + + bool await_ready() const noexcept { + // 只在达到间隔时才挂起 + return (++counter_ % yield_interval_) != 0; + } + + void await_suspend(std::coroutine_handle<> h) noexcept { + auto& manager = CoroutineManager::get_instance(); + manager.schedule_resume(h); + } + + void await_resume() const noexcept {} +}; + +} // namespace flowcoro diff --git a/src/coroutine_pool.cpp b/src/coroutine_pool.cpp index 5e93989..32da5a3 100644 --- a/src/coroutine_pool.cpp +++ b/src/coroutine_pool.cpp @@ -250,6 +250,29 @@ class CoroutineScheduler { return queue_size_.load(std::memory_order_relaxed); } + // 工作窃取:尝试从队列中获取一批任务给其他调度器 + size_t try_steal_work(std::vector>& stolen_tasks, size_t max_steal = 32) { + if (queue_size_.load(std::memory_order_relaxed) <= 1) { + return 0; // 队列太小,不值得窃取 + } + + size_t stolen_count = 0; + std::coroutine_handle<> handle; + + // 窃取一半的任务(最多max_steal个) + size_t target_steal = std::min(queue_size_.load() / 2, max_steal); + + while (stolen_count < target_steal && coroutine_queue_.dequeue(handle)) { + if (handle && !handle.done()) { + stolen_tasks.push_back(handle); + stolen_count++; + queue_size_.fetch_sub(1, std::memory_order_relaxed); + } + } + + return stolen_count; + } + size_t get_total_coroutines() const { return total_coroutines_.load(); } size_t get_completed_coroutines() const { return completed_coroutines_.load(); } size_t get_scheduler_id() const { return scheduler_id_; } @@ -394,14 +417,29 @@ class CoroutinePool { }); } - // 驱动协程池 - 现在由各个调度器自动运行,无需手动驱动 + // 驱动协程池 - 增强版本:主动处理队列并推进协程执行 void drive() { - // 多调度器架构下,每个调度器都在独立线程中自动运行 - // 这个方法保留为兼容接口,实际不需要调用 - if (stop_flag_.load()) return; + if (stop_flag_.load(std::memory_order_relaxed)) return; + + // 获取协程管理器并处理其队列 + auto& manager = flowcoro::CoroutineManager::get_instance(); + + // 1. 处理定时器队列 - 将到期的定时器移到就绪队列 + manager.process_timer_queue(); + + // 2. 处理就绪队列 - 立即执行就绪的协程 + manager.process_ready_queue(); + + // 3. 给调度器线程一些时间处理 + // 这对于异步任务很重要 + for (auto& scheduler : schedulers_) { + if (scheduler) { + std::this_thread::yield(); + } + } - // 可以在这里添加一些全局监控逻辑 - // 但协程调度已经由各个独立调度器自动处理 + // 4. 处理待销毁的协程 - 清理资源 + manager.process_pending_tasks(); } // 获取统计信息