-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
132 lines (106 loc) · 4.47 KB
/
cli.py
File metadata and controls
132 lines (106 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""ARCP command-line interface (Click)."""
from __future__ import annotations
import asyncio
import contextlib
import json
import sys
from typing import Any
import click
from . import ClientInfo, Lease, WebSocketTransport
from ._envelope import Envelope
from ._runtime.server import ARCPRuntime, RuntimeInfo
from ._store.eventlog import EventLog, SqliteEventLog
from ._transport.websocket import serve_websocket
from .client import ARCPClient
from .runtime import StaticBearerVerifier
@click.group()
@click.version_option(prog_name="arcp")
def main() -> None:
"""ARCP reference CLI."""
@main.command()
@click.option("--host", default="127.0.0.1", show_default=True)
@click.option("--port", default=7777, show_default=True, type=int)
@click.option("--token", required=True, help="Demo bearer token to accept.")
@click.option("--principal", default="cli-user", show_default=True)
@click.option("--db", default=None, help="Optional SQLite event log path.")
def serve(host: str, port: int, token: str, principal: str, db: str | None) -> None:
"""Run a minimal ARCP runtime serving the `echo` demo agent."""
async def echo(input_value: Any, ctx: Any) -> Any:
await ctx.log("info", "echo started")
return {"echoed": input_value}
log: EventLog | None = SqliteEventLog(db) if db else None
runtime = ARCPRuntime(
runtime=RuntimeInfo(name="arcp-cli", version="1.1.0"),
bearer=StaticBearerVerifier({token: principal}),
event_log=log,
)
runtime.register_agent("echo", echo)
async def go() -> None:
server = await serve_websocket(runtime.accept, host=host, port=port, path="/arcp")
click.echo(f"arcp serve listening on ws://{host}:{port}/arcp", err=True)
async with server:
await server.serve_forever()
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(go())
@main.command()
@click.option("--url", required=True, help="ws://host:port/arcp")
@click.option("--token", required=True)
@click.option("--agent", required=True)
@click.option("--input", "input_json", default="null")
@click.option(
"--lease",
default="{}",
help='Lease as JSON object: {"net.fetch": ["https://*"]}',
)
def submit(url: str, token: str, agent: str, input_json: str, lease: str) -> None:
"""Submit a single job and print the terminal `job.result` JSON."""
async def go() -> None:
client = ARCPClient(client=ClientInfo(name="arcp-cli", version="1.1.0"), token=token)
transport = await WebSocketTransport.connect(url)
await client.connect(transport)
try:
lease_obj: Lease = json.loads(lease)
handle = await client.submit(
agent=agent, input=json.loads(input_json), lease_request=lease_obj
)
result = await handle.done
click.echo(json.dumps(result.model_dump(mode="json", exclude_none=True), indent=2))
finally:
await client.close()
asyncio.run(go())
@main.command()
@click.option("--url", required=True)
@click.option("--token", required=True)
@click.option("--job-id", required=True)
def tail(url: str, token: str, job_id: str) -> None:
"""Subscribe to `job_id` and stream events to stdout as JSON lines."""
async def go() -> None:
client = ARCPClient(client=ClientInfo(name="arcp-cli", version="1.1.0"), token=token)
transport = await WebSocketTransport.connect(url)
await client.connect(transport)
try:
sub = await client.subscribe(job_id, history=True)
async for ev in sub.handle.events():
click.echo(json.dumps(ev))
result = await sub.handle.done
click.echo(json.dumps(result.model_dump(mode="json", exclude_none=True)))
finally:
await client.close()
asyncio.run(go())
@main.command()
@click.option("--db", required=True, help="SQLite event log path.")
@click.option("--session", required=True, help="Session id to replay.")
@click.option("--after-seq", default=0, type=int)
def replay(db: str, session: str, after_seq: int) -> None:
"""Replay envelopes from a SQLite event log to stdout."""
async def go() -> None:
log = SqliteEventLog(db)
try:
async for env_dict in log.read_since_seq(session, after_seq):
env = Envelope.from_wire(env_dict)
click.echo(json.dumps(env.to_wire()))
finally:
await log.close()
asyncio.run(go())
if __name__ == "__main__": # pragma: no cover
sys.exit(main())