diff --git a/compiler/code-gen/vertex-compiler.cpp b/compiler/code-gen/vertex-compiler.cpp index 0ca596f4c0..bba72d1a88 100644 --- a/compiler/code-gen/vertex-compiler.cpp +++ b/compiler/code-gen/vertex-compiler.cpp @@ -7,7 +7,9 @@ #include #include #include +#include +#include "auto/compiler/vertex/vertex-types.h" #include "common/containers/final_action.h" #include "common/wrappers/field_getter.h" #include "common/wrappers/likely.h" @@ -459,6 +461,21 @@ inline int64_t can_use_precomputed_hash_indexing_array(VertexPtr key) { return 0; } +// Helper to recursively check if a vertex subtree contains interruptible function calls +bool contains_interruptible_call(VertexPtr vertex) { + if (auto func_call = vertex.try_as(); func_call && func_call->func_id && func_call->func_id->is_interruptible) { + return true; + } + + for (auto child : *vertex) { + if (contains_interruptible_call(child)) { + return true; + } + } + + return false; +} + void compile_null_coalesce(VertexAdaptor root, CodeGenerator &W) { const TypeData *type = tinf::get_type(root); auto lhs = root->lhs(); @@ -469,18 +486,12 @@ void compile_null_coalesce(VertexAdaptor root, CodeGenerator & W << "TRY_CALL_ " << MacroBegin{} << TypeName{type} << ", "; } - /* TODO:K2 - * In current implementation all non-trivial finialize block marked as cpp coroutine. - * This leads to redundant coroutines, but eliminates the need to traverse the rhs subtree - * to find whether it actually contains interruptible call. It can be fixed in the future - * to reduce count of cpp coroutines. - */ - bool interruptible_call = G->is_output_mode_k2() && - !vk::any_of_equal(rhs->type(), op_var, op_int_const, op_float_const, op_false, op_null) && - W.get_context().parent_func->is_interruptible; + const bool interruptible_rhs = + G->is_output_mode_k2() && !vk::any_of_equal(rhs->type(), op_var, op_int_const, op_float_const, op_false, op_null) && contains_interruptible_call(rhs); - if (interruptible_call) { + if (interruptible_rhs) { W << "co_await "; + G->stats.on_interruptible_null_coalescing(); } W << "NullCoalesce< " << TypeName{type} << " >("; @@ -499,11 +510,11 @@ void compile_null_coalesce(VertexAdaptor root, CodeGenerator & } W << ").finalize("; - if (vk::any_of_equal(rhs->type(), op_var, op_int_const, op_float_const, op_false, op_null)) { W << rhs; } else { - auto &context = W.get_context(); + auto& context = W.get_context(); + const auto saved_interruptible_flag = std::exchange(context.interruptible_flag, interruptible_rhs); context.catch_labels.emplace_back(); ++context.inside_null_coalesce_fallback; /* TODO: K2 @@ -520,20 +531,22 @@ void compile_null_coalesce(VertexAdaptor root, CodeGenerator & W.get_context().null_coalescing_rhs_t = tinf::get_type(rhs); FunctionSignatureGenerator(W) << "[&] ()"; - W << " -> " << (interruptible_call ? "kphp::coro::task<" : "") << TypeName{tinf::get_type(rhs)} << (interruptible_call ? "> " : " ") << BEGIN; - W << (interruptible_call ? "co_return " : "return ") << rhs << ";" << NL; + W << " -> " << (interruptible_rhs ? "kphp::coro::task<" : "") << TypeName{tinf::get_type(rhs)} << (interruptible_rhs ? "> " : " ") << BEGIN; + W << (interruptible_rhs ? "co_return " : "return ") << rhs << ";" << NL; W << END; } context.catch_labels.pop_back(); kphp_assert(context.inside_null_coalesce_fallback > 0); context.inside_null_coalesce_fallback--; + context.interruptible_flag = saved_interruptible_flag; } W << ")"; if (rhs->throw_flag) { W << ", " << ThrowAction{} << MacroEnd{}; } + } void compile_binary_func_op(VertexAdaptor root, CodeGenerator &W) { @@ -921,8 +934,13 @@ void compile_func_call(VertexAdaptor root, CodeGenerator &W, func_ } if (mode == func_call_mode::fork_call) { - if (func->is_interruptible) { - W << "(kphp::forks::start(" << FunctionName(func); + if (G->is_output_mode_k2()) { + if (func->is_interruptible) { + W << "kphp::forks::start(" << FunctionName(func); + } else { + // For non-interruptible functions, wrap with lift_sync(f, args...) + W << "kphp::forks::start(kphp::coro::lift_sync(" << FunctionName(func) << (!root->args().empty() ? ", " : ""); + } } else { W << FunctionForkName(func); } @@ -934,10 +952,13 @@ void compile_func_call(VertexAdaptor root, CodeGenerator &W, func_ } } if (func && func->cpp_template_call) { - const TypeData *tp = tinf::get_type(root); + const TypeData* tp = tinf::get_type(root); W << "< " << TypeName(tp) << " >"; } - W << "("; + // Don't emit argument parentheses when wrapping non-interruptible function with kphp::coro::list_sync + if (mode != func_call_mode::fork_call || !G->is_output_mode_k2() || func->is_interruptible) { + W << "("; + } if (func && func->is_extern() && vk::any_of_equal(func->name, "JsonEncoder$$to_json_impl", "JsonEncoder$$from_json_impl")) { root = patch_compiling_json_impl_call(W, root); @@ -956,12 +977,18 @@ void compile_func_call(VertexAdaptor root, CodeGenerator &W, func_ if (is_function_call_should_be_tracked(func)) { W << "))"; } - W << ")"; - if (func->is_interruptible) { + + // Don't emit argument parentheses when wrapping non-interruptible function with kphp::coro::list_sync + if (mode != func_call_mode::fork_call || !G->is_output_mode_k2() || func->is_interruptible) { + W << ")"; + } + + // For K2 fork call, close forks::start (and lift_sync for non-interruptible functions) + if (G->is_output_mode_k2()) { if (mode == func_call_mode::fork_call) { - W << "))"; + W << (func->is_interruptible ? ")" : "))"); } else { - W << ")"; + W << (func->is_interruptible ? ")" : ""); } } } diff --git a/compiler/pipes/calc-bad-vars.cpp b/compiler/pipes/calc-bad-vars.cpp index 1337d3d3ee..04cc2b71c3 100644 --- a/compiler/pipes/calc-bad-vars.cpp +++ b/compiler/pipes/calc-bad-vars.cpp @@ -538,17 +538,17 @@ class CalcBadVars { } } - static void calc_interruptible(const FuncCallGraph &call_graph) { + static void calc_interruptible(const FuncCallGraph& call_graph) { IdMap into_interruptible(call_graph.n); IdMap to_parents(call_graph.n); - for (const auto &func : call_graph.functions) { + for (const auto& func : call_graph.functions) { if (func->is_interruptible) { mark(call_graph.rev_graph, into_interruptible, func, to_parents); } } - for (const auto &func : call_graph.functions) { + for (const auto& func : call_graph.functions) { if (into_interruptible[func]) { func->is_interruptible = true; if (unlikely(func->class_id && func->class_id == G->get_class("ArrayAccess"))) { @@ -562,12 +562,11 @@ class CalcBadVars { static void calc_k2_fork(const FuncCallGraph& call_graph, const std::vector& dep_data) { for (int i = 0; i < call_graph.n; ++i) { - for (const auto& fork : dep_data[i].forks) { - fork->is_interruptible = true; - if (!std::exchange(fork->is_k2_fork, true)) { // check only once - for (VarPtr param : fork->param_ids) { - kphp_error(!param->is_reference, fmt_format("Function '{}' cannot be forked since it has a reference parameter '{}'\n", fork->as_human_readable(), - param->as_human_readable())); + for (const auto& forked_function : dep_data[i].forks) { + if (!std::exchange(forked_function->is_k2_fork, true)) { // check only once + for (VarPtr param : forked_function->param_ids) { + kphp_error(!param->is_reference, fmt_format("Function '{}' cannot be forked since it has a reference parameter '{}'\n", + forked_function->as_human_readable(), param->as_human_readable())); } } } diff --git a/compiler/pipes/calc-func-dep.cpp b/compiler/pipes/calc-func-dep.cpp index b6adbfa017..02bcf435ee 100644 --- a/compiler/pipes/calc-func-dep.cpp +++ b/compiler/pipes/calc-func-dep.cpp @@ -137,7 +137,6 @@ VertexPtr CalcFuncDepPass::on_exit_vertex(VertexPtr vertex) { return vertex; } - DepData CalcFuncDepPass::get_data() { my_unique(&data.dep); my_unique(&data.modified_global_vars); diff --git a/compiler/stats.cpp b/compiler/stats.cpp index 86bfb04207..3f6d84ae52 100644 --- a/compiler/stats.cpp +++ b/compiler/stats.cpp @@ -52,6 +52,10 @@ void Stats::on_function_processed(FunctionPtr function) { } } +void Stats::on_interruptible_null_coalescing() { + ++interruptible_null_coalescings_; +} + void Stats::update_memory_stats() { const mem_info_t mem_info = get_self_mem_stats(); memory_rss_ = mem_info.rss; @@ -82,6 +86,7 @@ void Stats::write_to(std::ostream &out, bool with_indent) const { out << indent << "functions.total_throwing: " << total_throwing_functions_ << std::endl; out << indent << "functions.total_resumable: " << total_resumable_functions_ << std::endl; out << indent << "functions.total_interruptible: " << total_interruptible_functions_ << std::endl; + out << indent << "codegen.interruptible_null_coalescings: " << interruptible_null_coalescings_ << std::endl; out << block_sep; out << indent << "memory.rss: " << memory_rss_ * 1024 << std::endl; out << indent << "memory.rss_peak: " << memory_rss_peak_ * 1024 << std::endl; diff --git a/compiler/stats.h b/compiler/stats.h index 5639ed2406..16931aa9d8 100644 --- a/compiler/stats.h +++ b/compiler/stats.h @@ -14,6 +14,7 @@ class Stats { public: void on_var_inserting(VarData::Type type); void on_function_processed(FunctionPtr function); + void on_interruptible_null_coalescing(); void update_memory_stats(); @@ -69,6 +70,7 @@ class Stats { std::atomic total_resumable_functions_{0u}; std::atomic total_interruptible_functions_{0u}; std::atomic total_inline_functions_{0u}; + std::atomic interruptible_null_coalescings_{0u}; std::atomic memory_rss_{0}; std::atomic memory_rss_peak_{0}; diff --git a/runtime-light/coroutine/lift-sync.h b/runtime-light/coroutine/lift-sync.h new file mode 100644 index 0000000000..b61087d113 --- /dev/null +++ b/runtime-light/coroutine/lift-sync.h @@ -0,0 +1,58 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2026 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include +#include +#include + +#include "runtime-light/coroutine/type-traits.h" + +namespace kphp::coro { + +namespace details { + +template +class lift_sync_awaiter { + F m_func; + std::tuple m_args; + +public: + template func_type, std::same_as... args_type> + explicit lift_sync_awaiter(func_type&& f, args_type&&... args) noexcept + : m_func(std::forward(f)), + m_args(std::forward(args)...) {} + + lift_sync_awaiter(lift_sync_awaiter&& other) noexcept = default; + lift_sync_awaiter& operator=(lift_sync_awaiter&& other) noexcept = default; + ~lift_sync_awaiter() = default; + + lift_sync_awaiter(const lift_sync_awaiter&) = delete; + lift_sync_awaiter& operator=(const lift_sync_awaiter&) = delete; + + constexpr auto await_ready() const noexcept -> bool { + return true; + } + + constexpr auto await_suspend(std::coroutine_handle<> /*unused*/) const noexcept -> void { + std::unreachable(); + } + + auto await_resume() noexcept -> std::invoke_result_t { + return std::apply(std::move(m_func), std::move(m_args)); + } +}; + +} // namespace details + +template +requires std::invocable && (!kphp::coro::is_async_function_v) +auto lift_sync(F&& f, Args&&... args) noexcept { + return details::lift_sync_awaiter{std::forward(f), std::forward(args)...}; +} + +} // namespace kphp::coro diff --git a/runtime-light/stdlib/fork/fork-functions.h b/runtime-light/stdlib/fork/fork-functions.h index fd2c4de625..328d355daa 100644 --- a/runtime-light/stdlib/fork/fork-functions.h +++ b/runtime-light/stdlib/fork/fork-functions.h @@ -59,10 +59,10 @@ auto id_managed(awaitable_type awaitable) noexcept -> kphp::coro::task -auto start(kphp::coro::task task) noexcept -> int64_t { +template +auto start(awaitable_type awaitable) noexcept -> int64_t { auto& fork_instance_st{ForkInstanceState::get()}; - auto [fork_id, fork_task]{fork_instance_st.create_fork(std::move(task))}; + auto [fork_id, fork_task]{fork_instance_st.create_fork(std::move(awaitable))}; auto saved_fork_id{fork_instance_st.current_id}; kphp::log::assertion(kphp::coro::io_scheduler::get().start(std::move(fork_task))); fork_instance_st.current_id = saved_fork_id; diff --git a/runtime-light/stdlib/fork/fork-state.h b/runtime-light/stdlib/fork/fork-state.h index d98eddccb5..0cb7ed7199 100644 --- a/runtime-light/stdlib/fork/fork-state.h +++ b/runtime-light/stdlib/fork/fork-state.h @@ -4,6 +4,7 @@ #pragma once +#include #include #include #include @@ -13,8 +14,9 @@ #include "common/mixin/not_copyable.h" #include "runtime-common/core/allocator/script-allocator.h" #include "runtime-common/core/std/containers.h" +#include "runtime-light/coroutine/concepts.h" #include "runtime-light/coroutine/shared-task.h" -#include "runtime-light/coroutine/task.h" +#include "runtime-light/coroutine/type-traits.h" #include "runtime-light/stdlib/diagnostics/exception-types.h" #include "runtime-light/stdlib/diagnostics/logs.h" #include "runtime-light/stdlib/fork/fork-storage.h" @@ -49,29 +51,8 @@ struct ForkInstanceState final : private vk::not_copyable { static ForkInstanceState& get() noexcept; - template - std::pair> create_fork(kphp::coro::task task) noexcept { - static constexpr auto fork_coroutine{ - [](kphp::coro::task task, int64_t fork_id) noexcept -> kphp::coro::shared_task { - ForkInstanceState::get().current_id = fork_id; - - kphp::forks::details::storage s{}; - if constexpr (std::same_as) { - co_await std::move(task); - s.store(); - } else { - s.store(co_await std::move(task)); - } - co_return s; - }}; - - const int64_t fork_id{next_fork_id--}; - auto fork_task{std::invoke(fork_coroutine, std::move(task), fork_id)}; - forks.emplace( - fork_id, - fork_info{.awaited = {}, .thrown_exception = {}, .opt_handle = static_cast>(fork_task)}); - return std::make_pair(fork_id, std::move(fork_task)); - } + template + auto create_fork(awaitable_type awaitable) noexcept -> std::pair>; std::optional> get_info(int64_t fork_id) noexcept { if (auto it{forks.find(fork_id)}; it != forks.end()) [[likely]] { @@ -86,3 +67,25 @@ struct ForkInstanceState final : private vk::not_copyable { return *opt_fork_info; } }; + +template +auto ForkInstanceState::create_fork(awaitable_type awaitable) noexcept -> std::pair> { + static constexpr auto fork_coroutine{ + [](awaitable_type awaitable, int64_t fork_id, ForkInstanceState& fork_instance_state) noexcept -> kphp::coro::shared_task { + fork_instance_state.current_id = fork_id; + + kphp::forks::details::storage s{}; + if constexpr (std::same_as::awaiter_return_type, void>) { + co_await std::move(awaitable); + s.store(); + } else { + s.store::awaiter_return_type>(co_await std::move(awaitable)); + } + co_return s; + }}; + + const int64_t fork_id{next_fork_id--}; + auto fork_task{std::invoke(fork_coroutine, std::move(awaitable), fork_id, *this)}; + forks.emplace(fork_id, fork_info{.awaited = {}, .thrown_exception = {}, .opt_handle = fork_task}); + return std::make_pair(fork_id, std::move(fork_task)); +} diff --git a/runtime-light/stdlib/time/timer-functions.h b/runtime-light/stdlib/time/timer-functions.h index e6e0bdbc1f..7d874f573a 100644 --- a/runtime-light/stdlib/time/timer-functions.h +++ b/runtime-light/stdlib/time/timer-functions.h @@ -24,6 +24,7 @@ void f$set_timer(int64_t timeout_ms, T&& on_timer_callback) noexcept { // TODO choose from: // 1. someone should pop that fork from ForkInstanceState since it will stay there unless we perform f$wait on fork // 2. start timer_task via kphp::coro::io_scheduler::spawn (it won't have distinct fork id) + // FIXME it doesn't work with async `on_timer_callback` auto timer_task{std::invoke( [](std::chrono::milliseconds duration, T on_timer_callback) noexcept -> kphp::coro::task<> { co_await kphp::forks::id_managed(kphp::coro::io_scheduler::get().schedule(duration));