From 16ba4d5de7ddfbac1b12be3ed714107ad0a27c8a Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 3 Jul 2026 19:58:13 +0800 Subject: [PATCH 1/5] feat(publish): route publish jobs by workspace Publish workers need tenant routing before project-domain tables can move toward Citus distribution. Add workspace ids to publish job payloads and scope publication, schedule, account, activity, and replay lookups through the project workspace. Mismatched worker payloads are discarded after releasing their publish lock while legacy payloads derive a workspace from the project. --- .../internal/services/publish/lifecycle.go | 72 ++++++++++++--- backend/internal/services/publish/outbox.go | 5 +- backend/internal/services/publish/queue.go | 92 ++++++++++++++++--- backend/internal/services/publish/schedule.go | 25 ++--- backend/internal/services/publish/service.go | 51 ++++++++-- 5 files changed, 193 insertions(+), 52 deletions(-) diff --git a/backend/internal/services/publish/lifecycle.go b/backend/internal/services/publish/lifecycle.go index d368a8b7..8f4b1c62 100644 --- a/backend/internal/services/publish/lifecycle.go +++ b/backend/internal/services/publish/lifecycle.go @@ -39,8 +39,8 @@ func (s *Service) lifecycle() PublicationLifecycle { } func (l PublicationLifecycle) MarkQueued(pub *models.ProjectPlatformPublication, queuedAt time.Time) error { - result := l.db.Model(&models.ProjectPlatformPublication{}). - Where("id = ? AND status = ?", pub.ID, pub.Status). + result := l.publicationQuery(pub). + Where("status = ?", pub.Status). Updates(map[string]any{ "status": models.PublicationStatusQueued, "error_message": "", @@ -58,8 +58,8 @@ func (l PublicationLifecycle) MarkQueued(pub *models.ProjectPlatformPublication, } func (l PublicationLifecycle) MarkPublishing(pub *models.ProjectPlatformPublication, startedAt time.Time) error { - result := l.db.Model(&models.ProjectPlatformPublication{}). - Where("id = ? AND status = ?", pub.ID, pub.Status). + result := l.publicationQuery(pub). + Where("status = ?", pub.Status). Updates(map[string]any{ "status": models.PublicationStatusPublishing, "error_message": "", @@ -77,12 +77,18 @@ func (l PublicationLifecycle) MarkPublishing(pub *models.ProjectPlatformPublicat } func (l PublicationLifecycle) MarkFailed(ctx context.Context, projectID uuid.UUID, platform string, message string) error { + return l.MarkFailedForWorkspace(ctx, uuid.Nil, projectID, platform, message) +} + +func (l PublicationLifecycle) MarkFailedForWorkspace(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID, platform string, message string) error { db := l.db if ctx != nil { db = db.WithContext(ctx) } - return db.Model(&models.ProjectPlatformPublication{}). + query := db.Model(&models.ProjectPlatformPublication{}). Where("project_id = ? AND platform = ?", projectID, platform). + Scopes(workspaceScope(workspaceID)) + return query. Updates(map[string]any{ "status": models.PublicationStatusFailed, "error_message": SanitizeUserFacingErrorMessage(message), @@ -103,8 +109,12 @@ func (l PublicationLifecycle) CompletePublication(pub *models.ProjectPlatformPub } else { updates["retry_count"] = gorm.Expr("retry_count + ?", 1) } - if err := l.db.Model(pub).Updates(updates).Error; err != nil { - return err + result := l.publicationQuery(pub).Updates(updates) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + return l.publicationStateChangedError(pub.ID) } pub.Status = completion.Status pub.RemoteID = completion.RemoteID @@ -113,7 +123,15 @@ func (l PublicationLifecycle) CompletePublication(pub *models.ProjectPlatformPub return nil } -func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedAt time.Time) (models.PublishAttempt, bool, error) { +func (l PublicationLifecycle) publicationQuery(pub *models.ProjectPlatformPublication) *gorm.DB { + query := l.db.Model(&models.ProjectPlatformPublication{}).Where("id = ?", pub.ID) + if pub.WorkspaceID != uuid.Nil { + query = query.Where("workspace_id = ?", pub.WorkspaceID) + } + return query +} + +func (l PublicationLifecycle) StartPublishAttemptForWorkspace(workspaceID uuid.UUID, scheduleID uuid.UUID, startedAt time.Time) (models.PublishAttempt, bool, error) { if scheduleID == uuid.Nil { return models.PublishAttempt{}, false, nil } @@ -122,22 +140,23 @@ func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedA } var attempt models.PublishAttempt if err := l.db.Transaction(func(tx *gorm.DB) error { - result := tx.Model(&models.ScheduledPublication{}). + scheduleQuery := tx.Model(&models.ScheduledPublication{}). Where("id = ? AND status IN ?", scheduleID, []string{ models.ScheduledPublicationStatusScheduled, models.ScheduledPublicationStatusFailed, models.ScheduledPublicationStatusNeedsManualAction, }). - Updates(map[string]any{ - "status": models.ScheduledPublicationStatusRunning, - "last_error": "", - }) + Scopes(workspaceScope(workspaceID)) + result := scheduleQuery.Updates(map[string]any{ + "status": models.ScheduledPublicationStatusRunning, + "last_error": "", + }) if result.Error != nil { return result.Error } if result.RowsAffected == 0 { var schedule models.ScheduledPublication - if err := tx.Select("status").First(&schedule, "id = ?", scheduleID).Error; err != nil { + if err := tx.Select("status").Scopes(workspaceScope(workspaceID)).First(&schedule, "id = ?", scheduleID).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return errScheduledPublicationMissing } @@ -173,7 +192,15 @@ func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedA return attempt, true, nil } +func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedAt time.Time) (models.PublishAttempt, bool, error) { + return l.StartPublishAttemptForWorkspace(uuid.Nil, scheduleID, startedAt) +} + func (l PublicationLifecycle) FinishPublishAttempt(attempt *models.PublishAttempt, completion publishAttemptCompletion) error { + return l.FinishPublishAttemptForWorkspace(uuid.Nil, attempt, completion) +} + +func (l PublicationLifecycle) FinishPublishAttemptForWorkspace(workspaceID uuid.UUID, attempt *models.PublishAttempt, completion publishAttemptCompletion) error { if attempt == nil { return nil } @@ -199,6 +226,7 @@ func (l PublicationLifecycle) FinishPublishAttempt(attempt *models.PublishAttemp } return tx.Model(&models.ScheduledPublication{}). Where("id = ?", attempt.ScheduledPublicationID). + Scopes(workspaceScope(workspaceID)). Updates(map[string]any{ "status": scheduleStatus, "last_error": completion.ErrorMessage, @@ -207,6 +235,10 @@ func (l PublicationLifecycle) FinishPublishAttempt(attempt *models.PublishAttemp } func (l PublicationLifecycle) MarkPrepublishSyncing(projectID uuid.UUID, platforms []string) error { + return l.MarkPrepublishSyncingForWorkspace(uuid.Nil, projectID, platforms) +} + +func (l PublicationLifecycle) MarkPrepublishSyncingForWorkspace(workspaceID uuid.UUID, projectID uuid.UUID, platforms []string) error { if len(platforms) == 0 { return nil } @@ -214,6 +246,7 @@ func (l PublicationLifecycle) MarkPrepublishSyncing(projectID uuid.UUID, platfor var activeCount int64 if err := tx.Model(&models.ProjectPlatformPublication{}). Where("project_id = ? AND platform IN ? AND status IN ?", projectID, platforms, activePublicationStatuses()). + Scopes(workspaceScope(workspaceID)). Count(&activeCount).Error; err != nil { return err } @@ -223,6 +256,7 @@ func (l PublicationLifecycle) MarkPrepublishSyncing(projectID uuid.UUID, platfor if err := tx.Model(&models.ProjectPlatformPublication{}). Where("project_id = ? AND platform IN ? AND status NOT IN ?", projectID, platforms, activePublicationStatuses()). + Scopes(workspaceScope(workspaceID)). Updates(map[string]any{ "draft_status": models.PublicationDraftStatusSyncing, "error_message": "", @@ -233,6 +267,7 @@ func (l PublicationLifecycle) MarkPrepublishSyncing(projectID uuid.UUID, platfor if err := tx.Model(&models.ProjectPlatformPublication{}). Where("project_id = ? AND platform IN ? AND status IN ?", projectID, platforms, activePublicationStatuses()). + Scopes(workspaceScope(workspaceID)). Count(&activeCount).Error; err != nil { return err } @@ -251,6 +286,15 @@ func activePublicationStatuses() []string { } } +func workspaceScope(workspaceID uuid.UUID) func(*gorm.DB) *gorm.DB { + return func(db *gorm.DB) *gorm.DB { + if workspaceID == uuid.Nil { + return db + } + return db.Where("workspace_id = ?", workspaceID) + } +} + func (l PublicationLifecycle) publicationStateChangedError(publicationID uuid.UUID) error { var pub models.ProjectPlatformPublication if err := l.db.Select("status", "last_attempt_at").First(&pub, "id = ?", publicationID).Error; err != nil { diff --git a/backend/internal/services/publish/outbox.go b/backend/internal/services/publish/outbox.go index f43c1506..2cad89ed 100644 --- a/backend/internal/services/publish/outbox.go +++ b/backend/internal/services/publish/outbox.go @@ -151,7 +151,10 @@ func (s *Service) dispatchClaimedOutboxEvent(ctx context.Context, event models.O if err := json.Unmarshal(event.Payload, &job); err != nil { return fmt.Errorf("decode publish outbox payload: %w", err) } - if job.JobID == uuid.Nil || job.ProjectID == uuid.Nil || job.UserID == uuid.Nil || job.Platform == "" { + if err := s.ensurePublishJobWorkspaceID(ctx, &job); err != nil { + return fmt.Errorf("resolve publish outbox workspace for event %s: %w", event.ID, err) + } + if job.JobID == uuid.Nil || job.ProjectID == uuid.Nil || job.WorkspaceID == uuid.Nil || job.UserID == uuid.Nil || job.Platform == "" { return fmt.Errorf("invalid publish outbox payload for event %s", event.ID) } return s.queue.Enqueue(ctx, job) diff --git a/backend/internal/services/publish/queue.go b/backend/internal/services/publish/queue.go index f0d24770..0bd44ed4 100644 --- a/backend/internal/services/publish/queue.go +++ b/backend/internal/services/publish/queue.go @@ -41,12 +41,14 @@ var ( ErrPublicationAlreadyPublishing = errors.New("publication is already publishing") ErrPublishQueueEmpty = errors.New("publish queue empty") errPublishLockOwnershipLost = errors.New("publish lock ownership lost") + errPublishJobWorkspaceMismatch = errors.New("publish job workspace does not match project") publishLockRefreshInterval = publishLockRefreshEvery ) type PublishJob struct { JobID uuid.UUID `json:"job_id"` ProjectID uuid.UUID `json:"project_id"` + WorkspaceID uuid.UUID `json:"workspace_id"` UserID uuid.UUID `json:"user_id"` Platform string `json:"platform"` PublicationID uuid.UUID `json:"publication_id,omitempty"` @@ -197,18 +199,22 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID return PublishResponse{}, ErrForbidden } req.IdempotencyKey = normalizeIdempotencyKey(req.IdempotencyKey) + project, projectErr := s.projectForPublish(ctx, projectID, *scopeUserID) + if projectErr != nil { + return PublishResponse{}, projectErr + } + workspaceID := models.ProjectWorkspaceID(project) if req.IdempotencyKey != "" { - if resp, ok, err := s.findIdempotentPublishResponse(projectID, platform, *scopeUserID, req.IdempotencyKey); err != nil { + if resp, ok, err := s.findIdempotentPublishResponseForWorkspace(workspaceID, project.ID, platform, *scopeUserID, req.IdempotencyKey); err != nil { return PublishResponse{}, err } else if ok { return resp, nil } } - - project, pub, err := s.preparePublishJob(ctx, projectID, platform, *scopeUserID) + project, pub, err := s.preparePublishJobForProject(ctx, project, platform, *scopeUserID) if err != nil { if req.IdempotencyKey != "" { - if resp, ok, lookupErr := s.findIdempotentPublishResponse(projectID, platform, *scopeUserID, req.IdempotencyKey); lookupErr != nil { + if resp, ok, lookupErr := s.findIdempotentPublishResponseForWorkspace(workspaceID, project.ID, platform, *scopeUserID, req.IdempotencyKey); lookupErr != nil { return PublishResponse{}, lookupErr } else if ok { return resp, nil @@ -223,7 +229,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID if err != nil { return PublishResponse{}, err } - resp, err := s.PublishProjectWithContext(ctx, projectID, platform, scopeUserID, schedule.ID) + resp, err := s.PublishProjectInWorkspaceWithContext(ctx, workspaceID, project.ID, platform, scopeUserID, schedule.ID) if err != nil { return PublishResponse{}, err } @@ -237,6 +243,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID job := PublishJob{ JobID: uuid.New(), ProjectID: project.ID, + WorkspaceID: workspaceID, UserID: *scopeUserID, Platform: platform, PublicationID: pub.ID, @@ -254,7 +261,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID } if !acquired { if req.IdempotencyKey != "" { - if resp, ok, err := s.waitForIdempotentPublishResponse(ctx, project.ID, platform, *scopeUserID, req.IdempotencyKey); err != nil { + if resp, ok, err := s.waitForIdempotentPublishResponseForWorkspace(ctx, workspaceID, project.ID, platform, *scopeUserID, req.IdempotencyKey); err != nil { return PublishResponse{}, err } else if ok { return resp, nil @@ -277,6 +284,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID job.ScheduleID = schedule.ID if err := txService.recordPublishEvent(models.PublishEvent{ PublicationID: pub.ID, + WorkspaceID: job.WorkspaceID, ProjectID: project.ID, UserID: *scopeUserID, Platform: platform, @@ -287,7 +295,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID }); err != nil { return err } - if err := txService.recordProjectPublishActivity(project.ID, *scopeUserID, models.ProjectActivityPublishRequested, map[string]any{ + if err := txService.recordProjectPublishActivityForWorkspace(job.WorkspaceID, project.ID, *scopeUserID, models.ProjectActivityPublishRequested, map[string]any{ "platform": platform, "job_id": job.JobID.String(), }); err != nil { @@ -298,6 +306,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID } if err := txService.recordPublishEvent(models.PublishEvent{ PublicationID: pub.ID, + WorkspaceID: job.WorkspaceID, ProjectID: project.ID, UserID: *scopeUserID, Platform: platform, @@ -308,7 +317,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID }); err != nil { return err } - if err := txService.recordProjectPublishActivity(project.ID, *scopeUserID, models.ProjectActivityPublishQueued, map[string]any{ + if err := txService.recordProjectPublishActivityForWorkspace(job.WorkspaceID, project.ID, *scopeUserID, models.ProjectActivityPublishQueued, map[string]any{ "platform": platform, "job_id": job.JobID.String(), }); err != nil { @@ -397,7 +406,18 @@ func (s *Service) StartPublishWorkerWithErrors(ctx context.Context) <-chan error } func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { - if job.JobID == uuid.Nil || job.ProjectID == uuid.Nil || job.UserID == uuid.Nil || strings.TrimSpace(job.Platform) == "" { + if err := s.ensurePublishJobWorkspaceID(ctx, &job); err != nil { + if errors.Is(err, errPublishJobWorkspaceMismatch) { + log.Printf("discarding publish job %s because workspace does not match project", job.JobID) + if coordinationQueue := s.coordinationQueueOrDefault(); coordinationQueue != nil && job.JobID != uuid.Nil && job.ProjectID != uuid.Nil && strings.TrimSpace(job.Platform) != "" { + _ = coordinationQueue.ReleaseLock(context.Background(), publishLockKey(job.ProjectID, job.Platform), job.JobID.String()) + } + return nil + } + log.Printf("failed to resolve workspace for publish job %s: %v", job.JobID, err) + return err + } + if job.JobID == uuid.Nil || job.ProjectID == uuid.Nil || job.WorkspaceID == uuid.Nil || job.UserID == uuid.Nil || strings.TrimSpace(job.Platform) == "" { log.Printf("discarding invalid publish job: %+v", job) return nil } @@ -431,6 +451,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { if err := s.recordPublishEvent(models.PublishEvent{ PublicationID: job.PublicationID, + WorkspaceID: job.WorkspaceID, ProjectID: job.ProjectID, UserID: job.UserID, Platform: job.Platform, @@ -442,7 +463,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { log.Printf("failed to record publish job %s start event: %v", job.JobID, err) } - resp, err := s.PublishProjectWithContext(publishCtx, job.ProjectID, job.Platform, &job.UserID, job.ScheduleID) + resp, err := s.PublishProjectInWorkspaceWithContext(publishCtx, job.WorkspaceID, job.ProjectID, job.Platform, &job.UserID, job.ScheduleID) if err != nil { if errors.Is(context.Cause(publishCtx), errPublishLockOwnershipLost) { log.Printf("publish job %s stopped because lock ownership was lost", job.JobID) @@ -451,7 +472,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { log.Printf("publish job %s failed: %v", job.JobID, err) observeJob(publishJobResultError) cleanupCtx, cancelCleanup := publishCleanupContext(ctx) - if markErr := s.lifecycle().MarkFailed(cleanupCtx, job.ProjectID, job.Platform, err.Error()); markErr != nil { + if markErr := s.lifecycle().MarkFailedForWorkspace(cleanupCtx, job.WorkspaceID, job.ProjectID, job.Platform, err.Error()); markErr != nil { log.Printf("failed to mark publish job %s as failed: %v", job.JobID, markErr) } else { s.invalidateDashboardCaches(cleanupCtx) @@ -460,6 +481,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { cancelCleanup() _ = s.recordPublishEvent(models.PublishEvent{ PublicationID: job.PublicationID, + WorkspaceID: job.WorkspaceID, ProjectID: job.ProjectID, UserID: job.UserID, Platform: job.Platform, @@ -480,6 +502,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { observeJob(publishJobResultError) _ = s.recordPublishEvent(models.PublishEvent{ PublicationID: job.PublicationID, + WorkspaceID: job.WorkspaceID, ProjectID: job.ProjectID, UserID: job.UserID, Platform: job.Platform, @@ -496,6 +519,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { } _ = s.recordPublishEvent(models.PublishEvent{ PublicationID: job.PublicationID, + WorkspaceID: job.WorkspaceID, ProjectID: job.ProjectID, UserID: job.UserID, Platform: job.Platform, @@ -514,6 +538,40 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { return nil } +func (s *Service) ensurePublishJobWorkspaceID(ctx context.Context, job *PublishJob) error { + if job == nil { + return nil + } + if job.ProjectID == uuid.Nil { + if job.UserID != uuid.Nil { + job.WorkspaceID = models.PersonalWorkspaceID(job.UserID) + } + return nil + } + + queryCtx := ctx + if queryCtx == nil || queryCtx.Err() != nil { + queryCtx = s.requestContext() + } + var project models.Project + err := s.writerDB(queryCtx).Select("id", "user_id", "workspace_id").First(&project, "id = ?", job.ProjectID).Error + if err == nil { + projectWorkspaceID := models.ProjectWorkspaceID(project) + if job.WorkspaceID != uuid.Nil && job.WorkspaceID != projectWorkspaceID { + return errPublishJobWorkspaceMismatch + } + job.WorkspaceID = projectWorkspaceID + return nil + } + if errors.Is(err, gorm.ErrRecordNotFound) { + if job.UserID != uuid.Nil { + job.WorkspaceID = models.PersonalWorkspaceID(job.UserID) + } + return nil + } + return err +} + func (s *Service) ensurePublishJobLock(ctx context.Context, job PublishJob, lockKey string) (bool, error) { coordinationQueue := s.coordinationQueueOrDefault() if coordinationQueue == nil { @@ -530,7 +588,7 @@ func (s *Service) ensurePublishJobLock(ctx context.Context, job PublishJob, lock return false, nil } - retriable, err := s.publicationRetriableForJob(job.ProjectID, job.Platform) + retriable, err := s.publicationRetriableForJob(job.WorkspaceID, job.ProjectID, job.Platform) if err != nil { return false, err } @@ -585,9 +643,13 @@ func (s *Service) preparePublishJob(ctx context.Context, projectID uuid.UUID, pl if err != nil { return models.Project{}, models.ProjectPlatformPublication{}, ErrForbidden } + return s.preparePublishJobForProject(ctx, project, platform, userID) +} +func (s *Service) preparePublishJobForProject(ctx context.Context, project models.Project, platform string, userID uuid.UUID) (models.Project, models.ProjectPlatformPublication, error) { var pub models.ProjectPlatformPublication - if err := s.strongReadDB(ctx).Where("project_id = ? AND platform = ?", projectID, platform).First(&pub).Error; err != nil { + workspaceID := models.ProjectWorkspaceID(project) + if err := s.strongReadDB(ctx).Where("workspace_id = ? AND project_id = ? AND platform = ?", workspaceID, project.ID, platform).First(&pub).Error; err != nil { return models.Project{}, models.ProjectPlatformPublication{}, fmt.Errorf("publication record not found for platform: %s", platform) } if !pub.Enabled || pub.Status == models.PublicationStatusCancelled { @@ -614,9 +676,9 @@ func (s *Service) preparePublishJob(ctx context.Context, projectID uuid.UUID, pl return project, pub, nil } -func (s *Service) publicationRetriableForJob(projectID uuid.UUID, platform string) (bool, error) { +func (s *Service) publicationRetriableForJob(workspaceID uuid.UUID, projectID uuid.UUID, platform string) (bool, error) { var pub models.ProjectPlatformPublication - if err := s.writerDB(s.requestContext()).Select("enabled", "status").Where("project_id = ? AND platform = ?", projectID, platform).First(&pub).Error; err != nil { + if err := s.writerDB(s.requestContext()).Select("enabled", "status").Where("workspace_id = ? AND project_id = ? AND platform = ?", workspaceID, projectID, platform).First(&pub).Error; err != nil { return false, err } if !pub.Enabled || pub.Status == models.PublicationStatusCancelled { diff --git a/backend/internal/services/publish/schedule.go b/backend/internal/services/publish/schedule.go index 17969a41..9581a64d 100644 --- a/backend/internal/services/publish/schedule.go +++ b/backend/internal/services/publish/schedule.go @@ -29,10 +29,7 @@ func (s *Service) createScheduledPublication(ctx context.Context, project models if !s.writerDB(ctx).Migrator().HasTable(&models.ScheduledPublication{}) { return models.ScheduledPublication{}, nil } - workspaceID := models.PersonalWorkspaceID(project.UserID) - if project.WorkspaceID != nil && *project.WorkspaceID != uuid.Nil { - workspaceID = *project.WorkspaceID - } + workspaceID := models.ProjectWorkspaceID(project) schedule := models.ScheduledPublication{ WorkspaceID: workspaceID, ProjectID: project.ID, @@ -48,7 +45,7 @@ func (s *Service) createScheduledPublication(ctx context.Context, project models if schedule.Status == "" { schedule.Status = models.ScheduledPublicationStatusScheduled } - if latestVersionID, err := s.latestProjectVersionID(ctx, project.ID); err != nil { + if latestVersionID, err := s.latestProjectVersionID(ctx, workspaceID, project.ID); err != nil { return models.ScheduledPublication{}, err } else if latestVersionID != uuid.Nil { schedule.ProjectVersionID = &latestVersionID @@ -73,7 +70,7 @@ func (s *Service) ScheduleProjectPublication(ctx context.Context, projectID uuid } idempotencyKey := normalizeIdempotencyKey(req.IdempotencyKey) if idempotencyKey != "" { - if existing, found, err := s.findIdempotentScheduledPublication(ctx, project.ID, pub.ID, userID, idempotencyKey); err != nil { + if existing, found, err := s.findIdempotentScheduledPublication(ctx, models.ProjectWorkspaceID(project), project.ID, pub.ID, userID, idempotencyKey); err != nil { return nil, err } else if found { item := scheduledPublicationFromModel(existing, project, pub, nil) @@ -97,13 +94,13 @@ func (s *Service) ScheduleProjectPublication(ctx context.Context, projectID uuid return &item, nil } -func (s *Service) findIdempotentScheduledPublication(ctx context.Context, projectID uuid.UUID, publicationID uuid.UUID, userID uuid.UUID, key string) (models.ScheduledPublication, bool, error) { +func (s *Service) findIdempotentScheduledPublication(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID, publicationID uuid.UUID, userID uuid.UUID, key string) (models.ScheduledPublication, bool, error) { if strings.TrimSpace(key) == "" { return models.ScheduledPublication{}, false, nil } var schedule models.ScheduledPublication err := s.writerDB(ctx). - Where("project_id = ? AND publication_id = ? AND created_by = ? AND idempotency_key = ?", projectID, publicationID, userID, key). + Where("workspace_id = ? AND project_id = ? AND publication_id = ? AND created_by = ? AND idempotency_key = ?", workspaceID, projectID, publicationID, userID, key). Order("created_at DESC"). First(&schedule).Error if errors.Is(err, gorm.ErrRecordNotFound) { @@ -160,7 +157,7 @@ func (s *Service) RetryScheduledPublication(ctx context.Context, projectID uuid. if schedule.Status != models.ScheduledPublicationStatusFailed && schedule.Status != models.ScheduledPublicationStatusNeedsManualAction { return nil, ErrPublicationAlreadyPublishing } - if _, err := s.PublishProjectWithContext(ctx, projectID, schedule.Publication.Platform, &userID, scheduleID); err != nil { + if _, err := s.PublishProjectInWorkspaceWithContext(ctx, schedule.WorkspaceID, projectID, schedule.Publication.Platform, &userID, scheduleID); err != nil { return nil, err } return s.scheduledPublicationDetail(ctx, scheduleID) @@ -289,7 +286,7 @@ func (s *Service) dispatchScheduledPublication(ctx context.Context, schedule mod platform := strings.TrimSpace(schedule.Publication.Platform) if platform == "" { var publication models.ProjectPlatformPublication - if err := s.writerDB(ctx).Select("platform").First(&publication, "id = ?", schedule.PublicationID).Error; err != nil { + if err := s.writerDB(ctx).Select("platform").First(&publication, "id = ? AND workspace_id = ?", schedule.PublicationID, schedule.WorkspaceID).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil } @@ -300,10 +297,14 @@ func (s *Service) dispatchScheduledPublication(ctx context.Context, schedule mod if platform == "" { return nil } + if schedule.WorkspaceID == uuid.Nil { + return nil + } job := PublishJob{ JobID: uuid.New(), ProjectID: schedule.ProjectID, + WorkspaceID: schedule.WorkspaceID, UserID: schedule.CreatedBy, Platform: platform, PublicationID: schedule.PublicationID, @@ -326,13 +327,13 @@ func (s *Service) dispatchScheduledPublication(ctx context.Context, schedule mod return s.processPublishJob(ctx, job) } -func (s *Service) latestProjectVersionID(ctx context.Context, projectID uuid.UUID) (uuid.UUID, error) { +func (s *Service) latestProjectVersionID(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID) (uuid.UUID, error) { if !s.writerDB(ctx).Migrator().HasTable(&models.ProjectVersion{}) { return uuid.Nil, nil } var version models.ProjectVersion err := s.writerDB(ctx).Select("id"). - Where("project_id = ?", projectID). + Where("workspace_id = ? AND project_id = ?", workspaceID, projectID). Order("version_number DESC, created_at DESC"). First(&version).Error if err == nil { diff --git a/backend/internal/services/publish/service.go b/backend/internal/services/publish/service.go index 4058b72d..51b64613 100644 --- a/backend/internal/services/publish/service.go +++ b/backend/internal/services/publish/service.go @@ -270,6 +270,10 @@ func (s *Service) PublishProject(projectID uuid.UUID, platform string, scopeUser } func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid.UUID, platform string, scopeUserID *uuid.UUID, scheduleID uuid.UUID) (PublishResponse, error) { + return s.PublishProjectInWorkspaceWithContext(ctx, uuid.Nil, projectID, platform, scopeUserID, scheduleID) +} + +func (s *Service) PublishProjectInWorkspaceWithContext(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID, platform string, scopeUserID *uuid.UUID, scheduleID uuid.UUID) (PublishResponse, error) { // Remote browser sessions are only for account connection and cookie capture. // Publish jobs must be durable across Redis workers, so they load saved credentials instead. if ctx == nil { @@ -284,9 +288,15 @@ func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid. if err != nil { return PublishResponse{}, err } + projectWorkspaceID := models.ProjectWorkspaceID(proj) + if workspaceID == uuid.Nil { + workspaceID = projectWorkspaceID + } else if workspaceID != projectWorkspaceID { + return PublishResponse{}, ErrForbidden + } var pub models.ProjectPlatformPublication - if err := s.strongReadDB(ctx).Where("project_id = ? AND platform = ?", projectID, platform).First(&pub).Error; err != nil { + if err := s.strongReadDB(ctx).Where("workspace_id = ? AND project_id = ? AND platform = ?", workspaceID, projectID, platform).First(&pub).Error; err != nil { return PublishResponse{}, fmt.Errorf("publication record not found for platform: %s", platform) } if !pub.Enabled || pub.Status == models.PublicationStatusCancelled { @@ -295,13 +305,13 @@ func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid. startedAt := time.Now().UTC() lifecycle := s.lifecycle() - attempt, hasAttempt, err := lifecycle.StartPublishAttempt(scheduleID, startedAt) + attempt, hasAttempt, err := lifecycle.StartPublishAttemptForWorkspace(workspaceID, scheduleID, startedAt) if err != nil { return PublishResponse{}, err } failAttempt := func(err error) error { if hasAttempt { - _ = lifecycle.FinishPublishAttempt(&attempt, publishAttemptCompletion{ + _ = lifecycle.FinishPublishAttemptForWorkspace(workspaceID, &attempt, publishAttemptCompletion{ Status: models.PublishAttemptStatusFailed, ErrorMessage: SanitizeUserFacingErrorMessage(err.Error()), }) @@ -330,7 +340,7 @@ func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid. var account models.PlatformAccount if pub.PlatformAccountID != nil && *pub.PlatformAccountID != uuid.Nil { - if err := s.strongReadDB(ctx).Where("id = ?", *pub.PlatformAccountID).First(&account).Error; err != nil { + if err := s.strongReadDB(ctx).Where("id = ? AND workspace_id = ?", *pub.PlatformAccountID, workspaceID).First(&account).Error; err != nil { return PublishResponse{}, err } } @@ -391,7 +401,7 @@ func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid. attemptStatus = models.PublishAttemptStatusFailed } if hasAttempt { - if err := lifecycle.FinishPublishAttempt(&attempt, publishAttemptCompletion{ + if err := lifecycle.FinishPublishAttemptForWorkspace(workspaceID, &attempt, publishAttemptCompletion{ Status: attemptStatus, RemoteID: remoteID, PublishURL: publishURL, @@ -401,7 +411,7 @@ func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid. } } s.invalidateDashboardCaches(ctx) - if err := s.recordProjectPublishActivity(projectID, *scopeUserID, models.ProjectActivityPublishCompleted, map[string]any{ + if err := s.recordProjectPublishActivityForWorkspace(workspaceID, projectID, *scopeUserID, models.ProjectActivityPublishCompleted, map[string]any{ "platform": platform, "status": status, "remote_id": remoteID, @@ -482,6 +492,10 @@ func (s *Service) recordPublishEvent(event models.PublishEvent) error { } func (s *Service) recordProjectPublishActivity(projectID uuid.UUID, userID uuid.UUID, eventType string, metadata map[string]any) error { + return s.recordProjectPublishActivityForWorkspace(uuid.Nil, projectID, userID, eventType, metadata) +} + +func (s *Service) recordProjectPublishActivityForWorkspace(workspaceID uuid.UUID, projectID uuid.UUID, userID uuid.UUID, eventType string, metadata map[string]any) error { if projectID == uuid.Nil || userID == uuid.Nil || strings.TrimSpace(eventType) == "" { return nil } @@ -489,6 +503,12 @@ func (s *Service) recordProjectPublishActivity(projectID uuid.UUID, userID uuid. if err := s.writerDB(s.requestContext()).Select("id", "user_id", "workspace_id").First(&project, "id = ?", projectID).Error; err != nil { return err } + projectWorkspaceID := models.ProjectWorkspaceID(project) + if workspaceID == uuid.Nil { + workspaceID = projectWorkspaceID + } else if workspaceID != projectWorkspaceID { + return ErrForbidden + } payload := datatypes.JSON([]byte(`{}`)) if metadata != nil { encoded, err := json.Marshal(metadata) @@ -498,7 +518,7 @@ func (s *Service) recordProjectPublishActivity(projectID uuid.UUID, userID uuid. payload = datatypes.JSON(encoded) } return s.writerDB(s.requestContext()).Create(&models.ProjectActivity{ - WorkspaceID: models.ProjectWorkspaceID(project), + WorkspaceID: workspaceID, ProjectID: projectID, ActorUserID: userID, EventType: eventType, @@ -507,6 +527,10 @@ func (s *Service) recordProjectPublishActivity(projectID uuid.UUID, userID uuid. } func (s *Service) findIdempotentPublishResponse(projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { + return s.findIdempotentPublishResponseForWorkspace(uuid.Nil, projectID, platform, userID, key) +} + +func (s *Service) findIdempotentPublishResponseForWorkspace(workspaceID uuid.UUID, projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { if strings.TrimSpace(key) == "" { return PublishResponse{}, false, nil } @@ -516,9 +540,11 @@ func (s *Service) findIdempotentPublishResponse(projectID uuid.UUID, platform st } var queued models.PublishEvent - err := db. + query := db. + Scopes(workspaceScope(workspaceID)). Where("project_id = ? AND platform = ? AND user_id = ? AND idempotency_key = ? AND event_type = ?", projectID, platform, userID, key, "queued"). - Order("created_at DESC"). + Order("created_at DESC") + err := query. First(&queued).Error if errors.Is(err, gorm.ErrRecordNotFound) { return PublishResponse{}, false, nil @@ -530,6 +556,7 @@ func (s *Service) findIdempotentPublishResponse(projectID uuid.UUID, platform st event := queued var events []models.PublishEvent err = db. + Scopes(workspaceScope(workspaceID)). Where("project_id = ? AND platform = ? AND user_id = ? AND job_id = ?", projectID, platform, userID, queued.JobID). Order("created_at ASC"). Find(&events).Error @@ -581,6 +608,10 @@ func publishEventReplayRank(eventType string) int { } func (s *Service) waitForIdempotentPublishResponse(ctx context.Context, projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { + return s.waitForIdempotentPublishResponseForWorkspace(ctx, uuid.Nil, projectID, platform, userID, key) +} + +func (s *Service) waitForIdempotentPublishResponseForWorkspace(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { deadline := time.NewTimer(publishReplayWait) defer deadline.Stop() @@ -588,7 +619,7 @@ func (s *Service) waitForIdempotentPublishResponse(ctx context.Context, projectI defer ticker.Stop() for { - resp, ok, err := s.findIdempotentPublishResponse(projectID, platform, userID, key) + resp, ok, err := s.findIdempotentPublishResponseForWorkspace(workspaceID, projectID, platform, userID, key) if err != nil || ok { return resp, ok, err } From da45c4bb307c0e1dda291ecdbaa0bd97510f4e03 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 3 Jul 2026 19:58:37 +0800 Subject: [PATCH 2/5] test(publish): cover workspace-routed publish jobs Publish queue tests need to pin the tenant routing behavior added to durable worker payloads. Assert workspace ids in queue and outbox payloads, including legacy outbox replay derivation. Cover mismatched workspace jobs so they leave publication state untouched and release their lock. --- .../internal/services/publish/queue_test.go | 87 ++++++++++++++++++- 1 file changed, 84 insertions(+), 3 deletions(-) diff --git a/backend/internal/services/publish/queue_test.go b/backend/internal/services/publish/queue_test.go index 92974be6..10f628ac 100644 --- a/backend/internal/services/publish/queue_test.go +++ b/backend/internal/services/publish/queue_test.go @@ -407,6 +407,7 @@ func TestEnqueuePublishProjectQueuesAndLocksPublication(t *testing.T) { require.NoError(t, err) require.Equal(t, models.PublicationStatusQueued, resp.Status) require.Len(t, queue.jobs, 1) + require.Equal(t, models.PersonalWorkspaceID(user.ID), queue.jobs[0].WorkspaceID) require.Equal(t, uuid.Nil, queue.jobs[0].BrowserSessionID) lockKey := publishLockKey(project.ID, "wechat") @@ -419,6 +420,9 @@ func TestEnqueuePublishProjectQueuesAndLocksPublication(t *testing.T) { require.Equal(t, models.OutboxStatusDispatched, outbox.Status) require.Equal(t, 1, outbox.Attempts) require.NotNil(t, outbox.ProcessedAt) + var outboxJob PublishJob + require.NoError(t, json.Unmarshal(outbox.Payload, &outboxJob)) + require.Equal(t, models.PersonalWorkspaceID(user.ID), outboxJob.WorkspaceID) var saved models.ProjectPlatformPublication require.NoError(t, db.First(&saved, "project_id = ? AND platform = ?", project.ID, "wechat").Error) @@ -844,6 +848,7 @@ func TestEnqueuePublishProjectLeavesFailedDispatchInOutboxForRetry(t *testing.T) require.NoError(t, service.FlushPublishOutbox(context.Background(), 10)) require.Len(t, queue.jobs, 1) require.Equal(t, resp.JobID, queue.jobs[0].JobID.String()) + require.Equal(t, models.PersonalWorkspaceID(user.ID), queue.jobs[0].WorkspaceID) require.NoError(t, db.First(&outbox, "id = ?", outbox.ID).Error) require.Equal(t, models.OutboxStatusDispatched, outbox.Status) @@ -857,14 +862,36 @@ func TestFlushPublishOutboxRetriesStaleProcessingEvent(t *testing.T) { queue := newTestPublishQueue() service.queue = queue + user := models.User{Username: "legacy-outbox-owner"} + require.NoError(t, db.Create(&user).Error) + project := models.Project{ + UserID: user.ID, + Title: "Legacy queued post", + SourceContent: "

ready

", + Status: models.ProjectStatusReady, + } + require.NoError(t, db.Create(&project).Error) + job := PublishJob{ JobID: uuid.New(), - ProjectID: uuid.New(), - UserID: uuid.New(), + ProjectID: project.ID, + UserID: user.ID, Platform: "wechat", EnqueuedAt: time.Now().UTC(), } - payload, err := json.Marshal(job) + payload, err := json.Marshal(struct { + JobID uuid.UUID `json:"job_id"` + ProjectID uuid.UUID `json:"project_id"` + UserID uuid.UUID `json:"user_id"` + Platform string `json:"platform"` + EnqueuedAt time.Time `json:"enqueued_at"` + }{ + JobID: job.JobID, + ProjectID: job.ProjectID, + UserID: job.UserID, + Platform: job.Platform, + EnqueuedAt: job.EnqueuedAt, + }) require.NoError(t, err) staleUpdatedAt := time.Now().UTC().Add(-publishOutboxClaimTimeout - time.Second) outbox := models.OutboxEvent{ @@ -882,6 +909,7 @@ func TestFlushPublishOutboxRetriesStaleProcessingEvent(t *testing.T) { require.NoError(t, service.FlushPublishOutbox(context.Background(), 10)) require.Len(t, queue.jobs, 1) + job.WorkspaceID = models.PersonalWorkspaceID(user.ID) require.Equal(t, job, queue.jobs[0]) require.NoError(t, db.First(&outbox, "id = ?", outbox.ID).Error) require.Equal(t, models.OutboxStatusDispatched, outbox.Status) @@ -1166,6 +1194,58 @@ func TestProcessPublishJobPublishesAndReleasesLock(t *testing.T) { }, observer.observations) } +func TestProcessPublishJobRejectsWorkspaceMismatch(t *testing.T) { + db := setupPublishQueueTestDB(t) + service := newPublishTestService(db) + queue := newTestPublishQueue() + service.queue = queue + + publisher.Factory.Register("wechat", queueTestPublisher{}) + defer publisher.Factory.Register("wechat", &publisher.WechatPublisher{}) + + user := models.User{Username: "owner"} + require.NoError(t, db.Create(&user).Error) + project := models.Project{ + UserID: user.ID, + Title: "Queued post", + SourceContent: "

ready

", + Status: models.ProjectStatusReady, + } + require.NoError(t, db.Create(&project).Error) + createConnectedQueueAccount(t, db, user.ID, "wechat") + require.NoError(t, db.Create(&models.ProjectPlatformPublication{ + ProjectID: project.ID, + Platform: "wechat", + Enabled: true, + Status: models.PublicationStatusPublishing, + Config: datatypes.JSON(`{"title":"Queued post"}`), + AdaptedContent: datatypes.JSON(`{"format":"html","html":"ready"}`), + }).Error) + + wrongWorkspaceID := uuid.New() + job := PublishJob{ + JobID: uuid.New(), + ProjectID: project.ID, + WorkspaceID: wrongWorkspaceID, + UserID: user.ID, + Platform: "wechat", + EnqueuedAt: time.Now().UTC(), + } + lockKey := publishLockKey(project.ID, "wechat") + queue.locks[lockKey] = job.JobID.String() + + require.NoError(t, service.processPublishJob(context.Background(), job)) + + var saved models.ProjectPlatformPublication + require.NoError(t, db.First(&saved, "project_id = ? AND platform = ?", project.ID, "wechat").Error) + require.Equal(t, models.PublicationStatusPublishing, saved.Status) + require.Empty(t, queue.locks[lockKey]) + + var wrongWorkspaceEvents int64 + require.NoError(t, db.Model(&models.PublishEvent{}).Where("workspace_id = ?", wrongWorkspaceID).Count(&wrongWorkspaceEvents).Error) + require.Zero(t, wrongWorkspaceEvents) +} + func TestProcessPublishJobReacquiresExpiredLock(t *testing.T) { db := setupPublishQueueTestDB(t) service := newPublishTestService(db) @@ -1476,6 +1556,7 @@ func TestRedisPublishQueueEnqueuesAsynqTask(t *testing.T) { var payload PublishJob require.NoError(t, json.Unmarshal(tasks[0].Payload, &payload)) require.Equal(t, job, payload) + require.Contains(t, string(tasks[0].Payload), `"workspace_id"`) } func TestStartPublishWorkerWithErrorsReportsRunnerFailure(t *testing.T) { From fc83ff6764010e5bea5d82b6dee55d9a85e300b5 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 3 Jul 2026 19:59:16 +0800 Subject: [PATCH 3/5] docs(database): record Citus constraint review Phase 5 needs explicit constraint and join evidence before distributed-table rehearsal. Add a constraint review manifest and update table-group readiness terms with required primary-key, unique, and partition follow-ups. Update the database optimization plan to mark the review and worker-routing evidence as completed. --- doc/plan/database-optimization.md | 33 ++++-- script/db/citus_constraint_review.yml | 143 ++++++++++++++++++++++++++ script/db/citus_table_groups.yml | 108 +++++++++---------- 3 files changed, 222 insertions(+), 62 deletions(-) create mode 100644 script/db/citus_constraint_review.yml diff --git a/doc/plan/database-optimization.md b/doc/plan/database-optimization.md index 843f2644..5f5563cc 100644 --- a/doc/plan/database-optimization.md +++ b/doc/plan/database-optimization.md @@ -11,7 +11,7 @@ Status definitions: - `Not started`: no clear implementation has been found yet. - `Deferred`: not recommended for the current business stage; only trigger conditions are retained. -Current overall progress: about `80%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. +Current overall progress: about `86%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. | Phase | Weight | Current completion | Status | Completed | Not done / next steps | | ----- | ------ | ------------------ | ------ | --------- | --------------------- | @@ -20,7 +20,7 @@ Current overall progress: about `80%`. This number is manually estimated by phas | Phase 2: Read models and cache first | 15% | 100% | Done | Redis and Asynq dependencies are reusable; admin dashboard stats, admin project list, and dashboard account summary have short-TTL Redis cache; stats/project list/account cache misses are merged with singleflight; project, prepublish, publish, and account write paths invalidate the related dashboard cache; `workspace_dashboard_stats` and `project_list_summaries` read models are in place, and APIs prefer read models when coverage is complete; async refresh triggers after project save, platform sync, publish completion, and member changes; admin rebuild API, Asynq queue, and worker support full read-model rebuild | None | | Phase 3: Read/write splitting | 15% | 100% | Done | Optional `DB_READER_*` connection, application-level DB Router, signed sticky writer, consistency routing for project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension, consistency-level inventories for dashboard/publish/collab-service, self-hosted PostgreSQL read replica, managed `postgres-reader` entry point, PgBouncer reader pool, replica lag monitoring and automatic fallback to writer when over threshold | None; Phase 4 continues partitioning, archiving, and recovery flows | | Phase 4: Single-database partitioning, archiving, and hot/cold tiering | 15% | 100% | Done | Collaborative editing already has state + update batch + compaction foundation; `collab_document_update_batches` has PostgreSQL `document_id` hash partition target schema; event and terminal-session history already have row-level R2/S3 archive worker; `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and `remote_browser_sessions` have PostgreSQL monthly partition target schema; the archive worker exports whole cold monthly partitions to R2/S3 before detaching and dropping them; archive recovery procedure is documented | None | -| Phase 5: Citus preparation | 20% | 50% | In progress | Workspace model, `projects.workspace_id`, personal workspace ID, explicit or derived `workspace_id` coverage for project-domain tables, and Citus distributed/reference/colocated table-group design | Unique constraint and foreign-key review, worker payload routing | +| Phase 5: Citus preparation | 20% | 80% | In progress | Workspace model, `projects.workspace_id`, personal workspace ID, explicit or derived `workspace_id` coverage for project-domain tables, Citus distributed/reference/colocated table-group design, unique constraint / foreign-key / join review, and publish worker payload routing | Citus-safe constraint refactors, workspace-scoped repository boundary, validation-cluster rehearsal | | Phase 6: Citus distributed PostgreSQL operation | 10% | 0% | Deferred | None | Future Citus cluster design, worker/coordinator monitoring and backup, large-tenant isolation strategy | ### 0.1 Progress Update Rules @@ -60,14 +60,14 @@ Atomic commit guidance: | Application connection pools | Done | backend/publish-worker support `DB_MAX_OPEN_CONNS`, `DB_MAX_IDLE_CONNS`, `DB_CONN_MAX_LIFETIME`, and `DB_CONN_MAX_IDLE_TIME`; collab-service node-postgres pool supports `DB_MAX_OPEN_CONNS`, `DB_CONN_MAX_LIFETIME`, and `DB_CONN_MAX_IDLE_TIME`; Redis client supports `REDIS_POOL_SIZE`, `REDIS_MIN_IDLE_CONNS`, `REDIS_MAX_IDLE_CONNS`, `REDIS_CONN_MAX_IDLE_TIME`, and `REDIS_CONN_MAX_LIFETIME`; Docker Compose and self-hosted Kubernetes reuse PostgreSQL connections through PgBouncer writer pool; GORM PostgreSQL driver uses simple protocol to remain compatible with transaction pooling; self-hosted Kubernetes added PgBouncer reader pool and points app `DB_READER_HOST` to the reader pool | None | `backend/internal/db/db.go`, `backend/internal/redisclient/redisclient.go`, `collab-service/src/config.ts`, `collab-service/src/persistence/document-persistence.ts`, `collab-service/src/persistence/document-persistence.test.ts`, `deploy/docker/docker-compose.yml`, `deploy/kubernetes/data-services/self-hosted/pgbouncer.yaml`, `deploy/kubernetes/app-baseline/app-config.yaml` | | Query observability | Done | GORM QueryObserver, slow-query logs, `mpp_db_queries_total`, `mpp_db_query_duration_seconds`, `mpp_db_slow_queries_total`, self-hosted PostgreSQL `pg_stat_statements`, PostgreSQL exporter table-level health metrics, Grafana table row count / 24h growth / table size / index size / dead tuples / vacuum panels | Threshold alerts can continue to be added by production baseline in Phase 1/4 | `backend/internal/db/query_observer.go`, `backend/internal/observability/observability.go`, `script/db/audit_database_baseline.sql`, `deploy/docker/observability/postgres-exporter/queries.yml`, `deploy/docker/observability/grafana/dashboards/mpp-observability-baseline.json` | | Dashboard query audit | Done | Existing audit script covers dashboard Count, list, platform filter, publication preload, account query, and active session query plans | Not yet turned into a periodic CI/ops gate | `script/db/audit_dashboard_query_plans.sql` | -| Tenant boundary | In progress | Existing `workspaces`, `workspace_members`, `projects.workspace_id`, and personal workspace rules; project-domain publishing, activity, comment, version, share-link, media, AI, extension, and collaboration rows now carry `workspace_id` directly, with model hooks or service write paths deriving it from the owning project, document, or media asset for new writes; Citus target table groups are mapped to the `tenant_workspace` colocation group | Distributed-table constraints and worker payload ownership are still open | `backend/internal/models/hooks.go`, `backend/internal/models/ai_hooks.go`, `backend/internal/models/collab.go`, `backend/internal/db/monthly_partitions.go`, `backend/internal/db/hash_partitions.go`, `collab-service/src/persistence/document-persistence.ts`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.test.ts`, `script/db/citus_table_groups.yml`, `script/db/test_citus_table_groups.rb` | +| Tenant boundary | In progress | Existing `workspaces`, `workspace_members`, `projects.workspace_id`, and personal workspace rules; project-domain publishing, activity, comment, version, share-link, media, AI, extension, and collaboration rows now carry `workspace_id` directly, with model hooks or service write paths deriving it from the owning project, document, or media asset for new writes; Citus target table groups are mapped to the `tenant_workspace` colocation group; unique constraints, foreign keys, and cross-tenant joins have a documented Citus review; publish worker payloads carry `workspace_id` | Citus-safe constraint refactors and workspace-scoped repository boundaries are still open before validation-cluster DDL | `backend/internal/models/hooks.go`, `backend/internal/models/ai_hooks.go`, `backend/internal/models/collab.go`, `backend/internal/db/monthly_partitions.go`, `backend/internal/db/hash_partitions.go`, `collab-service/src/persistence/document-persistence.ts`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.test.ts`, `script/db/citus_table_groups.yml`, `script/db/citus_constraint_review.yml`, `script/db/test_citus_table_groups.rb`, `script/db/test_citus_constraint_review.rb`, `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/services/publish/schedule.go` | | Dashboard read models | Done | Added `workspace_dashboard_stats` and `project_list_summaries` read models, idempotently recomputed from fact tables by a centralized readmodel service; async refresh is triggered after project save, platform sync, publish completion, and member changes; admin stats and admin project list prefer read models when coverage is complete; admin rebuild API enqueues through Asynq, and API/worker processes can start readmodel workers for full rebuild from fact tables | None | `backend/internal/models/models.go`, `backend/internal/services/readmodel/service.go`, `backend/internal/services/readmodel/queue.go`, `backend/internal/services/readmodel/service_test.go`, `backend/internal/services/readmodel/queue_test.go`, `backend/internal/services/stats/overview.go`, `backend/internal/services/project/lifecycle.go`, `backend/internal/handlers/dashboard.go`, `backend/cmd/api/main.go`, `backend/cmd/publish-worker/main.go` | | Redis read cache | Done | Redis is already used for queues, locks, OAuth, browser sessions, and short-term coordination; admin dashboard stats, admin project list, and dashboard account summary use 15s TTL cache and bypass scoped/sticky-writer strong-consistency paths; stats/project list/account cache misses use singleflight to prevent process-local stampede; stats and account caches use versioned payloads and semantic validation, and Redis read-error fallback is also merged into one DB computation per key; project create/edit/platform save, prepublish sync/draft update, publish queue/execute/fail, and platform account write paths invalidate the related dashboard cache; full read-model rebuild reuses the Redis/Asynq queue | None | `backend/internal/services/stats/overview.go`, `backend/internal/services/stats/overview_test.go`, `backend/internal/services/project/list_cache.go`, `backend/internal/services/project/list_cache_test.go`, `backend/internal/services/prepublish/drafts.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/publication_flow_test.go`, `backend/internal/services/publish/queue_test.go`, `backend/internal/services/platform_account/account_cache.go`, `backend/internal/services/platform_account/account_cache_test.go`, `backend/internal/services/browser_session/complete.go`, `backend/internal/services/browser_session/service_test.go`, `backend/internal/services/readmodel/queue.go` | | Read/write splitting | Done | Supports optional `DB_READER_*` read-replica connection, `DefaultRouter`, and signed sticky writer; project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension are wired to strong/eventual/writer routing; dashboard, publish, and collab-service consistency-level inventories are complete, with collab-service online path kept writer-only; writer/reader pools are in self-hosted Kubernetes, and managed overlay provides a `postgres-reader` ExternalName entry point; `DB_READER_MAX_REPLICA_LAG` configures the replica lag threshold, eventual/analytics reads automatically fall back to writer when over threshold or lag is unknown, and `mpp_db_replica_lag_seconds` and `mpp_db_replica_healthy` metrics are exposed | None | `backend/internal/db/db.go`, `backend/internal/db/router.go`, `backend/internal/db/replica_lag.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/prepublish/service.go`, `backend/internal/services/mediaasset/service.go`, `backend/internal/services/browser_session/service.go`, `backend/internal/services/extension/service.go`, `backend/internal/app/runtime.go`, `deploy/kubernetes/data-services/self-hosted/postgres.yaml`, `deploy/kubernetes/data-services/self-hosted/pgbouncer.yaml`, `deploy/kubernetes/data-services/managed/services.yaml`, `script/kubernetes/validation/data_services.rb` | | Event-table partitioning and archiving | Done | `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and terminal `remote_browser_sessions` have default retention periods; the `archive` worker can batch-export JSONL to R2/S3 and delete old hot-table rows after successful upload; PostgreSQL schema initialization now creates monthly `created_at` partitions for `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and `remote_browser_sessions`, with partition-compatible `(id, created_at)` primary keys and rolling partition creation; the archive worker exports whole cold monthly partitions as JSONL to R2/S3, then detaches and drops the partition after successful upload; PostgreSQL browser-session active-row fallback uses a scoped advisory transaction lock because partitioned unique constraints must include the partition key; the archive recovery procedure defines inspection, staging restore, optional hot-table reinsertion, and audit checks | None | `backend/internal/db/monthly_partitions.go`, `backend/internal/db/db.go`, `backend/internal/models/models.go`, `backend/internal/db/db_test.go`, `backend/internal/services/browser_session/start.go`, `backend/internal/services/browser_session/cleanup.go`, `backend/internal/services/archive/worker.go`, `backend/internal/services/archive/partitions.go`, `backend/internal/services/archive/worker_test.go`, `backend/internal/services/archive/partitions_test.go`, Phase 4 archive recovery procedure in this document | | Collaboration batch governance | In progress | `collab_document_states`, `collab_document_update_batches`, and compaction/retention foundations exist; PostgreSQL schema initialization creates a 16-way `document_id` hash-partitioned `collab_document_update_batches` target table and migrates existing regular-table rows into it | Cold archiving is not implemented | `backend/internal/db/hash_partitions.go`, `backend/internal/db/db.go`, `backend/internal/models/collab.go`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.ts` | -| Outbox/CDC/event stream | In progress | The publishing queue path has a transactional Outbox: `EnqueuePublishProject` writes `outbox_events` in the same transaction and dispatches immediately after commit; publish worker starts an outbox dispatcher and supports retries for failed/stale processing records; Asynq continues to serve as the task-execution queue, and `PublishEvent` continues to serve as publishing audit | Currently covers only `publish.job_requested`; general business-event outbox, Debezium, and Redpanda/Kafka CDC are not implemented | `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/models/models.go` | -| Citus target state | In progress | Confirmed `workspace_id` as the most suitable distribution-column direction; project-domain tables now have explicit or stable derived tenant routing coverage, including direct-insert fallbacks for scheduled publishing, media, and AI rows; distributed tables, reference-table candidates, control-domain tables, and deferred colocation gaps are mapped in a machine-readable design manifest | Unique-constraint / foreign-key review and worker payload routing are not implemented | Phase 5 checklist, `script/db/citus_table_groups.yml`, `script/db/test_citus_table_groups.rb`, `backend/internal/models/ai_hooks.go`, and `backend/internal/db/db_test.go` | +| Outbox/CDC/event stream | In progress | The publishing queue path has a transactional Outbox: `EnqueuePublishProject` writes `outbox_events` in the same transaction and dispatches immediately after commit; publish worker starts an outbox dispatcher and supports retries for failed/stale processing records; publish job payloads include `workspace_id` for worker routing, while `outbox_events` remains coordinator/control-domain state; Asynq continues to serve as the task-execution queue, and `PublishEvent` continues to serve as publishing audit | Currently covers only `publish.job_requested`; general business-event outbox, Debezium, Redpanda/Kafka CDC, and relational `outbox_events.workspace_id` are not implemented | `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/services/publish/schedule.go`, `backend/internal/models/models.go` | +| Citus target state | In progress | Confirmed `workspace_id` as the most suitable distribution-column direction; project-domain tables now have explicit or stable derived tenant routing coverage, including direct-insert fallbacks for scheduled publishing, media, and AI rows; distributed tables, reference-table candidates, control-domain tables, and deferred colocation gaps are mapped in a machine-readable design manifest; unique constraints, foreign keys, cross-tenant joins, and worker payload routing now have reviewed evidence | Actual Citus-safe constraint refactors, workspace-scoped repository entry points, relational outbox routing, and validation-cluster rehearsal are not implemented | Phase 5 checklist, `script/db/citus_table_groups.yml`, `script/db/citus_constraint_review.yml`, `script/db/test_citus_table_groups.rb`, `script/db/test_citus_constraint_review.rb`, `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/services/publish/schedule.go`, `backend/internal/models/ai_hooks.go`, and `backend/internal/db/db_test.go` | ### 0.3 Phase Checklist @@ -129,8 +129,8 @@ Atomic commit guidance: - [x] Confirm `workspace_id` as the preferred Citus distribution column. - [x] Complete `workspace_id` or a stable derivation path for project-domain tables. Verification entry point: `backend/internal/models/hooks.go`, `backend/internal/models/ai_hooks.go`, `backend/internal/models/collab.go`, `backend/internal/db/monthly_partitions.go`, `backend/internal/db/hash_partitions.go`, `collab-service/src/persistence/document-persistence.ts`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.test.ts`. - [x] Design Citus distributed tables, reference tables, and colocated table groups. Verification entry point: `script/db/citus_table_groups.yml`, `script/db/test_citus_table_groups.rb`, and Section 7.2.1 Citus table group design. -- [ ] Review unique constraints, foreign keys, and cross-tenant joins. -- [ ] Add `workspace_id` to worker payloads. +- [x] Review unique constraints, foreign keys, and cross-tenant joins. Verification entry point: `script/db/citus_constraint_review.yml`, `script/db/test_citus_constraint_review.rb`, and Section 7.2.2 Citus constraint, foreign-key, and join review. +- [x] Add `workspace_id` to worker payloads. Verification entry point: `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/services/publish/schedule.go`, and `backend/internal/services/publish/queue_test.go`. #### Phase 6: Citus Distributed PostgreSQL Operation @@ -599,7 +599,7 @@ Control-domain tables for the first Citus validation: | ----- | --------------------------------------------- | | `platform_accounts`, `platform_account_grants` | Credential ownership is still user/platform scoped, and `platform_accounts.workspace_id` is nullable | | `remote_browser_sessions` | Short-lived runtime state is governed by Redis and the existing monthly archive path | -| `outbox_events` | Queue dispatch remains coordinator/control-domain state until worker payloads carry `workspace_id` | +| `outbox_events` | Queue dispatch remains coordinator/control-domain state; publish worker payloads now carry `workspace_id`, but `outbox_events` has no relational routing column yet | | `extension_execution_event_claims` | Global idempotency claims do not carry tenant routing yet | Deferred colocation gaps: @@ -611,6 +611,23 @@ Deferred colocation gaps: | `content_templates` | Split system templates from workspace templates or make tenant-scoped rows non-null by design | | `ai_drafting_messages`, `ai_tool_calls`, `ai_drafting_session_summaries`, `ai_session_events` | Add `workspace_id` derived from `ai_drafting_sessions` and review session-local uniqueness | +#### 7.2.2 Citus Constraint, Foreign-key, and Join Review + +The Phase 5 constraint review is captured in `script/db/citus_constraint_review.yml` and checked by `script/db/test_citus_constraint_review.rb`. + +Constraint review findings: + +- Ready without primary-key redesign: `workspaces`, `workspace_members`, `workspace_dashboard_stats`, and `workspace_quota_aggregates`, because their current key already contains the distribution value. +- Needs Citus-safe unique or primary-key redesign before distributed DDL: project rows, project publications, project summaries, share links, workspace invites, media assets/usages, and extension callback tokens. The common pattern is to include `workspace_id` in tenant-local uniqueness or keep globally unique token/object lookups in a control-domain table. +- Needs partition strategy rehearsal before distributed DDL: monthly event/activity tables and `collab_document_update_batches`, because existing partition-compatible primary keys are shaped around `created_at` or `document_id` rather than `workspace_id`. + +Foreign-key and join review findings: + +- Workspace-owned child rows should point through colocated `workspace_id` where possible. +- User identity joins can be served by treating `users` as a reference-table candidate during the first validation cluster, or by denormalizing display fields into read models if reference replication becomes undesirable. +- `platform_accounts` and grants remain control-domain data until account ownership becomes non-null workspace ownership. +- Cross-workspace write transactions and hot request-path cross-tenant joins are not allowed; admin/global statistics should use read models, CDC consumers, or offline warehouse queries. + Initial Citus validation DDL shape: ```sql diff --git a/script/db/citus_constraint_review.yml b/script/db/citus_constraint_review.yml new file mode 100644 index 00000000..9c0fe9fc --- /dev/null +++ b/script/db/citus_constraint_review.yml @@ -0,0 +1,143 @@ +version: 1 +distribution_key: workspace_id +review_scope: + models: + - backend/internal/models/models.go + - backend/internal/models/collab.go + - backend/internal/models/ai.go + - backend/internal/models/ai_quota.go + table_group_manifest: script/db/citus_table_groups.yml +unique_constraints: + reviewed_ready: + - table: workspaces + current_constraints: + - PRIMARY KEY (id) + reason: The root tenant table is distributed by id, so the primary key includes the distribution value. + - table: workspace_members + current_constraints: + - PRIMARY KEY (workspace_id, user_id) + reason: The member primary key already includes workspace_id. + - table: workspace_dashboard_stats + current_constraints: + - PRIMARY KEY (workspace_id) + reason: The read-model row is keyed by workspace_id. + - table: workspace_quota_aggregates + current_constraints: + - PRIMARY KEY (workspace_id) + reason: The quota aggregate row is keyed by workspace_id. + requires_refactor: + - table: projects + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (collab_document_id) + required_change: Use a Citus-safe tenant key such as (workspace_id, id), and move collab-document uniqueness to a workspace-scoped invariant. + - table: project_platform_publications + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (project_id, platform) + required_change: Include workspace_id in the primary and project/platform uniqueness strategy. + - table: project_list_summaries + current_constraints: + - PRIMARY KEY (project_id) + required_change: Include workspace_id in the summary key or make it dependent on a colocated projects key. + - table: project_share_links + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (token_hash) + required_change: Keep token uniqueness in a control-domain lookup table or make the token invariant workspace-scoped. + - table: workspace_invites + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (token_hash) + required_change: Keep invite token uniqueness in a control-domain lookup table or make the token invariant workspace-scoped. + - table: media_assets + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (object_key) + required_change: Include workspace routing in the metadata key or keep object-key uniqueness in object storage/control metadata. + - table: media_asset_usages + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (media_asset_id, resource_type, resource_id) + required_change: Include workspace_id in the asset/resource uniqueness strategy. + - table: extension_callback_tokens + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (token) + required_change: Keep token uniqueness in a control-domain lookup table or make the token invariant workspace-scoped. + - table: extension_execution_event_claims + current_constraints: + - PRIMARY KEY (event_id) + - UNIQUE (record_id) + required_change: Keep as control-domain idempotency state until claims carry workspace routing. +partitioned_tables: + require_workspace_partition_strategy: + - table: publish_events + current_primary_key: [id, created_at] + - table: extension_execution_events + current_primary_key: [id, created_at] + - table: project_activities + current_primary_key: [id, created_at] + - table: workspace_activities + current_primary_key: [id, created_at] + - table: collab_document_update_batches + current_primary_key: [id, document_id] + note: Existing document_id hash partitioning must be rehearsed with Citus and workspace-compatible uniqueness. +foreign_keys: + colocated_ready: + - child: workspace_members.workspace_id + parent: workspaces.id + - child: workspace_dashboard_stats.workspace_id + parent: workspaces.id + - child: workspace_quota_aggregates.workspace_id + parent: workspaces.id + reference_table_dependencies: + - parent_table: users + reason: Owner, actor, author, recipient, and creator references cross many tenant tables; first Citus validation should treat users as a reference-table candidate. + control_domain_boundaries: + - tables: [platform_accounts, platform_account_grants] + reason: Credential ownership remains user/platform scoped and platform_accounts.workspace_id is nullable. + - tables: [outbox_events] + reason: Dispatch state remains coordinator-owned; publish job payloads now carry workspace_id for worker routing. + deferred_until_workspace_key: + - project_collaborators + - collab_document_collaborators + - publish_attempts + - ai_drafting_messages + - ai_tool_calls + - ai_drafting_session_summaries + - ai_session_events +cross_tenant_joins: + shard_local: + - query_family: workspace project lists + rule: Filter by projects.workspace_id before joining project publications or summaries. + - query_family: project experience views + rule: Use workspace_id plus project_id when reading comments, versions, activity, share links, and media usage. + - query_family: publishing state transitions + rule: Carry workspace_id in publish jobs and scheduled-publication rows before loading project/publication state. + reference_or_control: + - query_family: user identity preloads + rule: Use users as a reference-table candidate, or denormalize display fields into read models if reference-table replication is rejected. + - query_family: platform account resolution + rule: Keep credential/account joins in the control domain until accounts become non-null workspace assets. + aggregate_offline: + - query_family: cross-workspace admin statistics + rule: Build read models, CDC consumers, or offline warehouse queries instead of making cross-tenant joins a hot request path. + prohibited_core_paths: + - Cross-workspace write transactions. + - Project/publication lookups by project_id alone in worker entry points. + - Distributed-table foreign keys that do not include compatible workspace routing. +worker_payloads: + tenant_routed: + - task_type: publish:project + payload_field: workspace_id + producers: + - backend/internal/services/publish/queue.go + - backend/internal/services/publish/schedule.go + replay_paths: + - backend/internal/services/publish/outbox.go + not_tenant_scoped: + - task_type: email:code + reason: Email verification and password reset jobs do not read or write tenant project-domain rows. + - task_type: readmodel:dashboard:rebuild + reason: Full dashboard rebuild is a global maintenance task with no single workspace owner. diff --git a/script/db/citus_table_groups.yml b/script/db/citus_table_groups.yml index 8b53d628..1e0edf87 100644 --- a/script/db/citus_table_groups.yml +++ b/script/db/citus_table_groups.yml @@ -10,177 +10,177 @@ physical_colocation_groups: - table: workspaces domain: tenant_core distribution_column: id - readiness: ready_after_constraint_review + readiness: reviewed_ready ddl: "create_distributed_table('workspaces', 'id')" notes: Root tenant table; the distributed value is the workspace UUID used by child workspace_id columns. - table: workspace_members domain: tenant_core distribution_column: workspace_id - readiness: ready_after_constraint_review + readiness: reviewed_ready ddl: "create_distributed_table('workspace_members', 'workspace_id', colocate_with => 'workspaces')" notes: Permission checks should remain shard-local with the workspace row. - table: workspace_invites domain: tenant_core distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('workspace_invites', 'workspace_id', colocate_with => 'workspaces')" - notes: Token uniqueness must be reviewed before distribution. + notes: Token uniqueness must be made Citus-safe before distribution. - table: workspace_activities domain: tenant_core distribution_column: workspace_id - readiness: target_after_partition_constraint_review + readiness: requires_partition_workspace_refactor ddl: "create_distributed_table('workspace_activities', 'workspace_id', colocate_with => 'workspaces')" - notes: Monthly partition primary key currently follows id and created_at. + notes: Monthly partition primary key currently follows id and created_at; Citus validation needs a workspace-compatible partition key strategy. - table: workspace_dashboard_stats domain: read_model distribution_column: workspace_id - readiness: ready_after_constraint_review + readiness: reviewed_ready ddl: "create_distributed_table('workspace_dashboard_stats', 'workspace_id', colocate_with => 'workspaces')" notes: Workspace-scoped aggregate read model. - table: notifications domain: tenant_core distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('notifications', 'workspace_id', colocate_with => 'workspaces')" - notes: User notification fan-out stays tenant-local; cross-workspace inboxes need read-model aggregation. + notes: User notification fan-out stays tenant-local; the id primary key needs workspace routing before distribution, and cross-workspace inboxes need read-model aggregation. - table: workspace_quota_aggregates domain: metering distribution_column: workspace_id - readiness: ready_after_constraint_review + readiness: reviewed_ready ddl: "create_distributed_table('workspace_quota_aggregates', 'workspace_id', colocate_with => 'workspaces')" notes: Workspace quota row is naturally keyed by workspace_id. - table: ai_usage_records domain: metering distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('ai_usage_records', 'workspace_id', colocate_with => 'workspaces')" - notes: Usage history is tenant-local and can aggregate into workspace_quota_aggregates. + notes: Usage history is tenant-local and can aggregate into workspace_quota_aggregates; the id primary key needs workspace routing before distribution. - table: projects domain: project distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('projects', 'workspace_id', colocate_with => 'workspaces')" - notes: Primary project row; project_id-only uniqueness must be reviewed before distribution. + notes: Primary project row; id primary key and collab-document uniqueness need workspace routing before distribution. - table: project_platform_publications domain: project distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('project_platform_publications', 'workspace_id', colocate_with => 'workspaces')" - notes: Draft/status rows should colocate with projects; project/platform uniqueness needs workspace_id. + notes: Draft/status rows should colocate with projects; id primary key and project/platform uniqueness need workspace_id. - table: project_list_summaries domain: read_model distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('project_list_summaries', 'workspace_id', colocate_with => 'workspaces')" - notes: Dashboard list read model follows the project workspace. + notes: Dashboard list read model follows the project workspace; the project_id primary key needs workspace routing before distribution. - table: project_activities domain: project distribution_column: workspace_id - readiness: target_after_partition_constraint_review + readiness: requires_partition_workspace_refactor ddl: "create_distributed_table('project_activities', 'workspace_id', colocate_with => 'workspaces')" - notes: Monthly partition primary key currently follows id and created_at. + notes: Monthly partition primary key currently follows id and created_at; Citus validation needs a workspace-compatible partition key strategy. - table: project_comments domain: project distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('project_comments', 'workspace_id', colocate_with => 'workspaces')" - notes: Comment reads and writes should remain local to the project workspace. + notes: Comment reads and writes should remain local to the project workspace; the id primary key needs workspace routing before distribution. - table: project_versions domain: project distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('project_versions', 'workspace_id', colocate_with => 'workspaces')" - notes: Version history should colocate with projects and collaboration state. + notes: Version history should colocate with projects and collaboration state; the id primary key needs workspace routing before distribution. - table: project_share_links domain: project distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('project_share_links', 'workspace_id', colocate_with => 'workspaces')" - notes: Token uniqueness must be reviewed before distribution. + notes: Token uniqueness must be made Citus-safe before distribution. - table: scheduled_publications domain: publishing distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('scheduled_publications', 'workspace_id', colocate_with => 'workspaces')" - notes: Scheduled publishing workers should route by workspace_id. + notes: Scheduled publishing workers route by workspace_id; the id primary key needs workspace routing before distribution. - table: publish_events domain: publishing distribution_column: workspace_id - readiness: target_after_partition_constraint_review + readiness: requires_partition_workspace_refactor ddl: "create_distributed_table('publish_events', 'workspace_id', colocate_with => 'workspaces')" - notes: Monthly partition primary key currently follows id and created_at. + notes: Monthly partition primary key currently follows id and created_at; Citus validation needs a workspace-compatible partition key strategy. - table: extension_callback_tokens domain: extension distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('extension_callback_tokens', 'workspace_id', colocate_with => 'workspaces')" - notes: Token uniqueness must be reviewed before distribution. + notes: Token uniqueness must be made Citus-safe before distribution. - table: extension_execution_events domain: extension distribution_column: workspace_id - readiness: target_after_partition_constraint_review + readiness: requires_partition_workspace_refactor ddl: "create_distributed_table('extension_execution_events', 'workspace_id', colocate_with => 'workspaces')" - notes: Monthly partition primary key currently follows id and created_at. + notes: Monthly partition primary key currently follows id and created_at; Citus validation needs a workspace-compatible partition key strategy. - table: media_assets domain: media distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('media_assets', 'workspace_id', colocate_with => 'workspaces')" - notes: Object key uniqueness must be reviewed; object bytes stay in R2/S3. + notes: Object key uniqueness must be made Citus-safe; object bytes stay in R2/S3. - table: media_asset_usages domain: media distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('media_asset_usages', 'workspace_id', colocate_with => 'workspaces')" notes: Asset/resource uniqueness must include workspace routing before distribution. - table: brand_profiles domain: content_setup distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('brand_profiles', 'workspace_id', colocate_with => 'workspaces')" - notes: Workspace-owned brand profiles should colocate with projects and AI context. + notes: Workspace-owned brand profiles should colocate with projects and AI context; the id primary key needs workspace routing before distribution. - table: collab_documents domain: collaboration distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('collab_documents', 'workspace_id', colocate_with => 'workspaces')" - notes: Collaboration document metadata follows the workspace tenant. + notes: Collaboration document metadata follows the workspace tenant; the id primary key needs workspace routing before distribution. - table: collab_document_states domain: collaboration distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('collab_document_states', 'workspace_id', colocate_with => 'workspaces')" - notes: State snapshots should colocate with their document metadata. + notes: State snapshots should colocate with their document metadata; the document_id primary key needs workspace routing before distribution. - table: collab_document_update_batches domain: collaboration distribution_column: workspace_id - readiness: target_after_partition_constraint_review + readiness: requires_partition_workspace_refactor ddl: "create_distributed_table('collab_document_update_batches', 'workspace_id', colocate_with => 'workspaces')" - notes: Existing document_id hash partitioning must be rehearsed with Citus partitioned-table support. + notes: Existing document_id hash partitioning must be rehearsed with Citus partitioned-table support and workspace-compatible uniqueness. - table: ai_context_snapshots domain: ai distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('ai_context_snapshots', 'workspace_id', colocate_with => 'workspaces')" - notes: Context snapshots follow the owning project workspace. + notes: Context snapshots follow the owning project workspace; the id primary key needs workspace routing before distribution. - table: ai_growth_optimization_runs domain: ai distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('ai_growth_optimization_runs', 'workspace_id', colocate_with => 'workspaces')" - notes: Growth runs should colocate with snapshots and proposals. + notes: Growth runs should colocate with snapshots and proposals; the id primary key needs workspace routing before distribution. - table: ai_proposals domain: ai distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('ai_proposals', 'workspace_id', colocate_with => 'workspaces')" - notes: Proposal decisions should stay shard-local with the project. + notes: Proposal decisions should stay shard-local with the project; the id primary key needs workspace routing before distribution. - table: ai_drafting_sessions domain: ai distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('ai_drafting_sessions', 'workspace_id', colocate_with => 'workspaces')" - notes: Drafting sessions are the parent route for AI chat child rows. + notes: Drafting sessions are the parent route for AI chat child rows; the id primary key needs workspace routing before distribution. reference_tables: - table: users readiness: reference_candidate_for_validation @@ -198,7 +198,7 @@ control_domain_tables: - table: remote_browser_sessions reason: Short-lived runtime state is governed by Redis and monthly archive paths before Citus. - table: outbox_events - reason: Queue dispatch remains coordinator/control-domain state until worker payloads carry workspace_id. + reason: Queue dispatch remains coordinator/control-domain state; publish worker payloads now carry workspace_id, but outbox_events has no relational routing column yet. - table: extension_execution_event_claims reason: Global idempotency claim table does not have workspace routing today. deferred_colocation_tables: From 721e86a08f0ca01307f8c978c6932066ca43b65f Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 3 Jul 2026 19:59:32 +0800 Subject: [PATCH 4/5] test(database): validate Citus constraint review The constraint review should stay machine-checkable as Phase 5 routing evidence evolves. Add a Minitest guard for required unique, partition, foreign-key, join, and worker-payload review sections. Also ensure table-group readiness terms no longer defer unresolved review work behind generic labels. --- script/db/test_citus_constraint_review.rb | 101 ++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 script/db/test_citus_constraint_review.rb diff --git a/script/db/test_citus_constraint_review.rb b/script/db/test_citus_constraint_review.rb new file mode 100644 index 00000000..367e8e99 --- /dev/null +++ b/script/db/test_citus_constraint_review.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true + +require "minitest/autorun" +require "yaml" + +class CitusConstraintReviewTest < Minitest::Test + REVIEW_PATH = File.expand_path("citus_constraint_review.yml", __dir__) + GROUPS_PATH = File.expand_path("citus_table_groups.yml", __dir__) + + REQUIRED_UNIQUE_REFACTOR_TABLES = %w[ + projects + project_platform_publications + project_list_summaries + project_share_links + workspace_invites + media_assets + media_asset_usages + extension_callback_tokens + extension_execution_event_claims + ].freeze + + REQUIRED_PARTITION_TABLES = %w[ + publish_events + extension_execution_events + project_activities + workspace_activities + collab_document_update_batches + ].freeze + + REQUIRED_WORKER_TASKS = %w[ + publish:project + ].freeze + + def setup + @review = YAML.safe_load_file(REVIEW_PATH, aliases: false) + @groups = YAML.safe_load_file(GROUPS_PATH, aliases: false) + end + + def test_review_declares_distribution_scope + assert_equal 1, @review.fetch("version") + assert_equal "workspace_id", @review.fetch("distribution_key") + refute_empty @review.fetch("review_scope").fetch("models") + assert_equal "script/db/citus_table_groups.yml", @review.fetch("review_scope").fetch("table_group_manifest") + end + + def test_unique_constraint_review_names_required_refactors + reviewed = table_names(@review.fetch("unique_constraints").fetch("requires_refactor")) + + missing = REQUIRED_UNIQUE_REFACTOR_TABLES - reviewed + + assert_empty missing, "missing unique constraint review entries: #{missing.join(", ")}" + end + + def test_partition_review_names_workspace_strategy_gaps + reviewed = table_names(@review.fetch("partitioned_tables").fetch("require_workspace_partition_strategy")) + + missing = REQUIRED_PARTITION_TABLES - reviewed + + assert_empty missing, "missing partition review entries: #{missing.join(", ")}" + end + + def test_foreign_key_and_join_review_has_actionable_decisions + foreign_keys = @review.fetch("foreign_keys") + cross_tenant_joins = @review.fetch("cross_tenant_joins") + + refute_empty foreign_keys.fetch("colocated_ready") + refute_empty foreign_keys.fetch("reference_table_dependencies") + refute_empty foreign_keys.fetch("control_domain_boundaries") + refute_empty foreign_keys.fetch("deferred_until_workspace_key") + refute_empty cross_tenant_joins.fetch("shard_local") + refute_empty cross_tenant_joins.fetch("reference_or_control") + refute_empty cross_tenant_joins.fetch("aggregate_offline") + refute_empty cross_tenant_joins.fetch("prohibited_core_paths") + end + + def test_worker_payload_review_names_tenant_routed_tasks + reviewed_tasks = @review.fetch("worker_payloads").fetch("tenant_routed").map { |entry| entry.fetch("task_type") } + + missing = REQUIRED_WORKER_TASKS - reviewed_tasks + + assert_empty missing, "missing worker payload routing entries: #{missing.join(", ")}" + end + + def test_table_group_manifest_uses_reviewed_readiness_terms + colocated_tables.each do |entry| + readiness = entry.fetch("readiness") + + refute_match(/after_.*review/, readiness, entry.fetch("table")) + end + end + + private + + def table_names(entries) + entries.map { |entry| entry.fetch("table") } + end + + def colocated_tables + @groups.fetch("physical_colocation_groups").fetch("tenant_workspace").fetch("tables") + end +end From 2d692c8e82ac90dccff25cbf9c025c60e28d856a Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Fri, 3 Jul 2026 20:39:57 +0800 Subject: [PATCH 5/5] refactor(publish): remove unused publish workspace wrappers Workspace-aware publish helpers replaced the project-only private wrappers during worker routing. Remove the unused compatibility methods that now only forward to workspace-scoped implementations. This keeps backend lint clean without changing publish behavior. --- backend/internal/services/publish/service.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/backend/internal/services/publish/service.go b/backend/internal/services/publish/service.go index 51b64613..11d26bd0 100644 --- a/backend/internal/services/publish/service.go +++ b/backend/internal/services/publish/service.go @@ -491,10 +491,6 @@ func (s *Service) recordPublishEvent(event models.PublishEvent) error { return s.writerDB(s.requestContext()).Create(&event).Error } -func (s *Service) recordProjectPublishActivity(projectID uuid.UUID, userID uuid.UUID, eventType string, metadata map[string]any) error { - return s.recordProjectPublishActivityForWorkspace(uuid.Nil, projectID, userID, eventType, metadata) -} - func (s *Service) recordProjectPublishActivityForWorkspace(workspaceID uuid.UUID, projectID uuid.UUID, userID uuid.UUID, eventType string, metadata map[string]any) error { if projectID == uuid.Nil || userID == uuid.Nil || strings.TrimSpace(eventType) == "" { return nil @@ -526,10 +522,6 @@ func (s *Service) recordProjectPublishActivityForWorkspace(workspaceID uuid.UUID }).Error } -func (s *Service) findIdempotentPublishResponse(projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { - return s.findIdempotentPublishResponseForWorkspace(uuid.Nil, projectID, platform, userID, key) -} - func (s *Service) findIdempotentPublishResponseForWorkspace(workspaceID uuid.UUID, projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { if strings.TrimSpace(key) == "" { return PublishResponse{}, false, nil @@ -607,10 +599,6 @@ func publishEventReplayRank(eventType string) int { } } -func (s *Service) waitForIdempotentPublishResponse(ctx context.Context, projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { - return s.waitForIdempotentPublishResponseForWorkspace(ctx, uuid.Nil, projectID, platform, userID, key) -} - func (s *Service) waitForIdempotentPublishResponseForWorkspace(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { deadline := time.NewTimer(publishReplayWait) defer deadline.Stop()