Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions CONTEXT.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ User's Python app (this library)
Other KBs in the network
```

**Key runtime model**: The `KnowledgeBase` registers itself and its KIs with the SC, then enters a **long-polling loop** (`start_handling_loop()`). On each poll the SC either returns an incoming KI call to handle or asks to re-poll. The KB dispatches calls to registered handler functions, serializes the result, and replies to the SC. For outgoing interactions (`ask()` / `post()`), the KB sends a request to the SC which fans out through the network.
**Key runtime model**: The `KnowledgeBase` registers itself and its KIs with the SC, then enters a **concurrent long-polling loop** (`start_handling_loop()`). The loop runs multiple poll-dispatch cycles concurrently, bounded by a semaphore (`max_concurrent_handlers`, default 10). Each cycle acquires the semaphore, polls the SC, and on HANDLE spawns an `asyncio.Task` that runs the handler, posts the response, and releases the semaphore. Handler exceptions are caught — an empty binding set is posted back so the SC doesn't hang. On EXIT, the loop stops polling and awaits all in-flight handler tasks. For outgoing interactions (`ask()` / `post()`), the KB sends a request to the SC which fans out through the network.

---

Expand Down Expand Up @@ -129,9 +129,10 @@ builder = KnowledgeBase.from_settings(settings) # settings: KnowledgeBaseSettin

#### Lifecycle
```python
kb.connect() # Verify SC is reachable (raises KnowledgeEngineNotAvailableError if not)
kb.register() # Register KB + sync all KIs with the SC (re-registers if already registered)
kb.unregister() # Unregister KB from SC (KIs automatically unregistered)
await kb.connect() # Verify SC is reachable (raises KnowledgeEngineNotAvailableError if not)
await kb.register() # Register KB + sync all KIs with the SC (re-registers if already registered)
await kb.unregister() # Unregister KB from SC (KIs automatically unregistered)
await kb.close() # Close the underlying HTTP client and release resources
```

#### Registering KIs (decorator pattern)
Expand Down Expand Up @@ -163,15 +164,16 @@ kb.post_ki(name="...", argument_graph_pattern="...", result_graph_pattern="...",
#### Outgoing interactions

```python
result = kb.ask(binding_set, ki_name="...") # Returns BindingSet or list[BindingModel]
result = kb.post(binding_set, ki_name="...") # Returns result BindingSet or list[BindingModel]
result = await kb.ask(binding_set, ki_name="...") # Returns BindingSet or list[BindingModel]
result = await kb.post(binding_set, ki_name="...") # Returns result BindingSet or list[BindingModel]
```

#### Handling loop

```python
kb.start_handling_loop() # Blocks, handles incoming KIs forever
kb.start_handling_loop(loops=10) # Runs exactly 10 poll iterations (useful for testing)
await kb.start_handling_loop() # Concurrent dispatch, up to 10 in-flight
await kb.start_handling_loop(max_concurrent_handlers=5) # Limit to 5 concurrent handlers
await kb.start_handling_loop(loops=10) # Runs exactly 10 poll cycles (useful for testing)
```

---
Expand Down Expand Up @@ -199,8 +201,8 @@ def handler(

**Behaviour:**
- The framework inspects handler signatures at registration time and resolves `Depends` params at call time.
- Dependency factories are **sync-only** (async support is out of scope).
- Factories can themselves declare `Depends` parameters — nested/transitive resolution is supported.
- Dependency factories can be **sync (`def`) or async (`async def`)** async factories are detected via `asyncio.iscoroutinefunction()` and awaited automatically; sync factories are called directly.
- Factories can themselves declare `Depends` parameters — nested/transitive resolution is supported, including mixed sync/async chains.
- `cache=True` (default): factory called once per KI-call invocation; result shared across all uses.
- `cache=False`: factory called fresh every time it is needed.

Expand Down Expand Up @@ -444,5 +446,5 @@ These are excluded from linting (`ruff`) and are kept for historical reference o
- **KI registry indexed by ID after registration**: `KnowledgeBase` maintains a secondary index (`_ki_registry_by_id`) populated once a KI is registered with the SC and assigned an ID. The handling loop dispatches by ID using this index.
- **Handler introspection**: `KnowledgeInteractionContext.__post_init__` inspects handler signatures to auto-detect binding models, enabling transparent (de)serialization without manual type dispatch. Dispatch logic (validate → call → serialize for ANSWER/REACT; prepare_outgoing + parse_result for ASK/POST) lives in `KnowledgeInteractionContext`, not in `KnowledgeBase`.
- **`KnowledgeBaseBuilder` wraps `KnowledgeBase`**: Settings-based KI registration belongs to `KnowledgeBaseBuilder`, not to `KnowledgeBase`. `KnowledgeBase.from_settings()` returns a builder; `builder.build()` returns the finished `KnowledgeBase`. `KnowledgeBase` itself has no knowledge of settings. ASK/POST KIs are auto-registered at `build()` time; ANSWER/REACT KIs require a handler attached via `builder.handler(name, func)` before `build()` is called.
- **Dependency injection via `Depends`**: `KnowledgeInteractionContext.dispatch()` calls `resolve_dependencies(handler)` before invoking the handler, passing resolved values as kwargs. The resolver (`src/dependency_injection.py`) uses `get_type_hints(include_extras=True)` to find `Annotated[T, Depends(factory)]` params, recursively resolves factory deps (transitive), and caches results per invocation when `cache=True`. `@wraps` on the decorator wrapper preserves `__annotations__`, so the resolver sees the original handler's hints.
- **Dependency injection via `Depends`**: `KnowledgeInteractionContext.dispatch()` calls `resolve_dependencies(handler)` before invoking the handler, passing resolved values as kwargs. The resolver (`src/dependency_injection.py`) uses `get_type_hints(include_extras=True)` to find `Annotated[T, Depends(factory)]` params, recursively resolves factory deps (transitive), and caches results per invocation when `cache=True`. Factories can be sync (`def`) or async (`async def`) — async factories are detected via `asyncio.iscoroutinefunction()` and awaited; sync factories are called directly. `@wraps` on the decorator wrapper preserves `__annotations__`, so the resolver sees the original handler's hints.
- **`dependency_overrides`**: `KnowledgeBase.dependency_overrides` is a `dict[Callable, Callable]` (à la FastAPI) that substitutes dependency factories at resolution time. Overrides are checked transitively at every level and inherit the original `Depends(cache=...)` setting. The dict is passed explicitly from `KnowledgeBase.call()` → `dispatch()` → `resolve_dependencies()` to keep `KnowledgeInteractionContext` decoupled from `KnowledgeBase`.
14 changes: 10 additions & 4 deletions examples/01-basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,16 @@ def example_answer_ki(binding_set, info):
return binding_set


if __name__ == "__main__":
async def main():
# Connect to the KE, then register and unregister this KB.
kb.connect()
kb.register()
await kb.connect()
await kb.register()
logger.info("Registered a Knowledge Base in the basic example!")
kb.unregister()
await kb.unregister()
logger.info("Unregistered the Knowledge Base in the basic example!")


if __name__ == "__main__":
import asyncio

asyncio.run(main())
14 changes: 10 additions & 4 deletions examples/02-binding_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,17 @@ def binding_models_raw_answer_ki(
]


if __name__ == "__main__":
async def main():
# Register both KIs, then cleanly unregister.
kb.connect()
kb.register()
await kb.connect()
await kb.register()
logger.info("Registered the binding models example KB!")

kb.unregister()
await kb.unregister()
logger.info("Unregistered the binding models example KB!")


if __name__ == "__main__":
import asyncio

asyncio.run(main())
26 changes: 19 additions & 7 deletions examples/03-ask_interaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
and typed results work end-to-end.
"""

from rdflib import URIRef
from shared import get_example_logger

from knowledge_mapper import BindingModel, KnowledgeBase, Literal, Uri
Expand Down Expand Up @@ -38,19 +39,30 @@ class PersonBinding(BindingModel):
prefixes={"ex": "http://example.org/knowledge-mapper/ask-interaction#"},
)

if __name__ == "__main__":

async def main():
# Register this KB, execute one ASK request, and then unregister.
kb.register()
await kb.register()
logger.info("KB registered.")
result = kb.ask(
result = await kb.ask(
[
{
"person": "http://example.org/knowledge-mapper/ask-interaction#person1",
}
PersonBinding(
person=URIRef(
"http://example.org/knowledge-mapper/ask-interaction#person1"
),
name=None,
age=None,
)
],
"ask-ki",
)
logger.info(f"Received result from ASK KI: {result}")

kb.unregister()
await kb.unregister()
logger.info("KB unregistered.")


if __name__ == "__main__":
import asyncio

asyncio.run(main())
16 changes: 11 additions & 5 deletions examples/04-post_measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ class ResultBinding(BindingModel):
)


if __name__ == "__main__":
# Register KB, wait briefly for manual testing, then execute one POST.
kb.register()
async def main():
# Register this KB, wait briefly for manual testing, then execute one POST.
await kb.register()
logger.info("KB registered.")
time.sleep(
5
) # Sleep for a bit to allow time for testing the POST KI with an external client
logger.info("Posting...")
result_bindings = kb.post(
result_bindings = await kb.post(
[
MeasurementBinding(
measurement=URIRef(
Expand All @@ -82,5 +82,11 @@ class ResultBinding(BindingModel):
"post-measurement-ki",
)
logger.info(f"Received result bindings: {result_bindings}")
kb.unregister()
await kb.unregister()
logger.info("KB unregistered.")


if __name__ == "__main__":
import asyncio

asyncio.run(main())
15 changes: 11 additions & 4 deletions examples/06-dependency_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,15 @@ def answer_sensor_readings(
]


if __name__ == "__main__":
kb.connect()
kb.register()
async def main():
await kb.connect()
await kb.register()
logger.info("Registered the dependency-injection example KB!")
kb.unregister()
await kb.unregister()
logger.info("Unregistered the dependency-injection example KB!")


if __name__ == "__main__":
import asyncio

asyncio.run(main())
10 changes: 4 additions & 6 deletions examples/07-testing/kb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import sys
from pathlib import Path
from time import sleep
from typing import cast

from rdflib import URIRef
Expand Down Expand Up @@ -47,8 +46,8 @@ class ExampleBinding(BindingModel):
)


def ask_for_values_of_subject(subject_name: str) -> list[str]:
result_binding_set: list[ExampleBinding] = kb.ask(
async def ask_for_values_of_subject(subject_name: str) -> list[str]:
result_binding_set: list[ExampleBinding] = await kb.ask(
[
ExampleBinding(
s=URIRef(f"http://example.org/knowledge-mapper/testing#{subject_name}"),
Expand Down Expand Up @@ -85,11 +84,11 @@ class ResultBinding(BindingModel):
)


def repeat_value_post(value: str, iterations: int) -> list[URIRef]:
async def repeat_value_post(value: str, iterations: int) -> list[URIRef]:
result_binding_set: list[ResultBinding] = []
for i in range(iterations):
result_binding_set.extend(
kb.post(
await kb.post(
[
ExampleBinding(
s=URIRef(
Expand All @@ -101,7 +100,6 @@ def repeat_value_post(value: str, iterations: int) -> list[URIRef]:
"post-ki",
) # type: ignore
)
sleep(1)
return [cast(URIRef, binding.other) for binding in result_binding_set]


Expand Down
31 changes: 17 additions & 14 deletions examples/07-testing/test_kb.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
# to the KE, so its important to replace it with a TestClient
test_client = TestClient(fake_url="http://fake-ke")
kb.client = test_client
# Here the KB and its interactions are registered with the TestClient, which always
# succeeds. This registration is necessary for the KB to be able to execute
# interactions in the tests.
kb.register()


@pytest.fixture(autouse=True)
async def _register_kb():
if not kb.is_registered:
await kb.register()
yield


@pytest.fixture()
Expand All @@ -26,15 +29,15 @@ def client():
# In a test you can do any ASK interaction that is registered.
# The TestClient will return an empty result binding set by default, disregarding the
# input.
def test_ask_ki_no_resuls():
result_binding_set = kb.ask([], "ask-ki-no-binding-model")
async def test_ask_ki_no_resuls():
result_binding_set = await kb.ask([], "ask-ki-no-binding-model")
assert result_binding_set == []


# You likely want to mock result binding sets, which can be done using the TestClient as
# in this test. The mocked result is returned when the ASK interaction is executed,
# disregarding the input.
def test_ask_ki_with_result(client: TestClient):
async def test_ask_ki_with_result(client: TestClient):
client.mock_result_binding_set(
"ask-ki-no-binding-model",
[
Expand All @@ -44,7 +47,7 @@ def test_ask_ki_with_result(client: TestClient):
}
],
)
result_binding_set = kb.ask([], "ask-ki-no-binding-model")
result_binding_set = await kb.ask([], "ask-ki-no-binding-model")
assert result_binding_set == [
{
"s": "<http://example.org/knowledge-mapper/testing#Subject>",
Expand All @@ -56,7 +59,7 @@ def test_ask_ki_with_result(client: TestClient):
# This is a little more useful when you have a binding model, testing the correctness of
# the binding model according to the graph pattern. One test per interaction like this
# is probably a good idea, to isolate issues with the binding model from other issues.
def test_ask_ki_with_binding_model(client: TestClient):
async def test_ask_ki_with_binding_model(client: TestClient):
client.mock_result_binding_set(
"ask-ki-with-binding-model",
[
Expand All @@ -67,7 +70,7 @@ def test_ask_ki_with_binding_model(client: TestClient):
],
)

result_binding_set = kb.ask(
result_binding_set = await kb.ask(
[
ExampleBinding(
s=URIRef("http://example.org/knowledge-mapper/testing#Subject"),
Expand All @@ -86,7 +89,7 @@ def test_ask_ki_with_binding_model(client: TestClient):

# However, most likely you will want to test the logic around interactions, where you
# might want to mock different results for different inputs.
def test_function_containing_ask(client: TestClient):
async def test_function_containing_ask(client: TestClient):
client.mock_result_binding_set(
ki_name="ask-ki-with-binding-model",
binding_set=[
Expand All @@ -97,12 +100,12 @@ def test_function_containing_ask(client: TestClient):
],
)

result = ask_for_values_of_subject("Subject")
result = await ask_for_values_of_subject("Subject")
assert result == ["test value"]


# Similar approaches can be taken for POST interactions.
def test_function_containing_post(client: TestClient):
async def test_function_containing_post(client: TestClient):
client.mock_result_binding_set(
ki_name="post-ki",
binding_set=[
Expand All @@ -113,5 +116,5 @@ def test_function_containing_post(client: TestClient):
],
)

result = repeat_value_post("test value", 1)
result = await repeat_value_post("test value", 1)
assert result == [URIRef("http://example.org/knowledge-mapper/testing#Other")]
Loading
Loading