Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitleaksignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# PostHog public project API key — intentionally committed, not a secret
src/authsome/server/analytics.py:generic-api-key:50

# Logo.dev public project token — intentionally committed for provider logos
ui/src/components/dashboard/dashboard-primitives.tsx:generic-api-key:12
15 changes: 15 additions & 0 deletions docs/site/reference/audit-log.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ authsome log # Print recent audit events
authsome log --json # Output JSON format
```

### User-scoped access

`GET /api/audit/events` is role-aware. Admin principals can review the global audit log.
Non-admin principals receive only events scoped to their own principal, including their
claimed identities, vault, providers, and credential lifecycle activity.

Supported query parameters:

| Parameter | Description |
| --- | --- |
| `limit` | Number of events to return, clamped to the server maximum. |
| `cursor` | Cursor returned by the previous page. |

Results are sorted newest-first and include `next_cursor` when another page is available.

## Privacy and Secrets

**What the log contains:**
Expand Down

This file was deleted.

4 changes: 2 additions & 2 deletions src/authsome/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
async def _cleanup_startup_resources(store, audit_log, runtime_state) -> None:
with suppress(Exception):
if audit_log is not None:
audit_log.shutdown()
await audit_log.async_shutdown()
with suppress(Exception):
if store is not None:
await store.close()
Expand Down Expand Up @@ -99,7 +99,7 @@ async def lifespan(app: FastAPI):
try:
shutdown_posthog()
if audit_log is not None:
audit_log.shutdown()
await audit_log.async_shutdown()
if store is not None:
await store.close()
finally:
Expand Down
18 changes: 14 additions & 4 deletions src/authsome/server/routes/audit.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Audit event routes."""

from typing import Any
from typing import Any, Literal

from fastapi import APIRouter, Depends, Request
from fastapi import APIRouter, Depends, HTTPException, Request, status

from authsome import audit
from authsome.identity.principal import PrincipalRole
Expand All @@ -19,10 +19,20 @@
async def list_audit_events(
request: Request,
limit: int = 50,
cursor: str | None = None,
auth: CredentialService = Depends(get_daemon_or_browser_auth_service),
) -> dict[str, Any]:
principal_id = None if auth.principal_role == PrincipalRole.ADMIN else auth.principal_id
return {"entries": await request.app.state.audit_log.list_events(limit=limit, principal_id=principal_id)}
effective_principal_id = None if auth.principal_role == PrincipalRole.ADMIN else auth.principal_id
scope: Literal["global", "principal"] = "global" if effective_principal_id is None else "principal"
try:
page = await request.app.state.audit_log.query_events(
limit=limit,
principal_id=effective_principal_id,
cursor=cursor,
)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) from exc
return {"entries": page.entries, "next_cursor": page.next_cursor, "scope": scope}


@router.post("/events")
Expand Down
114 changes: 99 additions & 15 deletions src/authsome/server/store/repositories.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Typed repositories for server-owned relational Store records."""

import asyncio
import base64
import binascii
import builtins
import json
import threading
Expand Down Expand Up @@ -65,6 +67,42 @@ class AuditEventInsert:
payload: dict[str, Any]


@dataclass(frozen=True)
class AuditEventPage:
"""Paged audit event query result."""

entries: list[dict[str, Any]]
next_cursor: str | None


def _encode_audit_cursor(*, timestamp: str, event_id: str) -> str:
payload = json.dumps(
{"event_id": event_id, "timestamp": timestamp},
separators=(",", ":"),
sort_keys=True,
).encode("utf-8")
return base64.urlsafe_b64encode(payload).decode("ascii").rstrip("=")


def _decode_audit_cursor(cursor: str) -> tuple[str, str]:
valid_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"
if not cursor or any(char not in valid_chars for char in cursor):
raise ValueError("Invalid audit cursor")
try:
padded_cursor = cursor + "=" * (-len(cursor) % 4)
decoded = base64.urlsafe_b64decode(padded_cursor.encode("ascii")).decode("utf-8")
payload = json.loads(decoded)
if not isinstance(payload, dict):
raise ValueError
timestamp = payload.get("timestamp")
event_id = payload.get("event_id")
if not isinstance(timestamp, str) or not timestamp or not isinstance(event_id, str) or not event_id:
raise ValueError
except (binascii.Error, json.JSONDecodeError, UnicodeError, ValueError, TypeError):
raise ValueError("Invalid audit cursor") from None
return timestamp, event_id


class AuditEventRegistry:
"""Relational audit event registry."""

Expand Down Expand Up @@ -96,21 +134,44 @@ async def insert_many(self, events: list[AuditEventInsert]) -> None:
],
)

async def list_recent(self, *, limit: int = 50, principal_id: str | None = None) -> list[dict[str, Any]]:
async def query_events(
self,
*,
limit: int = 50,
principal_id: str | None = None,
cursor: str | None = None,
) -> AuditEventPage:
bounded_limit = min(max(limit, 1), 500)
if principal_id is None:
rows = await self._db.fetch_all(
"SELECT payload_json FROM audit_events ORDER BY timestamp DESC, event_id DESC LIMIT ?",
[bounded_limit],
)
else:
rows = await self._db.fetch_all(
"SELECT payload_json FROM audit_events "
"WHERE principal_id = ? "
"ORDER BY timestamp DESC, event_id DESC LIMIT ?",
[principal_id, bounded_limit],
)
return [json.loads(row["payload_json"]) for row in rows]
conditions: list[str] = []
params: list[Any] = []

if principal_id is not None:
conditions.append("principal_id = ?")
params.append(principal_id)

if cursor:
cursor_timestamp, cursor_event_id = _decode_audit_cursor(cursor)
conditions.append("(timestamp < ? OR (timestamp = ? AND event_id < ?))")
params.extend([cursor_timestamp, cursor_timestamp, cursor_event_id])

where_clause = f" WHERE {' AND '.join(conditions)}" if conditions else ""
rows = await self._db.fetch_all(
"SELECT event_id, timestamp, payload_json FROM audit_events"
f"{where_clause} "
"ORDER BY timestamp DESC, event_id DESC LIMIT ?",
[*params, bounded_limit + 1],
)
visible_rows = rows[:bounded_limit]
entries = [json.loads(row["payload_json"]) for row in visible_rows]
next_cursor = None
if len(rows) > bounded_limit and visible_rows:
last = visible_rows[-1]
next_cursor = _encode_audit_cursor(timestamp=last["timestamp"], event_id=last["event_id"])
return AuditEventPage(entries=entries, next_cursor=next_cursor)

async def list_recent(self, *, limit: int = 50, principal_id: str | None = None) -> list[dict[str, Any]]:
page = await self.query_events(limit=limit, principal_id=principal_id)
return page.entries

def configure_exporter(self, loop: asyncio.AbstractEventLoop | None = None):
"""Configure the process OTel logger provider to export audit logs to Store."""
Expand Down Expand Up @@ -191,6 +252,10 @@ def shutdown(self) -> None:
self._closed = True
self.force_flush()

def close(self) -> None:
self._closed = True
self._drop_finished_futures()

def _is_loop_thread(self) -> bool:
try:
return asyncio.get_running_loop() is self._loop
Expand Down Expand Up @@ -257,9 +322,28 @@ def force_flush(self) -> None:
self._exporter.force_flush()

async def async_force_flush(self) -> None:
self._provider.force_flush()
await asyncio.to_thread(self._provider.force_flush)
await self._exporter.async_force_flush()

async def async_shutdown(self) -> None:
await self.async_force_flush()
_delegating_audit_exporter.set_active(None)
self._exporter.close()

async def query_events(
self,
*,
limit: int = 50,
principal_id: str | None = None,
cursor: str | None = None,
) -> AuditEventPage:
await self.async_force_flush()
return await self._registry.query_events(
limit=limit,
principal_id=principal_id,
cursor=cursor,
)

async def list_events(self, *, limit: int = 50, principal_id: str | None = None) -> list[dict[str, Any]]:
await self.async_force_flush()
return await self._registry.list_recent(limit=limit, principal_id=principal_id)
Expand Down
2 changes: 1 addition & 1 deletion tests/auth/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def audit_log(self, tmp_path) -> ServerAuditLog: # noqa: ANN001
try:
yield log
finally:
log.shutdown()
await log.async_shutdown()
await store.close()

@pytest.fixture
Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
os.environ["AUTHSOME_BASE_URL"] = TEST_AUTHSOME_BASE_URL
os.environ["AUTHSOME_ENV"] = "test"
os.environ["AUTHSOME_DO_NOT_TRACK"] = "true"
os.environ.pop("AUTHSOME_DATABASE_URL", None)
os.environ.pop("DATABASE_URL", None)
os.environ.pop("AUTHSOME_REDIS_URL", None)
os.environ.pop("AUTHSOME_POSTHOG_API_KEY", None)
os.environ.pop("POSTHOG_API_KEY", None)

Expand All @@ -26,6 +29,9 @@ def _disable_analytics(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("AUTHSOME_BASE_URL", TEST_AUTHSOME_BASE_URL)
monkeypatch.setenv("AUTHSOME_ENV", "test")
monkeypatch.setenv("AUTHSOME_DO_NOT_TRACK", "true")
monkeypatch.delenv("AUTHSOME_DATABASE_URL", raising=False)
monkeypatch.delenv("DATABASE_URL", raising=False)
monkeypatch.delenv("AUTHSOME_REDIS_URL", raising=False)
monkeypatch.delenv("AUTHSOME_POSTHOG_API_KEY", raising=False)
monkeypatch.delenv("POSTHOG_API_KEY", raising=False)
analytics.shutdown_posthog()
Expand Down
Loading
Loading