Overview
PyNest has no WebSocket support. Issue #104 shows the community is actively looking for it. Real-time bi-directional communication is a table-stakes feature for modern APIs — especially for agentic systems where agents need to stream responses, receive live events, and maintain persistent sessions.
This feature request proposes NestJS-compatible WebSocket Gateways as a first-class PyNest primitive, built on FastAPI's native WebSocket support and optionally python-socketio.
Motivation
Use cases that require WebSockets:
Streaming AI model responses to clients in real time
Live dashboards with push updates
Multi-agent coordination (agent A notifies agent B via WS)
Chat applications
Collaborative editing
Background job progress tracking
Without WebSocket support, PyNest developers must add raw FastAPI WebSocket routes outside the module system, losing DI, guards, and structured event handling.
Proposed API
@WebSocketGateway(port?, namespace?, options?)
```python
from nest.websockets import WebSocketGateway, SubscribeMessage, ConnectedSocket, MessageBody
from nest.websockets import OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
@WebSocketGateway(namespace="/chat")
class ChatGateway(OnGatewayConnection, OnGatewayDisconnect):
def __init__(self, chat_service: ChatService): # DI works!
self.service = chat_service
async def on_connection(self, client: WebSocket) -> None:
print(f"Client connected: {client.client}")
async def on_disconnect(self, client: WebSocket) -> None:
print(f"Client disconnected")
@SubscribeMessage("send_message")
async def handle_message(
self,
@MessageBody() data: SendMessageDto,
@ConnectedSocket() client: WebSocket,
) -> MessageAckDto:
message = await self.service.save(data)
return MessageAckDto(id=message.id, status="delivered")
```
@SubscribeMessage(event) — handle incoming WS events
```python
@SubscribeMessage("join_room")
async def handle_join(self, @MessageBody() room: JoinRoomDto, @ConnectedSocket() client: WebSocket):
await self.rooms.add(client, room.name)
await client.send_json({"event": "joined", "room": room.name})
@SubscribeMessage("ping")
async def handle_ping(self, @ConnectedSocket() client: WebSocket):
return {"event": "pong", "data": {}} # auto-sent back to client
```
Lifecycle Interfaces
```python
from nest.websockets import OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
class OnGatewayInit:
async def after_init(self, server: WebSocketServer) -> None: ...
class OnGatewayConnection:
async def on_connection(self, client: WebSocket, *args) -> None: ...
class OnGatewayDisconnect:
async def on_disconnect(self, client: WebSocket) -> None: ...
```
WebSocketServer — broadcast to rooms/all clients
```python
from nest.websockets import WebSocketGateway, WebSocketServer
@WebSocketGateway(namespace="/events")
class EventsGateway(OnGatewayInit):
server: WebSocketServer # auto-injected after after_init()
async def after_init(self, server: WebSocketServer):
self.server = server
async def broadcast_update(self, payload: dict):
await self.server.emit("update", payload) # all clients
async def send_to_room(self, room: str, payload: dict):
await self.server.to(room).emit("update", payload) # room only
async def send_to_client(self, client_id: str, payload: dict):
await self.server.to(client_id).emit("update", payload)
```
Guards on WebSocket Events
```python
@WebSocketGateway(namespace="/private")
@UseGuards(WsJwtGuard) # guard applies to all messages
class PrivateGateway:
@SubscribeMessage("secret")
@UseGuards(AdminWsGuard) # route-level guard
async def handle_secret(self, @MessageBody() data: dict):
...
```
WS-specific guards receive a WsArgumentsHost (via context.switch_to_ws()):
```python
class WsJwtGuard(BaseGuard):
async def can_activate(self, context: ExecutionContext) -> bool:
ws_host = context.switch_to_ws()
client = ws_host.get_client()
token = client.headers.get("authorization")
return await self.jwt.verify(token)
```
Module Registration
```python
@module (
providers=[ChatService, ChatGateway], # Gateway is a provider
controllers=[],
)
class ChatModule:
pass
```
Gateways are registered as providers — they participate in DI and their WebSocket routes are mounted alongside the HTTP routes automatically.
Streaming Support (Agent-Friendly)
```python
@WebSocketGateway(namespace="/ai")
class AgentGateway:
def __init__(self, llm_service: LLMService):
self.llm = llm_service
@SubscribeMessage("prompt")
async def handle_prompt(
self,
@MessageBody() data: PromptDto,
@ConnectedSocket() client: WebSocket,
):
# Stream tokens back to the client as they arrive
async for token in self.llm.stream(data.prompt):
await client.send_json({"event": "token", "data": token})
await client.send_json({"event": "done", "data": {}})
```
Transport Adapters
Adapter
Protocol
Use Case
FastAPIAdapter
Native WS (RFC 6455)
Default, no extra deps
SocketIOAdapter
Socket.IO
Browser clients, rooms, reconnect
```python
Default (FastAPI native WS):
@WebSocketGateway(namespace="/ws")
class MyGateway: ...
Socket.IO adapter (optional):
@WebSocketGateway(namespace="/socket.io", adapter=SocketIOAdapter)
class MySocketGateway: ...
```
pynest generate gateway CLI
```bash
pynest generate gateway chat
Creates:
chat/
chat.gateway.py
chat.module.py (with gateway registered)
```
Acceptance Criteria
@WebSocketGateway(namespace?, options?) class decorator in nest/websockets/
@SubscribeMessage(event) method decorator
@ConnectedSocket() and @MessageBody() param decorators for WS handlers
OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect lifecycle interfaces
WebSocketServer with emit(), to(room).emit(), broadcast() API
Gateways register as providers and participate in DI
Guards work on WS events (@UseGuards, context.switch_to_ws(), WsArgumentsHost)
Interceptors work on WS events (Feature update docs #6 integration)
Exception Filters work on WS events (Feature update readme #1 integration)
Native FastAPI WebSocket adapter (default, zero extra deps)
Optional Socket.IO adapter (pip install pynest-api[socketio])
Token streaming pattern documented and tested
pynest generate gateway <name> CLI command
Unit tests for message routing, guards, and lifecycle hooks
Integration test with an async WS client
Full documentation page
Dependencies
Related
Overview
PyNest has no WebSocket support. Issue #104 shows the community is actively looking for it. Real-time bi-directional communication is a table-stakes feature for modern APIs — especially for agentic systems where agents need to stream responses, receive live events, and maintain persistent sessions.
This feature request proposes NestJS-compatible WebSocket Gateways as a first-class PyNest primitive, built on FastAPI's native WebSocket support and optionally
python-socketio.Motivation
Use cases that require WebSockets:
Without WebSocket support, PyNest developers must add raw FastAPI WebSocket routes outside the module system, losing DI, guards, and structured event handling.
Proposed API
@WebSocketGateway(port?, namespace?, options?)```python
from nest.websockets import WebSocketGateway, SubscribeMessage, ConnectedSocket, MessageBody
from nest.websockets import OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
@WebSocketGateway(namespace="/chat")
class ChatGateway(OnGatewayConnection, OnGatewayDisconnect):
```
@SubscribeMessage(event)— handle incoming WS events```python
@SubscribeMessage("join_room")
async def handle_join(self, @MessageBody() room: JoinRoomDto, @ConnectedSocket() client: WebSocket):
await self.rooms.add(client, room.name)
await client.send_json({"event": "joined", "room": room.name})
@SubscribeMessage("ping")
async def handle_ping(self, @ConnectedSocket() client: WebSocket):
return {"event": "pong", "data": {}} # auto-sent back to client
```
Lifecycle Interfaces
```python
from nest.websockets import OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
class OnGatewayInit:
async def after_init(self, server: WebSocketServer) -> None: ...
class OnGatewayConnection:
async def on_connection(self, client: WebSocket, *args) -> None: ...
class OnGatewayDisconnect:
async def on_disconnect(self, client: WebSocket) -> None: ...
```
WebSocketServer— broadcast to rooms/all clients```python
from nest.websockets import WebSocketGateway, WebSocketServer
@WebSocketGateway(namespace="/events")
class EventsGateway(OnGatewayInit):
server: WebSocketServer # auto-injected after after_init()
```
Guards on WebSocket Events
```python
@WebSocketGateway(namespace="/private")
@UseGuards(WsJwtGuard) # guard applies to all messages
class PrivateGateway:
```
WS-specific guards receive a
WsArgumentsHost(viacontext.switch_to_ws()):```python
class WsJwtGuard(BaseGuard):
async def can_activate(self, context: ExecutionContext) -> bool:
ws_host = context.switch_to_ws()
client = ws_host.get_client()
token = client.headers.get("authorization")
return await self.jwt.verify(token)
```
Module Registration
```python
@module(
providers=[ChatService, ChatGateway], # Gateway is a provider
controllers=[],
)
class ChatModule:
pass
```
Gateways are registered as providers — they participate in DI and their WebSocket routes are mounted alongside the HTTP routes automatically.
Streaming Support (Agent-Friendly)
```python
@WebSocketGateway(namespace="/ai")
class AgentGateway:
```
Transport Adapters
FastAPIAdapterSocketIOAdapter```python
Default (FastAPI native WS):
@WebSocketGateway(namespace="/ws")
class MyGateway: ...
Socket.IO adapter (optional):
@WebSocketGateway(namespace="/socket.io", adapter=SocketIOAdapter)
class MySocketGateway: ...
```
pynest generate gatewayCLI```bash
pynest generate gateway chat
Creates:
chat/
chat.gateway.py
chat.module.py (with gateway registered)
```
Acceptance Criteria
@WebSocketGateway(namespace?, options?)class decorator innest/websockets/@SubscribeMessage(event)method decorator@ConnectedSocket()and@MessageBody()param decorators for WS handlersOnGatewayInit,OnGatewayConnection,OnGatewayDisconnectlifecycle interfacesWebSocketServerwithemit(),to(room).emit(),broadcast()API@UseGuards,context.switch_to_ws(),WsArgumentsHost)pip install pynest-api[socketio])pynest generate gateway <name>CLI commandDependencies
WsExceptionFiltervariant@MessageBody(),@ConnectedSocket()follow the same patternExecutionContext.switch_to_ws()requires this feature'sWsArgumentsHostRelated