diff --git a/.github/actions/install-build-deps/action.yml b/.github/actions/install-build-deps/action.yml index 17d7c14..3a91200 100644 --- a/.github/actions/install-build-deps/action.yml +++ b/.github/actions/install-build-deps/action.yml @@ -94,6 +94,113 @@ runs: sudo update-alternatives --install /usr/bin/clang-tidy clang-tidy /usr/bin/clang-tidy-18 100 shell: bash + - name: Install Podman and docker-compose + run: | + # shellcheck source=/dev/null + source "${GITHUB_ACTION_PATH}/podman-version.env" + # Prefer the reproducibility-pinned apt version, but the ubuntu-24.04 + # runner image periodically rotates the available podman patch revision + # out of its apt repo (e.g. 5.0.2+ds1-4ubuntu1 disappeared, turning the + # required jobs red with "Version '...' for 'podman' was not found"). + # Fall back to the newest available podman so an upstream apt rotation + # cannot hard-break CI, while still pinning when the exact version is + # present. + if apt-cache madison podman | grep -qF "${PODMAN_APT_VERSION}"; then + echo "Installing pinned podman=${PODMAN_APT_VERSION}" + sudo apt-get install -y "podman=${PODMAN_APT_VERSION}" podman-compose + else + echo "Pinned podman=${PODMAN_APT_VERSION} not available in apt; installing latest available podman" >&2 + apt-cache madison podman >&2 || true + sudo apt-get install -y podman podman-compose + fi + shell: bash + + - name: Start Podman rootless socket + run: | + systemctl --user start podman.socket + echo "DOCKER_HOST=unix:///run/user/$(id -u)/podman/podman.sock" >> "$GITHUB_ENV" + shell: bash + + - name: Configure host for in-container sanitizers + run: | + # ASan/TSan/LSan abort with "Shadow memory range interleaves with an + # existing memory mapping" on the ubuntu-24.04 noble kernel unless the + # ASLR entropy is lowered. The sanitizer runs inside the dev container + # but shares the host kernel, so this must be set on the runner host. + sudo sysctl -w vm.mmap_rnd_bits=28 + shell: bash + + - name: Set container build environment + run: | + echo "GIT_COMMIT=${{ github.sha }}" >> "$GITHUB_ENV" + echo "BUILD_UID=$(id -u)" >> "$GITHUB_ENV" + echo "BUILD_GID=$(id -g)" >> "$GITHUB_ENV" + shell: bash + + - name: Fix workspace permissions for Podman bind mounts + run: chmod -R a+rwX . + shell: bash + + - name: Restore dev container image cache + id: image_cache + uses: actions/cache@v5 + with: + path: /tmp/dev-image.tar + # Exact-match only: a partial restore-key (e.g. `podman-`) would load a + # stale tarball built from a different Containerfile/conanfile, so omit + # restore-keys and rebuild on any input change. + key: podman-${{ hashFiles('Containerfile', 'docker-compose.yml', 'conanfile.py') }} + + - name: Load or build dev container image + run: | + loaded=false + if [ "${{ steps.image_cache.outputs.cache-hit }}" = "true" ] && podman load -i /tmp/dev-image.tar 2>/dev/null; then + # Verify the tarball actually contained the tag compose resolves; + # a stale/mistagged tarball would otherwise pass here and fail at + # `podman-compose up`. + if podman image exists projectkeystone-dev:latest; then + echo "Loaded dev image from cache" + loaded=true + else + echo "Cached tarball missing projectkeystone-dev:latest tag; rebuilding" >&2 + fi + fi + if [ "$loaded" != "true" ]; then + DOCKER_HOST="$DOCKER_HOST" podman-compose build dev + podman save -o /tmp/dev-image.tar projectkeystone-dev:latest + fi + shell: bash + + - name: Start dev container + run: | + DOCKER_HOST="$DOCKER_HOST" podman-compose up -d dev + shell: bash + + - name: Wait for dev container readiness + run: | + for i in $(seq 1 10); do + if DOCKER_HOST="$DOCKER_HOST" podman-compose exec -T dev true 2>/dev/null; then + echo "Container ready" + exit 0 + fi + echo "Waiting for container... attempt $i/10" + sleep 2 + done + echo "Container failed to become ready" >&2 + DOCKER_HOST="$DOCKER_HOST" podman-compose logs dev >&2 + exit 1 + shell: bash + + - name: Verify Podman works (rootless) + run: | + rootless="$(podman info --format '{{.Host.Security.Rootless}}')" + echo "Podman rootless: ${rootless}" + if [ "${rootless}" != "true" ]; then + echo "Expected rootless Podman but got '${rootless}'" >&2 + exit 1 + fi + shell: bash + - name: Verify installation run: | echo "Clang version:" diff --git a/.github/actions/install-build-deps/podman-version.env b/.github/actions/install-build-deps/podman-version.env new file mode 100644 index 0000000..35c1801 --- /dev/null +++ b/.github/actions/install-build-deps/podman-version.env @@ -0,0 +1 @@ +PODMAN_APT_VERSION=5.0.2+ds1-4ubuntu1 diff --git a/.github/workflows/_required.yml b/.github/workflows/_required.yml index 3a2d245..1d933a5 100644 --- a/.github/workflows/_required.yml +++ b/.github/workflows/_required.yml @@ -240,21 +240,34 @@ jobs: fetchcontent-clang-tidy-${{ runner.os }}- - name: Install Conan dependencies - run: make deps.native + run: make deps - name: Configure CMake with clang-tidy + # `make deps` runs Conan inside the dev container, so the generated + # conan_toolchain.cmake references container-internal compiler/library + # paths. Configure must therefore run inside the same container or the + # toolchain will not resolve — this preserves the environment-parity + # goal. We reuse the Makefile's container invocation pattern + # (DOCKER_HOST + podman-compose exec -T dev). run: | - CONAN_TOOLCHAIN="" - if [ -f build/conan-deps/conan_toolchain.cmake ]; then - CONAN_TOOLCHAIN="-DCMAKE_TOOLCHAIN_FILE=build/conan-deps/conan_toolchain.cmake" - fi - cmake -S . -B build/x86.debug.clang-tidy \ - -G Ninja \ - -DCMAKE_BUILD_TYPE=Debug \ - -DENABLE_CLANG_TIDY=ON \ - $CONAN_TOOLCHAIN + DOCKER_HOST="${DOCKER_HOST:-}" podman-compose exec -T dev bash -c ' + CONAN_TOOLCHAIN="" + if [ -f build/conan-deps/conan_toolchain.cmake ]; then + CONAN_TOOLCHAIN="-DCMAKE_TOOLCHAIN_FILE=build/conan-deps/conan_toolchain.cmake" + fi + cmake -S . -B build/x86.debug.clang-tidy \ + -G Ninja \ + -DCMAKE_BUILD_TYPE=Debug \ + -DENABLE_CLANG_TIDY=ON \ + $CONAN_TOOLCHAIN + ' - name: Build with clang-tidy + # Must run inside the dev container for the same reason as the configure + # step: the build consumes the container-generated Conan toolchain and + # uses the container's clang/clang-tidy. clang-tidy-output.txt and + # clang-tidy-build.rc are written under /workspace, which is bind-mounted + # back to the host so the gating step below can read them. run: | # clang-tidy build often returns non-zero when diagnostics are found. # The next step parses clang-tidy-output.txt and decides whether those @@ -262,13 +275,15 @@ jobs: # We must capture the build output without aborting the job — but we # also must record the build rc for the next step to inspect, rather # than silently masking it with continue-on-error. - set +e - set -o pipefail - cmake --build build/x86.debug.clang-tidy -j"$(nproc)" 2>&1 | tee clang-tidy-output.txt - rc=${PIPESTATUS[0]} - set -e - echo "$rc" > clang-tidy-build.rc - echo "clang-tidy build exited rc=$rc (gating happens in next step)" + DOCKER_HOST="${DOCKER_HOST:-}" podman-compose exec -T dev bash -c ' + set +e + set -o pipefail + cmake --build build/x86.debug.clang-tidy -j"$(nproc)" 2>&1 | tee clang-tidy-output.txt + rc=${PIPESTATUS[0]} + set -e + echo "$rc" > clang-tidy-build.rc + echo "clang-tidy build exited rc=$rc (gating happens in next step)" + ' - name: Fail on clang-tidy errors run: | @@ -439,10 +454,10 @@ jobs: echo "CMAKE_CXX_COMPILER_LAUNCHER=sccache" >> $GITHUB_ENV - name: Install Conan dependencies - run: make deps.native + run: make deps - name: Build debug (C++) - run: make compile.debug.native + run: make compile.debug - name: Run C++ unit tests run: | @@ -507,21 +522,21 @@ jobs: echo "CMAKE_CXX_COMPILER_LAUNCHER=sccache" >> $GITHUB_ENV - name: Install Conan dependencies - run: make deps.native + run: make deps - name: Build + test (ASan) - run: make compile.debug.asan.native && make test.debug.asan.native + run: make compile.debug.asan && make test.debug.asan - name: Build + test (UBSan) - run: make compile.debug.ubsan.native && make test.debug.ubsan.native + run: make compile.debug.ubsan && make test.debug.ubsan - name: Build + test (TSan) - run: make compile.debug.tsan.native && make test.debug.tsan.native + run: make compile.debug.tsan && make test.debug.tsan env: TSAN_OPTIONS: "suppressions=${{ github.workspace }}/tsan.supp:second_deadlock_stack=1" - name: Build + test (LSan) - run: make compile.debug.lsan.native && make test.debug.lsan.native + run: make compile.debug.lsan && make test.debug.lsan - name: Show sccache stats if: always() @@ -577,10 +592,10 @@ jobs: echo "CMAKE_CXX_COMPILER_LAUNCHER=sccache" >> $GITHUB_ENV - name: Install Conan dependencies - run: make deps.native + run: make deps - name: Build release - run: make compile.release.native + run: make compile.release - name: Show sccache stats if: always() @@ -1022,17 +1037,17 @@ jobs: echo "CMAKE_CXX_COMPILER_LAUNCHER=sccache" >> $GITHUB_ENV - name: Install Conan dependencies - run: make deps.native + run: make deps - name: Build with coverage - run: make compile.debug.coverage.native + run: make compile.debug.coverage - name: Show sccache stats if: always() run: sccache --show-stats - name: Run tests for coverage - run: make test.debug.coverage.native + run: make test.debug.coverage - name: Generate coverage report run: | diff --git a/.github/workflows/extras.yml b/.github/workflows/extras.yml index 8293613..74373f0 100644 --- a/.github/workflows/extras.yml +++ b/.github/workflows/extras.yml @@ -56,17 +56,17 @@ jobs: echo "CMAKE_CXX_COMPILER_LAUNCHER=sccache" >> $GITHUB_ENV - name: Install Conan dependencies - run: make deps.native + run: make deps - name: Build release - run: make compile.release.native + run: make compile.release - name: Show sccache stats if: always() run: sccache --show-stats - name: Run benchmarks - run: make benchmark.native + run: make benchmark - name: Upload benchmark results uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 @@ -121,10 +121,10 @@ jobs: echo "CMAKE_CXX_COMPILER_LAUNCHER=sccache" >> $GITHUB_ENV - name: Install Conan dependencies - run: make deps.native + run: make deps - name: Build release - run: make compile.release.native + run: make compile.release - name: Show sccache stats if: always() @@ -189,17 +189,17 @@ jobs: echo "CMAKE_CXX_COMPILER_LAUNCHER=sccache" >> $GITHUB_ENV - name: Install Conan dependencies - run: make deps.native + run: make deps - name: Build with coverage - run: make compile.debug.coverage.native + run: make compile.debug.coverage - name: Show sccache stats if: always() run: sccache --show-stats - name: Run tests for coverage - run: make test.debug.coverage.native + run: make test.debug.coverage - name: Generate coverage report run: | diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index aa6b18e..acf53be 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -47,10 +47,10 @@ jobs: uses: ./.github/actions/install-build-deps - name: Build release - run: make compile.release.native + run: make compile.release - name: Run tests on release build - run: make test.release.native + run: make test.release - name: Build CPack packages run: | diff --git a/CMakeLists.txt b/CMakeLists.txt index 440a547..a518eb2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -324,9 +324,8 @@ add_executable( # The gRPC test fixture (grpc_test_fixture.cpp) and TLS test (test_grpc_tls.cpp) # were removed with the HMAS orchestration layer (ADR-015/016). -target_link_libraries( - unit_tests keystone_core keystone_concurrency - keystone_transport GTest::gtest_main) +target_link_libraries(unit_tests keystone_core keystone_concurrency + keystone_transport GTest::gtest_main) gtest_discover_tests(unit_tests) @@ -383,9 +382,10 @@ gtest_discover_tests(simulation_unit_tests) # Transport library — NATS connection with TLS support (issue #122), # NATSListener pull-based fetch loop (issues #178, #205, #307), and # TransparentBridge automatic off-host forwarding (issue #512). -add_library(keystone_transport src/transport/nats_connection.cpp - src/network/nats_listener.cpp - src/transport/transparent_bridge.cpp) +add_library( + keystone_transport + src/transport/nats_connection.cpp src/network/nats_listener.cpp + src/transport/transparent_bridge.cpp) target_include_directories( keystone_transport PUBLIC $ @@ -399,8 +399,8 @@ target_include_directories( keystone_transport PUBLIC ${spdlog_INCLUDE_DIRS_RELEASE} ${fmt_INCLUDE_DIRS_RELEASE}) -# Daemon — production service binary (issue #513) -# Must come after keystone_core and keystone_transport are defined. +# Daemon — production service binary (issue #513) Must come after keystone_core +# and keystone_transport are defined. add_subdirectory(src/daemon) # Unit Tests — NATS transport (issue #122) @@ -618,12 +618,8 @@ install( # Install test executables (keystone-test package) install( - TARGETS distributed_hierarchy_tests - unit_tests - concurrency_unit_tests - simulation_unit_tests - transport_unit_tests - bridge_unit_tests + TARGETS distributed_hierarchy_tests unit_tests concurrency_unit_tests + simulation_unit_tests transport_unit_tests bridge_unit_tests RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}/tests COMPONENT keystone-test) # Install benchmarks (keystone-misc package) diff --git a/Makefile b/Makefile index 83aee74..6ea3542 100644 --- a/Makefile +++ b/Makefile @@ -17,14 +17,10 @@ # Number of processors for parallel builds NPROC ?= $(shell nproc 2>/dev/null || sysctl -n hw.ncpu 2>/dev/null || echo 4) -# Container runtime (Podman) — pass NATIVE=1 to bypass container on CI/host -ifeq ($(NATIVE),1) - CONTAINER_CHECK := - CONTAINER_PREFIX := -else - CONTAINER_CHECK := podman compose up -d dev >/dev/null 2>&1 || true; - CONTAINER_PREFIX := podman compose exec -T dev -endif +# Container runtime (Podman) +# Use podman-compose instead of docker compose CLI plugin (which delegates to snap) +CONTAINER_CHECK := DOCKER_HOST="$(DOCKER_HOST)" podman-compose up -d dev >/dev/null 2>&1 || true; +CONTAINER_PREFIX := DOCKER_HOST="$(DOCKER_HOST)" podman-compose exec -T dev # Compiler flags BUILD_FLAGS_debug := -O0 -g -D_DEBUG @@ -63,8 +59,10 @@ endif .PHONY: deps deps: @echo "Installing Conan dependencies (Debug + Release)..." - conan install . --output-folder=$(CONAN_OUTPUT_DIR) --build=missing -s build_type=Debug -s compiler.cppstd=20 - conan install . --output-folder=$(CONAN_OUTPUT_DIR) --build=missing -s build_type=Release -s compiler.cppstd=20 + $(CONTAINER_CHECK) + $(CONTAINER_PREFIX) conan profile detect --exist-ok + $(CONTAINER_PREFIX) conan install . --output-folder=$(CONAN_OUTPUT_DIR) --build=missing -s build_type=Debug -s compiler.cppstd=20 + $(CONTAINER_PREFIX) conan install . --output-folder=$(CONAN_OUTPUT_DIR) --build=missing -s build_type=Release -s compiler.cppstd=20 # ============================================================================ # Default target @@ -281,25 +279,25 @@ clean: container.build: @echo "Building container image: dev..." - podman compose build dev + DOCKER_HOST="$(DOCKER_HOST)" podman-compose build dev container.build.%: @echo "Building container image: $*..." - podman compose build $* + DOCKER_HOST="$(DOCKER_HOST)" podman-compose build $* container.up: @echo "Starting dev container..." - podman compose up -d dev + DOCKER_HOST="$(DOCKER_HOST)" podman-compose up -d dev sleep 2 container.clean: @echo "Cleaning container resources..." - podman compose down -v + DOCKER_HOST="$(DOCKER_HOST)" podman-compose down -v podman rmi -f projectkeystone-dev:latest projectkeystone:latest || true container.down: @echo "Stopping containers..." - podman compose down + DOCKER_HOST="$(DOCKER_HOST)" podman-compose down container.shell: container.up $(CONTAINER_PREFIX) /bin/bash @@ -340,11 +338,6 @@ container.shell: container.up %.release: @$(MAKE) $* BUILD_FLAGS="$(BUILD_FLAGS) $(BUILD_FLAGS_release)" BUILD_SUBDIR="$(BUILD_SUBDIR)$(suffix $@)" CMAKE_BUILD_TYPE=Release -# Pattern rule for native variants — matches any target with .native suffix. -# Bypasses the container and runs the underlying target directly on the host. -%.native: - @$(MAKE) $* NATIVE=1 - # ============================================================================ # Help & Info # ============================================================================ @@ -423,6 +416,5 @@ help: @echo "Examples:" @echo " make compile.debug.asan # Build debug with ASan (in container)" @echo " make test.debug.asan # Run tests with ASan (in container)" - @echo " make compile.debug.asan.native # Build debug with ASan on host (no container)" - @echo " make test.debug.tsan.native # Run TSan tests on host (no container)" - @echo " make benchmark.native # Run benchmarks on host (no container)" + @echo " make test.debug.tsan # Run TSan tests (in container)" + @echo " make benchmark # Run benchmarks (in container)" diff --git a/docker-compose.yml b/docker-compose.yml index 7457deb..8043d01 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,15 +41,19 @@ services: args: BUILD_UID: "${BUILD_UID}" BUILD_GID: "${BUILD_GID}" - image: projectkeystone-dev:${GIT_COMMIT}-dev - container_name: projectkeystone-dev-${GIT_COMMIT} + image: projectkeystone-dev:latest + container_name: projectkeystone-dev user: "${BUILD_UID}:${BUILD_GID}" # Run as host user volumes: - - .:/workspace + - .:/workspace:Z working_dir: /workspace stdin_open: true tty: true command: /bin/bash + cap_add: + - SYS_PTRACE + security_opt: + - seccomp:unconfined environment: - HOME=/workspace/.docker-home # Avoid permission issues with $HOME - GIT_COMMIT=${GIT_COMMIT} @@ -64,7 +68,7 @@ services: image: projectkeystone-builder:${GIT_COMMIT}-latest container_name: projectkeystone-build-${GIT_COMMIT} volumes: - - .:/workspace + - .:/workspace:Z environment: - GIT_COMMIT=${GIT_COMMIT} - BUILD_UID=${BUILD_UID} diff --git a/include/concurrency/logger.hpp b/include/concurrency/logger.hpp index e2b65e2..519b738 100644 --- a/include/concurrency/logger.hpp +++ b/include/concurrency/logger.hpp @@ -1,13 +1,13 @@ #pragma once -#include -#include -#include - #include #include #include +#include +#include +#include + namespace keystone { namespace concurrency { @@ -44,7 +44,8 @@ class LogContext { * @param worker_id Worker thread index * @param session_id Session identifier */ - static void set(const std::string& agent_id, int32_t worker_id, const std::string& session_id); + static void set(const std::string& agent_id, int32_t worker_id, + const std::string& session_id); /** * @brief Clear the thread-local logging context (including correlation ID) @@ -236,7 +237,8 @@ class Logger { static std::shared_ptr logger_; template - static void log(spdlog::level::level_enum level, const std::string& fmt, Args&&... args) { + static void log(spdlog::level::level_enum level, const std::string& fmt, + Args&&... args) { // init() is idempotent and thread-safe (guarded by an internal mutex), so a // racing first-log from multiple threads creates the "keystone" logger // exactly once instead of throwing spdlog_ex on the loser of the race. @@ -249,7 +251,12 @@ class Logger { std::string full_fmt = context + " " + fmt; // Use runtime format to avoid compile-time format string requirement - logger_->log(spdlog::source_loc{}, level, fmt::runtime(full_fmt), std::forward(args)...); + if constexpr (sizeof...(args) > 0) { + logger_->log(spdlog::source_loc{}, level, fmt::runtime(full_fmt), + std::forward(args)...); + } else { + logger_->log(spdlog::source_loc{}, level, fmt::runtime(full_fmt)); + } } }; diff --git a/include/core/message.hpp b/include/core/message.hpp index 2af99f2..cb2ed4e 100644 --- a/include/core/message.hpp +++ b/include/core/message.hpp @@ -117,9 +117,11 @@ struct KeystoneMessage { correlation_id; ///< Optional correlation ID for distributed tracing // Payload and timing - [[deprecated("command is a legacy/convenience field; use payload with ActionType instead")]] - std::string command; ///< Command string to execute (legacy/convenience) - std::optional payload; ///< Optional payload data + [[deprecated( + "command is a legacy/convenience field; use payload with ActionType " + "instead")]] + std::string command; ///< Command string to execute (legacy/convenience) + std::optional payload; ///< Optional payload data std::chrono::system_clock::time_point timestamp; ///< Message timestamp // Declare special members out-of-line so their definitions (in message.cpp) diff --git a/include/transport/nats_connection.hpp b/include/transport/nats_connection.hpp index 44ca3c7..fb4a291 100644 --- a/include/transport/nats_connection.hpp +++ b/include/transport/nats_connection.hpp @@ -14,6 +14,8 @@ #pragma once +#include + #include #include #include @@ -22,8 +24,6 @@ #include #include -#include - namespace keystone { namespace transport { @@ -285,7 +285,8 @@ class NatsConnection { * @param timeout_ms Fetch timeout in milliseconds (default 30000) * @return NatsMsgPtr owning the fetched message, or a null * NatsMsgPtr on timeout. Ownership is transferred via - * NatsMsgPtr; the caller must NOT call natsMsg_Destroy(). + * NatsMsgPtr; the caller must NOT call + * natsMsg_Destroy(). * * @throws std::system_error if a network error occurs (transient) * @throws std::domain_error if consumer or stream not found (configuration) @@ -298,8 +299,7 @@ class NatsConnection { * - std::system_error: Transient errors (network, timeout) * - std::runtime_error: Permanent errors (auth, permission denied) */ - NatsMsgPtr fetch(std::string_view subject, - std::string_view consumer_name, + NatsMsgPtr fetch(std::string_view subject, std::string_view consumer_name, int64_t timeout_ms = 30000); // ========================================================================= @@ -324,9 +324,7 @@ class NatsConnection { // nats.c static callback shims — nats.c passes a void* user data pointer // which we cast back to NatsConnection*. Protected to allow test subclasses // to invoke them directly without a live nats.c connection. - static void onError(natsConnection* nc, - natsSubscription* sub, - natsStatus err, + static void onError(natsConnection* nc, natsSubscription* sub, natsStatus err, void* closure) noexcept; static void onDisconnected(natsConnection* nc, void* closure) noexcept; static void onReconnected(natsConnection* nc, void* closure) noexcept; diff --git a/include/transport/transparent_bridge.hpp b/include/transport/transparent_bridge.hpp index b1ed3f7..ef9b762 100644 --- a/include/transport/transparent_bridge.hpp +++ b/include/transport/transparent_bridge.hpp @@ -22,12 +22,12 @@ #pragma once +#include + #include #include #include -#include - // Forward declarations — avoid pulling in full nats.h types in callers. namespace keystone { namespace core { @@ -64,9 +64,9 @@ struct BridgeConfig { * After attach() is called the bridge: * 1. Registers an outbound NATS publisher with MessageBus so that messages for * unregistered (off-host) agents are serialized and published automatically. - * 2. Starts an inbound pull loop that subscribes to BridgeConfig::inbound_subject, - * deserializes each payload, and routes the resulting KeystoneMessage into - * the local MessageBus. + * 2. Starts an inbound pull loop that subscribes to + * BridgeConfig::inbound_subject, deserializes each payload, and routes the + * resulting KeystoneMessage into the local MessageBus. * * No component needs to know whether its peer is local or remote. */ @@ -77,7 +77,8 @@ class TransparentBridge { * @param conn NATS connection. Must outlive this object. * @param cfg Optional configuration override. */ - TransparentBridge(core::MessageBus& bus, NatsConnection& conn, BridgeConfig cfg = {}); + TransparentBridge(core::MessageBus& bus, NatsConnection& conn, + BridgeConfig cfg = {}); ~TransparentBridge(); diff --git a/src/concurrency/logger.cpp b/src/concurrency/logger.cpp index 17ac527..a322c93 100644 --- a/src/concurrency/logger.cpp +++ b/src/concurrency/logger.cpp @@ -5,12 +5,12 @@ #include "concurrency/logger.hpp" +#include + #include #include #include -#include - namespace keystone { namespace concurrency { @@ -32,14 +32,8 @@ std::string generateCorrelationId() { c = (c & 0x3FFFFFFFu) | 0x80000000u; // variant 10xx char buf[37]; - std::snprintf(buf, - sizeof(buf), - "%08x-%04x-%04x-%04x-%04x%08x", - a, - (b >> 16) & 0xFFFF, - b & 0xFFFF, - (c >> 16) & 0xFFFF, - c & 0xFFFF, + std::snprintf(buf, sizeof(buf), "%08x-%04x-%04x-%04x-%04x%08x", a, + (b >> 16) & 0xFFFF, b & 0xFFFF, (c >> 16) & 0xFFFF, c & 0xFFFF, d); return std::string(buf); } @@ -47,8 +41,7 @@ std::string generateCorrelationId() { // LogContext thread-local storage thread_local LogContext::Context LogContext::context_; -void LogContext::set(const std::string& agent_id, - int32_t worker_id, +void LogContext::set(const std::string& agent_id, int32_t worker_id, const std::string& session_id) { context_.agent_id = agent_id; context_.worker_id = worker_id; @@ -62,29 +55,19 @@ void LogContext::clear() { context_.correlation_id.clear(); } -std::string LogContext::getAgentId() { - return context_.agent_id; -} +std::string LogContext::getAgentId() { return context_.agent_id; } -int32_t LogContext::getWorkerId() { - return context_.worker_id; -} +int32_t LogContext::getWorkerId() { return context_.worker_id; } -std::string LogContext::getSessionId() { - return context_.session_id; -} +std::string LogContext::getSessionId() { return context_.session_id; } void LogContext::setCorrelationId(const std::string& correlation_id) { context_.correlation_id = correlation_id; } -void LogContext::clearCorrelationId() { - context_.correlation_id.clear(); -} +void LogContext::clearCorrelationId() { context_.correlation_id.clear(); } -std::string LogContext::getCorrelationId() { - return context_.correlation_id; -} +std::string LogContext::getCorrelationId() { return context_.correlation_id; } std::string LogContext::getContextString() { if (context_.agent_id.empty()) { @@ -92,7 +75,8 @@ std::string LogContext::getContextString() { } std::ostringstream oss; - oss << "[" << context_.agent_id << ":" << context_.worker_id << ":" << context_.session_id; + oss << "[" << context_.agent_id << ":" << context_.worker_id << ":" + << context_.session_id; if (!context_.correlation_id.empty()) { oss << ":corr=" << context_.correlation_id; } @@ -102,10 +86,12 @@ std::string LogContext::getContextString() { // CorrelationScope -CorrelationScope::CorrelationScope() : CorrelationScope(generateCorrelationId()) {} +CorrelationScope::CorrelationScope() + : CorrelationScope(generateCorrelationId()) {} CorrelationScope::CorrelationScope(std::string correlation_id) - : previous_id_(LogContext::getCorrelationId()), current_id_(std::move(correlation_id)) { + : previous_id_(LogContext::getCorrelationId()), + current_id_(std::move(correlation_id)) { LogContext::setCorrelationId(current_id_); } @@ -118,12 +104,13 @@ std::shared_ptr Logger::logger_; namespace { // Guards the check-then-act in init()/log(). Without it, two threads can both -// observe a null logger_, both call spdlog::stdout_color_mt("keystone"), and the -// second throws spdlog::spdlog_ex("logger with name 'keystone' already exists"). -// That exception is thrown from worker threads (e.g. the backpressure warning -// path in AgentCore::receiveMessage), is never caught there, and terminates the -// process. The -O0 --coverage build widens the race window enough to reproduce -// it deterministically (see AgentCoreTest.BackpressureConcurrentTrigger). +// observe a null logger_, both call spdlog::stdout_color_mt("keystone"), and +// the second throws spdlog::spdlog_ex("logger with name 'keystone' already +// exists"). That exception is thrown from worker threads (e.g. the backpressure +// warning path in AgentCore::receiveMessage), is never caught there, and +// terminates the process. The -O0 --coverage build widens the race window +// enough to reproduce it deterministically (see +// AgentCoreTest.BackpressureConcurrentTrigger). std::mutex& loggerInitMutex() { static std::mutex m; return m; diff --git a/src/core/message.cpp b/src/core/message.cpp index 824c28c..74f1a51 100644 --- a/src/core/message.cpp +++ b/src/core/message.cpp @@ -16,9 +16,9 @@ namespace core { // warning. // --------------------------------------------------------------------------- _Pragma("GCC diagnostic push") -_Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") + _Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") -KeystoneMessage::KeystoneMessage() = default; + KeystoneMessage::KeystoneMessage() = default; KeystoneMessage::KeystoneMessage(const KeystoneMessage&) = default; KeystoneMessage::KeystoneMessage(KeystoneMessage&&) noexcept = default; KeystoneMessage& KeystoneMessage::operator=(const KeystoneMessage&) = default; @@ -28,24 +28,24 @@ KeystoneMessage::~KeystoneMessage() = default; _Pragma("GCC diagnostic pop") -namespace { -// Simple UUID generation (not cryptographically secure, but sufficient for -// Phase 1) Thread-safe: uses thread_local to avoid data races across threads -std::string generate_uuid() { - thread_local std::random_device rd; - thread_local std::mt19937 gen(rd()); - thread_local std::uniform_int_distribution<> dis(0, 15); - static const char* hex = "0123456789abcdef"; - - std::stringstream ss; - for (int32_t i = 0; i < 32; ++i) { - if (i == 8 || i == 12 || i == 16 || i == 20) { - ss << '-'; + namespace { + // Simple UUID generation (not cryptographically secure, but sufficient for + // Phase 1) Thread-safe: uses thread_local to avoid data races across threads + std::string generate_uuid() { + thread_local std::random_device rd; + thread_local std::mt19937 gen(rd()); + thread_local std::uniform_int_distribution<> dis(0, 15); + static const char* hex = "0123456789abcdef"; + + std::stringstream ss; + for (int32_t i = 0; i < 32; ++i) { + if (i == 8 || i == 12 || i == 16 || i == 20) { + ss << '-'; + } + ss << hex[dis(gen)]; } - ss << hex[dis(gen)]; + return ss.str(); } - return ss.str(); -} } // namespace KeystoneMessage KeystoneMessage::create( @@ -56,10 +56,9 @@ KeystoneMessage KeystoneMessage::create( msg.sender_id = sender; msg.receiver_id = receiver; _Pragma("GCC diagnostic push") - _Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") - msg.command = cmd; - _Pragma("GCC diagnostic pop") - msg.payload = data; + _Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") + msg.command = cmd; + _Pragma("GCC diagnostic pop") msg.payload = data; msg.timestamp = std::chrono::system_clock::now(); // Initialize new fields with defaults for backward compatibility @@ -88,12 +87,12 @@ KeystoneMessage KeystoneMessage::create(const std::string& sender, // Legacy field: set command based on action type _Pragma("GCC diagnostic push") - _Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") - msg.command = actionTypeToString(action); + _Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") + msg.command = actionTypeToString(action); _Pragma("GCC diagnostic pop") - // Phase C: Initialize priority and deadline (FIX: was missing!) - msg.priority = Priority::NORMAL; + // Phase C: Initialize priority and deadline (FIX: was missing!) + msg.priority = Priority::NORMAL; msg.deadline = std::nullopt; return msg; diff --git a/src/core/message_bus.cpp b/src/core/message_bus.cpp index 4efe5f0..c346760 100644 --- a/src/core/message_bus.cpp +++ b/src/core/message_bus.cpp @@ -1,12 +1,12 @@ #include "core/message_bus.hpp" +#include + #include "concurrency/work_stealing_scheduler.hpp" #include "core/message_serializer.hpp" #include "core/metrics.hpp" #include "core/subject_validator.hpp" -#include - namespace keystone { namespace core { @@ -34,7 +34,8 @@ void MessageBus::registerAgent(const std::string& agent_id, // FIX P2-10: Enforce maximum agent limit to prevent DoS if (agents_.size() >= Config::MAX_AGENTS) { - throw std::runtime_error("Maximum agent count exceeded: " + std::to_string(Config::MAX_AGENTS)); + throw std::runtime_error("Maximum agent count exceeded: " + + std::to_string(Config::MAX_AGENTS)); } // Phase A2: Intern the agent_id string to get integer ID @@ -117,7 +118,8 @@ bool MessageBus::routeMessage(const KeystoneMessage& msg) { } // ✅ Lock released before external calls // Load scheduler atomically (thread-safe) - concurrency::WorkStealingScheduler* sched = scheduler_.load(std::memory_order_acquire); + concurrency::WorkStealingScheduler* sched = + scheduler_.load(std::memory_order_acquire); // Record message sent to metrics for tracking Metrics::getInstance().recordMessageSent(msg.msg_id, msg.priority); @@ -164,12 +166,15 @@ std::vector MessageBus::listAgents() const { } void MessageBus::setNatsPublisher( - std::function payload)> publisher) { + std::function payload)> + publisher) { std::lock_guard lock(nats_publisher_mutex_); nats_publisher_ = std::move(publisher); } -std::function payload)> +std::function payload)> MessageBus::getNatsPublisher() const { std::lock_guard lock(nats_publisher_mutex_); return nats_publisher_; diff --git a/src/core/message_pool.cpp b/src/core/message_pool.cpp index 97efc8f..c152654 100644 --- a/src/core/message_pool.cpp +++ b/src/core/message_pool.cpp @@ -46,10 +46,9 @@ void MessagePool::release(KeystoneMessage&& msg) { msg.sender_id.clear(); msg.receiver_id.clear(); _Pragma("GCC diagnostic push") - _Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") - msg.command.clear(); - _Pragma("GCC diagnostic pop") - msg.payload.reset(); + _Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") + msg.command.clear(); + _Pragma("GCC diagnostic pop") msg.payload.reset(); msg.priority = Priority::NORMAL; msg.deadline.reset(); // timestamp will be overwritten on next use diff --git a/src/core/message_serializer.cpp b/src/core/message_serializer.cpp index 0faac53..79e1807 100644 --- a/src/core/message_serializer.cpp +++ b/src/core/message_serializer.cpp @@ -22,14 +22,15 @@ SerializableMessage SerializableMessage::fromKeystoneMessage( smsg.content_type = static_cast(msg.content_type); _Pragma("GCC diagnostic push") - _Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") - smsg.command = cista::offset::string{msg.command.c_str()}; + _Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") + smsg.command = cista::offset::string{msg.command.c_str()}; _Pragma("GCC diagnostic pop") - if (msg.payload.has_value()) { + if (msg.payload.has_value()) { smsg.payload = cista::offset::string{msg.payload.value().c_str()}; smsg.has_payload = true; - } else { + } + else { smsg.payload = cista::offset::string{""}; smsg.has_payload = false; } @@ -63,13 +64,14 @@ KeystoneMessage SerializableMessage::toKeystoneMessage() const { msg.content_type = static_cast(content_type); _Pragma("GCC diagnostic push") - _Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") - msg.command = std::string{command.data(), command.size()}; + _Pragma("GCC diagnostic ignored \"-Wdeprecated-declarations\"") + msg.command = std::string{command.data(), command.size()}; _Pragma("GCC diagnostic pop") - if (has_payload) { + if (has_payload) { msg.payload = std::string{payload.data(), payload.size()}; - } else { + } + else { msg.payload = std::nullopt; } diff --git a/src/daemon/CMakeLists.txt b/src/daemon/CMakeLists.txt index 3650e21..2b8ba21 100644 --- a/src/daemon/CMakeLists.txt +++ b/src/daemon/CMakeLists.txt @@ -1,7 +1,7 @@ # Keystone daemon — the production service binary. # -# This target packages the Keystone transport daemon as a deployable binary. -# It depends on keystone_core (MessageBus, health check), keystone_transport +# This target packages the Keystone transport daemon as a deployable binary. It +# depends on keystone_core (MessageBus, health check), keystone_transport # (NatsConnection, NATSListener), and the nats.c library transitively. # # The daemon is installed to ${CMAKE_INSTALL_BINDIR}/keystone-server as part of @@ -12,8 +12,7 @@ add_executable(keystone_server main.cpp) target_include_directories( - keystone_server - PRIVATE $) + keystone_server PRIVATE $) target_link_libraries( keystone_server @@ -24,7 +23,5 @@ target_link_libraries( set_target_properties(keystone_server PROPERTIES OUTPUT_NAME "keystone-server") -install( - TARGETS keystone_server - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - COMPONENT keystone) +install(TARGETS keystone_server RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + COMPONENT keystone) diff --git a/src/daemon/main.cpp b/src/daemon/main.cpp index 91f220c..45b8993 100644 --- a/src/daemon/main.cpp +++ b/src/daemon/main.cpp @@ -1,10 +1,3 @@ -#include "core/message_bus.hpp" -#include "monitoring/health_check_server.hpp" -#include "monitoring/nats_status.hpp" -#include "network/nats_listener.hpp" -#include "transport/nats_connection.hpp" -#include "transport/transparent_bridge.hpp" - #include #include #include @@ -13,6 +6,13 @@ #include #include +#include "core/message_bus.hpp" +#include "monitoring/health_check_server.hpp" +#include "monitoring/nats_status.hpp" +#include "network/nats_listener.hpp" +#include "transport/nats_connection.hpp" +#include "transport/transparent_bridge.hpp" + namespace { std::atomic g_stop{false}; @@ -31,7 +31,8 @@ int main() { std::signal(SIGINT, signalHandler); keystone::monitoring::NatsStatusTracker nats_status; - keystone::monitoring::HealthCheckServer health_server(8080, nullptr, &nats_status); + keystone::monitoring::HealthCheckServer health_server(8080, nullptr, + &nats_status); if (!health_server.start()) { std::cerr << "keystone-daemon: failed to start health check server\n"; @@ -69,7 +70,8 @@ int main() { // DAG-advance callback: log the event (production code would call the real // DAG advancer once it is wired in from ProjectAgamemnon). auto dag_advance = [](std::string_view team_id, std::string_view task_id) { - std::cout << "keystone-daemon: dag_advance team=" << team_id << " task=" << task_id << '\n'; + std::cout << "keystone-daemon: dag_advance team=" << team_id + << " task=" << task_id << '\n'; }; keystone::transport::NatsConnection nats_conn(nats_cfg); @@ -83,8 +85,10 @@ int main() { // Wire NatsStatusTracker callbacks into NATS connection lifecycle (Issue // #210). - nats_conn.setDisconnectedCallback([&nats_status]() { nats_status.setDisconnected(); }); - nats_conn.setReconnectedCallback([&nats_status]() { nats_status.setConnected(); }); + nats_conn.setDisconnectedCallback( + [&nats_status]() { nats_status.setDisconnected(); }); + nats_conn.setReconnectedCallback( + [&nats_status]() { nats_status.setConnected(); }); // Attempt to connect to NATS; log a warning but continue if unavailable so // the health endpoint remains reachable. @@ -97,20 +101,22 @@ int main() { natsStatus bridge_s = bridge.attach(); if (bridge_s != NATS_OK) { std::cerr << "keystone-daemon: TransparentBridge::attach failed status=" - << static_cast(bridge_s) << " (continuing without bridge)\n"; + << static_cast(bridge_s) + << " (continuing without bridge)\n"; } else { - std::cout << "keystone-daemon: TransparentBridge attached subject=hi.agents.>\n"; + std::cout << "keystone-daemon: TransparentBridge attached " + "subject=hi.agents.>\n"; } jsCtx* js = nats_conn.jsContext(); if (js != nullptr) { natsStatus s = listener.start(js); if (s != NATS_OK) { - std::cerr << "keystone-daemon: NATSListener::start failed status=" << static_cast(s) - << " (continuing without NATS)\n"; + std::cerr << "keystone-daemon: NATSListener::start failed status=" + << static_cast(s) << " (continuing without NATS)\n"; } else { - std::cout << "keystone-daemon: NATSListener active subject=" << listener_cfg.subject - << '\n'; + std::cout << "keystone-daemon: NATSListener active subject=" + << listener_cfg.subject << '\n'; } } else { std::cerr << "keystone-daemon: failed to obtain JetStream context " diff --git a/src/transport/nats_connection.cpp b/src/transport/nats_connection.cpp index b6c971d..01fc807 100644 --- a/src/transport/nats_connection.cpp +++ b/src/transport/nats_connection.cpp @@ -5,6 +5,9 @@ #include "transport/nats_connection.hpp" +#include +#include + #include #include #include @@ -12,9 +15,6 @@ #include #include -#include -#include - namespace keystone { namespace transport { @@ -117,11 +117,13 @@ void NatsTlsConfig::validate() const { // cachedTlsEnvVars() reads the environment exactly once (thread-safe static // initialisation); see the implementation note in the anonymous namespace. const TlsEnvVars& env = cachedTlsEnvVars(); - std::string cert_path = env.cert_path.empty() ? client_cert_path : env.cert_path; + std::string cert_path = + env.cert_path.empty() ? client_cert_path : env.cert_path; std::string key_path = env.key_path.empty() ? client_key_path : env.key_path; // Both must be set or both must be empty - if ((!cert_path.empty() && key_path.empty()) || (cert_path.empty() && !key_path.empty())) { + if ((!cert_path.empty() && key_path.empty()) || + (cert_path.empty() && !key_path.empty())) { throw std::invalid_argument( "NatsTlsConfig: client certificate and key must both be set or both " "be empty; cert_path='" + @@ -133,11 +135,10 @@ void NatsTlsConfig::validate() const { // Construction / destruction // --------------------------------------------------------------------------- -NatsConnection::NatsConnection(NatsConfig config) : config_(std::move(config)) {} +NatsConnection::NatsConnection(NatsConfig config) + : config_(std::move(config)) {} -NatsConnection::~NatsConnection() { - disconnect(); -} +NatsConnection::~NatsConnection() { disconnect(); } // --------------------------------------------------------------------------- // Callback registration @@ -192,22 +193,27 @@ bool NatsConnection::applyTlsOptions(natsOptions* opts) const { const TlsEnvVars& env = cachedTlsEnvVars(); std::string ca_path = env.ca_path.empty() ? tls.ca_cert_path : env.ca_path; if (!ca_path.empty()) { - if (natsOptions_LoadCATrustedCertificates(opts, ca_path.c_str()) != NATS_OK) { - spdlog::error("NatsConnection: failed to load CA certificate from {}", ca_path); + if (natsOptions_LoadCATrustedCertificates(opts, ca_path.c_str()) != + NATS_OK) { + spdlog::error("NatsConnection: failed to load CA certificate from {}", + ca_path); return false; } } // Client certificate (mutual TLS): env vars take precedence over config // fields - std::string cert_path = env.cert_path.empty() ? tls.client_cert_path : env.cert_path; - std::string key_path = env.key_path.empty() ? tls.client_key_path : env.key_path; + std::string cert_path = + env.cert_path.empty() ? tls.client_cert_path : env.cert_path; + std::string key_path = + env.key_path.empty() ? tls.client_key_path : env.key_path; if (!cert_path.empty() && !key_path.empty()) { - if (natsOptions_LoadCertificatesChain(opts, cert_path.c_str(), key_path.c_str()) != NATS_OK) { - spdlog::error("NatsConnection: failed to load client certificate from {} / {}", - cert_path, - key_path); + if (natsOptions_LoadCertificatesChain(opts, cert_path.c_str(), + key_path.c_str()) != NATS_OK) { + spdlog::error( + "NatsConnection: failed to load client certificate from {} / {}", + cert_path, key_path); return false; } } @@ -253,7 +259,8 @@ bool NatsConnection::connect() { } // Reconnection policy - if (natsOptions_SetMaxReconnect(opts, config_.max_reconnect_attempts) != NATS_OK) { + if (natsOptions_SetMaxReconnect(opts, config_.max_reconnect_attempts) != + NATS_OK) { return false; } @@ -277,16 +284,20 @@ bool NatsConnection::connect() { } // Lifecycle callbacks — pass `this` as closure so static shims can dispatch - if (natsOptions_SetErrorHandler(opts, NatsConnection::onError, this) != NATS_OK) { + if (natsOptions_SetErrorHandler(opts, NatsConnection::onError, this) != + NATS_OK) { return false; } - if (natsOptions_SetDisconnectedCB(opts, NatsConnection::onDisconnected, this) != NATS_OK) { + if (natsOptions_SetDisconnectedCB(opts, NatsConnection::onDisconnected, + this) != NATS_OK) { return false; } - if (natsOptions_SetReconnectedCB(opts, NatsConnection::onReconnected, this) != NATS_OK) { + if (natsOptions_SetReconnectedCB(opts, NatsConnection::onReconnected, this) != + NATS_OK) { return false; } - if (natsOptions_SetClosedCB(opts, NatsConnection::onClosed, this) != NATS_OK) { + if (natsOptions_SetClosedCB(opts, NatsConnection::onClosed, this) != + NATS_OK) { return false; } @@ -323,8 +334,9 @@ jsCtx* NatsConnection::jsContext() noexcept { } const natsStatus status = natsConnection_JetStream(&js_ctx_, conn_, nullptr); if (status != NATS_OK) { - spdlog::error("NatsConnection::jsContext: natsConnection_JetStream failed: {}", - natsStatus_GetText(status)); + spdlog::error( + "NatsConnection::jsContext: natsConnection_JetStream failed: {}", + natsStatus_GetText(status)); js_ctx_ = nullptr; return nullptr; } @@ -343,19 +355,15 @@ bool NatsConnection::isConnected() const noexcept { return getState() == NatsConnectionState::CONNECTED; } -natsConnection* NatsConnection::handle() const noexcept { - return conn_; -} +natsConnection* NatsConnection::handle() const noexcept { return conn_; } // --------------------------------------------------------------------------- // Static callback shims // --------------------------------------------------------------------------- // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) -void NatsConnection::onError(natsConnection* /*nc*/, - natsSubscription* /*sub*/, - natsStatus err, - void* closure) noexcept { +void NatsConnection::onError(natsConnection* /*nc*/, natsSubscription* /*sub*/, + natsStatus err, void* closure) noexcept { auto* self = static_cast(closure); ErrorCallback cb; { @@ -368,9 +376,11 @@ void NatsConnection::onError(natsConnection* /*nc*/, } } -void NatsConnection::onDisconnected(natsConnection* /*nc*/, void* closure) noexcept { +void NatsConnection::onDisconnected(natsConnection* /*nc*/, + void* closure) noexcept { auto* self = static_cast(closure); - self->state_.store(NatsConnectionState::RECONNECTING, std::memory_order_release); + self->state_.store(NatsConnectionState::RECONNECTING, + std::memory_order_release); DisconnectedCallback cb; { std::lock_guard lock(self->callbacks_mutex_); @@ -381,7 +391,8 @@ void NatsConnection::onDisconnected(natsConnection* /*nc*/, void* closure) noexc } } -void NatsConnection::onReconnected(natsConnection* /*nc*/, void* closure) noexcept { +void NatsConnection::onReconnected(natsConnection* /*nc*/, + void* closure) noexcept { auto* self = static_cast(closure); self->state_.store(NatsConnectionState::CONNECTED, std::memory_order_release); ReconnectedCallback cb; @@ -411,14 +422,16 @@ void NatsConnection::onClosed(natsConnection* /*nc*/, void* closure) noexcept { // Exception mapping (ADR-014: exception contract) // --------------------------------------------------------------------------- -void NatsConnection::throwForNatsStatus(natsStatus status, const std::string& context) { +void NatsConnection::throwForNatsStatus(natsStatus status, + const std::string& context) { if (status == NATS_OK) { return; // No error } const char* nats_text = natsStatus_GetText(status); - std::string error_msg = context + ": " + (nats_text != nullptr ? nats_text : "unknown error") + - " (nats_status=" + std::to_string(static_cast(status)) + ")"; + std::string error_msg = + context + ": " + (nats_text != nullptr ? nats_text : "unknown error") + + " (nats_status=" + std::to_string(static_cast(status)) + ")"; NatsErrorCategory category = categorizeNatsError(status); @@ -427,7 +440,8 @@ void NatsConnection::throwForNatsStatus(natsStatus status, const std::string& co throw std::domain_error(error_msg); case NatsErrorCategory::kTransient: - throw std::system_error(std::error_code(EAGAIN, std::generic_category()), error_msg); + throw std::system_error(std::error_code(EAGAIN, std::generic_category()), + error_msg); case NatsErrorCategory::kPermanent: throw std::runtime_error(error_msg); @@ -443,11 +457,13 @@ NatsMsgPtr NatsConnection::fetch(std::string_view subject, int64_t timeout_ms) { jsCtx* js = jsContext(); if (js == nullptr) { - throw std::runtime_error("NatsConnection::fetch: not connected to NATS (jsContext is null)"); + throw std::runtime_error( + "NatsConnection::fetch: not connected to NATS (jsContext is null)"); } if (subject.empty() || consumer_name.empty()) { - throw std::domain_error("NatsConnection::fetch: subject and consumer_name must not be empty"); + throw std::domain_error( + "NatsConnection::fetch: subject and consumer_name must not be empty"); } // Subscribe to the subject with durable consumer semantics @@ -457,15 +473,16 @@ NatsMsgPtr NatsConnection::fetch(std::string_view subject, sub_opts.Config.MaxAckPending = 1; // Rate-limiting per CLAUDE.md natsSubscription* sub = nullptr; - natsStatus s = js_Subscribe( - &sub, js, std::string(subject).c_str(), nullptr, nullptr, nullptr, &sub_opts, nullptr); + natsStatus s = js_Subscribe(&sub, js, std::string(subject).c_str(), nullptr, + nullptr, nullptr, &sub_opts, nullptr); if (s != NATS_OK) { throwForNatsStatus(s, "NatsConnection::fetch subscribe"); } if (sub == nullptr) { - throw std::runtime_error("NatsConnection::fetch: subscription returned null"); + throw std::runtime_error( + "NatsConnection::fetch: subscription returned null"); } // Fetch a single message with timeout using natsMsgList diff --git a/src/transport/transparent_bridge.cpp b/src/transport/transparent_bridge.cpp index b142a31..5729b9d 100644 --- a/src/transport/transparent_bridge.cpp +++ b/src/transport/transparent_bridge.cpp @@ -1,8 +1,6 @@ #include "transport/transparent_bridge.hpp" -#include "core/message_bus.hpp" -#include "core/message_serializer.hpp" -#include "transport/nats_connection.hpp" +#include #include #include @@ -14,7 +12,9 @@ #include #include -#include +#include "core/message_bus.hpp" +#include "core/message_serializer.hpp" +#include "transport/nats_connection.hpp" namespace keystone { namespace transport { @@ -31,12 +31,11 @@ std::string deriveNatsSubject(std::string_view receiver_id) { // TransparentBridge // --------------------------------------------------------------------------- -TransparentBridge::TransparentBridge(core::MessageBus& bus, NatsConnection& conn, BridgeConfig cfg) +TransparentBridge::TransparentBridge(core::MessageBus& bus, + NatsConnection& conn, BridgeConfig cfg) : bus_(bus), conn_(conn), cfg_(std::move(cfg)) {} -TransparentBridge::~TransparentBridge() { - stop(); -} +TransparentBridge::~TransparentBridge() { stop(); } natsStatus TransparentBridge::attach() { // ------------------------------------------------------------------------- @@ -44,23 +43,22 @@ natsStatus TransparentBridge::attach() { // MessageBus::routeMessage() serialises the KeystoneMessage and calls this // lambda with (subject, serialized_bytes) when local lookup fails (#512). // ------------------------------------------------------------------------- - bus_.setNatsPublisher([this](std::string_view subject, std::span payload) { - natsConnection* nc = conn_.handle(); - if (nc == nullptr || payload.empty()) { - return; - } - natsStatus s = natsConnection_Publish(nc, - subject.data(), - reinterpret_cast(payload.data()), - static_cast(payload.size())); - if (s != NATS_OK) { - spdlog::error( - "TransparentBridge: natsConnection_Publish failed subject={} " - "status={}", - subject, - static_cast(s)); - } - }); + bus_.setNatsPublisher( + [this](std::string_view subject, std::span payload) { + natsConnection* nc = conn_.handle(); + if (nc == nullptr || payload.empty()) { + return; + } + natsStatus s = natsConnection_Publish( + nc, subject.data(), reinterpret_cast(payload.data()), + static_cast(payload.size())); + if (s != NATS_OK) { + spdlog::error( + "TransparentBridge: natsConnection_Publish failed subject={} " + "status={}", + subject, static_cast(s)); + } + }); // ------------------------------------------------------------------------- // Inbound path: subscribe to cfg_.inbound_subject and start pull loop. @@ -82,16 +80,14 @@ natsStatus TransparentBridge::attach() { for (int attempt = 1; attempt <= attempts; ++attempt) { jsErrCode jerr = static_cast(0); - s = js_Subscribe( - &sub_, js, cfg_.inbound_subject.c_str(), nullptr, nullptr, nullptr, &sub_opts, &jerr); + s = js_Subscribe(&sub_, js, cfg_.inbound_subject.c_str(), nullptr, nullptr, + nullptr, &sub_opts, &jerr); if (s == NATS_OK) { break; } - spdlog::warn("TransparentBridge: subscribe attempt {}/{} failed status={} jerr={}", - attempt, - attempts, - static_cast(s), - static_cast(jerr)); + spdlog::warn( + "TransparentBridge: subscribe attempt {}/{} failed status={} jerr={}", + attempt, attempts, static_cast(s), static_cast(jerr)); } if (s != NATS_OK) { @@ -106,7 +102,8 @@ natsStatus TransparentBridge::attach() { try { inbound_thread_ = std::thread(&TransparentBridge::inbound_loop, this); } catch (const std::exception& ex) { - spdlog::error("TransparentBridge: failed to start inbound thread: {}", ex.what()); + spdlog::error("TransparentBridge: failed to start inbound thread: {}", + ex.what()); natsSubscription_Unsubscribe(sub_); natsSubscription_Destroy(sub_); sub_ = nullptr; @@ -152,8 +149,9 @@ void TransparentBridge::inbound_loop() noexcept { } if (s != NATS_OK) { - spdlog::error("TransparentBridge: natsSubscription_Fetch failed status={}", - static_cast(s)); + spdlog::error( + "TransparentBridge: natsSubscription_Fetch failed status={}", + static_cast(s)); std::this_thread::sleep_for(std::chrono::milliseconds(100)); continue; } @@ -182,8 +180,8 @@ void TransparentBridge::inbound_loop() noexcept { try { const auto* bytes = static_cast(data); - core::KeystoneMessage km = - core::MessageSerializer::deserialize(bytes, static_cast(data_len)); + core::KeystoneMessage km = core::MessageSerializer::deserialize( + bytes, static_cast(data_len)); // Route to local MessageBus. If no local agent is registered for this // receiver_id the message is dropped (avoid re-publishing to NATS and @@ -197,17 +195,21 @@ void TransparentBridge::inbound_loop() noexcept { } should_ack = true; } catch (const std::exception& ex) { - spdlog::error("TransparentBridge: deserialization failed: {}", ex.what()); + spdlog::error("TransparentBridge: deserialization failed: {}", + ex.what()); // nak — allow redelivery } catch (...) { - spdlog::error("TransparentBridge: deserialization threw unknown exception"); + spdlog::error( + "TransparentBridge: deserialization threw unknown exception"); // nak } }(); - natsStatus ack_s = should_ack ? natsMsg_Ack(msg, nullptr) : natsMsg_Nak(msg, nullptr); + natsStatus ack_s = + should_ack ? natsMsg_Ack(msg, nullptr) : natsMsg_Nak(msg, nullptr); if (ack_s != NATS_OK) { - spdlog::warn("TransparentBridge: ack/nak failed status={}", static_cast(ack_s)); + spdlog::warn("TransparentBridge: ack/nak failed status={}", + static_cast(ack_s)); } natsMsg_Destroy(msg); } diff --git a/tests/unit/test_message_serializer.cpp b/tests/unit/test_message_serializer.cpp index 8a2e165..7d0c3b0 100644 --- a/tests/unit/test_message_serializer.cpp +++ b/tests/unit/test_message_serializer.cpp @@ -68,9 +68,8 @@ TEST(MessageSerializerTest, DifferentActionTypes) { // Test: Serialize different content types TEST(MessageSerializerTest, DifferentContentTypes) { - auto msg1 = - KeystoneMessage::create("agent1", "agent2", ActionType::EXECUTE, - "text data", ContentType::TEXT_PLAIN); + auto msg1 = KeystoneMessage::create("agent1", "agent2", ActionType::EXECUTE, + "text data", ContentType::TEXT_PLAIN); auto msg2 = KeystoneMessage::create("agent1", "agent2", ActionType::EXECUTE, "binary data", ContentType::BINARY_CISTA); @@ -89,8 +88,8 @@ TEST(MessageSerializerTest, DifferentContentTypes) { TEST(MessageSerializerTest, LargePayload) { std::string large_payload(10000, 'x'); // 10KB payload - auto msg = KeystoneMessage::create( - "agent1", "agent2", ActionType::RETURN_RESULT, large_payload); + auto msg = KeystoneMessage::create("agent1", "agent2", + ActionType::RETURN_RESULT, large_payload); auto buffer = MessageSerializer::serialize(msg); auto deserialized = MessageSerializer::deserialize(buffer); @@ -133,9 +132,9 @@ TEST(MessageSerializerTest, TimestampPreservation) { // Test: Special characters in strings TEST(MessageSerializerTest, SpecialCharacters) { - auto msg = KeystoneMessage::create( - "agent-1.test", "agent@2#special", ActionType::EXECUTE, - "payload with\nnewlines\tand\ttabs"); + auto msg = KeystoneMessage::create("agent-1.test", "agent@2#special", + ActionType::EXECUTE, + "payload with\nnewlines\tand\ttabs"); auto buffer = MessageSerializer::serialize(msg); auto deserialized = MessageSerializer::deserialize(buffer); diff --git a/tests/unit/test_nats_connection.cpp b/tests/unit/test_nats_connection.cpp index f4ef5dc..2d43560 100644 --- a/tests/unit/test_nats_connection.cpp +++ b/tests/unit/test_nats_connection.cpp @@ -35,14 +35,14 @@ * the definitive oracle that the fix is correct. */ -#include "transport/nats_connection.hpp" +#include #include #include #include #include -#include +#include "transport/nats_connection.hpp" using namespace keystone::transport; @@ -54,7 +54,9 @@ class NatsConnectionTestPeer : public NatsConnection { public: using NatsConnection::NatsConnection; - void fireError() { NatsConnection::onError(nullptr, nullptr, static_cast(0), this); } + void fireError() { + NatsConnection::onError(nullptr, nullptr, static_cast(0), this); + } void fireDisconnected() { NatsConnection::onDisconnected(nullptr, this); } void fireReconnected() { NatsConnection::onReconnected(nullptr, this); } @@ -495,15 +497,18 @@ TEST_F(NatsJsContextTest, JsContextNullDoesNotAffectOtherMethods) { class NatsFetchOwnershipTest : public ::testing::Test { protected: - NatsConnectionTestPeer conn_; // never connected — jsContext() returns nullptr + NatsConnectionTestPeer + conn_; // never connected — jsContext() returns nullptr }; // --- Static type check ------------------------------------------------- // NatsMsgPtr must be a specialisation of std::unique_ptr whose element type is // natsMsg and whose deleter is a function pointer (not a stateful object). -static_assert(std::is_same_v>, - "NatsMsgPtr must be unique_ptr"); +static_assert( + std::is_same_v>, + "NatsMsgPtr must be unique_ptr"); // --- Runtime tests ------------------------------------------------------ @@ -530,7 +535,8 @@ TEST_F(NatsFetchOwnershipTest, FetchThrowsRuntimeErrorWhenNotConnected) { // fetch() must throw std::runtime_error when jsContext() returns nullptr // (i.e., the connection was never established). This confirms the guard // at the top of the implementation is intact after the RAII refactor. - EXPECT_THROW(conn_.fetch("hi.tasks.>", "my-consumer", 5000), std::runtime_error); + EXPECT_THROW(conn_.fetch("hi.tasks.>", "my-consumer", 5000), + std::runtime_error); } TEST_F(NatsFetchOwnershipTest, FetchThrowsRuntimeErrorBeforeDomainCheck) { @@ -589,7 +595,8 @@ TEST_F(NatsTlsValidateStructFieldsTest, KeyStructFieldOnlyThrows) { EXPECT_THROW(tls.validate(), std::invalid_argument); } -TEST_F(NatsTlsValidateStructFieldsTest, ValidateCalledMultipleTimesIsIdempotent) { +TEST_F(NatsTlsValidateStructFieldsTest, + ValidateCalledMultipleTimesIsIdempotent) { // Calling validate() multiple times on a valid config must not throw and // must not corrupt state. This also exercises the static-cache path being // called repeatedly — safe because cachedTlsEnvVars() returns a const ref. diff --git a/tests/unit/test_subject_validator.cpp b/tests/unit/test_subject_validator.cpp index a42e807..ca32302 100644 --- a/tests/unit/test_subject_validator.cpp +++ b/tests/unit/test_subject_validator.cpp @@ -11,15 +11,15 @@ * (Issue #280). */ +#include + +#include + #include "core/message.hpp" #include "core/message_bus.hpp" #include "core/message_sink.hpp" #include "core/subject_validator.hpp" -#include - -#include - namespace { // Minimal non-agent message sink used purely as a registration fixture for the @@ -27,7 +27,8 @@ namespace { // core::IMessageSink (the agent layer was extracted to ProjectAgamemnon per // ADR-015), so these tests no longer need a concrete agent type. struct StubSink : public keystone::core::IMessageSink { - void receiveMessage(const keystone::core::KeystoneMessage& /*msg*/) override {} + void receiveMessage(const keystone::core::KeystoneMessage& /*msg*/) override { + } }; } // namespace diff --git a/tests/unit/test_transparent_bridge.cpp b/tests/unit/test_transparent_bridge.cpp index b9ad0a3..03abf8d 100644 --- a/tests/unit/test_transparent_bridge.cpp +++ b/tests/unit/test_transparent_bridge.cpp @@ -14,12 +14,7 @@ * NatsConnection has no JetStream context (not connected) */ -#include "core/message.hpp" -#include "core/message_bus.hpp" -#include "core/message_serializer.hpp" -#include "core/message_sink.hpp" -#include "transport/nats_connection.hpp" -#include "transport/transparent_bridge.hpp" +#include #include #include @@ -30,7 +25,12 @@ #include #include -#include +#include "core/message.hpp" +#include "core/message_bus.hpp" +#include "core/message_serializer.hpp" +#include "core/message_sink.hpp" +#include "transport/nats_connection.hpp" +#include "transport/transparent_bridge.hpp" using namespace keystone::core; using namespace keystone::transport; @@ -67,11 +67,13 @@ TEST(MessageBusOutbound, ForwardsOffHostViaPublisher) { std::string captured_subject; std::vector captured_payload; - bus.setNatsPublisher([&](std::string_view subject, std::span payload) { - captured_subject = std::string(subject); - captured_payload.assign(reinterpret_cast(payload.data()), - reinterpret_cast(payload.data()) + payload.size()); - }); + bus.setNatsPublisher( + [&](std::string_view subject, std::span payload) { + captured_subject = std::string(subject); + captured_payload.assign( + reinterpret_cast(payload.data()), + reinterpret_cast(payload.data()) + payload.size()); + }); auto msg = KeystoneMessage::create("sender", "off-host-agent", "ping"); // No local agent registered → should forward via NATS publisher. @@ -90,19 +92,21 @@ TEST(MessageBusOutbound, OutboundPayloadRoundTrips) { std::vector captured_payload; - bus.setNatsPublisher([&](std::string_view /*subject*/, std::span payload) { - captured_payload.assign(reinterpret_cast(payload.data()), - reinterpret_cast(payload.data()) + payload.size()); - }); + bus.setNatsPublisher( + [&](std::string_view /*subject*/, std::span payload) { + captured_payload.assign( + reinterpret_cast(payload.data()), + reinterpret_cast(payload.data()) + payload.size()); + }); - auto msg = KeystoneMessage::create( - "alice", "remote-bob", ActionType::EXECUTE, std::string("hello remote")); + auto msg = KeystoneMessage::create("alice", "remote-bob", ActionType::EXECUTE, + std::string("hello remote")); bus.routeMessage(msg); ASSERT_FALSE(captured_payload.empty()); - KeystoneMessage decoded = MessageSerializer::deserialize(captured_payload.data(), - captured_payload.size()); + KeystoneMessage decoded = MessageSerializer::deserialize( + captured_payload.data(), captured_payload.size()); EXPECT_EQ(decoded.sender_id, "alice"); EXPECT_EQ(decoded.receiver_id, "remote-bob"); @@ -117,9 +121,9 @@ TEST(MessageBusOutbound, LocalDeliveryDoesNotInvokePublisher) { MessageBus bus; std::atomic publish_calls{0}; - bus.setNatsPublisher([&](std::string_view /*subject*/, std::span /*payload*/) { - ++publish_calls; - }); + bus.setNatsPublisher( + [&](std::string_view /*subject*/, + std::span /*payload*/) { ++publish_calls; }); // Register a minimal non-agent message sink. The transport core depends only // on core::IMessageSink (the agent layer was extracted to ProjectAgamemnon @@ -165,7 +169,8 @@ TEST(TransparentBridge, StopClearsNatsPublisher) { NatsConnection conn; // Manually set a publisher to simulate what attach() would do. - bus.setNatsPublisher([](std::string_view /*s*/, std::span /*p*/) {}); + bus.setNatsPublisher( + [](std::string_view /*s*/, std::span /*p*/) {}); EXPECT_NE(bus.getNatsPublisher(), nullptr); @@ -212,9 +217,10 @@ TEST(TransparentBridge, AttachFailureStillRegistersOutboundPublisher) { // We check indirectly: routeMessage should invoke it. std::string captured_subject; // Replace with our test publisher to verify. - bus.setNatsPublisher([&](std::string_view subject, std::span /*payload*/) { - captured_subject = std::string(subject); - }); + bus.setNatsPublisher( + [&](std::string_view subject, std::span /*payload*/) { + captured_subject = std::string(subject); + }); auto msg = KeystoneMessage::create("a", "remote-x", "cmd"); bus.routeMessage(msg);