From 5c92e69a49cd1bf1f4b90ca3bdb77fb9715dae57 Mon Sep 17 00:00:00 2001 From: whoisasx Date: Wed, 17 Jun 2026 00:48:37 +0530 Subject: [PATCH 1/2] feat: add desktop notifications v1 --- backend/internal/httpd/apispec/openapi.yaml | 102 +++++++++ .../internal/httpd/apispec/specgen/build.go | 90 +++++--- backend/internal/httpd/controllers/dto.go | 20 ++ .../httpd/controllers/notifications.go | 75 +++++-- .../httpd/controllers/notifications_test.go | 94 +++++++- .../internal/service/notification/service.go | 35 +++ .../service/notification/service_test.go | 58 ++++- .../internal/service/notification/store.go | 2 + .../storage/sqlite/gen/notifications.sql.go | 64 ++++++ .../storage/sqlite/queries/notifications.sql | 11 + .../sqlite/store/notification_store.go | 25 +++ .../sqlite/store/notification_store_test.go | 89 ++++++++ docs/status.md | 8 +- frontend/src/api/schema.ts | 148 ++++++++++++ frontend/src/main.ts | 28 ++- frontend/src/preload.ts | 11 + .../components/NotificationCenter.tsx | 211 ++++++++++++++++++ .../src/renderer/components/ShellTopbar.tsx | 2 + .../renderer/hooks/useNotificationsQuery.ts | 38 ++++ frontend/src/renderer/lib/bridge.ts | 4 + .../src/renderer/lib/notifications.test.ts | 153 +++++++++++++ frontend/src/renderer/lib/notifications.ts | 138 ++++++++++++ frontend/src/renderer/test/setup.ts | 4 + 23 files changed, 1354 insertions(+), 56 deletions(-) create mode 100644 frontend/src/renderer/components/NotificationCenter.tsx create mode 100644 frontend/src/renderer/hooks/useNotificationsQuery.ts create mode 100644 frontend/src/renderer/lib/notifications.test.ts create mode 100644 frontend/src/renderer/lib/notifications.ts diff --git a/backend/internal/httpd/apispec/openapi.yaml b/backend/internal/httpd/apispec/openapi.yaml index 3e067d4d..5b8cfce7 100644 --- a/backend/internal/httpd/apispec/openapi.yaml +++ b/backend/internal/httpd/apispec/openapi.yaml @@ -100,6 +100,82 @@ paths: summary: List unread notifications tags: - notifications + /api/v1/notifications/{id}: + patch: + operationId: markNotificationRead + parameters: + - description: Notification identifier. + in: path + name: id + required: true + schema: + description: Notification identifier. + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/MarkNotificationReadRequest' + required: true + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/NotificationEnvelope' + description: OK + "400": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Bad Request + "404": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Not Found + "500": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Internal Server Error + "501": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Not Implemented + summary: Mark a notification read + tags: + - notifications + /api/v1/notifications/read-all: + post: + operationId: markAllNotificationsRead + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/MarkAllNotificationsReadResponse' + description: OK + "500": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Internal Server Error + "501": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Not Implemented + summary: Mark all unread notifications read + tags: + - notifications /api/v1/notifications/stream: get: operationId: streamNotifications @@ -1357,6 +1433,25 @@ components: required: - sessions type: object + MarkAllNotificationsReadResponse: + properties: + notifications: + items: + $ref: '#/components/schemas/NotificationResponse' + type: array + required: + - notifications + type: object + MarkNotificationReadRequest: + properties: + status: + description: V1 supports only marking an unread notification read. + enum: + - read + type: string + required: + - status + type: object MergePRResponse: properties: method: @@ -1370,6 +1465,13 @@ components: - prNumber - method type: object + NotificationEnvelope: + properties: + notification: + $ref: '#/components/schemas/NotificationResponse' + required: + - notification + type: object NotificationResponse: properties: body: diff --git a/backend/internal/httpd/apispec/specgen/build.go b/backend/internal/httpd/apispec/specgen/build.go index 2aeca734..2887130f 100644 --- a/backend/internal/httpd/apispec/specgen/build.go +++ b/backend/internal/httpd/apispec/specgen/build.go @@ -130,38 +130,42 @@ var schemaNames = map[string]string{ "DomainAgentConfig": "AgentConfig", "DomainRoleOverride": "RoleOverride", // httpd/controllers (wire envelopes) - "ControllersListProjectsResponse": "ListProjectsResponse", - "ControllersProjectResponse": "ProjectResponse", - "ControllersGetProjectResponse": "ProjectGetResponse", - "ControllersProjectOrDegraded": "ProjectOrDegraded", - "ControllersListSessionsQuery": "ListSessionsQuery", - "ControllersCleanupSessionsQuery": "CleanupSessionsQuery", - "ControllersListSessionsResponse": "ListSessionsResponse", - "ControllersSpawnSessionRequest": "SpawnSessionRequest", - "ControllersSessionResponse": "SessionResponse", - "ControllersRenameSessionRequest": "RenameSessionRequest", - "ControllersRenameSessionResponse": "RenameSessionResponse", - "ControllersRestoreSessionResponse": "RestoreSessionResponse", - "ControllersCleanupSessionsResponse": "CleanupSessionsResponse", - "ControllersCleanupSkippedSession": "CleanupSkippedSession", - "ControllersKillSessionResponse": "KillSessionResponse", - "ControllersRollbackSessionResponse": "RollbackSessionResponse", - "ControllersSendSessionMessageRequest": "SendSessionMessageRequest", - "ControllersSendSessionMessageResponse": "SendSessionMessageResponse", - "ControllersClaimPRResponse": "ClaimPRResponse", - "ControllersClaimPRRequest": "ClaimPRRequest", - "ControllersSessionPRFacts": "SessionPRFacts", - "ControllersListSessionPRsResponse": "ListSessionPRsResponse", - "ControllersSetActivityRequest": "SetActivityRequest", - "ControllersSetActivityResponse": "SetActivityResponse", - "ControllersSpawnOrchestratorRequest": "SpawnOrchestratorRequest", - "ControllersSpawnOrchestratorResponse": "SpawnOrchestratorResponse", - "ControllersOrchestratorResponse": "OrchestratorResponse", - "ControllersListNotificationsQuery": "ListNotificationsQuery", - "ControllersNotificationStreamQuery": "NotificationStreamQuery", - "ControllersNotificationTarget": "NotificationTarget", - "ControllersNotificationResponse": "NotificationResponse", - "ControllersListNotificationsResponse": "ListNotificationsResponse", + "ControllersListProjectsResponse": "ListProjectsResponse", + "ControllersProjectResponse": "ProjectResponse", + "ControllersGetProjectResponse": "ProjectGetResponse", + "ControllersProjectOrDegraded": "ProjectOrDegraded", + "ControllersListSessionsQuery": "ListSessionsQuery", + "ControllersCleanupSessionsQuery": "CleanupSessionsQuery", + "ControllersListSessionsResponse": "ListSessionsResponse", + "ControllersSpawnSessionRequest": "SpawnSessionRequest", + "ControllersSessionResponse": "SessionResponse", + "ControllersRenameSessionRequest": "RenameSessionRequest", + "ControllersRenameSessionResponse": "RenameSessionResponse", + "ControllersRestoreSessionResponse": "RestoreSessionResponse", + "ControllersCleanupSessionsResponse": "CleanupSessionsResponse", + "ControllersCleanupSkippedSession": "CleanupSkippedSession", + "ControllersKillSessionResponse": "KillSessionResponse", + "ControllersRollbackSessionResponse": "RollbackSessionResponse", + "ControllersSendSessionMessageRequest": "SendSessionMessageRequest", + "ControllersSendSessionMessageResponse": "SendSessionMessageResponse", + "ControllersClaimPRResponse": "ClaimPRResponse", + "ControllersClaimPRRequest": "ClaimPRRequest", + "ControllersSessionPRFacts": "SessionPRFacts", + "ControllersListSessionPRsResponse": "ListSessionPRsResponse", + "ControllersSetActivityRequest": "SetActivityRequest", + "ControllersSetActivityResponse": "SetActivityResponse", + "ControllersSpawnOrchestratorRequest": "SpawnOrchestratorRequest", + "ControllersSpawnOrchestratorResponse": "SpawnOrchestratorResponse", + "ControllersOrchestratorResponse": "OrchestratorResponse", + "ControllersListNotificationsQuery": "ListNotificationsQuery", + "ControllersNotificationStreamQuery": "NotificationStreamQuery", + "ControllersNotificationIDParam": "NotificationIDParam", + "ControllersNotificationTarget": "NotificationTarget", + "ControllersNotificationResponse": "NotificationResponse", + "ControllersListNotificationsResponse": "ListNotificationsResponse", + "ControllersMarkNotificationReadRequest": "MarkNotificationReadRequest", + "ControllersNotificationEnvelope": "NotificationEnvelope", + "ControllersMarkAllNotificationsReadResponse": "MarkAllNotificationsReadResponse", // httpd/controllers — PR wire envelopes "ControllersMergePRResponse": "MergePRResponse", "ControllersResolveCommentsRequest": "ResolveCommentsRequest", @@ -275,6 +279,28 @@ func notificationOperations() []operation { {http.StatusNotImplemented, envelope.APIError{}}, }, }, + { + method: http.MethodPatch, path: "/api/v1/notifications/{id}", id: "markNotificationRead", tag: "notifications", + summary: "Mark a notification read", + pathParams: []any{controllers.NotificationIDParam{}}, + reqBody: controllers.MarkNotificationReadRequest{}, + resps: []respUnit{ + {http.StatusOK, controllers.NotificationEnvelope{}}, + {http.StatusBadRequest, envelope.APIError{}}, + {http.StatusNotFound, envelope.APIError{}}, + {http.StatusInternalServerError, envelope.APIError{}}, + {http.StatusNotImplemented, envelope.APIError{}}, + }, + }, + { + method: http.MethodPost, path: "/api/v1/notifications/read-all", id: "markAllNotificationsRead", tag: "notifications", + summary: "Mark all unread notifications read", + resps: []respUnit{ + {http.StatusOK, controllers.MarkAllNotificationsReadResponse{}}, + {http.StatusInternalServerError, envelope.APIError{}}, + {http.StatusNotImplemented, envelope.APIError{}}, + }, + }, { method: http.MethodGet, path: "/api/v1/notifications/stream", id: "streamNotifications", tag: "notifications", summary: "Stream created notifications", diff --git a/backend/internal/httpd/controllers/dto.go b/backend/internal/httpd/controllers/dto.go index d2bb10a8..eae67d1e 100644 --- a/backend/internal/httpd/controllers/dto.go +++ b/backend/internal/httpd/controllers/dto.go @@ -273,6 +273,11 @@ type NotificationStreamQuery struct { ProjectID string `query:"projectId,omitempty" description:"Optional project id filter for live notifications."` } +// NotificationIDParam is the {id} path parameter shared by notification routes. +type NotificationIDParam struct { + ID string `path:"id" description:"Notification identifier."` +} + // NotificationTarget is the dashboard navigation target for a notification. type NotificationTarget struct { Kind string `json:"kind" enum:"session,pr"` @@ -299,6 +304,21 @@ type ListNotificationsResponse struct { Notifications []NotificationResponse `json:"notifications"` } +// MarkNotificationReadRequest is the body of PATCH /api/v1/notifications/{id}. +type MarkNotificationReadRequest struct { + Status string `json:"status" enum:"read" description:"V1 supports only marking an unread notification read."` +} + +// NotificationEnvelope is the { notification } response body for notification mutations. +type NotificationEnvelope struct { + Notification NotificationResponse `json:"notification"` +} + +// MarkAllNotificationsReadResponse is the body of POST /api/v1/notifications/read-all. +type MarkAllNotificationsReadResponse struct { + Notifications []NotificationResponse `json:"notifications"` +} + // PRIDParam is the {id} path parameter shared by the /prs/{id} routes. type PRIDParam struct { ID string `path:"id" description:"PR number."` diff --git a/backend/internal/httpd/controllers/notifications.go b/backend/internal/httpd/controllers/notifications.go index 273d52aa..94ec31c7 100644 --- a/backend/internal/httpd/controllers/notifications.go +++ b/backend/internal/httpd/controllers/notifications.go @@ -18,6 +18,8 @@ import ( // NotificationService is the controller-facing notification service contract. type NotificationService interface { ListUnread(ctx context.Context, filter notificationsvc.ListFilter) ([]notificationsvc.Notification, error) + MarkRead(ctx context.Context, id string) (notificationsvc.Notification, bool, error) + MarkAllRead(ctx context.Context) ([]notificationsvc.Notification, error) } // NotificationStream is the live notification stream used by SSE clients. @@ -34,6 +36,8 @@ type NotificationsController struct { // Register mounts bounded notification REST routes on the supplied router. func (c *NotificationsController) Register(r chi.Router) { r.Get("/notifications", c.list) + r.Post("/notifications/read-all", c.markAllRead) + r.Patch("/notifications/{id}", c.markRead) } // RegisterStream mounts long-lived notification stream routes on the supplied router. @@ -59,6 +63,41 @@ func (c *NotificationsController) list(w http.ResponseWriter, r *http.Request) { envelope.WriteJSON(w, http.StatusOK, ListNotificationsResponse{Notifications: notificationResponses(notifications)}) } +func (c *NotificationsController) markRead(w http.ResponseWriter, r *http.Request) { + if c.Svc == nil { + apispec.NotImplemented(w, r, "PATCH", "/api/v1/notifications/{id}") + return + } + var req MarkNotificationReadRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + envelope.WriteAPIError(w, r, http.StatusBadRequest, "bad_request", "INVALID_JSON", "Invalid JSON body", nil) + return + } + if req.Status != string(domain.NotificationRead) { + envelope.WriteAPIError(w, r, http.StatusBadRequest, "bad_request", "INVALID_NOTIFICATION_STATUS", "Notification status must be read", nil) + return + } + notification, _, err := c.Svc.MarkRead(r.Context(), chi.URLParam(r, "id")) + if err != nil { + envelope.WriteError(w, r, err) + return + } + envelope.WriteJSON(w, http.StatusOK, NotificationEnvelope{Notification: notificationResponse(notification)}) +} + +func (c *NotificationsController) markAllRead(w http.ResponseWriter, r *http.Request) { + if c.Svc == nil { + apispec.NotImplemented(w, r, "POST", "/api/v1/notifications/read-all") + return + } + notifications, err := c.Svc.MarkAllRead(r.Context()) + if err != nil { + envelope.WriteError(w, r, err) + return + } + envelope.WriteJSON(w, http.StatusOK, MarkAllNotificationsReadResponse{Notifications: notificationResponses(notifications)}) +} + func (c *NotificationsController) stream(w http.ResponseWriter, r *http.Request) { if c.Stream == nil { apispec.NotImplemented(w, r, "GET", "/api/v1/notifications/stream") @@ -142,26 +181,30 @@ func (e notificationQueryError) Error() string { return string(e) } func notificationResponses(in []notificationsvc.Notification) []NotificationResponse { out := make([]NotificationResponse, 0, len(in)) for _, n := range in { - out = append(out, NotificationResponse{ - ID: n.ID, - SessionID: string(n.SessionID), - ProjectID: string(n.ProjectID), - PRURL: n.PRURL, - Type: string(n.Type), - Title: n.Title, - Body: n.Body, - Status: string(n.Status), - CreatedAt: n.CreatedAt, - Target: NotificationTarget{ - Kind: string(n.Target.Kind), - SessionID: string(n.Target.SessionID), - PRURL: n.Target.PRURL, - }, - }) + out = append(out, notificationResponse(n)) } return out } +func notificationResponse(n notificationsvc.Notification) NotificationResponse { + return NotificationResponse{ + ID: n.ID, + SessionID: string(n.SessionID), + ProjectID: string(n.ProjectID), + PRURL: n.PRURL, + Type: string(n.Type), + Title: n.Title, + Body: n.Body, + Status: string(n.Status), + CreatedAt: n.CreatedAt, + Target: NotificationTarget{ + Kind: string(n.Target.Kind), + SessionID: string(n.Target.SessionID), + PRURL: n.Target.PRURL, + }, + } +} + func notificationResponseFromRecord(rec domain.NotificationRecord) NotificationResponse { return NotificationResponse{ ID: rec.ID, diff --git a/backend/internal/httpd/controllers/notifications_test.go b/backend/internal/httpd/controllers/notifications_test.go index def49923..1c8c8e3d 100644 --- a/backend/internal/httpd/controllers/notifications_test.go +++ b/backend/internal/httpd/controllers/notifications_test.go @@ -14,14 +14,18 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/config" "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/httpd" + "github.com/aoagents/agent-orchestrator/backend/internal/httpd/apierr" "github.com/aoagents/agent-orchestrator/backend/internal/httpd/controllers" notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification" ) type fakeNotificationService struct { - gotFilter notificationsvc.ListFilter - items []notificationsvc.Notification - err error + gotFilter notificationsvc.ListFilter + gotMarkID string + items []notificationsvc.Notification + markItem notificationsvc.Notification + markAllItems []notificationsvc.Notification + err error } type fakeNotificationStream struct { @@ -34,6 +38,15 @@ func (f *fakeNotificationService) ListUnread(_ context.Context, filter notificat return f.items, f.err } +func (f *fakeNotificationService) MarkRead(_ context.Context, id string) (notificationsvc.Notification, bool, error) { + f.gotMarkID = id + return f.markItem, f.err == nil, f.err +} + +func (f *fakeNotificationService) MarkAllRead(context.Context) ([]notificationsvc.Notification, error) { + return f.markAllItems, f.err +} + func (f *fakeNotificationStream) Subscribe(projectID domain.ProjectID) (<-chan domain.NotificationRecord, func()) { f.gotProject = projectID if f.ch == nil { @@ -55,6 +68,10 @@ func newNotificationStreamTestServer(t *testing.T, svc controllers.NotificationS return srv } +func notificationsvcNotFound() error { + return apierr.NotFound("NOTIFICATION_NOT_FOUND", "Unknown unread notification") +} + func TestNotificationsAPI_ListUnread(t *testing.T) { now := time.Date(2026, 6, 11, 10, 0, 0, 0, time.UTC) svc := &fakeNotificationService{items: []notificationsvc.Notification{{ @@ -109,6 +126,77 @@ func TestNotificationsAPI_RejectsUnsupportedStatus(t *testing.T) { assertErrorCode(t, body, status, http.StatusBadRequest, "INVALID_QUERY") } +func TestNotificationsAPI_MarkRead(t *testing.T) { + now := time.Date(2026, 6, 11, 10, 0, 0, 0, time.UTC) + svc := &fakeNotificationService{markItem: notificationsvc.Notification{ + NotificationRecord: domain.NotificationRecord{ + ID: "ntf_1", SessionID: "mer-1", ProjectID: "mer", Type: domain.NotificationNeedsInput, + Title: "checkout-flow needs input", Status: domain.NotificationRead, CreatedAt: now, + }, + Target: notificationsvc.Target{Kind: notificationsvc.TargetSession, SessionID: "mer-1"}, + }} + srv := newNotificationTestServer(t, svc) + + body, status, _ := doRequest(t, srv, "PATCH", "/api/v1/notifications/ntf_1", `{"status":"read"}`) + if status != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", status, body) + } + if svc.gotMarkID != "ntf_1" { + t.Fatalf("gotMarkID = %q", svc.gotMarkID) + } + var resp struct { + Notification struct { + ID string `json:"id"` + Status string `json:"status"` + Target struct { + Kind string `json:"kind"` + } `json:"target"` + } `json:"notification"` + } + mustJSON(t, body, &resp) + if resp.Notification.ID != "ntf_1" || resp.Notification.Status != "read" || resp.Notification.Target.Kind != "session" { + t.Fatalf("resp = %+v", resp) + } +} + +func TestNotificationsAPI_MarkReadRejectsUnsupportedStatus(t *testing.T) { + srv := newNotificationTestServer(t, &fakeNotificationService{}) + + body, status, _ := doRequest(t, srv, "PATCH", "/api/v1/notifications/ntf_1", `{"status":"unread"}`) + assertErrorCode(t, body, status, http.StatusBadRequest, "INVALID_NOTIFICATION_STATUS") +} + +func TestNotificationsAPI_MarkReadUnknownNotification(t *testing.T) { + srv := newNotificationTestServer(t, &fakeNotificationService{err: notificationsvcNotFound()}) + + body, status, _ := doRequest(t, srv, "PATCH", "/api/v1/notifications/missing", `{"status":"read"}`) + assertErrorCode(t, body, status, http.StatusNotFound, "NOTIFICATION_NOT_FOUND") +} + +func TestNotificationsAPI_MarkAllRead(t *testing.T) { + now := time.Date(2026, 6, 11, 10, 0, 0, 0, time.UTC) + svc := &fakeNotificationService{markAllItems: []notificationsvc.Notification{{ + NotificationRecord: domain.NotificationRecord{ID: "ntf_1", SessionID: "mer-1", ProjectID: "mer", Type: domain.NotificationNeedsInput, Title: "needs", Status: domain.NotificationRead, CreatedAt: now}, + Target: notificationsvc.Target{Kind: notificationsvc.TargetSession, SessionID: "mer-1"}, + }}} + srv := newNotificationTestServer(t, svc) + + body, status, _ := doRequest(t, srv, "POST", "/api/v1/notifications/read-all", "") + if status != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", status, body) + } + var resp struct { + Notifications []struct { + ID string `json:"id"` + Status string `json:"status"` + } `json:"notifications"` + } + mustJSON(t, body, &resp) + if len(resp.Notifications) != 1 || resp.Notifications[0].ID != "ntf_1" || resp.Notifications[0].Status != "read" { + t.Fatalf("resp = %+v", resp) + } +} + func TestNotificationsAPI_WithoutServiceIs501(t *testing.T) { srv := newNotificationTestServer(t, nil) diff --git a/backend/internal/service/notification/service.go b/backend/internal/service/notification/service.go index 25f92edc..dcbbb66b 100644 --- a/backend/internal/service/notification/service.go +++ b/backend/internal/service/notification/service.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/httpd/apierr" ) const ( @@ -46,6 +47,40 @@ func (m *Manager) ListUnread(ctx context.Context, filter ListFilter) ([]Notifica return out, nil } +// MarkRead marks one unread notification read. +func (m *Manager) MarkRead(ctx context.Context, id string) (Notification, bool, error) { + if m == nil || m.store == nil { + return Notification{}, false, errors.New("notification: store is required") + } + if id == "" { + return Notification{}, false, apierr.Invalid("INVALID_NOTIFICATION_ID", "Notification id is required", nil) + } + row, ok, err := m.store.MarkNotificationRead(ctx, id) + if err != nil { + return Notification{}, false, err + } + if !ok { + return Notification{}, false, apierr.NotFound("NOTIFICATION_NOT_FOUND", "Unknown unread notification") + } + return notificationFromRecord(row), true, nil +} + +// MarkAllRead marks all unread notifications read. +func (m *Manager) MarkAllRead(ctx context.Context) ([]Notification, error) { + if m == nil || m.store == nil { + return nil, errors.New("notification: store is required") + } + rows, err := m.store.MarkAllNotificationsRead(ctx) + if err != nil { + return nil, err + } + out := make([]Notification, 0, len(rows)) + for _, row := range rows { + out = append(out, notificationFromRecord(row)) + } + return out, nil +} + func normalizeLimit(limit int) int { if limit <= 0 { return DefaultListLimit diff --git a/backend/internal/service/notification/service_test.go b/backend/internal/service/notification/service_test.go index 3330fa53..31f13528 100644 --- a/backend/internal/service/notification/service_test.go +++ b/backend/internal/service/notification/service_test.go @@ -2,15 +2,20 @@ package notification import ( "context" + "errors" "testing" "time" "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/httpd/apierr" ) type fakeStore struct { - rows []domain.NotificationRecord - err error + rows []domain.NotificationRecord + markRow domain.NotificationRecord + markOK bool + markAllRows []domain.NotificationRecord + err error } func (f *fakeStore) CreateNotification(context.Context, domain.NotificationRecord) (domain.NotificationRecord, bool, error) { @@ -21,6 +26,14 @@ func (f *fakeStore) ListUnreadNotifications(_ context.Context, _ int) ([]domain. return f.rows, f.err } +func (f *fakeStore) MarkNotificationRead(_ context.Context, _ string) (domain.NotificationRecord, bool, error) { + return f.markRow, f.markOK, f.err +} + +func (f *fakeStore) MarkAllNotificationsRead(context.Context) ([]domain.NotificationRecord, error) { + return f.markAllRows, f.err +} + func TestListUnreadAddsTargets(t *testing.T) { st := &fakeStore{rows: []domain.NotificationRecord{ {ID: "n1", SessionID: "mer-1", ProjectID: "mer", Type: domain.NotificationNeedsInput, Title: "needs", Status: domain.NotificationUnread, CreatedAt: time.Now()}, @@ -36,6 +49,47 @@ func TestListUnreadAddsTargets(t *testing.T) { } } +func TestMarkReadAddsTarget(t *testing.T) { + st := &fakeStore{ + markRow: domain.NotificationRecord{ + ID: "n2", SessionID: "mer-1", ProjectID: "mer", PRURL: "https://github.com/o/r/pull/1", + Type: domain.NotificationReadyToMerge, Title: "ready", Status: domain.NotificationRead, CreatedAt: time.Now(), + }, + markOK: true, + } + mgr := New(Deps{Store: st}) + got, ok, err := mgr.MarkRead(context.Background(), "n2") + if err != nil || !ok { + t.Fatalf("MarkRead ok=%v err=%v", ok, err) + } + if got.Status != domain.NotificationRead || got.Target.Kind != TargetPR || got.Target.PRURL == "" { + t.Fatalf("notification = %+v", got) + } +} + +func TestMarkReadMissingReturnsNotFound(t *testing.T) { + mgr := New(Deps{Store: &fakeStore{}}) + _, _, err := mgr.MarkRead(context.Background(), "missing") + var apiErr *apierr.Error + if !errors.As(err, &apiErr) || apiErr.Kind != apierr.KindNotFound || apiErr.Code != "NOTIFICATION_NOT_FOUND" { + t.Fatalf("err = %v, want notification not found", err) + } +} + +func TestMarkAllReadAddsTargets(t *testing.T) { + st := &fakeStore{markAllRows: []domain.NotificationRecord{{ + ID: "n1", SessionID: "mer-1", ProjectID: "mer", Type: domain.NotificationNeedsInput, Title: "needs", Status: domain.NotificationRead, CreatedAt: time.Now(), + }}} + mgr := New(Deps{Store: st}) + got, err := mgr.MarkAllRead(context.Background()) + if err != nil { + t.Fatalf("MarkAllRead: %v", err) + } + if len(got) != 1 || got[0].Target.Kind != TargetSession || got[0].Status != domain.NotificationRead { + t.Fatalf("notifications = %+v", got) + } +} + func TestListUnreadRequiresStore(t *testing.T) { _, err := New(Deps{}).ListUnread(context.Background(), ListFilter{}) if err == nil { diff --git a/backend/internal/service/notification/store.go b/backend/internal/service/notification/store.go index 58be2625..294917a9 100644 --- a/backend/internal/service/notification/store.go +++ b/backend/internal/service/notification/store.go @@ -9,4 +9,6 @@ import ( // Store is the notification service's read persistence surface. type Store interface { ListUnreadNotifications(ctx context.Context, limit int) ([]domain.NotificationRecord, error) + MarkNotificationRead(ctx context.Context, id string) (domain.NotificationRecord, bool, error) + MarkAllNotificationsRead(ctx context.Context) ([]domain.NotificationRecord, error) } diff --git a/backend/internal/storage/sqlite/gen/notifications.sql.go b/backend/internal/storage/sqlite/gen/notifications.sql.go index a70c6df5..c4f9c205 100644 --- a/backend/internal/storage/sqlite/gen/notifications.sql.go +++ b/backend/internal/storage/sqlite/gen/notifications.sql.go @@ -128,3 +128,67 @@ func (q *Queries) ListUnreadNotifications(ctx context.Context, limit int64) ([]N } return items, nil } + +const markAllNotificationsRead = `-- name: MarkAllNotificationsRead :many +UPDATE notifications +SET status = 'read' +WHERE status = 'unread' +RETURNING id, session_id, project_id, pr_url, type, title, body, status, created_at +` + +func (q *Queries) MarkAllNotificationsRead(ctx context.Context) ([]Notification, error) { + rows, err := q.db.QueryContext(ctx, markAllNotificationsRead) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Notification{} + for rows.Next() { + var i Notification + if err := rows.Scan( + &i.ID, + &i.SessionID, + &i.ProjectID, + &i.PRURL, + &i.Type, + &i.Title, + &i.Body, + &i.Status, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const markNotificationRead = `-- name: MarkNotificationRead :one +UPDATE notifications +SET status = 'read' +WHERE id = ? AND status = 'unread' +RETURNING id, session_id, project_id, pr_url, type, title, body, status, created_at +` + +func (q *Queries) MarkNotificationRead(ctx context.Context, id string) (Notification, error) { + row := q.db.QueryRowContext(ctx, markNotificationRead, id) + var i Notification + err := row.Scan( + &i.ID, + &i.SessionID, + &i.ProjectID, + &i.PRURL, + &i.Type, + &i.Title, + &i.Body, + &i.Status, + &i.CreatedAt, + ) + return i, err +} diff --git a/backend/internal/storage/sqlite/queries/notifications.sql b/backend/internal/storage/sqlite/queries/notifications.sql index bd8a75b5..a91f4511 100644 --- a/backend/internal/storage/sqlite/queries/notifications.sql +++ b/backend/internal/storage/sqlite/queries/notifications.sql @@ -11,6 +11,17 @@ WHERE status = 'unread' ORDER BY created_at DESC LIMIT ?; +-- name: MarkNotificationRead :one +UPDATE notifications +SET status = 'read' +WHERE id = ? AND status = 'unread' +RETURNING *; + +-- name: MarkAllNotificationsRead :many +UPDATE notifications +SET status = 'read' +WHERE status = 'unread' +RETURNING *; -- name: GetUnreadNotificationByDedupe :one SELECT * diff --git a/backend/internal/storage/sqlite/store/notification_store.go b/backend/internal/storage/sqlite/store/notification_store.go index ce54293f..6bec9ec1 100644 --- a/backend/internal/storage/sqlite/store/notification_store.go +++ b/backend/internal/storage/sqlite/store/notification_store.go @@ -62,6 +62,31 @@ func (s *Store) ListUnreadNotifications(ctx context.Context, limit int) ([]domai return notificationsFromGen(rows), nil } +// MarkNotificationRead marks one unread notification read. +func (s *Store) MarkNotificationRead(ctx context.Context, id string) (domain.NotificationRecord, bool, error) { + s.writeMu.Lock() + defer s.writeMu.Unlock() + row, err := s.qw.MarkNotificationRead(ctx, id) + if errors.Is(err, sql.ErrNoRows) { + return domain.NotificationRecord{}, false, nil + } + if err != nil { + return domain.NotificationRecord{}, false, fmt.Errorf("mark notification read %s: %w", id, err) + } + return notificationFromGen(row), true, nil +} + +// MarkAllNotificationsRead marks every unread notification read. +func (s *Store) MarkAllNotificationsRead(ctx context.Context) ([]domain.NotificationRecord, error) { + s.writeMu.Lock() + defer s.writeMu.Unlock() + rows, err := s.qw.MarkAllNotificationsRead(ctx) + if err != nil { + return nil, fmt.Errorf("mark all notifications read: %w", err) + } + return notificationsFromGen(rows), nil +} + func (s *Store) getUnreadNotificationByDedupe(ctx context.Context, rec domain.NotificationRecord) (domain.NotificationRecord, bool, error) { row, err := s.qw.GetUnreadNotificationByDedupe(ctx, gen.GetUnreadNotificationByDedupeParams{ SessionID: rec.SessionID, diff --git a/backend/internal/storage/sqlite/store/notification_store_test.go b/backend/internal/storage/sqlite/store/notification_store_test.go index 0b2ebf9b..a01a7d87 100644 --- a/backend/internal/storage/sqlite/store/notification_store_test.go +++ b/backend/internal/storage/sqlite/store/notification_store_test.go @@ -49,6 +49,95 @@ func TestNotificationStore_InsertListAndDedupe(t *testing.T) { } } +func TestNotificationStore_MarkReadReopensUnreadDedupe(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + seedProject(t, s, "mer") + sess, err := s.CreateSession(ctx, sampleRecord("mer")) + if err != nil { + t.Fatalf("create session: %v", err) + } + now := time.Now().UTC().Truncate(time.Second) + rec := domain.NotificationRecord{ + ID: "ntf_1", + SessionID: sess.ID, + ProjectID: sess.ProjectID, + Type: domain.NotificationNeedsInput, + Title: "checkout-flow needs input", + Status: domain.NotificationUnread, + CreatedAt: now, + } + if _, inserted, err := s.CreateNotification(ctx, rec); err != nil || !inserted { + t.Fatalf("CreateNotification inserted=%v err=%v", inserted, err) + } + read, ok, err := s.MarkNotificationRead(ctx, rec.ID) + if err != nil || !ok { + t.Fatalf("MarkNotificationRead ok=%v err=%v", ok, err) + } + if read.Status != domain.NotificationRead { + t.Fatalf("status = %q, want read", read.Status) + } + rows, err := s.ListUnreadNotifications(ctx, 10) + if err != nil { + t.Fatalf("ListUnreadNotifications: %v", err) + } + if len(rows) != 0 { + t.Fatalf("rows = %+v, want none", rows) + } + again := rec + again.ID = "ntf_2" + again.CreatedAt = now.Add(time.Minute) + if _, inserted, err := s.CreateNotification(ctx, again); err != nil || !inserted { + t.Fatalf("CreateNotification after read inserted=%v err=%v", inserted, err) + } +} + +func TestNotificationStore_MarkReadMissing(t *testing.T) { + s := newTestStore(t) + _, ok, err := s.MarkNotificationRead(context.Background(), "missing") + if err != nil || ok { + t.Fatalf("MarkNotificationRead ok=%v err=%v, want false nil", ok, err) + } +} + +func TestNotificationStore_MarkAllRead(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + seedProject(t, s, "mer") + sess, err := s.CreateSession(ctx, sampleRecord("mer")) + if err != nil { + t.Fatalf("create session: %v", err) + } + base := time.Now().UTC().Truncate(time.Second) + for _, rec := range []domain.NotificationRecord{ + {ID: "ntf_1", SessionID: sess.ID, ProjectID: sess.ProjectID, Type: domain.NotificationNeedsInput, Title: "one", Status: domain.NotificationUnread, CreatedAt: base}, + {ID: "ntf_2", SessionID: sess.ID, ProjectID: sess.ProjectID, PRURL: "https://github.com/o/r/pull/1", Type: domain.NotificationReadyToMerge, Title: "two", Status: domain.NotificationUnread, CreatedAt: base.Add(time.Minute)}, + } { + if _, inserted, err := s.CreateNotification(ctx, rec); err != nil || !inserted { + t.Fatalf("insert %s inserted=%v err=%v", rec.ID, inserted, err) + } + } + read, err := s.MarkAllNotificationsRead(ctx) + if err != nil { + t.Fatalf("MarkAllNotificationsRead: %v", err) + } + if len(read) != 2 { + t.Fatalf("read rows = %+v", read) + } + for _, row := range read { + if row.Status != domain.NotificationRead { + t.Fatalf("row = %+v, want read", row) + } + } + rows, err := s.ListUnreadNotifications(ctx, 10) + if err != nil { + t.Fatalf("ListUnreadNotifications: %v", err) + } + if len(rows) != 0 { + t.Fatalf("unread rows = %+v, want none", rows) + } +} + func TestNotificationStore_ListUnreadNewestFirstAcrossProjects(t *testing.T) { s := newTestStore(t) ctx := context.Background() diff --git a/docs/status.md b/docs/status.md index 70320e07..75e76b34 100644 --- a/docs/status.md +++ b/docs/status.md @@ -36,6 +36,9 @@ surface (`npm run sqlc`, `npm run api`). `/prs/{id}/resolve-comments`. - Review routes registered: `GET /reviews`, `POST /reviews/execute`, `POST /reviews/{id}/send`. +- Durable dashboard notifications for `needs_input`, `ready_to_merge`, + `pr_merged`, and `pr_closed_unmerged`: backend enrichment/persistence, + unread list, live notification stream, and read acknowledgement API. - SCM observer (`internal/observe/scm`) wired into the daemon: GitHub provider, lazy/non-blocking auth, per-PR polling with ETag guards and semantic diffing, feeding PR facts into lifecycle, which sends agent nudges for CI failures, @@ -58,14 +61,15 @@ surface (`npm run sqlc`, `npm run api`). spawn-orchestrator flow. - Terminal pane (xterm) over the mux WebSocket, with a live SSE events connection and port-rebind on daemon restart. +- In-app notification center with unread catch-up over REST, live notification + stream updates, explicit open-target actions, mark-read controls, and + Electron app toasts while the app is running. ## In flight / not yet a runtime feature - **Tracker lane**: GitHub tracker adapter exists, but there is no daemon observer loop or agent-lifecycle→issue mirroring yet, so the tracker does nothing at runtime ([#112](https://github.com/aoagents/agent-orchestrator/issues/112)). -- **Notifications**: design/in-flight only; no shipped backend notifier or UI - center. - **Live PR/tracker fact surfacing**: the observer writes facts, but exposing the full `pr_*` / `tracker_*` CDC events to live consumers ([#110](https://github.com/aoagents/agent-orchestrator/issues/110)) and in diff --git a/frontend/src/api/schema.ts b/frontend/src/api/schema.ts index 92a84d15..bf59efbc 100644 --- a/frontend/src/api/schema.ts +++ b/frontend/src/api/schema.ts @@ -38,6 +38,40 @@ export interface paths { patch?: never; trace?: never; }; + "/api/v1/notifications/{id}": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + get?: never; + put?: never; + post?: never; + delete?: never; + options?: never; + head?: never; + /** Mark a notification read */ + patch: operations["markNotificationRead"]; + trace?: never; + }; + "/api/v1/notifications/read-all": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + get?: never; + put?: never; + /** Mark all unread notifications read */ + post: operations["markAllNotificationsRead"]; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/api/v1/notifications/stream": { parameters: { query?: never; @@ -481,11 +515,24 @@ export interface components { ListSessionsResponse: { sessions: components["schemas"]["Session"][]; }; + MarkAllNotificationsReadResponse: { + notifications: components["schemas"]["NotificationResponse"][]; + }; + MarkNotificationReadRequest: { + /** + * @description V1 supports only marking an unread notification read. + * @enum {string} + */ + status: "read"; + }; MergePRResponse: { method: string; ok: boolean; prNumber: number; }; + NotificationEnvelope: { + notification: components["schemas"]["NotificationResponse"]; + }; NotificationResponse: { body: string; /** Format: date-time */ @@ -796,6 +843,107 @@ export interface operations { }; }; }; + markNotificationRead: { + parameters: { + query?: never; + header?: never; + path: { + /** @description Notification identifier. */ + id: string; + }; + cookie?: never; + }; + requestBody: { + content: { + "application/json": components["schemas"]["MarkNotificationReadRequest"]; + }; + }; + responses: { + /** @description OK */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["NotificationEnvelope"]; + }; + }; + /** @description Bad Request */ + 400: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + /** @description Not Found */ + 404: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + /** @description Internal Server Error */ + 500: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + /** @description Not Implemented */ + 501: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + }; + }; + markAllNotificationsRead: { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description OK */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MarkAllNotificationsReadResponse"]; + }; + }; + /** @description Internal Server Error */ + 500: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + /** @description Not Implemented */ + 501: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + }; + }; streamNotifications: { parameters: { query?: { diff --git a/frontend/src/main.ts b/frontend/src/main.ts index ca05ed6f..8afea20d 100644 --- a/frontend/src/main.ts +++ b/frontend/src/main.ts @@ -1,4 +1,14 @@ -import { app, BrowserWindow, dialog, ipcMain, net, protocol, shell, type OpenDialogOptions } from "electron"; +import { + app, + BrowserWindow, + dialog, + ipcMain, + net, + Notification as ElectronNotification, + protocol, + shell, + type OpenDialogOptions, +} from "electron"; import { updateElectronApp } from "update-electron-app"; import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process"; import { existsSync } from "node:fs"; @@ -319,6 +329,22 @@ ipcMain.handle("app:chooseDirectory", async () => { return result.filePaths[0] ?? null; }); +ipcMain.handle("notifications:show", (_event, notification: { id: string; title: string; body?: string }) => { + if (!notification.id || !notification.title || !ElectronNotification.isSupported()) return; + const toast = new ElectronNotification({ + title: notification.title, + body: notification.body, + }); + toast.on("click", () => { + if (!mainWindow) return; + if (mainWindow.isMinimized()) mainWindow.restore(); + mainWindow.show(); + mainWindow.focus(); + mainWindow.webContents.send("notifications:click", notification.id); + }); + toast.show(); +}); + // Auto-update only runs for packaged builds reading the GitHub Releases feed // (see forge.config.ts publishers). In dev there is no feed, so it is skipped. // A live updater additionally requires a signed + notarized build — see diff --git a/frontend/src/preload.ts b/frontend/src/preload.ts index 2e762f2c..8d85500b 100644 --- a/frontend/src/preload.ts +++ b/frontend/src/preload.ts @@ -18,6 +18,17 @@ const api = { }; }, }, + notifications: { + show: (notification: { id: string; title: string; body?: string }) => + ipcRenderer.invoke("notifications:show", notification) as Promise, + onClick: (listener: (id: string) => void) => { + const wrapped = (_event: Electron.IpcRendererEvent, id: string) => listener(id); + ipcRenderer.on("notifications:click", wrapped); + return () => { + ipcRenderer.off("notifications:click", wrapped); + }; + }, + }, }; contextBridge.exposeInMainWorld("ao", api); diff --git a/frontend/src/renderer/components/NotificationCenter.tsx b/frontend/src/renderer/components/NotificationCenter.tsx new file mode 100644 index 00000000..d6de7816 --- /dev/null +++ b/frontend/src/renderer/components/NotificationCenter.tsx @@ -0,0 +1,211 @@ +import { useQueryClient } from "@tanstack/react-query"; +import { useNavigate } from "@tanstack/react-router"; +import { Bell, Check, CheckCheck, CircleAlert, ExternalLink, GitMerge, GitPullRequest, XCircle } from "lucide-react"; +import { useCallback, useEffect, useMemo, useState } from "react"; +import { useMarkAllNotificationsReadMutation, useMarkNotificationReadMutation, useNotificationsQuery } from "../hooks/useNotificationsQuery"; +import { aoBridge } from "../lib/bridge"; +import { formatTimeCompact } from "../lib/format-time"; +import { createNotificationsTransport, type NotificationDTO, unreadNotificationsQueryKey } from "../lib/notifications"; +import { cn } from "../lib/utils"; +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuLabel, + DropdownMenuSeparator, + DropdownMenuTrigger, +} from "./ui/dropdown-menu"; + +type NotificationCenterProps = { + style?: React.CSSProperties; +}; + +export function NotificationCenter({ style }: NotificationCenterProps) { + const navigate = useNavigate(); + const queryClient = useQueryClient(); + const notificationsQuery = useNotificationsQuery(); + const markRead = useMarkNotificationReadMutation(); + const markAllRead = useMarkAllNotificationsReadMutation(); + const [actionError, setActionError] = useState(null); + const notifications = useMemo(() => notificationsQuery.data ?? [], [notificationsQuery.data]); + const unreadCount = notifications.length; + + const openTarget = useCallback( + (notification: NotificationDTO) => { + const target = notification.target; + if (target.kind === "pr" && target.prUrl) { + window.open(target.prUrl, "_blank", "noopener,noreferrer"); + return; + } + const sessionId = target.sessionId || notification.sessionId; + if (!sessionId) return; + if (notification.projectId) { + void navigate({ + to: "/projects/$projectId/sessions/$sessionId", + params: { projectId: notification.projectId, sessionId }, + }); + return; + } + void navigate({ to: "/sessions/$sessionId", params: { sessionId } }); + }, + [navigate], + ); + + useEffect(() => createNotificationsTransport(queryClient).connect(), [queryClient]); + + useEffect(() => { + return aoBridge.notifications.onClick((id) => { + const current = queryClient.getQueryData(unreadNotificationsQueryKey) ?? []; + const notification = current.find((item) => item.id === id); + if (notification) openTarget(notification); + }); + }, [openTarget, queryClient]); + + const markOneRead = async (id: string) => { + setActionError(null); + try { + await markRead.mutateAsync(id); + } catch (error) { + setActionError(error instanceof Error ? error.message : "Could not mark notification read"); + } + }; + + const markAll = async () => { + setActionError(null); + try { + await markAllRead.mutateAsync(); + } catch (error) { + setActionError(error instanceof Error ? error.message : "Could not mark notifications read"); + } + }; + + return ( + + + + + +
+ Notifications + +
+ {actionError ? ( +
{actionError}
+ ) : null} + {notificationsQuery.isError && unreadCount === 0 ? ( +
Could not load notifications.
+ ) : unreadCount === 0 ? ( +
No unread notifications.
+ ) : ( +
+ {notifications.map((notification, index) => ( +
+ + {index < notifications.length - 1 ? : null} +
+ ))} +
+ )} +
+
+ ); +} + +function NotificationItem({ + disabled, + notification, + onMarkRead, + onOpen, +}: { + disabled: boolean; + notification: NotificationDTO; + onMarkRead: (id: string) => Promise; + onOpen: (notification: NotificationDTO) => void; +}) { + const Icon = notificationIcon(notification.type); + return ( +
+
+
+
+
+

{notification.title}

+ {formatTimeCompact(notification.createdAt)} +
+ {notification.body ? ( +

{notification.body}

+ ) : null} +
+
+ + +
+
+ ); +} + +function notificationIcon(type: string) { + switch (type) { + case "needs_input": + return CircleAlert; + case "ready_to_merge": + return GitPullRequest; + case "pr_merged": + return GitMerge; + case "pr_closed_unmerged": + return XCircle; + default: + return Bell; + } +} diff --git a/frontend/src/renderer/components/ShellTopbar.tsx b/frontend/src/renderer/components/ShellTopbar.tsx index 63539287..d111b529 100644 --- a/frontend/src/renderer/components/ShellTopbar.tsx +++ b/frontend/src/renderer/components/ShellTopbar.tsx @@ -2,6 +2,7 @@ import { useQueryClient } from "@tanstack/react-query"; import { useNavigate, useParams } from "@tanstack/react-router"; import { GitBranch, LayoutGrid, PanelRightClose, PanelRightOpen, Waypoints } from "lucide-react"; import { useState } from "react"; +import { NotificationCenter } from "./NotificationCenter"; import { findProjectOrchestrator, isOrchestratorSession, @@ -126,6 +127,7 @@ export function ShellTopbar() {
+ {isSessionRoute ? ( <> {isOrchestrator ? ( diff --git a/frontend/src/renderer/hooks/useNotificationsQuery.ts b/frontend/src/renderer/hooks/useNotificationsQuery.ts new file mode 100644 index 00000000..fb39bdfc --- /dev/null +++ b/frontend/src/renderer/hooks/useNotificationsQuery.ts @@ -0,0 +1,38 @@ +import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; +import { + clearUnreadNotifications, + fetchUnreadNotifications, + markAllNotificationsRead, + markNotificationRead, + removeUnreadNotification, + unreadNotificationsQueryKey, +} from "../lib/notifications"; + +export function useNotificationsQuery() { + return useQuery({ + queryKey: unreadNotificationsQueryKey, + queryFn: fetchUnreadNotifications, + retry: 1, + }); +} + +export function useMarkNotificationReadMutation() { + const queryClient = useQueryClient(); + return useMutation({ + mutationFn: markNotificationRead, + onSuccess: (notification) => { + removeUnreadNotification(queryClient, notification.id); + }, + }); +} + +export function useMarkAllNotificationsReadMutation() { + const queryClient = useQueryClient(); + return useMutation({ + mutationFn: markAllNotificationsRead, + onSuccess: () => { + clearUnreadNotifications(queryClient); + void queryClient.invalidateQueries({ queryKey: unreadNotificationsQueryKey }); + }, + }); +} diff --git a/frontend/src/renderer/lib/bridge.ts b/frontend/src/renderer/lib/bridge.ts index 6e907e13..e358ca1b 100644 --- a/frontend/src/renderer/lib/bridge.ts +++ b/frontend/src/renderer/lib/bridge.ts @@ -16,4 +16,8 @@ export const aoBridge: AoBridge = stop: async () => ({ state: "stopped" }), onStatus: () => () => undefined, }, + notifications: { + show: async () => undefined, + onClick: () => () => undefined, + }, } satisfies AoBridge); diff --git a/frontend/src/renderer/lib/notifications.test.ts b/frontend/src/renderer/lib/notifications.test.ts new file mode 100644 index 00000000..f375dfd1 --- /dev/null +++ b/frontend/src/renderer/lib/notifications.test.ts @@ -0,0 +1,153 @@ +import { QueryClient } from "@tanstack/react-query"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { NotificationDTO } from "./notifications"; + +const { + getApiBaseUrlMock, + onStatusMock, + removeStatusMock, + showNotificationMock, + subscribeApiBaseUrlMock, + unsubscribeBaseUrlMock, +} = vi.hoisted(() => ({ + getApiBaseUrlMock: vi.fn(() => "http://127.0.0.1:3001"), + onStatusMock: vi.fn(), + removeStatusMock: vi.fn(), + showNotificationMock: vi.fn(), + subscribeApiBaseUrlMock: vi.fn(), + unsubscribeBaseUrlMock: vi.fn(), +})); + +vi.mock("./api-client", () => ({ + apiClient: {}, + apiErrorMessage: () => "Request failed", + getApiBaseUrl: getApiBaseUrlMock, + subscribeApiBaseUrl: subscribeApiBaseUrlMock, +})); + +vi.mock("./bridge", () => ({ + aoBridge: { + daemon: { onStatus: onStatusMock }, + notifications: { show: showNotificationMock }, + }, +})); + +import { createNotificationsTransport, mergeUnreadNotification, unreadNotificationsQueryKey } from "./notifications"; + +class EventSourceStub { + static instances: EventSourceStub[] = []; + url: string; + closed = false; + readyState = 0; + onopen: (() => void) | null = null; + onerror: (() => void) | null = null; + listeners = new Map) => void>(); + + constructor(url: string) { + this.url = url; + EventSourceStub.instances.push(this); + } + + addEventListener(type: string, listener: EventListener) { + this.listeners.set(type, listener as (event: MessageEvent) => void); + } + + dispatch(type: string, data: unknown) { + this.listeners.get(type)?.({ data: JSON.stringify(data) } as MessageEvent); + } + + close() { + this.closed = true; + this.readyState = 2; + } +} + +function notification(overrides: Partial = {}): NotificationDTO { + return { + id: "ntf_1", + sessionId: "mer-1", + projectId: "mer", + prUrl: "", + type: "needs_input", + title: "checkout-flow needs input", + body: "The agent is waiting for your response.", + status: "unread", + createdAt: "2026-06-16T10:00:00Z", + target: { kind: "session", sessionId: "mer-1" }, + ...overrides, + }; +} + +function queryClient() { + return new QueryClient({ defaultOptions: { queries: { retry: false } } }); +} + +beforeEach(() => { + EventSourceStub.instances = []; + getApiBaseUrlMock.mockReset().mockReturnValue("http://127.0.0.1:3001"); + onStatusMock.mockReset().mockReturnValue(removeStatusMock); + removeStatusMock.mockReset(); + showNotificationMock.mockReset().mockResolvedValue(undefined); + subscribeApiBaseUrlMock.mockReset().mockReturnValue(unsubscribeBaseUrlMock); + unsubscribeBaseUrlMock.mockReset(); + (globalThis as unknown as { EventSource: unknown }).EventSource = EventSourceStub; +}); + +afterEach(() => { + delete (globalThis as unknown as { EventSource?: unknown }).EventSource; +}); + +describe("notification cache helpers", () => { + it("merges unread notifications by id", () => { + const qc = queryClient(); + + expect(mergeUnreadNotification(qc, notification())).toBe(true); + expect(mergeUnreadNotification(qc, notification())).toBe(false); + + expect(qc.getQueryData(unreadNotificationsQueryKey)).toHaveLength(1); + }); +}); + +describe("createNotificationsTransport", () => { + it("opens the notification stream and invalidates unread notifications on open", () => { + const qc = queryClient(); + const invalidateSpy = vi.spyOn(qc, "invalidateQueries"); + + createNotificationsTransport(qc).connect(); + EventSourceStub.instances[0].onopen?.(); + + expect(EventSourceStub.instances).toHaveLength(1); + expect(EventSourceStub.instances[0].url).toBe("http://127.0.0.1:3001/api/v1/notifications/stream"); + expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: unreadNotificationsQueryKey }); + }); + + it("merges live notifications and shows one toast for a new id", () => { + const qc = queryClient(); + createNotificationsTransport(qc).connect(); + const source = EventSourceStub.instances[0]; + + source.dispatch("notification_created", notification()); + source.dispatch("notification_created", notification()); + + expect(qc.getQueryData(unreadNotificationsQueryKey)).toHaveLength(1); + expect(showNotificationMock).toHaveBeenCalledTimes(1); + expect(showNotificationMock).toHaveBeenCalledWith({ + id: "ntf_1", + title: "checkout-flow needs input", + body: "The agent is waiting for your response.", + }); + }); + + it("reconnects when the API base URL changes", () => { + createNotificationsTransport(queryClient()).connect(); + const onBaseUrlChange = subscribeApiBaseUrlMock.mock.calls[0][0] as () => void; + const first = EventSourceStub.instances[0]; + + getApiBaseUrlMock.mockReturnValue("http://127.0.0.1:4555"); + onBaseUrlChange(); + + expect(first.closed).toBe(true); + expect(EventSourceStub.instances).toHaveLength(2); + expect(EventSourceStub.instances[1].url).toBe("http://127.0.0.1:4555/api/v1/notifications/stream"); + }); +}); diff --git a/frontend/src/renderer/lib/notifications.ts b/frontend/src/renderer/lib/notifications.ts new file mode 100644 index 00000000..0379d2f3 --- /dev/null +++ b/frontend/src/renderer/lib/notifications.ts @@ -0,0 +1,138 @@ +import type { QueryClient } from "@tanstack/react-query"; +import type { components } from "../../api/schema"; +import { aoBridge } from "./bridge"; +import { apiClient, apiErrorMessage, getApiBaseUrl, subscribeApiBaseUrl } from "./api-client"; + +export type NotificationDTO = components["schemas"]["NotificationResponse"]; + +export const unreadNotificationsQueryKey = ["notifications", "unread"] as const; + +const SSE_RETRY_MS = 5_000; +const EVENTSOURCE_CLOSED = 2; + +export async function fetchUnreadNotifications(): Promise { + const { data, error } = await apiClient.GET("/api/v1/notifications", { + params: { query: { status: "unread", limit: 100 } }, + }); + if (error) throw new Error(apiErrorMessage(error, "Could not load notifications")); + return sortNotifications(data?.notifications ?? []); +} + +export async function markNotificationRead(id: string): Promise { + const { data, error } = await apiClient.PATCH("/api/v1/notifications/{id}", { + params: { path: { id } }, + body: { status: "read" }, + }); + if (error) throw new Error(apiErrorMessage(error, "Could not mark notification read")); + if (!data?.notification) throw new Error("Notification update returned no notification"); + return data.notification; +} + +export async function markAllNotificationsRead(): Promise { + const { data, error } = await apiClient.POST("/api/v1/notifications/read-all"); + if (error) throw new Error(apiErrorMessage(error, "Could not mark notifications read")); + return data?.notifications ?? []; +} + +export function mergeUnreadNotification(queryClient: QueryClient, notification: NotificationDTO): boolean { + let inserted = false; + queryClient.setQueryData(unreadNotificationsQueryKey, (current = []) => { + if (current.some((item) => item.id === notification.id)) return current; + inserted = true; + return sortNotifications([notification, ...current]); + }); + return inserted; +} + +export function removeUnreadNotification(queryClient: QueryClient, id: string): void { + queryClient.setQueryData(unreadNotificationsQueryKey, (current = []) => + current.filter((item) => item.id !== id), + ); +} + +export function clearUnreadNotifications(queryClient: QueryClient): void { + queryClient.setQueryData(unreadNotificationsQueryKey, []); +} + +export function createNotificationsTransport(queryClient: QueryClient) { + return { + connect() { + let retryTimer: ReturnType | undefined; + let source: EventSource | undefined; + let sourceBaseUrl: string | undefined; + + const invalidateUnread = () => { + void queryClient.invalidateQueries({ queryKey: unreadNotificationsQueryKey }); + }; + + const scheduleRetry = () => { + if (retryTimer) return; + retryTimer = setTimeout(() => { + retryTimer = undefined; + connectSource(); + }, SSE_RETRY_MS); + }; + + const connectSource = () => { + if (typeof EventSource === "undefined") return; + const baseUrl = getApiBaseUrl(); + if (source && sourceBaseUrl === baseUrl && source.readyState !== EVENTSOURCE_CLOSED) return; + source?.close(); + source = undefined; + sourceBaseUrl = baseUrl; + try { + source = new EventSource(`${baseUrl.replace(/\/+$/, "")}/api/v1/notifications/stream`); + source.onopen = invalidateUnread; + source.onerror = () => { + if (source?.readyState === EVENTSOURCE_CLOSED) scheduleRetry(); + }; + source.addEventListener("notification_created", (event) => { + const notification = parseNotificationEvent(event); + if (!notification) return; + const inserted = mergeUnreadNotification(queryClient, notification); + if (inserted) { + void aoBridge.notifications.show({ + id: notification.id, + title: notification.title, + body: notification.body || undefined, + }); + } + }); + } catch { + source = undefined; + } + }; + + const removeDaemonListener = aoBridge.daemon.onStatus(() => { + connectSource(); + invalidateUnread(); + }); + const removeBaseUrlListener = subscribeApiBaseUrl(() => { + connectSource(); + invalidateUnread(); + }); + connectSource(); + + return () => { + if (retryTimer) clearTimeout(retryTimer); + removeDaemonListener(); + removeBaseUrlListener(); + source?.close(); + }; + }, + }; +} + +function parseNotificationEvent(event: Event): NotificationDTO | null { + const data = (event as MessageEvent).data; + if (typeof data !== "string" || data === "") return null; + try { + return JSON.parse(data) as NotificationDTO; + } catch { + return null; + } +} + +function sortNotifications(notifications: NotificationDTO[]): NotificationDTO[] { + return [...notifications].sort((a, b) => Date.parse(b.createdAt) - Date.parse(a.createdAt)); +} diff --git a/frontend/src/renderer/test/setup.ts b/frontend/src/renderer/test/setup.ts index 6ad62a0c..69e8157a 100644 --- a/frontend/src/renderer/test/setup.ts +++ b/frontend/src/renderer/test/setup.ts @@ -61,4 +61,8 @@ window.ao = { stop: async () => ({ state: "stopped" }), onStatus: () => () => undefined, }, + notifications: { + show: async () => undefined, + onClick: () => () => undefined, + }, }; From bbde99d766a778bcdd6cbf920afdbc1f973f4908 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 16 Jun 2026 19:19:04 +0000 Subject: [PATCH 2/2] chore: format with prettier [skip ci] --- frontend/src/renderer/components/NotificationCenter.tsx | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/frontend/src/renderer/components/NotificationCenter.tsx b/frontend/src/renderer/components/NotificationCenter.tsx index d6de7816..1a81b449 100644 --- a/frontend/src/renderer/components/NotificationCenter.tsx +++ b/frontend/src/renderer/components/NotificationCenter.tsx @@ -2,7 +2,11 @@ import { useQueryClient } from "@tanstack/react-query"; import { useNavigate } from "@tanstack/react-router"; import { Bell, Check, CheckCheck, CircleAlert, ExternalLink, GitMerge, GitPullRequest, XCircle } from "lucide-react"; import { useCallback, useEffect, useMemo, useState } from "react"; -import { useMarkAllNotificationsReadMutation, useMarkNotificationReadMutation, useNotificationsQuery } from "../hooks/useNotificationsQuery"; +import { + useMarkAllNotificationsReadMutation, + useMarkNotificationReadMutation, + useNotificationsQuery, +} from "../hooks/useNotificationsQuery"; import { aoBridge } from "../lib/bridge"; import { formatTimeCompact } from "../lib/format-time"; import { createNotificationsTransport, type NotificationDTO, unreadNotificationsQueryKey } from "../lib/notifications";