From a16fff37eb3e3cc383c4ff71556694228487023b Mon Sep 17 00:00:00 2001 From: tangkikodo Date: Thu, 11 Jun 2026 12:24:48 +0800 Subject: [PATCH] feat(use_case): add gRPC transport for UseCaseService New `create_use_case_grpc_server()` exposes UseCaseService methods as unary gRPC RPCs using JSON-over-gRPC (no protobuf code generation). Each UseCaseService becomes a gRPC service, each @query/@mutation method becomes a unary RPC. FromContext params are injected from gRPC metadata. Error codes map to gRPC status codes (NOT_FOUND, INVALID_ARGUMENT, PERMISSION_DENIED, etc). Optional dependency: `pip install nexusx[grpc]` Co-Authored-By: Claude Opus 4.7 --- demo/use_case/grpc_demo.py | 103 ++++++++++ pyproject.toml | 3 + src/nexusx/__init__.py | 2 + src/nexusx/use_case/__init__.py | 2 + src/nexusx/use_case/grpc_server.py | 282 +++++++++++++++++++++++++++ tests/test_grpc.py | 295 +++++++++++++++++++++++++++++ 6 files changed, 687 insertions(+) create mode 100644 demo/use_case/grpc_demo.py create mode 100644 src/nexusx/use_case/grpc_server.py create mode 100644 tests/test_grpc.py diff --git a/demo/use_case/grpc_demo.py b/demo/use_case/grpc_demo.py new file mode 100644 index 0000000..3f9fb4a --- /dev/null +++ b/demo/use_case/grpc_demo.py @@ -0,0 +1,103 @@ +"""gRPC demo — UseCaseService exposed as gRPC server. + +Run: + uv run --with grpcio python -m demo.use_case.grpc_demo +""" + +import asyncio +import json + +from pydantic import BaseModel + +from nexusx import UseCaseAppConfig, UseCaseService, create_use_case_grpc_server, query + +# ── DTOs ────────────────────────────────────────── + + +class ItemDTO(BaseModel): + id: int + name: str + price: float + + +# ── Service ─────────────────────────────────────── + + +class CatalogService(UseCaseService): + """Product catalog service.""" + + @query + async def list_items(cls) -> list[ItemDTO]: + return [ + ItemDTO(id=1, name="Widget", price=9.99), + ItemDTO(id=2, name="Gadget", price=19.99), + ItemDTO(id=3, name="Doohickey", price=4.99), + ] + + @query + async def get_item(cls, item_id: int) -> ItemDTO | None: + items = { + 1: ItemDTO(id=1, name="Widget", price=9.99), + 2: ItemDTO(id=2, name="Gadget", price=19.99), + 3: ItemDTO(id=3, name="Doohickey", price=4.99), + } + return items.get(item_id) + + +# ── Main ────────────────────────────────────────── + +PORT = 50051 + + +async def server(): + config = UseCaseAppConfig( + name="catalog", + services=[CatalogService], + ) + server = create_use_case_grpc_server(config, port=PORT) + await server.start() + print(f"gRPC server listening on port {PORT}") + await server.wait_for_termination() + + +async def client(): + from grpc import aio as grpc_aio + + await asyncio.sleep(0.5) # wait for server + + async with grpc_aio.insecure_channel(f"localhost:{PORT}") as channel: + call = channel.unary_unary( + "/CatalogService/list_items", + request_serializer=lambda d: json.dumps(d).encode(), + response_deserializer=lambda b: b, + ) + response = await call({}) + data = json.loads(response) + print("\n── list_items ──") + for item in data["result"]: + print(f" {item['id']}: {item['name']} — ${item['price']}") + + # Get single item + call = channel.unary_unary( + "/CatalogService/get_item", + request_serializer=lambda d: json.dumps(d).encode(), + response_deserializer=lambda b: b, + ) + response = await call({"item_id": 2}) + data = json.loads(response) + print("\n── get_item(2) ──") + print(f" {data['result']}") + + +async def main(): + server_task = asyncio.create_task(server()) + await client() + server_task.cancel() + try: + await server_task + except asyncio.CancelledError: + pass + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 805ca3b..484d549 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,9 @@ fastmcp = [ cli = [ "typer>=0.12.0", ] +grpc = [ + "grpcio>=1.60.0", +] [project.urls] Homepage = "https://github.com/allmonday/nexusx" diff --git a/src/nexusx/__init__.py b/src/nexusx/__init__.py index 6a6f574..a908b96 100644 --- a/src/nexusx/__init__.py +++ b/src/nexusx/__init__.py @@ -71,6 +71,7 @@ def resolve_author(self, loader=Loader('author')): create_jsonrpc_router, create_use_case_cli, create_use_case_flat_server, + create_use_case_grpc_server, create_use_case_mcp_server, ) from nexusx.use_case import ( @@ -109,6 +110,7 @@ def resolve_author(self, loader=Loader('author')): "SelectionError", "create_use_case_mcp_server", "create_use_case_flat_server", + "create_use_case_grpc_server", "create_use_case_cli", "create_jsonrpc_router", "create_use_case_router", diff --git a/src/nexusx/use_case/__init__.py b/src/nexusx/use_case/__init__.py index 034b61f..0246f9a 100644 --- a/src/nexusx/use_case/__init__.py +++ b/src/nexusx/use_case/__init__.py @@ -8,6 +8,7 @@ from nexusx.use_case.cli import create_use_case_cli from nexusx.use_case.context import FromContext from nexusx.use_case.flat_server import create_use_case_flat_server +from nexusx.use_case.grpc_server import create_use_case_grpc_server from nexusx.use_case.jsonrpc import create_jsonrpc_router from nexusx.use_case.router import create_router from nexusx.use_case.selection import SelectionError @@ -17,6 +18,7 @@ __all__ = [ "create_use_case_cli", "create_use_case_flat_server", + "create_use_case_grpc_server", "create_jsonrpc_router", "create_router", "create_use_case_mcp_server", diff --git a/src/nexusx/use_case/grpc_server.py b/src/nexusx/use_case/grpc_server.py new file mode 100644 index 0000000..ac9091b --- /dev/null +++ b/src/nexusx/use_case/grpc_server.py @@ -0,0 +1,282 @@ +"""gRPC Server — expose UseCaseService methods as unary RPCs over gRPC. + +Uses JSON-over-gRPC: request/response bodies are JSON-encoded bytes. +This avoids protobuf code generation while leveraging gRPC's transport layer +(retry, load balancing, TLS, interceptors). + +Usage:: + + from nexusx import UseCaseAppConfig, create_use_case_grpc_server + + server = create_use_case_grpc_server( + UseCaseAppConfig(name="project", services=[UserService]), + port=50051, + ) + await server.start() +""" + +from __future__ import annotations + +import inspect +import json +from typing import Annotated, Any, get_args, get_origin, get_type_hints + +try: + import grpc + from grpc import aio as grpc_aio +except ImportError as exc: + raise ImportError( + "grpcio is required for gRPC support: pip install nexusx[grpc]" + ) from exc + +from nexusx.use_case.business import USE_CASE_METHODS_ATTR +from nexusx.use_case.context import FromContext +from nexusx.use_case.manager import UseCaseManager +from nexusx.use_case.server import _coerce_kwargs, _serialize_result +from nexusx.use_case.types import UseCaseAppConfig + + +def _get_from_context_params(method: Any) -> set[str]: + from_context_params: set[str] = set() + try: + hints = get_type_hints(method, include_extras=True) + except Exception: + hints = {} + sig = inspect.signature(method) + for name in sig.parameters: + annotation = hints.get(name) + if annotation is not None and get_origin(annotation) is Annotated: + for arg in get_args(annotation): + if isinstance(arg, FromContext): + from_context_params.add(name) + break + return from_context_params + + +class _GrpcContextAdapter: + """Adapts gRPC ServicerContext metadata to dict-like access for context_extractor.""" + + def __init__(self, context: grpc_aio.ServicerContext): + metadata = context.invocation_metadata() or [] + self._metadata = dict(metadata) + + def get(self, key: str, default: str | None = None) -> str | None: + return self._metadata.get(key, default) + + def __getitem__(self, key: str) -> str: + return self._metadata[key] + + def __contains__(self, key: str) -> bool: + return key in self._metadata + + @property + def headers(self) -> dict[str, str]: + return self._metadata + + +async def _extract_context( + app: Any, + grpc_context: grpc_aio.ServicerContext, +) -> dict[str, Any]: + if app.context_extractor is None: + return {} + adapter = _GrpcContextAdapter(grpc_context) + result = app.context_extractor(adapter) + if inspect.isawaitable(result): + result = await result + return result if isinstance(result, dict) else {} + + +async def _handle_rpc( + app: Any, + service_name: str, + method_name: str, + request_bytes: bytes, + grpc_context: grpc_aio.ServicerContext, +) -> bytes: + # Deserialize + try: + params = json.loads(request_bytes.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + await grpc_context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + f"Invalid JSON: {e}", + ) + return b"" # unreachable, but helps type checker + + if not isinstance(params, dict): + await grpc_context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + "Request body must be a JSON object", + ) + return b"" + + # Service lookup + service_cls = app.services.get(service_name) + if service_cls is None: + available = list(app.services.keys()) + await grpc_context.abort( + grpc.StatusCode.NOT_FOUND, + f"Service '{service_name}' not found. Available: {available}", + ) + return b"" + + # Method lookup + methods = getattr(service_cls, USE_CASE_METHODS_ATTR) + if method_name not in methods: + available = list(methods.keys()) + await grpc_context.abort( + grpc.StatusCode.NOT_FOUND, + f"Method '{method_name}' not found. Available: {available}", + ) + return b"" + + # Mutation check + method_meta = methods.get(method_name, {}) + method_kind = method_meta.get("kind", "query") if isinstance(method_meta, dict) else "query" + if not app.enable_mutation and method_kind == "mutation": + await grpc_context.abort( + grpc.StatusCode.PERMISSION_DENIED, + f"Method '{method_name}' is a mutation and mutations are disabled", + ) + return b"" + + # Execute + method = getattr(service_cls, method_name) + func = method.__func__ if isinstance(method, classmethod) else method + + kwargs = _coerce_kwargs(func, dict(params)) + + # FromContext injection + from_context_params = _get_from_context_params(method) + if from_context_params: + context = await _extract_context(app, grpc_context) + sig = inspect.signature(method) + for param_name in from_context_params: + if param_name in context: + kwargs[param_name] = context[param_name] + elif ( + param_name not in kwargs + and sig.parameters[param_name].default is inspect.Parameter.empty + ): + await grpc_context.abort( + grpc.StatusCode.FAILED_PRECONDITION, + f"Required FromContext parameter '{param_name}' not found", + ) + return b"" + + try: + result = await method(**kwargs) + except Exception as e: + await grpc_context.abort( + grpc.StatusCode.INTERNAL, + f"Error executing {service_name}.{method_name}: {e}", + ) + return b"" + + serialized = _serialize_result(result) + return json.dumps({"result": serialized}, ensure_ascii=False).encode("utf-8") + + +class _UseCaseServiceHandler(grpc.ServiceRpcHandler): + """Routes /ServiceName/MethodName to UseCaseService methods.""" + + def __init__(self, app: Any): + self._app = app + self._service_name: str | None = None + self._methods: dict[str, Any] = {} + + services = app.services + if len(services) == 1: + svc_cls = list(services.values())[0] + self._service_name = svc_cls.__name__ + self._methods = getattr(svc_cls, USE_CASE_METHODS_ATTR, {}) + else: + # Multi-service: accept any registered service + self._service_name = app.name + for svc_cls in services.values(): + svc_methods = getattr(svc_cls, USE_CASE_METHODS_ATTR, {}) + for mname, meta in svc_methods.items(): + qualified = f"{svc_cls.__name__}.{mname}" + self._methods[qualified] = (svc_cls, mname, meta) + + def service_name(self) -> str: + return self._service_name or "" + + def service( + self, + handler_call_details: grpc.HandlerCallDetails, + ) -> grpc.RpcMethodHandler | None: + # Format: /ServiceName/MethodName + method_path = handler_call_details.method + if not method_path: + return None + + parts = method_path.strip("/").split("/") + if len(parts) != 2: + return None + + called_service = parts[0] + called_method = parts[1] + + # Accept any registered service name + if called_service not in self._app.services: + return None + + app = self._app + service_name = called_service + method_name = called_method + + async def handler( + request: bytes, + context: grpc_aio.ServicerContext, + ) -> bytes: + return await _handle_rpc(app, service_name, method_name, request, context) + + return grpc.unary_unary_rpc_method_handler( + handler, + request_deserializer=lambda b: b, + response_serializer=lambda r: r, + ) + + +def create_use_case_grpc_server( + config: UseCaseAppConfig, + host: str = "[::]", + port: int = 50051, +) -> grpc_aio.Server: + """Create an async gRPC server from UseCaseAppConfig. + + Each UseCaseService becomes a gRPC service, each ``@query``/``@mutation`` + method becomes a unary RPC. Request/response bodies are JSON-encoded bytes. + + Args: + config: A ``UseCaseAppConfig`` with services. + host: Bind address. Defaults to ``[::]`` (all interfaces). + port: Listen port. Defaults to 50051. + + Returns: + A ``grpc.aio.Server`` instance. Call ``await server.start()`` to start. + + Example:: + + server = create_use_case_grpc_server( + UseCaseAppConfig(name="project", services=[UserService]), + port=50051, + ) + await server.start() + await server.wait_for_termination() + """ + if not isinstance(config, UseCaseAppConfig): + raise TypeError("config must be a UseCaseAppConfig") + + manager = UseCaseManager([config]) + app = manager.apps[config.name] + + server = grpc_aio.server() + server.add_generic_rpc_handlers( + (_UseCaseServiceHandler(app),) + ) + server.add_insecure_port(f"{host}:{port}") + + return server diff --git a/tests/test_grpc.py b/tests/test_grpc.py new file mode 100644 index 0000000..673f4e4 --- /dev/null +++ b/tests/test_grpc.py @@ -0,0 +1,295 @@ +"""Tests for create_use_case_grpc_server — gRPC transport for UseCaseService.""" + +from __future__ import annotations + +import json +from typing import Annotated + +import pytest +from pydantic import BaseModel + +from nexusx.decorator import mutation, query +from nexusx.use_case.business import UseCaseService +from nexusx.use_case.context import FromContext +from nexusx.use_case.grpc_server import create_use_case_grpc_server +from nexusx.use_case.types import UseCaseAppConfig + +grpc = pytest.importorskip("grpc") +grpc_aio = pytest.importorskip("grpc.aio") + +# ────────────────────────────────────────────────── +# Test DTOs +# ────────────────────────────────────────────────── + + +class UserDTO(BaseModel): + id: int + name: str + + +# ────────────────────────────────────────────────── +# Test Services +# ────────────────────────────────────────────────── + + +class UserService(UseCaseService): + """User management service.""" + + @query + async def list_users(cls) -> list[UserDTO]: + return [UserDTO(id=1, name="Alice"), UserDTO(id=2, name="Bob")] + + @query + async def get_user(cls, user_id: int) -> UserDTO | None: + if user_id == 1: + return UserDTO(id=1, name="Alice") + return None + + @mutation + async def create_user(cls, name: str) -> UserDTO: + return UserDTO(id=99, name=name) + + +class PingService(UseCaseService): + """Ping service.""" + + @query + async def ping(cls) -> str: + return "pong" + + +class ContextService(UseCaseService): + """Service using FromContext.""" + + @query + async def whoami(cls, user_id: Annotated[int, FromContext()]) -> UserDTO: + return UserDTO(id=user_id, name="from_context") + + @query + async def greet( + cls, + user_id: Annotated[int, FromContext()], + message: str, + ) -> str: + return f"user={user_id},msg={message}" + + +# ────────────────────────────────────────────────── +# Helpers +# ────────────────────────────────────────────────── + + +def _extract_user(adapter): + return {"user_id": int(adapter.headers.get("x-user-id", "0"))} + + +TEST_PORT = 50099 +TEST_HOST = "localhost" + + +@pytest.fixture +async def grpc_server_and_channel(): + config = UseCaseAppConfig( + name="test", + services=[UserService, PingService], + ) + server = create_use_case_grpc_server(config, host=TEST_HOST, port=TEST_PORT) + await server.start() + async with grpc_aio.insecure_channel(f"{TEST_HOST}:{TEST_PORT}") as channel: + yield channel + await server.stop(grace=0) + + +TEST_PORT_CTX = 50098 + + +@pytest.fixture +async def grpc_server_context(): + config = UseCaseAppConfig( + name="test", + services=[ContextService], + context_extractor=_extract_user, + ) + server = create_use_case_grpc_server(config, host=TEST_HOST, port=TEST_PORT_CTX) + await server.start() + async with grpc_aio.insecure_channel(f"{TEST_HOST}:{TEST_PORT_CTX}") as channel: + yield channel + await server.stop(grace=0) + + +async def _call( + channel: grpc_aio.Channel, + service: str, + method: str, + params: dict | None = None, + metadata: list[tuple[str, str]] | None = None, +) -> tuple[bytes, grpc.StatusCode | None]: + """Call a gRPC method and return (response_bytes, status_code).""" + method_path = f"/{service}/{method}" + call = channel.unary_unary( + method_path, + request_serializer=lambda d: json.dumps(d).encode("utf-8"), + response_deserializer=lambda b: b, + ) + request = params or {} + try: + response = await call(request, metadata=metadata) + return response, None + except grpc.aio.AioRpcError as e: + return b"", e.code() + + +# ────────────────────────────────────────────────── +# Tests +# ────────────────────────────────────────────────── + + +class TestBasicInvocation: + @pytest.mark.asyncio + async def test_list_users(self, grpc_server_and_channel): + channel = grpc_server_and_channel + response, status = await _call(channel, "UserService", "list_users") + assert status is None + data = json.loads(response) + assert len(data["result"]) == 2 + assert data["result"][0]["name"] == "Alice" + + @pytest.mark.asyncio + async def test_get_user(self, grpc_server_and_channel): + channel = grpc_server_and_channel + response, status = await _call(channel, "UserService", "get_user", {"user_id": 1}) + assert status is None + data = json.loads(response) + assert data["result"]["id"] == 1 + assert data["result"]["name"] == "Alice" + + @pytest.mark.asyncio + async def test_get_user_not_found(self, grpc_server_and_channel): + channel = grpc_server_and_channel + response, status = await _call(channel, "UserService", "get_user", {"user_id": 999}) + assert status is None + data = json.loads(response) + assert data["result"] is None + + @pytest.mark.asyncio + async def test_mutation(self, grpc_server_and_channel): + channel = grpc_server_and_channel + response, status = await _call(channel, "UserService", "create_user", {"name": "Charlie"}) + assert status is None + data = json.loads(response) + assert data["result"]["id"] == 99 + assert data["result"]["name"] == "Charlie" + + @pytest.mark.asyncio + async def test_ping(self, grpc_server_and_channel): + channel = grpc_server_and_channel + response, status = await _call(channel, "PingService", "ping") + assert status is None + data = json.loads(response) + assert data["result"] == "pong" + + +class TestErrors: + @pytest.mark.asyncio + async def test_service_not_found(self, grpc_server_and_channel): + channel = grpc_server_and_channel + _, status = await _call(channel, "UnknownService", "list_users") + # gRPC returns UNIMPLEMENTED when no handler matches the service path + assert status in (grpc.StatusCode.NOT_FOUND, grpc.StatusCode.UNIMPLEMENTED) + + @pytest.mark.asyncio + async def test_method_not_found(self, grpc_server_and_channel): + channel = grpc_server_and_channel + _, status = await _call(channel, "UserService", "unknown_method") + assert status == grpc.StatusCode.NOT_FOUND + + @pytest.mark.asyncio + async def test_invalid_json(self, grpc_server_and_channel): + channel = grpc_server_and_channel + call = channel.unary_unary( + "/UserService/get_user", + request_serializer=lambda d: d, + response_deserializer=lambda b: b, + ) + try: + await call(b"not json") + raise AssertionError("Should have raised") + except grpc.aio.AioRpcError as e: + assert e.code() == grpc.StatusCode.INVALID_ARGUMENT + + @pytest.mark.asyncio + async def test_non_object_params(self, grpc_server_and_channel): + channel = grpc_server_and_channel + call = channel.unary_unary( + "/UserService/get_user", + request_serializer=lambda d: json.dumps(d).encode("utf-8"), + response_deserializer=lambda b: b, + ) + try: + await call([1, 2, 3]) + raise AssertionError("Should have raised") + except grpc.aio.AioRpcError as e: + assert e.code() == grpc.StatusCode.INVALID_ARGUMENT + + @pytest.mark.asyncio + async def test_mutation_disabled(self): + config = UseCaseAppConfig( + name="test", + services=[UserService], + enable_mutation=False, + ) + server = create_use_case_grpc_server(config, host=TEST_HOST, port=50097) + await server.start() + async with grpc_aio.insecure_channel(f"{TEST_HOST}:50097") as channel: + _, status = await _call(channel, "UserService", "create_user", {"name": "X"}) + assert status == grpc.StatusCode.PERMISSION_DENIED + await server.stop(grace=0) + + +class TestFromContext: + @pytest.mark.asyncio + async def test_context_from_metadata(self, grpc_server_context): + channel = grpc_server_context + response, status = await _call( + channel, + "ContextService", + "whoami", + metadata=[("x-user-id", "42")], + ) + assert status is None + data = json.loads(response) + assert data["result"]["id"] == 42 + + @pytest.mark.asyncio + async def test_context_with_body_param(self, grpc_server_context): + channel = grpc_server_context + response, status = await _call( + channel, + "ContextService", + "greet", + params={"message": "hello"}, + metadata=[("x-user-id", "7")], + ) + assert status is None + data = json.loads(response) + assert data["result"] == "user=7,msg=hello" + + +class TestServerLifecycle: + def test_invalid_config_type(self): + with pytest.raises(TypeError, match="UseCaseAppConfig"): + create_use_case_grpc_server("not a config") + + @pytest.mark.asyncio + async def test_start_and_stop(self): + config = UseCaseAppConfig(name="test", services=[PingService]) + server = create_use_case_grpc_server(config, host=TEST_HOST, port=50096) + await server.start() + + async with grpc_aio.insecure_channel(f"{TEST_HOST}:50096") as channel: + response, status = await _call(channel, "PingService", "ping") + assert status is None + data = json.loads(response) + assert data["result"] == "pong" + + await server.stop(grace=0)