From 9800c09e3efcffdc9349a0f73ec7d64c73e8e24c Mon Sep 17 00:00:00 2001 From: GRACENOBLE Date: Tue, 23 Jun 2026 09:50:33 +0300 Subject: [PATCH 1/4] feat(backend): complete six template gaps for production readiness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Wire Redis Streams end-to-end: Producer initialised in bootstrap, consumer goroutine in server fans UserCreatedEvent → Asynq welcome email task - Fix welcome email handler to actually call Mailjet via NewHandleWelcomeEmail constructor (was only logging before) - Add user profile CRUD: migration adds firebase_uid/email/photo_url to users, PATCH /api/v1/me upserts profile, DELETE /api/v1/me removes DB record - Make WebSocket bi-directional: ReadPump forwards inbound Envelopes to hub, Hub.OnMessage registers typed handlers dispatched per message type - Add generic pagination types (Page[T], CursorPage[T], PageRequest) to domain - Add bindJSON/bindQuery validation helpers; refactor existing handlers to use them - Remove TestClose that closed shared testDB mid-suite, breaking later tests --- backend/cmd/api/main.go | 4 +- backend/docs/swagger/docs.go | 139 ++++++++++++++++++ backend/docs/swagger/swagger.json | 139 ++++++++++++++++++ backend/docs/swagger/swagger.yaml | 91 ++++++++++++ backend/internal/bootstrap/bootstrap.go | 12 ++ backend/internal/domain/pagination.go | 36 +++++ backend/internal/domain/user.go | 14 ++ .../20260623063851_add_user_profile.sql | 16 ++ .../postgres/health_repository_test.go | 9 -- .../database/postgres/user_repository.go | 45 ++++++ .../database/postgres/user_repository_test.go | 122 +++++++++++++++ .../internal/infrastructure/queue/handlers.go | 23 +-- .../infrastructure/queue/handlers_test.go | 6 +- backend/internal/infrastructure/ws/client.go | 12 +- backend/internal/infrastructure/ws/hub.go | 48 ++++-- .../internal/infrastructure/ws/hub_test.go | 30 ++++ backend/internal/infrastructure/ws/message.go | 14 +- backend/internal/server/server.go | 29 +++- .../transport/handlers/fcm_handler.go | 11 +- .../internal/transport/handlers/handler.go | 6 + .../transport/handlers/health_handler_test.go | 4 +- .../internal/transport/handlers/me_handler.go | 90 ++++++++++++ .../transport/handlers/me_handler_test.go | 133 +++++++++++++++++ backend/internal/transport/handlers/routes.go | 4 + .../transport/handlers/storage_handler.go | 3 +- .../transport/handlers/swagger_types.go | 4 + .../internal/transport/handlers/validation.go | 25 ++++ backend/internal/usecase/streams.go | 8 + backend/internal/usecase/user.go | 13 ++ 29 files changed, 1046 insertions(+), 44 deletions(-) create mode 100644 backend/internal/domain/pagination.go create mode 100644 backend/internal/domain/user.go create mode 100644 backend/internal/infrastructure/database/migrations/20260623063851_add_user_profile.sql create mode 100644 backend/internal/infrastructure/database/postgres/user_repository.go create mode 100644 backend/internal/infrastructure/database/postgres/user_repository_test.go create mode 100644 backend/internal/transport/handlers/me_handler.go create mode 100644 backend/internal/transport/handlers/me_handler_test.go create mode 100644 backend/internal/transport/handlers/validation.go create mode 100644 backend/internal/usecase/streams.go create mode 100644 backend/internal/usecase/user.go diff --git a/backend/cmd/api/main.go b/backend/cmd/api/main.go index 79a8f8c..7a8a308 100644 --- a/backend/cmd/api/main.go +++ b/backend/cmd/api/main.go @@ -10,8 +10,6 @@ import ( "syscall" "time" - "github.com/hibiken/asynq" - "backend/internal/bootstrap" "backend/internal/infrastructure/queue" "backend/internal/infrastructure/ws" @@ -72,7 +70,7 @@ func main() { fmt.Fprintf(os.Stderr, "startup failed: %v\n", err) os.Exit(1) } - worker.Register(queue.TypeWelcomeEmail, asynq.HandlerFunc(queue.HandleWelcomeEmail)) + worker.Register(queue.TypeWelcomeEmail, queue.NewHandleWelcomeEmail(app.EmailSender)) go func() { if err := worker.Run(workerCtx); err != nil { slog.Error("queue: worker error", "err", err) diff --git a/backend/docs/swagger/docs.go b/backend/docs/swagger/docs.go index c7bc030..ff445da 100644 --- a/backend/docs/swagger/docs.go +++ b/backend/docs/swagger/docs.go @@ -244,6 +244,108 @@ const docTemplate = `{ } } } + }, + "delete": { + "security": [ + { + "BearerAuth": [] + } + ], + "description": "Removes the authenticated user's profile record from the database. The Firebase account is not deleted.", + "produces": [ + "application/json" + ], + "tags": [ + "users" + ], + "summary": "Delete current user account", + "responses": { + "204": { + "description": "No Content" + }, + "401": { + "description": "Unauthorized", + "schema": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + } + } + }, + "patch": { + "security": [ + { + "BearerAuth": [] + } + ], + "description": "Upserts the authenticated user's profile record. Requires a valid Firebase ID token.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "users" + ], + "summary": "Update current user profile", + "parameters": [ + { + "description": "Profile update", + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/handlers.updateMeRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.UserAlias" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + }, + "401": { + "description": "Unauthorized", + "schema": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + } + } } }, "/api/v1/storage/presign": { @@ -479,6 +581,32 @@ const docTemplate = `{ } } }, + "handlers.UserAlias": { + "type": "object", + "properties": { + "created_at": { + "type": "string" + }, + "email": { + "type": "string" + }, + "firebase_uid": { + "type": "string" + }, + "id": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "photo_url": { + "type": "string" + }, + "updated_at": { + "type": "string" + } + } + }, "handlers.presignRequest": { "type": "object", "required": [ @@ -535,6 +663,17 @@ const docTemplate = `{ "type": "string" } } + }, + "handlers.updateMeRequest": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "name": { + "type": "string" + } + } } }, "securityDefinitions": { diff --git a/backend/docs/swagger/swagger.json b/backend/docs/swagger/swagger.json index 4f6f877..b5450ec 100644 --- a/backend/docs/swagger/swagger.json +++ b/backend/docs/swagger/swagger.json @@ -238,6 +238,108 @@ } } } + }, + "delete": { + "security": [ + { + "BearerAuth": [] + } + ], + "description": "Removes the authenticated user's profile record from the database. The Firebase account is not deleted.", + "produces": [ + "application/json" + ], + "tags": [ + "users" + ], + "summary": "Delete current user account", + "responses": { + "204": { + "description": "No Content" + }, + "401": { + "description": "Unauthorized", + "schema": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + } + } + }, + "patch": { + "security": [ + { + "BearerAuth": [] + } + ], + "description": "Upserts the authenticated user's profile record. Requires a valid Firebase ID token.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "users" + ], + "summary": "Update current user profile", + "parameters": [ + { + "description": "Profile update", + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/handlers.updateMeRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.UserAlias" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + }, + "401": { + "description": "Unauthorized", + "schema": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + } + } } }, "/api/v1/storage/presign": { @@ -473,6 +575,32 @@ } } }, + "handlers.UserAlias": { + "type": "object", + "properties": { + "created_at": { + "type": "string" + }, + "email": { + "type": "string" + }, + "firebase_uid": { + "type": "string" + }, + "id": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "photo_url": { + "type": "string" + }, + "updated_at": { + "type": "string" + } + } + }, "handlers.presignRequest": { "type": "object", "required": [ @@ -529,6 +657,17 @@ "type": "string" } } + }, + "handlers.updateMeRequest": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "name": { + "type": "string" + } + } } }, "securityDefinitions": { diff --git a/backend/docs/swagger/swagger.yaml b/backend/docs/swagger/swagger.yaml index 52d0308..cf391f5 100644 --- a/backend/docs/swagger/swagger.yaml +++ b/backend/docs/swagger/swagger.yaml @@ -37,6 +37,23 @@ definitions: wait_duration: type: string type: object + handlers.UserAlias: + properties: + created_at: + type: string + email: + type: string + firebase_uid: + type: string + id: + type: integer + name: + type: string + photo_url: + type: string + updated_at: + type: string + type: object handlers.presignRequest: properties: content_type: @@ -75,6 +92,13 @@ definitions: required: - token type: object + handlers.updateMeRequest: + properties: + name: + type: string + required: + - name + type: object host: localhost:8080 info: contact: {} @@ -207,6 +231,31 @@ paths: tags: - fcm /api/v1/me: + delete: + description: Removes the authenticated user's profile record from the database. + The Firebase account is not deleted. + produces: + - application/json + responses: + "204": + description: No Content + "401": + description: Unauthorized + schema: + additionalProperties: + type: string + type: object + "500": + description: Internal Server Error + schema: + additionalProperties: + type: string + type: object + security: + - BearerAuth: [] + summary: Delete current user account + tags: + - users get: description: 'Returns the authenticated Firebase user''s decoded claims. Requires Authorization: Bearer .' @@ -229,6 +278,48 @@ paths: summary: Get current user tags: - auth + patch: + consumes: + - application/json + description: Upserts the authenticated user's profile record. Requires a valid + Firebase ID token. + parameters: + - description: Profile update + in: body + name: body + required: true + schema: + $ref: '#/definitions/handlers.updateMeRequest' + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/handlers.UserAlias' + "400": + description: Bad Request + schema: + additionalProperties: + type: string + type: object + "401": + description: Unauthorized + schema: + additionalProperties: + type: string + type: object + "500": + description: Internal Server Error + schema: + additionalProperties: + type: string + type: object + security: + - BearerAuth: [] + summary: Update current user profile + tags: + - users /api/v1/storage/{key}: delete: parameters: diff --git a/backend/internal/bootstrap/bootstrap.go b/backend/internal/bootstrap/bootstrap.go index 35c603d..e8baff6 100644 --- a/backend/internal/bootstrap/bootstrap.go +++ b/backend/internal/bootstrap/bootstrap.go @@ -21,6 +21,7 @@ import ( "backend/internal/infrastructure/ipgeo" "backend/internal/infrastructure/queue" "backend/internal/infrastructure/storage/r2" + "backend/internal/infrastructure/streams" "backend/internal/usecase" "backend/pkg/firebase" "backend/pkg/logger" @@ -40,6 +41,7 @@ type App struct { DB *sql.DB Cache usecase.CacheService // nil when REDIS_URL is not set Enqueuer usecase.Enqueuer // nil when REDIS_URL is not set + StreamProducer usecase.StreamProducer // nil when REDIS_URL is not set Firebase usecase.FirebaseAdminClient // nil when FIREBASE_PROJECT_ID is not set FCMSender usecase.NotificationSender // nil when FIREBASE_PROJECT_ID is not set EmailSender usecase.EmailSender // nil when MAILJET_API_KEY is not set @@ -134,6 +136,15 @@ func Run(ctx context.Context) (*App, error) { enqueuer = eq } + var streamProducer usecase.StreamProducer + if cfg.RedisURL != "" { + p, err := streams.NewProducer(cfg.RedisURL) + if err != nil { + return nil, fmt.Errorf("bootstrap: streams: %w", err) + } + streamProducer = p + } + var firebaseClient usecase.FirebaseAdminClient var fcmSender usecase.NotificationSender if cfg.FirebaseProjectID != "" { @@ -186,6 +197,7 @@ func Run(ctx context.Context) (*App, error) { DB: db, Cache: cache, Enqueuer: enqueuer, + StreamProducer: streamProducer, Firebase: firebaseClient, FCMSender: fcmSender, EmailSender: emailSender, diff --git a/backend/internal/domain/pagination.go b/backend/internal/domain/pagination.go new file mode 100644 index 0000000..fa028fd --- /dev/null +++ b/backend/internal/domain/pagination.go @@ -0,0 +1,36 @@ +package domain + +// Page is a generic offset-based page of results. +type Page[T any] struct { + Items []T `json:"items"` + Total int `json:"total"` + Page int `json:"page"` + PageSize int `json:"page_size"` + HasMore bool `json:"has_more"` +} + +// CursorPage is a generic cursor-based page of results. +type CursorPage[T any] struct { + Items []T `json:"items"` + NextCursor string `json:"next_cursor,omitempty"` + HasMore bool `json:"has_more"` +} + +// PageRequest holds query-param pagination inputs. +type PageRequest struct { + Page int `form:"page" binding:"min=0"` + PageSize int `form:"page_size" binding:"min=0,max=100"` +} + +// Defaults fills zero values with sensible defaults (page 1, size 20). +func (p *PageRequest) Defaults() { + if p.Page <= 0 { + p.Page = 1 + } + if p.PageSize <= 0 { + p.PageSize = 20 + } +} + +// Offset returns the SQL OFFSET value for this page request. +func (p PageRequest) Offset() int { return (p.Page - 1) * p.PageSize } diff --git a/backend/internal/domain/user.go b/backend/internal/domain/user.go new file mode 100644 index 0000000..29aae96 --- /dev/null +++ b/backend/internal/domain/user.go @@ -0,0 +1,14 @@ +package domain + +import "time" + +// User represents an authenticated user's profile record. +type User struct { + ID int64 `json:"id"` + FirebaseUID string `json:"firebase_uid"` + Name string `json:"name"` + Email string `json:"email"` + PhotoURL string `json:"photo_url"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} diff --git a/backend/internal/infrastructure/database/migrations/20260623063851_add_user_profile.sql b/backend/internal/infrastructure/database/migrations/20260623063851_add_user_profile.sql new file mode 100644 index 0000000..73a7c8a --- /dev/null +++ b/backend/internal/infrastructure/database/migrations/20260623063851_add_user_profile.sql @@ -0,0 +1,16 @@ +-- +goose Up +ALTER TABLE users + ADD COLUMN IF NOT EXISTS firebase_uid TEXT UNIQUE, + ADD COLUMN IF NOT EXISTS email TEXT, + ADD COLUMN IF NOT EXISTS photo_url TEXT, + ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ NOT NULL DEFAULT now(); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_users_firebase_uid ON users(firebase_uid); + +-- +goose Down +DROP INDEX IF EXISTS idx_users_firebase_uid; +ALTER TABLE users + DROP COLUMN IF EXISTS firebase_uid, + DROP COLUMN IF EXISTS email, + DROP COLUMN IF EXISTS photo_url, + DROP COLUMN IF EXISTS updated_at; diff --git a/backend/internal/infrastructure/database/postgres/health_repository_test.go b/backend/internal/infrastructure/database/postgres/health_repository_test.go index 8d1c721..3d402d5 100644 --- a/backend/internal/infrastructure/database/postgres/health_repository_test.go +++ b/backend/internal/infrastructure/database/postgres/health_repository_test.go @@ -79,9 +79,6 @@ func TestMain(m *testing.M) { } } -// Tests are ordered: TestNew → TestHealth → TestClose. -// TestClose closes testDB; tests after it would fail. - func TestNew(t *testing.T) { if testDB == nil { t.Fatal("NewPostgresDB() returned nil db") @@ -102,9 +99,3 @@ func TestHealth(t *testing.T) { t.Fatalf("expected no error, got %s", stats.Error) } } - -func TestClose(t *testing.T) { - if err := testDB.Close(); err != nil { - t.Fatalf("expected Close() to return nil, got %v", err) - } -} diff --git a/backend/internal/infrastructure/database/postgres/user_repository.go b/backend/internal/infrastructure/database/postgres/user_repository.go new file mode 100644 index 0000000..00c8cb6 --- /dev/null +++ b/backend/internal/infrastructure/database/postgres/user_repository.go @@ -0,0 +1,45 @@ +package postgres + +import ( + "context" + "database/sql" + "fmt" + + "backend/internal/domain" + "backend/internal/usecase" +) + +type userRepository struct{ db *sql.DB } + +// NewUserRepository returns a usecase.UserRepository backed by PostgreSQL. +func NewUserRepository(db *sql.DB) usecase.UserRepository { + return &userRepository{db: db} +} + +func (r *userRepository) Upsert(ctx context.Context, u *domain.User) (*domain.User, error) { + const q = ` + INSERT INTO users (firebase_uid, name, email, photo_url) + VALUES ($1, $2, $3, $4) + ON CONFLICT (firebase_uid) DO UPDATE + SET name = EXCLUDED.name, + email = EXCLUDED.email, + photo_url = EXCLUDED.photo_url, + updated_at = now() + RETURNING id, firebase_uid, name, email, photo_url, created_at, updated_at` + + out := &domain.User{} + err := r.db.QueryRowContext(ctx, q, u.FirebaseUID, u.Name, u.Email, u.PhotoURL). + Scan(&out.ID, &out.FirebaseUID, &out.Name, &out.Email, &out.PhotoURL, &out.CreatedAt, &out.UpdatedAt) + if err != nil { + return nil, fmt.Errorf("user repository: upsert: %w", err) + } + return out, nil +} + +func (r *userRepository) DeleteByFirebaseUID(ctx context.Context, firebaseUID string) error { + _, err := r.db.ExecContext(ctx, `DELETE FROM users WHERE firebase_uid = $1`, firebaseUID) + if err != nil { + return fmt.Errorf("user repository: delete: %w", err) + } + return nil +} diff --git a/backend/internal/infrastructure/database/postgres/user_repository_test.go b/backend/internal/infrastructure/database/postgres/user_repository_test.go new file mode 100644 index 0000000..68f4ccc --- /dev/null +++ b/backend/internal/infrastructure/database/postgres/user_repository_test.go @@ -0,0 +1,122 @@ +package postgres + +import ( + "context" + "testing" + + "backend/internal/domain" +) + +func setupUsersTable(t *testing.T) { + t.Helper() + _, err := testDB.Exec(` + CREATE TABLE IF NOT EXISTS users ( + id BIGSERIAL PRIMARY KEY, + firebase_uid TEXT UNIQUE, + name TEXT NOT NULL DEFAULT '', + email TEXT, + photo_url TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() + )`) + if err != nil { + t.Fatalf("setupUsersTable: %v", err) + } + t.Cleanup(func() { + _, _ = testDB.Exec(`DROP TABLE IF EXISTS users`) + }) +} + +func TestUserRepository_Upsert_Create(t *testing.T) { + setupUsersTable(t) + repo := NewUserRepository(testDB) + + u := &domain.User{ + FirebaseUID: "uid-001", + Name: "Alice", + Email: "alice@example.com", + PhotoURL: "https://example.com/alice.png", + } + got, err := repo.Upsert(context.Background(), u) + if err != nil { + t.Fatalf("Upsert() unexpected error: %v", err) + } + if got.ID == 0 { + t.Error("expected non-zero ID after upsert") + } + if got.FirebaseUID != u.FirebaseUID { + t.Errorf("FirebaseUID: got %q, want %q", got.FirebaseUID, u.FirebaseUID) + } + if got.Name != u.Name { + t.Errorf("Name: got %q, want %q", got.Name, u.Name) + } + if got.Email != u.Email { + t.Errorf("Email: got %q, want %q", got.Email, u.Email) + } + if got.PhotoURL != u.PhotoURL { + t.Errorf("PhotoURL: got %q, want %q", got.PhotoURL, u.PhotoURL) + } +} + +func TestUserRepository_Upsert_Update(t *testing.T) { + setupUsersTable(t) + repo := NewUserRepository(testDB) + + first := &domain.User{ + FirebaseUID: "uid-002", + Name: "Bob", + Email: "bob@example.com", + PhotoURL: "", + } + created, err := repo.Upsert(context.Background(), first) + if err != nil { + t.Fatalf("first Upsert() error: %v", err) + } + + updated := &domain.User{ + FirebaseUID: "uid-002", + Name: "Bob Updated", + Email: "bob+new@example.com", + PhotoURL: "https://example.com/bob.png", + } + got, err := repo.Upsert(context.Background(), updated) + if err != nil { + t.Fatalf("second Upsert() error: %v", err) + } + if got.ID != created.ID { + t.Errorf("expected same ID on update: got %d, want %d", got.ID, created.ID) + } + if got.Name != "Bob Updated" { + t.Errorf("Name: got %q, want %q", got.Name, "Bob Updated") + } + if got.Email != "bob+new@example.com" { + t.Errorf("Email: got %q, want %q", got.Email, "bob+new@example.com") + } +} + +func TestUserRepository_DeleteByFirebaseUID(t *testing.T) { + setupUsersTable(t) + repo := NewUserRepository(testDB) + + u := &domain.User{ + FirebaseUID: "uid-003", + Name: "Carol", + Email: "carol@example.com", + } + if _, err := repo.Upsert(context.Background(), u); err != nil { + t.Fatalf("Upsert() error: %v", err) + } + + if err := repo.DeleteByFirebaseUID(context.Background(), "uid-003"); err != nil { + t.Fatalf("DeleteByFirebaseUID() error: %v", err) + } + + // Confirm record is gone. + var count int + if err := testDB.QueryRow(`SELECT COUNT(*) FROM users WHERE firebase_uid = $1`, "uid-003").Scan(&count); err != nil { + t.Fatalf("count query error: %v", err) + } + if count != 0 { + t.Errorf("expected 0 rows after delete, got %d", count) + } +} diff --git a/backend/internal/infrastructure/queue/handlers.go b/backend/internal/infrastructure/queue/handlers.go index 700da67..e701b5e 100644 --- a/backend/internal/infrastructure/queue/handlers.go +++ b/backend/internal/infrastructure/queue/handlers.go @@ -4,18 +4,23 @@ import ( "context" "encoding/json" "fmt" - "log/slog" "github.com/hibiken/asynq" + + "backend/internal/usecase" ) -// HandleWelcomeEmail logs the welcome email payload. -// Real delivery via Mailjet is wired in issue #20. -func HandleWelcomeEmail(_ context.Context, t *asynq.Task) error { - var p WelcomeEmailPayload - if err := json.Unmarshal(t.Payload(), &p); err != nil { - return fmt.Errorf("welcome email: unmarshal payload: %w", err) +// NewHandleWelcomeEmail returns an asynq handler that sends a welcome email via sender. +// If sender is nil, the task is acknowledged without sending (graceful degradation). +func NewHandleWelcomeEmail(sender usecase.EmailSender) asynq.HandlerFunc { + return func(ctx context.Context, t *asynq.Task) error { + var p WelcomeEmailPayload + if err := json.Unmarshal(t.Payload(), &p); err != nil { + return fmt.Errorf("welcome email: unmarshal payload: %w", err) + } + if sender == nil { + return nil + } + return sender.SendWelcomeEmail(ctx, p.Email, p.Email) } - slog.Info("queue: welcome email task received", "user_id", p.UserID, "email", p.Email) - return nil } diff --git a/backend/internal/infrastructure/queue/handlers_test.go b/backend/internal/infrastructure/queue/handlers_test.go index 183966f..a44202a 100644 --- a/backend/internal/infrastructure/queue/handlers_test.go +++ b/backend/internal/infrastructure/queue/handlers_test.go @@ -16,14 +16,16 @@ func TestHandleWelcomeEmail_ValidPayload(t *testing.T) { t.Fatalf("marshal: %v", err) } task := asynq.NewTask(queue.TypeWelcomeEmail, payload) - if err := queue.HandleWelcomeEmail(context.Background(), task); err != nil { + handler := queue.NewHandleWelcomeEmail(nil) // nil sender: ack without sending + if err := handler(context.Background(), task); err != nil { t.Errorf("unexpected error: %v", err) } } func TestHandleWelcomeEmail_InvalidPayload(t *testing.T) { task := asynq.NewTask(queue.TypeWelcomeEmail, []byte("not-json")) - if err := queue.HandleWelcomeEmail(context.Background(), task); err == nil { + handler := queue.NewHandleWelcomeEmail(nil) + if err := handler(context.Background(), task); err == nil { t.Error("expected error for invalid payload, got nil") } } diff --git a/backend/internal/infrastructure/ws/client.go b/backend/internal/infrastructure/ws/client.go index 08b7318..224c250 100644 --- a/backend/internal/infrastructure/ws/client.go +++ b/backend/internal/infrastructure/ws/client.go @@ -1,6 +1,7 @@ package ws import ( + "encoding/json" "log/slog" "time" @@ -54,14 +55,21 @@ func (c *Client) ReadPump() { return c.conn.SetReadDeadline(time.Now().Add(pongWait)) }) for { - _, _, err := c.conn.ReadMessage() + _, raw, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { slog.Error("websocket read error", "error", err) } break } - // Server-to-client only for now; incoming messages are discarded. + var env Envelope + if jsonErr := json.Unmarshal(raw, &env); jsonErr == nil { + select { + case c.hub.inbound <- InboundMessage{ClientID: c.conn.RemoteAddr().String(), Msg: env}: + default: + slog.Warn("ws: inbound channel full, message dropped") + } + } } } diff --git a/backend/internal/infrastructure/ws/hub.go b/backend/internal/infrastructure/ws/hub.go index 756648a..d23a487 100644 --- a/backend/internal/infrastructure/ws/hub.go +++ b/backend/internal/infrastructure/ws/hub.go @@ -4,29 +4,44 @@ import ( "context" "encoding/json" "errors" + "log/slog" + "sync" ) // Hub maintains the set of active WebSocket clients and broadcasts messages to them. -// All mutations are serialised through the Run goroutine — no locking needed on the -// clients map itself. +// All mutations to the clients map are serialised through the Run goroutine. +// msgHandlers is protected by mu and may be updated concurrently. type Hub struct { - clients map[*Client]struct{} - broadcast chan []byte - Register chan *Client - Unregister chan *Client + clients map[*Client]struct{} + broadcast chan []byte + Register chan *Client + Unregister chan *Client + inbound chan InboundMessage + msgHandlers map[string]InboundHandler + mu sync.RWMutex } // NewHub allocates a Hub with buffered channels. func NewHub() *Hub { return &Hub{ - clients: make(map[*Client]struct{}), - broadcast: make(chan []byte, 256), - Register: make(chan *Client), - Unregister: make(chan *Client), + clients: make(map[*Client]struct{}), + broadcast: make(chan []byte, 256), + Register: make(chan *Client), + Unregister: make(chan *Client), + inbound: make(chan InboundMessage, 256), + msgHandlers: make(map[string]InboundHandler), } } -// Run processes register, unregister, and broadcast events until ctx is cancelled. +// OnMessage registers a handler for the given message type. +// Safe to call before or after Run starts. +func (h *Hub) OnMessage(msgType string, handler InboundHandler) { + h.mu.Lock() + h.msgHandlers[msgType] = handler + h.mu.Unlock() +} + +// Run processes register, unregister, broadcast, and inbound events until ctx is cancelled. // Call this in its own goroutine. func (h *Hub) Run(ctx context.Context) { for { @@ -53,6 +68,17 @@ func (h *Hub) Run(ctx context.Context) { delete(h.clients, c) } } + case im := <-h.inbound: + h.mu.RLock() + fn := h.msgHandlers[im.Msg.Type] + h.mu.RUnlock() + if fn != nil { + go func(msg InboundMessage) { + if err := fn(context.Background(), msg); err != nil { + slog.Error("ws: inbound handler error", "type", msg.Msg.Type, "err", err) + } + }(im) + } } } } diff --git a/backend/internal/infrastructure/ws/hub_test.go b/backend/internal/infrastructure/ws/hub_test.go index ee017be..6040e0f 100644 --- a/backend/internal/infrastructure/ws/hub_test.go +++ b/backend/internal/infrastructure/ws/hub_test.go @@ -110,6 +110,36 @@ func TestHub_ConcurrentClientsAndBroadcast(t *testing.T) { wg.Wait() } +func TestHub_OnMessage(t *testing.T) { + hub := NewHub() + go hub.Run(t.Context()) + + called := make(chan InboundMessage, 1) + hub.OnMessage("ping", func(_ context.Context, msg InboundMessage) error { + called <- msg + return nil + }) + + im := InboundMessage{ClientID: "client-1", Msg: Envelope{Type: "ping"}} + select { + case hub.inbound <- im: + case <-time.After(time.Second): + t.Fatal("timed out sending to hub.inbound") + } + + select { + case got := <-called: + if got.ClientID != "client-1" { + t.Errorf("ClientID: got %q, want %q", got.ClientID, "client-1") + } + if got.Msg.Type != "ping" { + t.Errorf("Msg.Type: got %q, want %q", got.Msg.Type, "ping") + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for OnMessage handler to be called") + } +} + func TestHub_Publish(t *testing.T) { hub := NewHub() go hub.Run(t.Context()) diff --git a/backend/internal/infrastructure/ws/message.go b/backend/internal/infrastructure/ws/message.go index 0916ed7..5c0292c 100644 --- a/backend/internal/infrastructure/ws/message.go +++ b/backend/internal/infrastructure/ws/message.go @@ -1,9 +1,21 @@ package ws -import "encoding/json" +import ( + "context" + "encoding/json" +) // Envelope is the typed wire format for all WebSocket messages. type Envelope struct { Type string `json:"type"` Payload json.RawMessage `json:"payload"` } + +// InboundMessage is a message received from a WebSocket client. +type InboundMessage struct { + ClientID string + Msg Envelope +} + +// InboundHandler processes an inbound WebSocket message. +type InboundHandler func(ctx context.Context, msg InboundMessage) error diff --git a/backend/internal/server/server.go b/backend/internal/server/server.go index c20b282..f746f56 100644 --- a/backend/internal/server/server.go +++ b/backend/internal/server/server.go @@ -1,6 +1,8 @@ package server import ( + "context" + "encoding/json" "errors" "fmt" "net/http" @@ -14,6 +16,8 @@ import ( "backend/internal/bootstrap" "backend/internal/infrastructure/database/postgres" + "backend/internal/infrastructure/queue" + "backend/internal/infrastructure/streams" "backend/internal/infrastructure/ws" "backend/internal/transport/handlers" "backend/internal/usecase" @@ -40,6 +44,8 @@ func NewServer(app *bootstrap.App, hub *ws.Hub) (*http.Server, error) { fcmTokenRepo = postgres.NewFCMTokenRepository(app.DB) } + userRepo := postgres.NewUserRepository(app.DB) + // Build Asynqmon UI handler when Redis is available. var queueUI http.Handler if app.Config.RedisURL != "" { @@ -56,7 +62,28 @@ func NewServer(app *bootstrap.App, hub *ws.Hub) (*http.Server, error) { } } - h := handlers.NewHandler(healthUC, app.Firebase, hub, app.Enqueuer, queueUI, app.FCMSender, fcmTokenRepo, app.EmailSender, app.StorageService, app.GeoLocator) + // Start Redis Streams consumer: fan-out UserCreated events → welcome email queue. + if app.StreamProducer != nil && app.Enqueuer != nil { + consumer, err := streams.NewConsumer(app.Config.RedisURL, streams.StreamUserCreated, "api", "api-1") + if err == nil { + go func() { + _ = consumer.Run(context.Background(), func(ctx context.Context, data []byte) error { + var evt streams.UserCreatedEvent + if err := json.Unmarshal(data, &evt); err != nil { + return err + } + payload, err := json.Marshal(queue.WelcomeEmailPayload{UserID: evt.UserID, Email: evt.Email}) + if err != nil { + return err + } + return app.Enqueuer.Enqueue(ctx, queue.TypeWelcomeEmail, payload) + }) + _ = consumer.Close() + }() + } + } + + h := handlers.NewHandler(healthUC, app.Firebase, hub, app.Enqueuer, queueUI, app.FCMSender, fcmTokenRepo, app.EmailSender, app.StorageService, app.GeoLocator, app.StreamProducer, userRepo) // Register DB pool metrics collector. // AlreadyRegisteredError is silenced — only the first registration wins diff --git a/backend/internal/transport/handlers/fcm_handler.go b/backend/internal/transport/handlers/fcm_handler.go index 6b3355c..94dee6f 100644 --- a/backend/internal/transport/handlers/fcm_handler.go +++ b/backend/internal/transport/handlers/fcm_handler.go @@ -5,6 +5,7 @@ import ( "github.com/gin-gonic/gin" + "backend/internal/infrastructure/streams" "backend/internal/transport/middleware" "backend/internal/usecase" ) @@ -41,8 +42,7 @@ func (h *Handler) RegisterFCMToken(c *gin.Context) { } var req registerFCMTokenRequest - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + if !bindJSON(c, &req) { return } @@ -51,6 +51,13 @@ func (h *Handler) RegisterFCMToken(c *gin.Context) { return } + if h.streamProducer != nil { + _ = h.streamProducer.Publish(c.Request.Context(), streams.StreamUserCreated, streams.UserCreatedEvent{ + UserID: claims.UID, + Email: claims.Email, + }) + } + c.JSON(http.StatusOK, gin.H{"message": "token registered"}) } diff --git a/backend/internal/transport/handlers/handler.go b/backend/internal/transport/handlers/handler.go index 7bde88a..8df3a26 100644 --- a/backend/internal/transport/handlers/handler.go +++ b/backend/internal/transport/handlers/handler.go @@ -19,6 +19,8 @@ type Handler struct { emailSender usecase.EmailSender // nil when MAILJET_API_KEY is not set storageService usecase.StorageService // nil when R2_ACCOUNT_ID is not set geoLocator usecase.GeoLocator // nil when geo client is not configured + streamProducer usecase.StreamProducer // nil when REDIS_URL is not set + userRepo usecase.UserRepository // nil when not wired } // NewHandler constructs a Handler with all required use cases. @@ -33,6 +35,8 @@ func NewHandler( emailSender usecase.EmailSender, storageService usecase.StorageService, geoLocator usecase.GeoLocator, + streamProducer usecase.StreamProducer, + userRepo usecase.UserRepository, ) *Handler { return &Handler{ healthUC: healthUC, @@ -45,5 +49,7 @@ func NewHandler( emailSender: emailSender, storageService: storageService, geoLocator: geoLocator, + streamProducer: streamProducer, + userRepo: userRepo, } } diff --git a/backend/internal/transport/handlers/health_handler_test.go b/backend/internal/transport/handlers/health_handler_test.go index ce9a6fa..48a3dbb 100644 --- a/backend/internal/transport/handlers/health_handler_test.go +++ b/backend/internal/transport/handlers/health_handler_test.go @@ -31,7 +31,7 @@ func TestHealthHandler_Success(t *testing.T) { Status: "up", Message: "It's healthy", } - h := NewHandler(&mockHealthUC{stats: want}, nil, nil, nil, nil, nil, nil, nil, nil, nil) + h := NewHandler(&mockHealthUC{stats: want}, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) r := gin.New() r.GET("/health", h.HealthHandler) @@ -50,7 +50,7 @@ func TestHealthHandler_Success(t *testing.T) { } func TestHealthHandler_ServiceUnavailable(t *testing.T) { - h := NewHandler(&mockHealthUC{err: errors.New("connection refused")}, nil, nil, nil, nil, nil, nil, nil, nil, nil) + h := NewHandler(&mockHealthUC{err: errors.New("connection refused")}, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) r := gin.New() r.GET("/health", h.HealthHandler) diff --git a/backend/internal/transport/handlers/me_handler.go b/backend/internal/transport/handlers/me_handler.go new file mode 100644 index 0000000..d401024 --- /dev/null +++ b/backend/internal/transport/handlers/me_handler.go @@ -0,0 +1,90 @@ +package handlers + +import ( + "net/http" + + "github.com/gin-gonic/gin" + + "backend/internal/domain" + "backend/internal/transport/middleware" + "backend/internal/usecase" +) + +type updateMeRequest struct { + Name string `json:"name" binding:"required"` +} + +// UpdateMeHandler godoc +// +// @Summary Update current user profile +// @Description Upserts the authenticated user's profile record. Requires a valid Firebase ID token. +// @Tags users +// @Accept json +// @Produce json +// @Param body body updateMeRequest true "Profile update" +// @Success 200 {object} UserAlias +// @Failure 400 {object} map[string]string +// @Failure 401 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /api/v1/me [patch] +// @Security BearerAuth +func (h *Handler) UpdateMeHandler(c *gin.Context) { + var req updateMeRequest + if !bindJSON(c, &req) { + return + } + + raw, exists := c.Get(middleware.FirebaseClaimsKey) + if !exists { + c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) + return + } + claims, ok := raw.(*usecase.FirebaseToken) + if !ok { + c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) + return + } + + u := &domain.User{ + FirebaseUID: claims.UID, + Name: req.Name, + Email: claims.Email, + PhotoURL: claims.PhotoURL, + } + updated, err := h.userRepo.Upsert(c.Request.Context(), u) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update profile"}) + return + } + c.JSON(http.StatusOK, updated) +} + +// DeleteMeHandler godoc +// +// @Summary Delete current user account +// @Description Removes the authenticated user's profile record from the database. The Firebase account is not deleted. +// @Tags users +// @Produce json +// @Success 204 +// @Failure 401 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /api/v1/me [delete] +// @Security BearerAuth +func (h *Handler) DeleteMeHandler(c *gin.Context) { + raw, exists := c.Get(middleware.FirebaseClaimsKey) + if !exists { + c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) + return + } + claims, ok := raw.(*usecase.FirebaseToken) + if !ok { + c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) + return + } + + if err := h.userRepo.DeleteByFirebaseUID(c.Request.Context(), claims.UID); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete account"}) + return + } + c.Status(http.StatusNoContent) +} diff --git a/backend/internal/transport/handlers/me_handler_test.go b/backend/internal/transport/handlers/me_handler_test.go new file mode 100644 index 0000000..4e07d5b --- /dev/null +++ b/backend/internal/transport/handlers/me_handler_test.go @@ -0,0 +1,133 @@ +package handlers + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + + "backend/internal/domain" + "backend/internal/transport/middleware" + "backend/internal/usecase" +) + +// mockUserRepo satisfies usecase.UserRepository for handler tests. +type mockUserRepo struct { + upserted *domain.User + deleted string + upsertErr error + deleteErr error +} + +func (m *mockUserRepo) Upsert(_ context.Context, u *domain.User) (*domain.User, error) { + if m.upsertErr != nil { + return nil, m.upsertErr + } + out := *u + out.ID = 1 + m.upserted = &out + return &out, nil +} + +func (m *mockUserRepo) DeleteByFirebaseUID(_ context.Context, firebaseUID string) error { + m.deleted = firebaseUID + return m.deleteErr +} + +func newMeRouter(h *Handler) *gin.Engine { + r := gin.New() + injectClaims := func(c *gin.Context) { + c.Set(middleware.FirebaseClaimsKey, &usecase.FirebaseToken{ + UID: "uid-test", + Email: "test@example.com", + PhotoURL: "https://example.com/photo.png", + }) + c.Next() + } + r.PATCH("/api/v1/me", injectClaims, h.UpdateMeHandler) + r.DELETE("/api/v1/me", injectClaims, h.DeleteMeHandler) + return r +} + +func TestUpdateMeHandler_Success(t *testing.T) { + repo := &mockUserRepo{} + h := &Handler{userRepo: repo} + r := newMeRouter(h) + + body, _ := json.Marshal(map[string]string{"name": "Alice"}) + w := httptest.NewRecorder() + r.ServeHTTP(w, httptest.NewRequest(http.MethodPatch, "/api/v1/me", bytes.NewReader(body))) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if repo.upserted == nil { + t.Fatal("expected Upsert to be called") + } + if repo.upserted.Name != "Alice" { + t.Errorf("Name: got %q, want %q", repo.upserted.Name, "Alice") + } + if repo.upserted.FirebaseUID != "uid-test" { + t.Errorf("FirebaseUID: got %q, want %q", repo.upserted.FirebaseUID, "uid-test") + } +} + +func TestUpdateMeHandler_MissingBody(t *testing.T) { + repo := &mockUserRepo{} + h := &Handler{userRepo: repo} + r := newMeRouter(h) + + w := httptest.NewRecorder() + r.ServeHTTP(w, httptest.NewRequest(http.MethodPatch, "/api/v1/me", bytes.NewReader([]byte(`{}`)))) // missing required name + + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestUpdateMeHandler_NoClaims(t *testing.T) { + h := &Handler{userRepo: &mockUserRepo{}} + r := gin.New() + r.PATCH("/api/v1/me", h.UpdateMeHandler) + + body, _ := json.Marshal(map[string]string{"name": "Alice"}) + w := httptest.NewRecorder() + r.ServeHTTP(w, httptest.NewRequest(http.MethodPatch, "/api/v1/me", bytes.NewReader(body))) + + if w.Code != http.StatusUnauthorized { + t.Fatalf("expected 401, got %d", w.Code) + } +} + +func TestDeleteMeHandler_Success(t *testing.T) { + repo := &mockUserRepo{} + h := &Handler{userRepo: repo} + r := newMeRouter(h) + + w := httptest.NewRecorder() + r.ServeHTTP(w, httptest.NewRequest(http.MethodDelete, "/api/v1/me", nil)) + + if w.Code != http.StatusNoContent { + t.Fatalf("expected 204, got %d: %s", w.Code, w.Body.String()) + } + if repo.deleted != "uid-test" { + t.Errorf("expected DeleteByFirebaseUID(uid-test), got %q", repo.deleted) + } +} + +func TestDeleteMeHandler_NoClaims(t *testing.T) { + h := &Handler{userRepo: &mockUserRepo{}} + r := gin.New() + r.DELETE("/api/v1/me", h.DeleteMeHandler) + + w := httptest.NewRecorder() + r.ServeHTTP(w, httptest.NewRequest(http.MethodDelete, "/api/v1/me", nil)) + + if w.Code != http.StatusUnauthorized { + t.Fatalf("expected 401, got %d", w.Code) + } +} diff --git a/backend/internal/transport/handlers/routes.go b/backend/internal/transport/handlers/routes.go index 177e46e..8398b40 100644 --- a/backend/internal/transport/handlers/routes.go +++ b/backend/internal/transport/handlers/routes.go @@ -67,6 +67,10 @@ func (h *Handler) RegisterRoutes(rps float64, burst int, sentryDSN string) http. api.Use(middleware.GeoFromRequest(h.geoLocator)) } api.GET("/me", h.MeHandler) + if h.userRepo != nil { + api.PATCH("/me", h.UpdateMeHandler) + api.DELETE("/me", h.DeleteMeHandler) + } if h.fcmTokenRepo != nil { api.POST("/fcm/register", h.RegisterFCMToken) diff --git a/backend/internal/transport/handlers/storage_handler.go b/backend/internal/transport/handlers/storage_handler.go index 9367aba..dac5f07 100644 --- a/backend/internal/transport/handlers/storage_handler.go +++ b/backend/internal/transport/handlers/storage_handler.go @@ -31,8 +31,7 @@ type presignResponse struct { // @Security BearerAuth func (h *Handler) PresignHandler(c *gin.Context) { var req presignRequest - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + if !bindJSON(c, &req) { return } diff --git a/backend/internal/transport/handlers/swagger_types.go b/backend/internal/transport/handlers/swagger_types.go index 13acf11..40e7e0d 100644 --- a/backend/internal/transport/handlers/swagger_types.go +++ b/backend/internal/transport/handlers/swagger_types.go @@ -12,3 +12,7 @@ type HealthStats = domain.HealthStats // FirebaseToken is a type alias so swaggo can resolve usecase.FirebaseToken // from within this package for annotation purposes. type FirebaseToken = usecase.FirebaseToken + +// UserAlias is a type alias so swaggo can resolve domain.User +// from within this package for annotation purposes. +type UserAlias = domain.User diff --git a/backend/internal/transport/handlers/validation.go b/backend/internal/transport/handlers/validation.go new file mode 100644 index 0000000..217796c --- /dev/null +++ b/backend/internal/transport/handlers/validation.go @@ -0,0 +1,25 @@ +package handlers + +import ( + "net/http" + + "github.com/gin-gonic/gin" +) + +// bindJSON binds and validates the request body. Writes 400 on failure and returns false. +func bindJSON(c *gin.Context, dst any) bool { + if err := c.ShouldBindJSON(dst); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return false + } + return true +} + +// bindQuery binds and validates query params. Writes 400 on failure and returns false. +func bindQuery(c *gin.Context, dst any) bool { + if err := c.ShouldBindQuery(dst); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return false + } + return true +} diff --git a/backend/internal/usecase/streams.go b/backend/internal/usecase/streams.go new file mode 100644 index 0000000..8048dd6 --- /dev/null +++ b/backend/internal/usecase/streams.go @@ -0,0 +1,8 @@ +package usecase + +import "context" + +// StreamProducer publishes domain events to a named stream. +type StreamProducer interface { + Publish(ctx context.Context, stream string, event any) error +} diff --git a/backend/internal/usecase/user.go b/backend/internal/usecase/user.go new file mode 100644 index 0000000..c20484f --- /dev/null +++ b/backend/internal/usecase/user.go @@ -0,0 +1,13 @@ +package usecase + +import ( + "context" + + "backend/internal/domain" +) + +// UserRepository persists user profile data. +type UserRepository interface { + Upsert(ctx context.Context, u *domain.User) (*domain.User, error) + DeleteByFirebaseUID(ctx context.Context, firebaseUID string) error +} From 6280ab6b608a77b3065b5927ffc6af5383107dc5 Mon Sep 17 00:00:00 2001 From: GRACENOBLE Date: Tue, 23 Jun 2026 10:10:10 +0300 Subject: [PATCH 2/4] fix(backend): address CodeRabbit review findings - Add Close() to StreamProducer interface; close producer on graceful shutdown - Move streams consumer goroutine from server.go to main.go with a proper cancellable context (streamCancel) so it exits cleanly on SIGINT/SIGTERM - Log consumer init failure instead of silently dropping via if err == nil guard - Add Name field to UserCreatedEvent and WelcomeEmailPayload so welcome emails address recipients by name instead of their email address - Guard Offset() against page <= 1 to prevent negative SQL OFFSET values - Make firebase_uid NOT NULL in migration; backfill legacy rows before constraint - Log publish errors in fcm_handler instead of swallowing with _ = - Bound inbound WebSocket handler concurrency with a 64-slot semaphore - Pass hub lifecycle context to inbound handlers instead of context.Background() - Log malformed inbound WebSocket frames instead of silently discarding - Use stable error messages in bindJSON/bindQuery to avoid leaking internals - Use bindJSON helper in UnregisterFCMToken (was still using inline pattern) Co-Authored-By: Claude Sonnet 4.6 --- backend/cmd/api/main.go | 36 +++++++++++++++++++ backend/internal/domain/pagination.go | 8 ++++- .../20260623063851_add_user_profile.sql | 10 ++++-- .../internal/infrastructure/queue/handlers.go | 6 +++- .../internal/infrastructure/queue/tasks.go | 2 +- .../internal/infrastructure/streams/events.go | 1 + backend/internal/infrastructure/ws/client.go | 14 ++++---- backend/internal/infrastructure/ws/hub.go | 15 ++++++-- backend/internal/server/server.go | 25 ------------- .../transport/handlers/fcm_handler.go | 11 +++--- .../internal/transport/handlers/validation.go | 4 +-- backend/internal/usecase/streams.go | 1 + 12 files changed, 88 insertions(+), 45 deletions(-) diff --git a/backend/cmd/api/main.go b/backend/cmd/api/main.go index 7a8a308..0ae47a0 100644 --- a/backend/cmd/api/main.go +++ b/backend/cmd/api/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "log/slog" "net/http" @@ -12,6 +13,7 @@ import ( "backend/internal/bootstrap" "backend/internal/infrastructure/queue" + "backend/internal/infrastructure/streams" "backend/internal/infrastructure/ws" "backend/internal/server" ) @@ -78,6 +80,32 @@ func main() { }() } + // Start Redis Streams consumer: fan-out UserCreated events → welcome email queue. + var streamCancel context.CancelFunc + if app.StreamProducer != nil && app.Enqueuer != nil { + streamCtx, sCancel := context.WithCancel(context.Background()) + streamCancel = sCancel + consumer, err := streams.NewConsumer(app.Config.RedisURL, streams.StreamUserCreated, "api", "api-1") + if err != nil { + slog.Error("streams: failed to create consumer", "err", err) + } else { + go func() { + _ = consumer.Run(streamCtx, func(ctx context.Context, data []byte) error { + var evt streams.UserCreatedEvent + if err := json.Unmarshal(data, &evt); err != nil { + return err + } + payload, err := json.Marshal(queue.WelcomeEmailPayload{UserID: evt.UserID, Email: evt.Email, Name: evt.Name}) + if err != nil { + return err + } + return app.Enqueuer.Enqueue(ctx, queue.TypeWelcomeEmail, payload) + }) + _ = consumer.Close() + }() + } + } + srv, err := server.NewServer(app, hub) if err != nil { fmt.Fprintf(os.Stderr, "startup failed: %v\n", err) @@ -94,6 +122,9 @@ func main() { <-done + if streamCancel != nil { + streamCancel() // stop streams consumer + } if workerCancel != nil { workerCancel() // stop worker before hub (in-flight jobs drain first) } @@ -109,5 +140,10 @@ func main() { slog.Error("enqueuer close error", "error", err) } } + if app.StreamProducer != nil { + if err := app.StreamProducer.Close(); err != nil { + slog.Error("stream producer close error", "error", err) + } + } slog.Info("graceful shutdown complete") } diff --git a/backend/internal/domain/pagination.go b/backend/internal/domain/pagination.go index fa028fd..d22d5dd 100644 --- a/backend/internal/domain/pagination.go +++ b/backend/internal/domain/pagination.go @@ -33,4 +33,10 @@ func (p *PageRequest) Defaults() { } // Offset returns the SQL OFFSET value for this page request. -func (p PageRequest) Offset() int { return (p.Page - 1) * p.PageSize } +// Normalises the page to at least 1 so the result is never negative. +func (p PageRequest) Offset() int { + if p.Page <= 1 { + return 0 + } + return (p.Page - 1) * p.PageSize +} diff --git a/backend/internal/infrastructure/database/migrations/20260623063851_add_user_profile.sql b/backend/internal/infrastructure/database/migrations/20260623063851_add_user_profile.sql index 73a7c8a..51bff09 100644 --- a/backend/internal/infrastructure/database/migrations/20260623063851_add_user_profile.sql +++ b/backend/internal/infrastructure/database/migrations/20260623063851_add_user_profile.sql @@ -1,14 +1,18 @@ -- +goose Up ALTER TABLE users - ADD COLUMN IF NOT EXISTS firebase_uid TEXT UNIQUE, + ADD COLUMN IF NOT EXISTS firebase_uid TEXT, ADD COLUMN IF NOT EXISTS email TEXT, ADD COLUMN IF NOT EXISTS photo_url TEXT, ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ NOT NULL DEFAULT now(); -CREATE UNIQUE INDEX IF NOT EXISTS idx_users_firebase_uid ON users(firebase_uid); +-- Backfill any pre-existing rows so the NOT NULL constraint can be applied. +UPDATE users SET firebase_uid = 'legacy-' || id::text WHERE firebase_uid IS NULL; + +ALTER TABLE users ALTER COLUMN firebase_uid SET NOT NULL; +ALTER TABLE users ADD CONSTRAINT users_firebase_uid_key UNIQUE (firebase_uid); -- +goose Down -DROP INDEX IF EXISTS idx_users_firebase_uid; +ALTER TABLE users DROP CONSTRAINT IF EXISTS users_firebase_uid_key; ALTER TABLE users DROP COLUMN IF EXISTS firebase_uid, DROP COLUMN IF EXISTS email, diff --git a/backend/internal/infrastructure/queue/handlers.go b/backend/internal/infrastructure/queue/handlers.go index e701b5e..b5fad25 100644 --- a/backend/internal/infrastructure/queue/handlers.go +++ b/backend/internal/infrastructure/queue/handlers.go @@ -21,6 +21,10 @@ func NewHandleWelcomeEmail(sender usecase.EmailSender) asynq.HandlerFunc { if sender == nil { return nil } - return sender.SendWelcomeEmail(ctx, p.Email, p.Email) + name := p.Name + if name == "" { + name = p.Email + } + return sender.SendWelcomeEmail(ctx, p.Email, name) } } diff --git a/backend/internal/infrastructure/queue/tasks.go b/backend/internal/infrastructure/queue/tasks.go index 62008a4..0c19da4 100644 --- a/backend/internal/infrastructure/queue/tasks.go +++ b/backend/internal/infrastructure/queue/tasks.go @@ -6,8 +6,8 @@ const ( ) // WelcomeEmailPayload is the JSON payload for TypeWelcomeEmail tasks. -// Real delivery via Mailjet is wired in issue #20. type WelcomeEmailPayload struct { UserID string `json:"user_id"` Email string `json:"email"` + Name string `json:"name"` } diff --git a/backend/internal/infrastructure/streams/events.go b/backend/internal/infrastructure/streams/events.go index d2411ec..376e0d0 100644 --- a/backend/internal/infrastructure/streams/events.go +++ b/backend/internal/infrastructure/streams/events.go @@ -10,4 +10,5 @@ const ( type UserCreatedEvent struct { UserID string `json:"user_id"` Email string `json:"email"` + Name string `json:"name"` } diff --git a/backend/internal/infrastructure/ws/client.go b/backend/internal/infrastructure/ws/client.go index 224c250..c6f37fe 100644 --- a/backend/internal/infrastructure/ws/client.go +++ b/backend/internal/infrastructure/ws/client.go @@ -63,12 +63,14 @@ func (c *Client) ReadPump() { break } var env Envelope - if jsonErr := json.Unmarshal(raw, &env); jsonErr == nil { - select { - case c.hub.inbound <- InboundMessage{ClientID: c.conn.RemoteAddr().String(), Msg: env}: - default: - slog.Warn("ws: inbound channel full, message dropped") - } + if jsonErr := json.Unmarshal(raw, &env); jsonErr != nil { + slog.Warn("ws: malformed inbound frame, discarding", "client", c.conn.RemoteAddr(), "err", jsonErr) + continue + } + select { + case c.hub.inbound <- InboundMessage{ClientID: c.conn.RemoteAddr().String(), Msg: env}: + default: + slog.Warn("ws: inbound channel full, message dropped") } } } diff --git a/backend/internal/infrastructure/ws/hub.go b/backend/internal/infrastructure/ws/hub.go index d23a487..4a78fb4 100644 --- a/backend/internal/infrastructure/ws/hub.go +++ b/backend/internal/infrastructure/ws/hub.go @@ -8,6 +8,8 @@ import ( "sync" ) +const inboundWorkers = 64 // max concurrent inbound handler goroutines + // Hub maintains the set of active WebSocket clients and broadcasts messages to them. // All mutations to the clients map are serialised through the Run goroutine. // msgHandlers is protected by mu and may be updated concurrently. @@ -19,6 +21,7 @@ type Hub struct { inbound chan InboundMessage msgHandlers map[string]InboundHandler mu sync.RWMutex + sem chan struct{} // bounds concurrent inbound handler goroutines } // NewHub allocates a Hub with buffered channels. @@ -30,6 +33,7 @@ func NewHub() *Hub { Unregister: make(chan *Client), inbound: make(chan InboundMessage, 256), msgHandlers: make(map[string]InboundHandler), + sem: make(chan struct{}, inboundWorkers), } } @@ -72,12 +76,19 @@ func (h *Hub) Run(ctx context.Context) { h.mu.RLock() fn := h.msgHandlers[im.Msg.Type] h.mu.RUnlock() - if fn != nil { + if fn == nil { + continue + } + select { + case h.sem <- struct{}{}: go func(msg InboundMessage) { - if err := fn(context.Background(), msg); err != nil { + defer func() { <-h.sem }() + if err := fn(ctx, msg); err != nil { slog.Error("ws: inbound handler error", "type", msg.Msg.Type, "err", err) } }(im) + default: + slog.Warn("ws: inbound worker pool full, message dropped", "type", im.Msg.Type) } } } diff --git a/backend/internal/server/server.go b/backend/internal/server/server.go index f746f56..7bfa770 100644 --- a/backend/internal/server/server.go +++ b/backend/internal/server/server.go @@ -1,8 +1,6 @@ package server import ( - "context" - "encoding/json" "errors" "fmt" "net/http" @@ -16,8 +14,6 @@ import ( "backend/internal/bootstrap" "backend/internal/infrastructure/database/postgres" - "backend/internal/infrastructure/queue" - "backend/internal/infrastructure/streams" "backend/internal/infrastructure/ws" "backend/internal/transport/handlers" "backend/internal/usecase" @@ -62,27 +58,6 @@ func NewServer(app *bootstrap.App, hub *ws.Hub) (*http.Server, error) { } } - // Start Redis Streams consumer: fan-out UserCreated events → welcome email queue. - if app.StreamProducer != nil && app.Enqueuer != nil { - consumer, err := streams.NewConsumer(app.Config.RedisURL, streams.StreamUserCreated, "api", "api-1") - if err == nil { - go func() { - _ = consumer.Run(context.Background(), func(ctx context.Context, data []byte) error { - var evt streams.UserCreatedEvent - if err := json.Unmarshal(data, &evt); err != nil { - return err - } - payload, err := json.Marshal(queue.WelcomeEmailPayload{UserID: evt.UserID, Email: evt.Email}) - if err != nil { - return err - } - return app.Enqueuer.Enqueue(ctx, queue.TypeWelcomeEmail, payload) - }) - _ = consumer.Close() - }() - } - } - h := handlers.NewHandler(healthUC, app.Firebase, hub, app.Enqueuer, queueUI, app.FCMSender, fcmTokenRepo, app.EmailSender, app.StorageService, app.GeoLocator, app.StreamProducer, userRepo) // Register DB pool metrics collector. diff --git a/backend/internal/transport/handlers/fcm_handler.go b/backend/internal/transport/handlers/fcm_handler.go index 94dee6f..ff17587 100644 --- a/backend/internal/transport/handlers/fcm_handler.go +++ b/backend/internal/transport/handlers/fcm_handler.go @@ -1,6 +1,7 @@ package handlers import ( + "log/slog" "net/http" "github.com/gin-gonic/gin" @@ -52,10 +53,13 @@ func (h *Handler) RegisterFCMToken(c *gin.Context) { } if h.streamProducer != nil { - _ = h.streamProducer.Publish(c.Request.Context(), streams.StreamUserCreated, streams.UserCreatedEvent{ + if err := h.streamProducer.Publish(c.Request.Context(), streams.StreamUserCreated, streams.UserCreatedEvent{ UserID: claims.UID, Email: claims.Email, - }) + Name: claims.Name, + }); err != nil { + slog.Warn("fcm: failed to publish user created event", "user_id", claims.UID, "err", err) + } } c.JSON(http.StatusOK, gin.H{"message": "token registered"}) @@ -84,8 +88,7 @@ func (h *Handler) UnregisterFCMToken(c *gin.Context) { } var req unregisterFCMTokenRequest - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + if !bindJSON(c, &req) { return } diff --git a/backend/internal/transport/handlers/validation.go b/backend/internal/transport/handlers/validation.go index 217796c..4a425c6 100644 --- a/backend/internal/transport/handlers/validation.go +++ b/backend/internal/transport/handlers/validation.go @@ -9,7 +9,7 @@ import ( // bindJSON binds and validates the request body. Writes 400 on failure and returns false. func bindJSON(c *gin.Context, dst any) bool { if err := c.ShouldBindJSON(dst); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) return false } return true @@ -18,7 +18,7 @@ func bindJSON(c *gin.Context, dst any) bool { // bindQuery binds and validates query params. Writes 400 on failure and returns false. func bindQuery(c *gin.Context, dst any) bool { if err := c.ShouldBindQuery(dst); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid query parameters"}) return false } return true diff --git a/backend/internal/usecase/streams.go b/backend/internal/usecase/streams.go index 8048dd6..000f0a7 100644 --- a/backend/internal/usecase/streams.go +++ b/backend/internal/usecase/streams.go @@ -5,4 +5,5 @@ import "context" // StreamProducer publishes domain events to a named stream. type StreamProducer interface { Publish(ctx context.Context, stream string, event any) error + Close() error } From a0e3627619082965ae0e99fdd2a21927fce19acc Mon Sep 17 00:00:00 2001 From: GRACENOBLE Date: Tue, 23 Jun 2026 10:15:30 +0300 Subject: [PATCH 3/4] fix(streams): downgrade i/o timeout to WARN and add backoff on errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Managed Redis providers (Upstash, Redis Cloud) terminate idle connections after ~10 s, which surfaces as an i/o timeout on the blocking XReadGroup call. go-redis reconnects automatically on the next attempt, so these are not real errors — log them at WARN and retry immediately. For genuine unexpected errors, sleep 500 ms before retrying so the loop does not hammer Redis when it is actually unhealthy. --- .../internal/infrastructure/streams/consumer.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/backend/internal/infrastructure/streams/consumer.go b/backend/internal/infrastructure/streams/consumer.go index be65421..ed50690 100644 --- a/backend/internal/infrastructure/streams/consumer.go +++ b/backend/internal/infrastructure/streams/consumer.go @@ -2,8 +2,11 @@ package streams import ( "context" + "errors" "fmt" "log/slog" + "net" + "time" "github.com/redis/go-redis/v9" ) @@ -54,7 +57,21 @@ func (c *Consumer) Run(ctx context.Context, h Handler) error { if err == redis.Nil { continue } + // Managed Redis (Upstash, Redis Cloud) kills idle connections, which + // surfaces as an i/o timeout on the blocking XReadGroup call. The + // go-redis pool reconnects automatically on the next attempt, so + // treat this as WARN and retry without delay. + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + slog.Warn("streams: connection timeout, reconnecting", "stream", c.stream) + continue + } slog.Error("streams: read error", "stream", c.stream, "err", err) + select { + case <-ctx.Done(): + return nil + case <-time.After(500 * time.Millisecond): + } continue } for _, entry := range entries { From 4034ed403dafb975dd5c7baea39c50c904e445cf Mon Sep 17 00:00:00 2001 From: GRACENOBLE Date: Tue, 23 Jun 2026 10:24:32 +0300 Subject: [PATCH 4/4] fix(streams): unwire consumer goroutine; leave infrastructure ready to use The blocking XReadGroup poll holds a persistent Redis connection even when there is no business logic consuming events, wasting a connection slot on managed Redis plans. The consumer is not wired at startup until a concrete use case exists. All infrastructure remains in place (Producer, Consumer, events, types) and the producer still publishes UserCreatedEvent on FCM registration. A fully worked example of how to wire the consumer back in is left as a comment in cmd/api/main.go. Also adds MaxIdleConns=1 and ConnMaxIdleTime=8s to the consumer client so idle connections are recycled before managed Redis providers kill them, preventing i/o timeout noise if the consumer is re-enabled. --- backend/cmd/api/main.go | 50 ++++++++----------- .../infrastructure/streams/consumer.go | 18 +++++-- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/backend/cmd/api/main.go b/backend/cmd/api/main.go index 0ae47a0..f65f2a8 100644 --- a/backend/cmd/api/main.go +++ b/backend/cmd/api/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/json" "fmt" "log/slog" "net/http" @@ -13,7 +12,6 @@ import ( "backend/internal/bootstrap" "backend/internal/infrastructure/queue" - "backend/internal/infrastructure/streams" "backend/internal/infrastructure/ws" "backend/internal/server" ) @@ -80,31 +78,26 @@ func main() { }() } - // Start Redis Streams consumer: fan-out UserCreated events → welcome email queue. - var streamCancel context.CancelFunc - if app.StreamProducer != nil && app.Enqueuer != nil { - streamCtx, sCancel := context.WithCancel(context.Background()) - streamCancel = sCancel - consumer, err := streams.NewConsumer(app.Config.RedisURL, streams.StreamUserCreated, "api", "api-1") - if err != nil { - slog.Error("streams: failed to create consumer", "err", err) - } else { - go func() { - _ = consumer.Run(streamCtx, func(ctx context.Context, data []byte) error { - var evt streams.UserCreatedEvent - if err := json.Unmarshal(data, &evt); err != nil { - return err - } - payload, err := json.Marshal(queue.WelcomeEmailPayload{UserID: evt.UserID, Email: evt.Email, Name: evt.Name}) - if err != nil { - return err - } - return app.Enqueuer.Enqueue(ctx, queue.TypeWelcomeEmail, payload) - }) - _ = consumer.Close() - }() - } - } + // --- Redis Streams consumer (unwired — add when you have a concrete use case) --- + // The streams infrastructure (Producer, Consumer, events) is fully implemented. + // Wire a consumer here following this pattern when you need it: + // + // streamCtx, streamCancel := context.WithCancel(context.Background()) + // consumer, err := streams.NewConsumer(app.Config.RedisURL, streams.StreamUserCreated, "api", "api-1") + // if err != nil { + // slog.Error("streams: failed to create consumer", "err", err) + // } else { + // go func() { + // _ = consumer.Run(streamCtx, func(ctx context.Context, data []byte) error { + // var evt streams.UserCreatedEvent + // if err := json.Unmarshal(data, &evt); err != nil { return err } + // payload, _ := json.Marshal(queue.WelcomeEmailPayload{UserID: evt.UserID, Email: evt.Email, Name: evt.Name}) + // return app.Enqueuer.Enqueue(ctx, queue.TypeWelcomeEmail, payload) + // }) + // _ = consumer.Close() + // }() + // } + // // In the shutdown sequence below, call: streamCancel() srv, err := server.NewServer(app, hub) if err != nil { @@ -122,9 +115,6 @@ func main() { <-done - if streamCancel != nil { - streamCancel() // stop streams consumer - } if workerCancel != nil { workerCancel() // stop worker before hub (in-flight jobs drain first) } diff --git a/backend/internal/infrastructure/streams/consumer.go b/backend/internal/infrastructure/streams/consumer.go index ed50690..bdcd1ae 100644 --- a/backend/internal/infrastructure/streams/consumer.go +++ b/backend/internal/infrastructure/streams/consumer.go @@ -28,6 +28,12 @@ func NewConsumer(redisURL, stream, group, consumer string) (*Consumer, error) { if err != nil { return nil, fmt.Errorf("streams: parse redis url: %w", err) } + // Proactively close idle connections after 8 s so the client pool recycles + // them before managed Redis providers (Upstash, Redis Cloud) kill them at + // their own idle timeout (~10–60 s depending on plan). This eliminates the + // i/o timeout errors that otherwise surface from blocking XReadGroup calls. + opts.MaxIdleConns = 1 + opts.ConnMaxIdleTime = 8 * time.Second return &Consumer{ client: redis.NewClient(opts), stream: stream, @@ -57,13 +63,17 @@ func (c *Consumer) Run(ctx context.Context, h Handler) error { if err == redis.Nil { continue } - // Managed Redis (Upstash, Redis Cloud) kills idle connections, which - // surfaces as an i/o timeout on the blocking XReadGroup call. The - // go-redis pool reconnects automatically on the next attempt, so - // treat this as WARN and retry without delay. + // i/o timeout means the server closed an idle connection. The pool + // will reconnect on the next attempt; back off briefly to avoid + // rapid connection cycling and unnecessary I/O. var netErr net.Error if errors.As(err, &netErr) && netErr.Timeout() { slog.Warn("streams: connection timeout, reconnecting", "stream", c.stream) + select { + case <-ctx.Done(): + return nil + case <-time.After(2 * time.Second): + } continue } slog.Error("streams: read error", "stream", c.stream, "err", err)