Skip to content

Migrate Knowledge Mapper SDK to fully async architecture #27

@DaviddeBest-TNO

Description

@DaviddeBest-TNO

Problem Statement

The Knowledge Mapper SDK is fully synchronous: it uses the requests library for HTTP, runs a sequential poll → dispatch → respond handling loop, and requires all handler functions and dependency factories to be synchronous. This means a slow handler (e.g. one that queries a database or calls kb.ask() to reach another KB on the network) blocks the entire handling loop — no other incoming Knowledge Interaction (KI) calls can be processed until it finishes. As the Knowledge Engine network grows and KBs participate in more complex multi-hop interactions, this sequential bottleneck becomes a real limitation.

Solution

Migrate the Knowledge Mapper SDK to a fully async architecture using asyncio and httpx. The handling loop will dispatch multiple handler invocations concurrently via asyncio.create_task, gated by a configurable semaphore. Handlers, dependency factories, and outgoing KI calls (ask()/post()) all become async-native, while still accepting synchronous handler functions (run in a thread pool) for ergonomic backward compatibility. The requests library is replaced entirely by httpx.AsyncClient.

User Stories

  1. As a KB developer, I want my REACT handler to process incoming POST calls without blocking other handlers, so that my KB can serve multiple requesters concurrently.
  2. As a KB developer, I want to write async def handlers that await async I/O (database queries, HTTP calls, file reads), so that I can use modern Python async libraries without blocking the event loop.
  3. As a KB developer, I want to write plain def handlers when I don't need async, so that simple handlers remain simple and I don't need to learn asyncio for trivial use cases.
  4. As a KB developer, I want my REACT handler to call await kb.ask(...) to query the KE network before responding, so that my KB can compose knowledge from multiple sources in a single handler invocation.
  5. As a KB developer, I want my sync handler to call kb.ask_sync(...) without understanding event loop mechanics, so that I can make outgoing KI calls from sync handlers running in a thread pool.
  6. As a KB developer, I want to configure the maximum number of concurrent handlers via max_concurrent_handlers, so that I can tune resource usage for my deployment.
  7. As a KB developer, I want the handling loop to continue processing other requests when one handler raises an exception, so that a single bad invocation doesn't crash my entire KB.
  8. As a KB developer, I want the handling loop to wait for all in-flight handlers to finish before shutting down on an EXIT signal, so that no work is lost during graceful shutdown.
  9. As a KB developer, I want to use async def dependency factories with Depends(), so that I can initialize async resources (e.g. async DB connection pools) as handler dependencies.
  10. As a KB developer, I want sync dependency factories to still work without changes, so that I don't have to rewrite existing factories.
  11. As a KB developer, I want await kb.connect(), await kb.register(), and await kb.close() lifecycle methods, so that KB setup and teardown integrate naturally into my async application.
  12. As a KB developer, I want await kb.ask(...) and await kb.post(...) to be non-blocking, so that multiple outgoing KI calls can be made concurrently (e.g. via asyncio.gather).
  13. As a test author, I want a TestClient that uses asyncio.Queue for incoming KI calls, so that concurrent poll tasks block realistically instead of spinning on empty REPOLL.
  14. As a test author, I want all existing test patterns (enqueue_handle_request, mock_result_binding_set, enqueue_exit) to work with the async TestClient, so that migrating tests is mechanical.
  15. As a KB developer, I want httpx as the sole HTTP dependency (replacing requests), so that the dependency footprint is minimal and consistent.

Implementation Decisions

Fully Async Architecture (No Sync/Async Split)

The entire SDK adopts a single async code path. There is no parallel sync ClientProtocol or sync KnowledgeBase. All ClientProtocol methods become coroutines. All KnowledgeBase lifecycle and I/O methods become async def. Users call their KB from within an asyncio.run() context. This was chosen over maintaining dual sync/async code paths to avoid duplication and the complexity of bridging two worlds.

Dual Handler Support (sync and async)

The framework accepts both def and async def handler functions. Detection uses asyncio.iscoroutinefunction(). Async handlers are awaited directly; sync handlers are run via asyncio.to_thread() so they don't block the event loop. This preserves ergonomics for simple handlers while enabling full async capabilities. The decorator wrapper in _register_ki_decorator must preserve the async-ness of the original function.

httpx Replaces requests Entirely

requests is removed as a dependency. Client uses httpx.AsyncClient internally. The AsyncClient is created in Client.__init__() (httpx does not require an active event loop at construction time) and closed via a close() method. KnowledgeBase exposes a close() that delegates to the client.

Concurrent Handling Loop with Semaphore-Gated Polling

start_handling_loop() accepts a max_concurrent_handlers parameter (default 10). The loop runs multiple concurrent poll tasks gated by an asyncio.Semaphore. Each iteration: acquire semaphore → poll SC → on HANDLE, spawn a task that runs the handler, posts the response, and releases the semaphore → loop continues polling. This naturally bounds both in-flight polls and running handlers.

Handler Error Policy: Log and Continue

When a concurrent handler task raises an exception, the error is logged (including KI name and context), an empty binding set is posted back to the SC (to unblock it), and the loop continues. The handling loop is a long-running service; one bad invocation must not crash the entire KB.

Graceful Shutdown

On receiving PollResult.EXIT, the loop stops issuing new polls and waits for all in-flight handler tasks to complete before returning. No timeout or cancellation — handlers run to completion.

Sync Bridge for Outgoing Calls

ask_sync() and post_sync() convenience methods on KnowledgeBase use asyncio.run_coroutine_threadsafe() to schedule the async call on the running event loop and block the current thread until the result is ready. This enables sync handlers (running in to_thread) to make outgoing KI calls without understanding event loop mechanics. The KnowledgeBase stores a reference to the running event loop (captured when start_handling_loop() begins).

Async Dependency Injection

resolve_dependencies() becomes async def. It detects whether each Depends factory is a coroutine function and either awaits it directly or runs it via to_thread (for sync factories). Caching semantics are unchanged. The resolver is called from KnowledgeInteractionContext.dispatch(), which is also now async def.

TestClient with asyncio.Queue

TestClient._incoming_calls changes from collections.deque to asyncio.Queue. poll_ki_call() becomes async def and uses await self._incoming_calls.get(), which naturally blocks until something is enqueued. This accurately simulates the SC's long-polling behavior and prevents spin loops in tests with concurrent poll tasks. Test helper methods (enqueue_handle_request, mock_result_binding_set, enqueue_exit) remain synchronous — they just call queue.put_nowait().

Decorators Remain Synchronous

@kb.answer_ki() and @kb.react_ki() decorators do not become async. They only register the handler and KI info in the local registry (no I/O with defer_ke_registration=True). The wrapper function inside the decorator is updated to detect and preserve async handler signatures.

KnowledgeBaseBuilder Unchanged

KnowledgeBaseBuilder.build() remains synchronous — it configures and returns a KnowledgeBase whose lifecycle methods are then called asynchronously by the user. No changes to the builder API.

Modules to Build/Modify

  1. ke/client.pyClientProtocol becomes fully async. Client migrates from requests to httpx.AsyncClient. Add close() method to both protocol and implementation.
  2. knowledge_interaction.pydispatch() becomes async def. Handler type alias updated to accept both sync and async callables. resolve_dependencies call becomes awaited.
  3. dependency_injection.pyresolve_dependencies() becomes async def. Adds async factory detection via asyncio.iscoroutinefunction().
  4. kb/knowledge_base.py — All lifecycle/IO methods become async def. Handling loop redesigned with concurrent polling + semaphore. Add ask_sync()/post_sync() bridge methods, close(), and event loop reference.
  5. testing/fake_client.py — All ClientProtocol methods become async def. _incoming_calls changes to asyncio.Queue. poll_ki_call blocks until queue has items.
  6. kb/builder.py — Minor: build() stays sync, but returned KB has async methods.
  7. pyproject.toml — Replace requests with httpx. Add pytest-asyncio as a test dependency.

Testing Decisions

What Makes a Good Test

Tests should verify external behavior through the public API, not internal implementation details. A good test:

  • Sets up a KnowledgeBase with a TestClient, registers KIs, and asserts on the results of handler invocations, outgoing calls, or lifecycle transitions.
  • Does not assert on internal data structures like _ki_registry_by_id layout or semaphore internals.
  • Uses @pytest.mark.asyncio and async def test functions throughout.

Modules to Test

  1. ke/client.py — Test that Client makes correct HTTP calls via httpx.AsyncClient (can use httpx's built-in mock transport or respx). Test close() properly closes the underlying client.
  2. knowledge_interaction.py — Test dispatch() with async handlers, sync handlers, handlers with binding models, and handlers with dependencies. Verify that sync handlers don't block the event loop (timing-based test with concurrent dispatches).
  3. dependency_injection.py — Test resolve_dependencies() with async factories, sync factories, cached vs uncached, nested/transitive async factories, and dependency_overrides with async replacements.
  4. kb/knowledge_base.py — Test the concurrent handling loop: multiple enqueued handle requests are dispatched concurrently (not sequentially). Test graceful shutdown waits for in-flight handlers. Test error logging + continuation on handler failure. Test ask_sync()/post_sync() from a sync handler context. Test max_concurrent_handlers is respected.
  5. testing/fake_client.py — Test that poll_ki_call blocks when queue is empty and unblocks when a request is enqueued. Test enqueue_exit terminates polling.
  6. kb/builder.py — Test that build() returns a KB whose async lifecycle methods work correctly.

Prior Art

Existing tests in tests/ use the TestClient pattern: create a KnowledgeBase, inject a TestClient, register KIs, enqueue handle requests or mock results, and assert on responses. The migration to async follows the same patterns but wraps everything in async def test functions. See tests/test_handlers.py and tests/test_ask_and_post.py for the established patterns.

Out of Scope

  • Async context manager (async with KnowledgeBase(...) as kb) — can be added later as syntactic sugar.
  • WebSocket or SSE-based polling — the SC uses HTTP long-polling; changing the transport protocol is a separate concern.
  • Async CLI entry point — the config-driven CLI (issue CLI: knowledge-mapper run — start a Python-defined KnowledgeBase #10) is a separate feature; it can adopt async when implemented.
  • Connection retry/backoff — resilience features for the HTTP client are orthogonal to the async migration.
  • Structured concurrency via TaskGroup — the fire-and-forget + semaphore model is sufficient; TaskGroup can be evaluated later.
  • Shutdown timeout — graceful shutdown waits indefinitely for in-flight handlers; a configurable timeout can be added if needed.

Further Notes

  • The Knowledge Engine Smart Connector's long-polling model (GET /sc/handle) provides natural back-pressure: each poll blocks until the SC has a request. With concurrent polls gated by the semaphore, the system self-regulates.
  • httpx.AsyncClient supports HTTP/2, connection pooling, and keepalive out of the box — this is a performance improvement over per-request requests calls.
  • The ask_sync()/post_sync() bridge is a known pattern from frameworks like asgiref.sync_to_async — it uses asyncio.run_coroutine_threadsafe() which is safe for cross-thread event loop access.
  • Since there are no existing users of the current (non-legacy) SDK, there is no backward compatibility concern. The migration can be done in a single pass.
    ___BEGIN___COMMAND_DONE_MARKER___0

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestready-for-agentIssue is ready for agent implementation

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions