From c5dc7ee0a110c1e35dad2de99c1eb818f2b5f278 Mon Sep 17 00:00:00 2001 From: Alexey Ozeritskiy Date: Sun, 26 Apr 2026 00:01:21 +0100 Subject: [PATCH 1/2] Fix future wait any --- coroio/corochain.hpp | 16 ++++++++++++++ coroio/poller.hpp | 37 +++++++++++++++++++++----------- coroio/socket.hpp | 50 +++++++++++++++++++++++++++++++++++++------- tests/tests.cpp | 32 ++++++++++++++++++++++++++++ 4 files changed, 115 insertions(+), 20 deletions(-) diff --git a/coroio/corochain.hpp b/coroio/corochain.hpp index 5503eae..1db3ee4 100644 --- a/coroio/corochain.hpp +++ b/coroio/corochain.hpp @@ -163,6 +163,12 @@ struct TFutureBase { Coro.promise().Caller = caller; } + void detach() { + if (Coro) { + Coro.promise().Caller = std::noop_coroutine(); + } + } + using promise_type = TPromise; protected: @@ -363,6 +369,11 @@ TFuture Any(std::vector>&& futures) { f.await_suspend(self); } co_await std::suspend_always(); + for (auto& f : all) { + if (!f.done()) { + f.detach(); + } + } co_return std::find_if(all.begin(), all.end(), [](auto& f) { return f.done(); })->await_resume(); } @@ -384,6 +395,11 @@ inline TFuture Any(std::vector>&& futures) { f.await_suspend(self); } co_await std::suspend_always(); + for (auto& f : all) { + if (!f.done()) { + f.detach(); + } + } co_return; } diff --git a/coroio/poller.hpp b/coroio/poller.hpp index fa7a902..9c945eb 100644 --- a/coroio/poller.hpp +++ b/coroio/poller.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -120,21 +121,17 @@ class TPollerBase { * @param fd The file descriptor. */ void RemoveEvent(int fd) { - // TODO: resume waiting coroutines here MaxFd_ = std::max(MaxFd_, fd); Changes_.emplace_back(TEvent{fd, TEvent::READ|TEvent::WRITE|TEvent::RHUP, {}}); + if (!ReadyEvents_.empty()) { + RemovedFdsCurrentLoop_.insert(fd); + } } - /** - * @brief No-op placeholder for future cleanup by handle. - * - * Intended to be called by destructors of unfinished futures so that - * pending waits can be unregistered. Currently unimplemented. - * - * @param h The coroutine handle (unused). - */ - void RemoveEvent(THandle /*h*/) { - // TODO: Add new vector for this type of removing - // Will be called in destuctor of unfinished futures + + void RemoveReadyHandle(THandle h) { + if (!ReadyEvents_.empty()) { + RemovedHandlesCurrentLoop_.insert(h.address()); + } } /** * @brief Suspends execution until the specified time. @@ -254,8 +251,20 @@ class TPollerBase { */ void WakeupReadyHandles() { for (auto&& ev : ReadyEvents_) { + if (!ev.Handle) { continue; } + if (!RemovedFdsCurrentLoop_.empty()) [[unlikely]] { + if (ev.Fd >= 0 && RemovedFdsCurrentLoop_.count(ev.Fd)) { continue; } + } + if (!RemovedHandlesCurrentLoop_.empty()) [[unlikely]] { + if (RemovedHandlesCurrentLoop_.count(ev.Handle.address())) { + RemovedHandlesCurrentLoop_.erase(ev.Handle.address()); + continue; + } + } Wakeup(std::move(ev)); } + RemovedFdsCurrentLoop_.clear(); + RemovedHandlesCurrentLoop_.clear(); } /** * @brief Sets the maximum polling duration. @@ -303,6 +312,8 @@ class TPollerBase { ReadyEvents_.clear(); Changes_.clear(); MaxFd_ = 0; + RemovedFdsCurrentLoop_.clear(); + RemovedHandlesCurrentLoop_.clear(); } /** * @brief Processes scheduled timers. @@ -333,6 +344,8 @@ class TPollerBase { int MaxFd_ = 0; ///< Highest file descriptor in use. std::vector Changes_; ///< Pending changes (registered events). std::vector ReadyEvents_; ///< Events ready to wake up their coroutines. + std::unordered_set RemovedFdsCurrentLoop_; ///< Fds cancelled mid-loop (fd-based backends). + std::unordered_set RemovedHandlesCurrentLoop_; ///< Handles cancelled mid-loop (uring/IOCP). unsigned TimerId_ = 0; ///< Counter for generating unique timer IDs. std::priority_queue Timers_; ///< Priority queue for scheduled timers. TTime LastTimersProcessTime_; ///< Last time timers were processed. diff --git a/coroio/socket.hpp b/coroio/socket.hpp index fe70044..c3217ef 100644 --- a/coroio/socket.hpp +++ b/coroio/socket.hpp @@ -573,10 +573,12 @@ class TPollerDrivenSocket: public TSocket struct TAwaitable { bool await_ready() const { return false; } void await_suspend(std::coroutine_handle<> h) { + handle_ = h; poller->Accept(fd, reinterpret_cast(&addr[0]), &len, h); } TPollerDrivenSocket await_resume() { + handle_ = {}; int clientfd = poller->Result(); if (clientfd < 0) { throw std::system_error(-clientfd, std::generic_category(), "accept"); @@ -585,8 +587,13 @@ class TPollerDrivenSocket: public TSocket return TPollerDrivenSocket{TAddress{reinterpret_cast(&addr[0]), len}, clientfd, *poller}; } + ~TAwaitable() { + if (handle_) { poller->RemoveReadyHandle(handle_); } + } + T* poller; int fd; + std::coroutine_handle<> handle_; char addr[2*(sizeof(sockaddr_in6)+16)] = {0}; // use additional memory for windows socklen_t len = static_cast(sizeof(addr)); @@ -616,6 +623,7 @@ class TPollerDrivenSocket: public TSocket bool await_ready() const { return false; } void await_suspend(std::coroutine_handle<> h) { + handle_ = h; poller->Connect(fd, addr.first, addr.second, h); if (deadline != TTime::max()) { timerId = poller->AddTimer(deadline, h); @@ -623,6 +631,7 @@ class TPollerDrivenSocket: public TSocket } void await_resume() { + handle_ = {}; if (deadline != TTime::max() && poller->RemoveTimer(timerId, deadline)) { poller->Cancel(fd); throw std::system_error(std::make_error_code(std::errc::timed_out)); @@ -633,11 +642,16 @@ class TPollerDrivenSocket: public TSocket } } + ~TAwaitable() { + if (handle_) { poller->RemoveReadyHandle(handle_); } + } + T* poller; int fd; std::pair addr; TTime deadline; unsigned timerId = 0; + std::coroutine_handle<> handle_; }; return TAwaitable{Poller_, Fd_, RemoteAddr()->RawAddr(), deadline}; } @@ -656,10 +670,12 @@ class TPollerDrivenSocket: public TSocket struct TAwaitable { bool await_ready() const { return false; } void await_suspend(std::coroutine_handle<> h) { + handle_ = h; poller->Recv(fd, buf, size, h); } auto await_resume() { + handle_ = {}; auto ret = poller->Result(); if (ret < 0) { #ifdef _WIN32 @@ -678,11 +694,15 @@ class TPollerDrivenSocket: public TSocket return ret; } + ~TAwaitable() { + if (handle_) { poller->RemoveReadyHandle(handle_); } + } + T* poller; int fd; - void* buf; size_t size; + std::coroutine_handle<> handle_; }; return TAwaitable{Poller_, Fd_, buf, size}; @@ -702,10 +722,12 @@ class TPollerDrivenSocket: public TSocket struct TAwaitable { bool await_ready() const { return false; } void await_suspend(std::coroutine_handle<> h) { + handle_ = h; poller->Send(fd, buf, size, h); } auto await_resume() { + handle_ = {}; auto ret = poller->Result(); if (ret < 0) { #ifdef _WIN32 @@ -724,22 +746,24 @@ class TPollerDrivenSocket: public TSocket return ret; } + ~TAwaitable() { + if (handle_) { poller->RemoveReadyHandle(handle_); } + } + T* poller; int fd; - const void* buf; size_t size; + std::coroutine_handle<> handle_; }; return TAwaitable{Poller_, Fd_, buf, size}; } - /// The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome. auto WriteSomeYield(const void* buf, size_t size) { return WriteSome(buf, size); } - /// The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome. auto ReadSomeYield(void* buf, size_t size) { return ReadSome(buf, size); } @@ -795,10 +819,12 @@ class TPollerDrivenFileHandle: public TFileHandle struct TAwaitable { bool await_ready() const { return false; } void await_suspend(std::coroutine_handle<> h) { + handle_ = h; poller->Read(fd, buf, size, h); } auto await_resume() { + handle_ = {}; auto ret = poller->Result(); if (ret < 0) { #ifdef _WIN32 @@ -817,11 +843,15 @@ class TPollerDrivenFileHandle: public TFileHandle return ret; } + ~TAwaitable() { + if (handle_) { poller->RemoveReadyHandle(handle_); } + } + T* poller; int fd; - void* buf; size_t size; + std::coroutine_handle<> handle_; }; return TAwaitable{Poller_, Fd_, buf, size}; @@ -841,10 +871,12 @@ class TPollerDrivenFileHandle: public TFileHandle struct TAwaitable { bool await_ready() const { return false; } void await_suspend(std::coroutine_handle<> h) { + handle_ = h; poller->Write(fd, buf, size, h); } auto await_resume() { + handle_ = {}; auto ret = poller->Result(); if (ret < 0) { #ifdef _WIN32 @@ -863,22 +895,24 @@ class TPollerDrivenFileHandle: public TFileHandle return ret; } + ~TAwaitable() { + if (handle_) { poller->RemoveReadyHandle(handle_); } + } + T* poller; int fd; - const void* buf; size_t size; + std::coroutine_handle<> handle_; }; return TAwaitable{Poller_, Fd_, buf, size}; } - /// The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome. auto WriteSomeYield(const void* buf, size_t size) { return WriteSome(buf, size); } - /// The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome. auto ReadSomeYield(void* buf, size_t size) { return ReadSome(buf, size); } diff --git a/tests/tests.cpp b/tests/tests.cpp index bf05a95..b3312b0 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -781,6 +781,37 @@ void test_zero_copy_line_splitter_wrap(void**) { assert_string_equal("cc\n", result.data()); } +template +void test_any_simultaneous(void**) { + TLoop loop; + + int fds1[2], fds2[2]; + assert_int_equal(pipe(fds1), 0); + assert_int_equal(pipe(fds2), 0); + assert_int_equal(write(fds1[1], "x", 1), 1); + assert_int_equal(write(fds2[1], "x", 1), 1); + + auto read_one = [](TPoller& poller, int fd) -> TFuture { + typename TPoller::TFileHandle fh(fd, poller); + char buf[1]; + co_return co_await fh.ReadSomeYield(buf, 1); + }; + + TFuture task = [&](TPoller& poller) -> TFuture { + std::vector> futures; + futures.push_back(read_one(poller, fds1[0])); + futures.push_back(read_one(poller, fds2[0])); + int r = co_await Any(std::move(futures)); + assert_true(r == 1 || r == 2); + }(loop.Poller()); + + while (!task.done()) { + loop.Step(); + } + close(fds1[1]); + close(fds2[1]); +} + void test_self_id(void**) { void* id; TFuture h = [](void** id) -> TFuture { @@ -1338,6 +1369,7 @@ int main(int argc, char* argv[]) { ADD_TEST(cmocka_unit_test, test_zero_copy_line_splitter); ADD_TEST(cmocka_unit_test, test_line_splitter_wrap); ADD_TEST(cmocka_unit_test, test_zero_copy_line_splitter_wrap); + ADD_TEST(my_unit_poller, test_any_simultaneous); ADD_TEST(cmocka_unit_test, test_self_id); ADD_TEST(cmocka_unit_test, test_resolv_nameservers); ADD_TEST(my_unit_poller, test_listen); From db4a42f352ae9f32403c93bc300172b5b0d480ef Mon Sep 17 00:00:00 2001 From: Alexey Ozeritskiy Date: Sun, 26 Apr 2026 17:06:49 +0100 Subject: [PATCH 2/2] Update --- coroio/backends/iocp.hpp | 1 - coroio/backends/uring.hpp | 1 - coroio/poller.hpp | 12 ++++++++++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/coroio/backends/iocp.hpp b/coroio/backends/iocp.hpp index 3428186..afe9e7c 100644 --- a/coroio/backends/iocp.hpp +++ b/coroio/backends/iocp.hpp @@ -142,7 +142,6 @@ class TIOCp: public TPollerBase { // Allocator to avoid dynamic memory allocation for each IOCP event structure. TArenaAllocator Allocator_; std::vector Entries_; - std::queue Results_; }; } diff --git a/coroio/backends/uring.hpp b/coroio/backends/uring.hpp index b390510..791a16a 100644 --- a/coroio/backends/uring.hpp +++ b/coroio/backends/uring.hpp @@ -188,7 +188,6 @@ class TUring: public TPollerBase { int RingFd_; ///< File descriptor for the io_uring. int EpollFd_; ///< Epoll file descriptor (for integration with epoll). struct io_uring Ring_; ///< The io_uring structure. - std::queue Results_; ///< Queue of results for completed operations. std::vector Buffer_; ///< Buffer used for internal I/O operations. }; diff --git a/coroio/poller.hpp b/coroio/poller.hpp index 9c945eb..5ac3372 100644 --- a/coroio/poller.hpp +++ b/coroio/poller.hpp @@ -251,13 +251,20 @@ class TPollerBase { */ void WakeupReadyHandles() { for (auto&& ev : ReadyEvents_) { - if (!ev.Handle) { continue; } + if (!ev.Handle) { + continue; + } if (!RemovedFdsCurrentLoop_.empty()) [[unlikely]] { - if (ev.Fd >= 0 && RemovedFdsCurrentLoop_.count(ev.Fd)) { continue; } + if (ev.Fd >= 0 && RemovedFdsCurrentLoop_.count(ev.Fd)) { + continue; + } } if (!RemovedHandlesCurrentLoop_.empty()) [[unlikely]] { if (RemovedHandlesCurrentLoop_.count(ev.Handle.address())) { RemovedHandlesCurrentLoop_.erase(ev.Handle.address()); + if (ev.Fd < 0 && !Results_.empty()) { + Results_.pop(); + } continue; } } @@ -344,6 +351,7 @@ class TPollerBase { int MaxFd_ = 0; ///< Highest file descriptor in use. std::vector Changes_; ///< Pending changes (registered events). std::vector ReadyEvents_; ///< Events ready to wake up their coroutines. + std::queue Results_; ///< Results queue for uring/IOCP completions (shared to allow mid-loop discard). std::unordered_set RemovedFdsCurrentLoop_; ///< Fds cancelled mid-loop (fd-based backends). std::unordered_set RemovedHandlesCurrentLoop_; ///< Handles cancelled mid-loop (uring/IOCP). unsigned TimerId_ = 0; ///< Counter for generating unique timer IDs.