Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 49 additions & 22 deletions compiler/code-gen/vertex-compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
#include <iterator>
#include <string_view>
#include <unordered_map>
#include <utility>

#include "auto/compiler/vertex/vertex-types.h"
#include "common/containers/final_action.h"
#include "common/wrappers/field_getter.h"
#include "common/wrappers/likely.h"
Expand Down Expand Up @@ -459,6 +461,21 @@ inline int64_t can_use_precomputed_hash_indexing_array(VertexPtr key) {
return 0;
}

// Helper to recursively check if a vertex subtree contains interruptible function calls
bool contains_interruptible_call(VertexPtr vertex) {
if (auto func_call = vertex.try_as<op_func_call>(); func_call && func_call->func_id && func_call->func_id->is_interruptible) {
return true;
}

for (auto child : *vertex) {
if (contains_interruptible_call(child)) {
return true;
}
}

return false;
}

void compile_null_coalesce(VertexAdaptor<op_null_coalesce> root, CodeGenerator &W) {
const TypeData *type = tinf::get_type(root);
auto lhs = root->lhs();
Expand All @@ -469,18 +486,12 @@ void compile_null_coalesce(VertexAdaptor<op_null_coalesce> root, CodeGenerator &
W << "TRY_CALL_ " << MacroBegin{} << TypeName{type} << ", ";
}

/* TODO:K2
* In current implementation all non-trivial finialize block marked as cpp coroutine.
* This leads to redundant coroutines, but eliminates the need to traverse the rhs subtree
* to find whether it actually contains interruptible call. It can be fixed in the future
* to reduce count of cpp coroutines.
*/
bool interruptible_call = G->is_output_mode_k2() &&
!vk::any_of_equal(rhs->type(), op_var, op_int_const, op_float_const, op_false, op_null) &&
W.get_context().parent_func->is_interruptible;
const bool interruptible_rhs =
G->is_output_mode_k2() && !vk::any_of_equal(rhs->type(), op_var, op_int_const, op_float_const, op_false, op_null) && contains_interruptible_call(rhs);

if (interruptible_call) {
if (interruptible_rhs) {
W << "co_await ";
G->stats.on_interruptible_null_coalescing();
}

W << "NullCoalesce< " << TypeName{type} << " >(";
Expand All @@ -499,11 +510,11 @@ void compile_null_coalesce(VertexAdaptor<op_null_coalesce> root, CodeGenerator &
}

W << ").finalize(";

if (vk::any_of_equal(rhs->type(), op_var, op_int_const, op_float_const, op_false, op_null)) {
W << rhs;
} else {
auto &context = W.get_context();
auto& context = W.get_context();
const auto saved_interruptible_flag = std::exchange(context.interruptible_flag, interruptible_rhs);
context.catch_labels.emplace_back();
++context.inside_null_coalesce_fallback;
/* TODO: K2
Expand All @@ -520,20 +531,22 @@ void compile_null_coalesce(VertexAdaptor<op_null_coalesce> root, CodeGenerator &
W.get_context().null_coalescing_rhs_t = tinf::get_type(rhs);

FunctionSignatureGenerator(W) << "[&] ()";
W << " -> " << (interruptible_call ? "kphp::coro::task<" : "") << TypeName{tinf::get_type(rhs)} << (interruptible_call ? "> " : " ") << BEGIN;
W << (interruptible_call ? "co_return " : "return ") << rhs << ";" << NL;
W << " -> " << (interruptible_rhs ? "kphp::coro::task<" : "") << TypeName{tinf::get_type(rhs)} << (interruptible_rhs ? "> " : " ") << BEGIN;
W << (interruptible_rhs ? "co_return " : "return ") << rhs << ";" << NL;
W << END;
}

context.catch_labels.pop_back();
kphp_assert(context.inside_null_coalesce_fallback > 0);
context.inside_null_coalesce_fallback--;
context.interruptible_flag = saved_interruptible_flag;
}

W << ")";
if (rhs->throw_flag) {
W << ", " << ThrowAction{} << MacroEnd{};
}

}

void compile_binary_func_op(VertexAdaptor<meta_op_binary> root, CodeGenerator &W) {
Expand Down Expand Up @@ -921,8 +934,13 @@ void compile_func_call(VertexAdaptor<op_func_call> root, CodeGenerator &W, func_
}

if (mode == func_call_mode::fork_call) {
if (func->is_interruptible) {
W << "(kphp::forks::start(" << FunctionName(func);
if (G->is_output_mode_k2()) {
if (func->is_interruptible) {
W << "kphp::forks::start(" << FunctionName(func);
} else {
// For non-interruptible functions, wrap with lift_sync(f, args...)
W << "kphp::forks::start(kphp::coro::lift_sync(" << FunctionName(func) << (!root->args().empty() ? ", " : "");
}
} else {
W << FunctionForkName(func);
}
Expand All @@ -934,10 +952,13 @@ void compile_func_call(VertexAdaptor<op_func_call> root, CodeGenerator &W, func_
}
}
if (func && func->cpp_template_call) {
const TypeData *tp = tinf::get_type(root);
const TypeData* tp = tinf::get_type(root);
W << "< " << TypeName(tp) << " >";
}
W << "(";
// Don't emit argument parentheses when wrapping non-interruptible function with kphp::coro::list_sync
if (mode != func_call_mode::fork_call || !G->is_output_mode_k2() || func->is_interruptible) {
W << "(";
}

if (func && func->is_extern() && vk::any_of_equal(func->name, "JsonEncoder$$to_json_impl", "JsonEncoder$$from_json_impl")) {
root = patch_compiling_json_impl_call(W, root);
Expand All @@ -956,12 +977,18 @@ void compile_func_call(VertexAdaptor<op_func_call> root, CodeGenerator &W, func_
if (is_function_call_should_be_tracked(func)) {
W << "))";
}
W << ")";
if (func->is_interruptible) {

// Don't emit argument parentheses when wrapping non-interruptible function with kphp::coro::list_sync
if (mode != func_call_mode::fork_call || !G->is_output_mode_k2() || func->is_interruptible) {
W << ")";
}

// For K2 fork call, close forks::start (and lift_sync for non-interruptible functions)
if (G->is_output_mode_k2()) {
if (mode == func_call_mode::fork_call) {
W << "))";
W << (func->is_interruptible ? ")" : "))");
} else {
W << ")";
W << (func->is_interruptible ? ")" : "");
}
}
}
Expand Down
17 changes: 8 additions & 9 deletions compiler/pipes/calc-bad-vars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,17 +538,17 @@ class CalcBadVars {
}
}

static void calc_interruptible(const FuncCallGraph &call_graph) {
static void calc_interruptible(const FuncCallGraph& call_graph) {
IdMap<char> into_interruptible(call_graph.n);
IdMap<FunctionPtr> to_parents(call_graph.n);

for (const auto &func : call_graph.functions) {
for (const auto& func : call_graph.functions) {
if (func->is_interruptible) {
mark(call_graph.rev_graph, into_interruptible, func, to_parents);
}
}

for (const auto &func : call_graph.functions) {
for (const auto& func : call_graph.functions) {
if (into_interruptible[func]) {
func->is_interruptible = true;
if (unlikely(func->class_id && func->class_id == G->get_class("ArrayAccess"))) {
Expand All @@ -562,12 +562,11 @@ class CalcBadVars {

static void calc_k2_fork(const FuncCallGraph& call_graph, const std::vector<DepData>& dep_data) {
for (int i = 0; i < call_graph.n; ++i) {
for (const auto& fork : dep_data[i].forks) {
fork->is_interruptible = true;
if (!std::exchange(fork->is_k2_fork, true)) { // check only once
for (VarPtr param : fork->param_ids) {
kphp_error(!param->is_reference, fmt_format("Function '{}' cannot be forked since it has a reference parameter '{}'\n", fork->as_human_readable(),
param->as_human_readable()));
for (const auto& forked_function : dep_data[i].forks) {
if (!std::exchange(forked_function->is_k2_fork, true)) { // check only once
for (VarPtr param : forked_function->param_ids) {
kphp_error(!param->is_reference, fmt_format("Function '{}' cannot be forked since it has a reference parameter '{}'\n",
forked_function->as_human_readable(), param->as_human_readable()));
}
}
}
Expand Down
1 change: 0 additions & 1 deletion compiler/pipes/calc-func-dep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ VertexPtr CalcFuncDepPass::on_exit_vertex(VertexPtr vertex) {
return vertex;
}


DepData CalcFuncDepPass::get_data() {
my_unique(&data.dep);
my_unique(&data.modified_global_vars);
Expand Down
5 changes: 5 additions & 0 deletions compiler/stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ void Stats::on_function_processed(FunctionPtr function) {
}
}

void Stats::on_interruptible_null_coalescing() {
++interruptible_null_coalescings_;
}

void Stats::update_memory_stats() {
const mem_info_t mem_info = get_self_mem_stats();
memory_rss_ = mem_info.rss;
Expand Down Expand Up @@ -82,6 +86,7 @@ void Stats::write_to(std::ostream &out, bool with_indent) const {
out << indent << "functions.total_throwing: " << total_throwing_functions_ << std::endl;
out << indent << "functions.total_resumable: " << total_resumable_functions_ << std::endl;
out << indent << "functions.total_interruptible: " << total_interruptible_functions_ << std::endl;
out << indent << "codegen.interruptible_null_coalescings: " << interruptible_null_coalescings_ << std::endl;
out << block_sep;
out << indent << "memory.rss: " << memory_rss_ * 1024 << std::endl;
out << indent << "memory.rss_peak: " << memory_rss_peak_ * 1024 << std::endl;
Expand Down
2 changes: 2 additions & 0 deletions compiler/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Stats {
public:
void on_var_inserting(VarData::Type type);
void on_function_processed(FunctionPtr function);
void on_interruptible_null_coalescing();

void update_memory_stats();

Expand Down Expand Up @@ -69,6 +70,7 @@ class Stats {
std::atomic<std::uint64_t> total_resumable_functions_{0u};
std::atomic<std::uint64_t> total_interruptible_functions_{0u};
std::atomic<std::uint64_t> total_inline_functions_{0u};
std::atomic<std::uint64_t> interruptible_null_coalescings_{0u};

std::atomic<std::uint64_t> memory_rss_{0};
std::atomic<std::uint64_t> memory_rss_peak_{0};
Expand Down
58 changes: 58 additions & 0 deletions runtime-light/coroutine/lift-sync.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2026 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <concepts>
#include <coroutine>
#include <tuple>
#include <type_traits>
#include <utility>

#include "runtime-light/coroutine/type-traits.h"

namespace kphp::coro {

namespace details {

template<typename F, typename... Args>
class lift_sync_awaiter {
F m_func;
std::tuple<Args...> m_args;

public:
template<std::same_as<F> func_type, std::same_as<Args>... args_type>
explicit lift_sync_awaiter(func_type&& f, args_type&&... args) noexcept
: m_func(std::forward<func_type>(f)),
m_args(std::forward<args_type>(args)...) {}

lift_sync_awaiter(lift_sync_awaiter&& other) noexcept = default;
lift_sync_awaiter& operator=(lift_sync_awaiter&& other) noexcept = default;
~lift_sync_awaiter() = default;

lift_sync_awaiter(const lift_sync_awaiter&) = delete;
lift_sync_awaiter& operator=(const lift_sync_awaiter&) = delete;

constexpr auto await_ready() const noexcept -> bool {
return true;
}

constexpr auto await_suspend(std::coroutine_handle<> /*unused*/) const noexcept -> void {
std::unreachable();
}

auto await_resume() noexcept -> std::invoke_result_t<F, Args...> {
return std::apply(std::move(m_func), std::move(m_args));
}
};

} // namespace details

template<typename F, typename... Args>
requires std::invocable<F, Args...> && (!kphp::coro::is_async_function_v<F, Args...>)
auto lift_sync(F&& f, Args&&... args) noexcept {
return details::lift_sync_awaiter<F, Args...>{std::forward<F>(f), std::forward<Args>(args)...};
}

} // namespace kphp::coro
6 changes: 3 additions & 3 deletions runtime-light/stdlib/fork/fork-functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ auto id_managed(awaitable_type awaitable) noexcept -> kphp::coro::task<typename
}
}

template<typename return_type>
auto start(kphp::coro::task<return_type> task) noexcept -> int64_t {
template<kphp::coro::concepts::awaitable awaitable_type>
auto start(awaitable_type awaitable) noexcept -> int64_t {
auto& fork_instance_st{ForkInstanceState::get()};
auto [fork_id, fork_task]{fork_instance_st.create_fork(std::move(task))};
auto [fork_id, fork_task]{fork_instance_st.create_fork(std::move(awaitable))};
auto saved_fork_id{fork_instance_st.current_id};
kphp::log::assertion(kphp::coro::io_scheduler::get().start(std::move(fork_task)));
fork_instance_st.current_id = saved_fork_id;
Expand Down
51 changes: 27 additions & 24 deletions runtime-light/stdlib/fork/fork-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once

#include <concepts>
#include <cstdint>
#include <functional>
#include <limits>
Expand All @@ -13,8 +14,9 @@
#include "common/mixin/not_copyable.h"
#include "runtime-common/core/allocator/script-allocator.h"
#include "runtime-common/core/std/containers.h"
#include "runtime-light/coroutine/concepts.h"
#include "runtime-light/coroutine/shared-task.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/coroutine/type-traits.h"
#include "runtime-light/stdlib/diagnostics/exception-types.h"
#include "runtime-light/stdlib/diagnostics/logs.h"
#include "runtime-light/stdlib/fork/fork-storage.h"
Expand Down Expand Up @@ -49,29 +51,8 @@ struct ForkInstanceState final : private vk::not_copyable {

static ForkInstanceState& get() noexcept;

template<typename return_type>
std::pair<int64_t, kphp::coro::shared_task<kphp::forks::details::storage>> create_fork(kphp::coro::task<return_type> task) noexcept {
static constexpr auto fork_coroutine{
[](kphp::coro::task<return_type> task, int64_t fork_id) noexcept -> kphp::coro::shared_task<kphp::forks::details::storage> {
ForkInstanceState::get().current_id = fork_id;

kphp::forks::details::storage s{};
if constexpr (std::same_as<return_type, void>) {
co_await std::move(task);
s.store();
} else {
s.store<return_type>(co_await std::move(task));
}
co_return s;
}};

const int64_t fork_id{next_fork_id--};
auto fork_task{std::invoke(fork_coroutine, std::move(task), fork_id)};
forks.emplace(
fork_id,
fork_info{.awaited = {}, .thrown_exception = {}, .opt_handle = static_cast<kphp::coro::shared_task<kphp::forks::details::storage>>(fork_task)});
return std::make_pair(fork_id, std::move(fork_task));
}
template<kphp::coro::concepts::awaitable awaitable_type>
auto create_fork(awaitable_type awaitable) noexcept -> std::pair<int64_t, kphp::coro::shared_task<kphp::forks::details::storage>>;

std::optional<std::reference_wrapper<fork_info>> get_info(int64_t fork_id) noexcept {
if (auto it{forks.find(fork_id)}; it != forks.end()) [[likely]] {
Expand All @@ -86,3 +67,25 @@ struct ForkInstanceState final : private vk::not_copyable {
return *opt_fork_info;
}
};

template<kphp::coro::concepts::awaitable awaitable_type>
auto ForkInstanceState::create_fork(awaitable_type awaitable) noexcept -> std::pair<int64_t, kphp::coro::shared_task<kphp::forks::details::storage>> {
static constexpr auto fork_coroutine{
[](awaitable_type awaitable, int64_t fork_id, ForkInstanceState& fork_instance_state) noexcept -> kphp::coro::shared_task<kphp::forks::details::storage> {
fork_instance_state.current_id = fork_id;

kphp::forks::details::storage s{};
if constexpr (std::same_as<typename kphp::coro::awaitable_traits<awaitable_type>::awaiter_return_type, void>) {
co_await std::move(awaitable);
s.store();
} else {
s.store<typename kphp::coro::awaitable_traits<awaitable_type>::awaiter_return_type>(co_await std::move(awaitable));
}
co_return s;
}};

const int64_t fork_id{next_fork_id--};
auto fork_task{std::invoke(fork_coroutine, std::move(awaitable), fork_id, *this)};
forks.emplace(fork_id, fork_info{.awaited = {}, .thrown_exception = {}, .opt_handle = fork_task});
return std::make_pair(fork_id, std::move(fork_task));
}
1 change: 1 addition & 0 deletions runtime-light/stdlib/time/timer-functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void f$set_timer(int64_t timeout_ms, T&& on_timer_callback) noexcept {
// TODO choose from:
// 1. someone should pop that fork from ForkInstanceState since it will stay there unless we perform f$wait on fork
// 2. start timer_task via kphp::coro::io_scheduler::spawn (it won't have distinct fork id)
// FIXME it doesn't work with async `on_timer_callback`
auto timer_task{std::invoke(
[](std::chrono::milliseconds duration, T on_timer_callback) noexcept -> kphp::coro::task<> {
co_await kphp::forks::id_managed(kphp::coro::io_scheduler::get().schedule(duration));
Expand Down
Loading