diff --git a/backend/internal/services/publish/lifecycle.go b/backend/internal/services/publish/lifecycle.go index d368a8b7..8f4b1c62 100644 --- a/backend/internal/services/publish/lifecycle.go +++ b/backend/internal/services/publish/lifecycle.go @@ -39,8 +39,8 @@ func (s *Service) lifecycle() PublicationLifecycle { } func (l PublicationLifecycle) MarkQueued(pub *models.ProjectPlatformPublication, queuedAt time.Time) error { - result := l.db.Model(&models.ProjectPlatformPublication{}). - Where("id = ? AND status = ?", pub.ID, pub.Status). + result := l.publicationQuery(pub). + Where("status = ?", pub.Status). Updates(map[string]any{ "status": models.PublicationStatusQueued, "error_message": "", @@ -58,8 +58,8 @@ func (l PublicationLifecycle) MarkQueued(pub *models.ProjectPlatformPublication, } func (l PublicationLifecycle) MarkPublishing(pub *models.ProjectPlatformPublication, startedAt time.Time) error { - result := l.db.Model(&models.ProjectPlatformPublication{}). - Where("id = ? AND status = ?", pub.ID, pub.Status). + result := l.publicationQuery(pub). + Where("status = ?", pub.Status). Updates(map[string]any{ "status": models.PublicationStatusPublishing, "error_message": "", @@ -77,12 +77,18 @@ func (l PublicationLifecycle) MarkPublishing(pub *models.ProjectPlatformPublicat } func (l PublicationLifecycle) MarkFailed(ctx context.Context, projectID uuid.UUID, platform string, message string) error { + return l.MarkFailedForWorkspace(ctx, uuid.Nil, projectID, platform, message) +} + +func (l PublicationLifecycle) MarkFailedForWorkspace(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID, platform string, message string) error { db := l.db if ctx != nil { db = db.WithContext(ctx) } - return db.Model(&models.ProjectPlatformPublication{}). + query := db.Model(&models.ProjectPlatformPublication{}). Where("project_id = ? AND platform = ?", projectID, platform). + Scopes(workspaceScope(workspaceID)) + return query. Updates(map[string]any{ "status": models.PublicationStatusFailed, "error_message": SanitizeUserFacingErrorMessage(message), @@ -103,8 +109,12 @@ func (l PublicationLifecycle) CompletePublication(pub *models.ProjectPlatformPub } else { updates["retry_count"] = gorm.Expr("retry_count + ?", 1) } - if err := l.db.Model(pub).Updates(updates).Error; err != nil { - return err + result := l.publicationQuery(pub).Updates(updates) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + return l.publicationStateChangedError(pub.ID) } pub.Status = completion.Status pub.RemoteID = completion.RemoteID @@ -113,7 +123,15 @@ func (l PublicationLifecycle) CompletePublication(pub *models.ProjectPlatformPub return nil } -func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedAt time.Time) (models.PublishAttempt, bool, error) { +func (l PublicationLifecycle) publicationQuery(pub *models.ProjectPlatformPublication) *gorm.DB { + query := l.db.Model(&models.ProjectPlatformPublication{}).Where("id = ?", pub.ID) + if pub.WorkspaceID != uuid.Nil { + query = query.Where("workspace_id = ?", pub.WorkspaceID) + } + return query +} + +func (l PublicationLifecycle) StartPublishAttemptForWorkspace(workspaceID uuid.UUID, scheduleID uuid.UUID, startedAt time.Time) (models.PublishAttempt, bool, error) { if scheduleID == uuid.Nil { return models.PublishAttempt{}, false, nil } @@ -122,22 +140,23 @@ func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedA } var attempt models.PublishAttempt if err := l.db.Transaction(func(tx *gorm.DB) error { - result := tx.Model(&models.ScheduledPublication{}). + scheduleQuery := tx.Model(&models.ScheduledPublication{}). Where("id = ? AND status IN ?", scheduleID, []string{ models.ScheduledPublicationStatusScheduled, models.ScheduledPublicationStatusFailed, models.ScheduledPublicationStatusNeedsManualAction, }). - Updates(map[string]any{ - "status": models.ScheduledPublicationStatusRunning, - "last_error": "", - }) + Scopes(workspaceScope(workspaceID)) + result := scheduleQuery.Updates(map[string]any{ + "status": models.ScheduledPublicationStatusRunning, + "last_error": "", + }) if result.Error != nil { return result.Error } if result.RowsAffected == 0 { var schedule models.ScheduledPublication - if err := tx.Select("status").First(&schedule, "id = ?", scheduleID).Error; err != nil { + if err := tx.Select("status").Scopes(workspaceScope(workspaceID)).First(&schedule, "id = ?", scheduleID).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return errScheduledPublicationMissing } @@ -173,7 +192,15 @@ func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedA return attempt, true, nil } +func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedAt time.Time) (models.PublishAttempt, bool, error) { + return l.StartPublishAttemptForWorkspace(uuid.Nil, scheduleID, startedAt) +} + func (l PublicationLifecycle) FinishPublishAttempt(attempt *models.PublishAttempt, completion publishAttemptCompletion) error { + return l.FinishPublishAttemptForWorkspace(uuid.Nil, attempt, completion) +} + +func (l PublicationLifecycle) FinishPublishAttemptForWorkspace(workspaceID uuid.UUID, attempt *models.PublishAttempt, completion publishAttemptCompletion) error { if attempt == nil { return nil } @@ -199,6 +226,7 @@ func (l PublicationLifecycle) FinishPublishAttempt(attempt *models.PublishAttemp } return tx.Model(&models.ScheduledPublication{}). Where("id = ?", attempt.ScheduledPublicationID). + Scopes(workspaceScope(workspaceID)). Updates(map[string]any{ "status": scheduleStatus, "last_error": completion.ErrorMessage, @@ -207,6 +235,10 @@ func (l PublicationLifecycle) FinishPublishAttempt(attempt *models.PublishAttemp } func (l PublicationLifecycle) MarkPrepublishSyncing(projectID uuid.UUID, platforms []string) error { + return l.MarkPrepublishSyncingForWorkspace(uuid.Nil, projectID, platforms) +} + +func (l PublicationLifecycle) MarkPrepublishSyncingForWorkspace(workspaceID uuid.UUID, projectID uuid.UUID, platforms []string) error { if len(platforms) == 0 { return nil } @@ -214,6 +246,7 @@ func (l PublicationLifecycle) MarkPrepublishSyncing(projectID uuid.UUID, platfor var activeCount int64 if err := tx.Model(&models.ProjectPlatformPublication{}). Where("project_id = ? AND platform IN ? AND status IN ?", projectID, platforms, activePublicationStatuses()). + Scopes(workspaceScope(workspaceID)). Count(&activeCount).Error; err != nil { return err } @@ -223,6 +256,7 @@ func (l PublicationLifecycle) MarkPrepublishSyncing(projectID uuid.UUID, platfor if err := tx.Model(&models.ProjectPlatformPublication{}). Where("project_id = ? AND platform IN ? AND status NOT IN ?", projectID, platforms, activePublicationStatuses()). + Scopes(workspaceScope(workspaceID)). Updates(map[string]any{ "draft_status": models.PublicationDraftStatusSyncing, "error_message": "", @@ -233,6 +267,7 @@ func (l PublicationLifecycle) MarkPrepublishSyncing(projectID uuid.UUID, platfor if err := tx.Model(&models.ProjectPlatformPublication{}). Where("project_id = ? AND platform IN ? AND status IN ?", projectID, platforms, activePublicationStatuses()). + Scopes(workspaceScope(workspaceID)). Count(&activeCount).Error; err != nil { return err } @@ -251,6 +286,15 @@ func activePublicationStatuses() []string { } } +func workspaceScope(workspaceID uuid.UUID) func(*gorm.DB) *gorm.DB { + return func(db *gorm.DB) *gorm.DB { + if workspaceID == uuid.Nil { + return db + } + return db.Where("workspace_id = ?", workspaceID) + } +} + func (l PublicationLifecycle) publicationStateChangedError(publicationID uuid.UUID) error { var pub models.ProjectPlatformPublication if err := l.db.Select("status", "last_attempt_at").First(&pub, "id = ?", publicationID).Error; err != nil { diff --git a/backend/internal/services/publish/outbox.go b/backend/internal/services/publish/outbox.go index f43c1506..2cad89ed 100644 --- a/backend/internal/services/publish/outbox.go +++ b/backend/internal/services/publish/outbox.go @@ -151,7 +151,10 @@ func (s *Service) dispatchClaimedOutboxEvent(ctx context.Context, event models.O if err := json.Unmarshal(event.Payload, &job); err != nil { return fmt.Errorf("decode publish outbox payload: %w", err) } - if job.JobID == uuid.Nil || job.ProjectID == uuid.Nil || job.UserID == uuid.Nil || job.Platform == "" { + if err := s.ensurePublishJobWorkspaceID(ctx, &job); err != nil { + return fmt.Errorf("resolve publish outbox workspace for event %s: %w", event.ID, err) + } + if job.JobID == uuid.Nil || job.ProjectID == uuid.Nil || job.WorkspaceID == uuid.Nil || job.UserID == uuid.Nil || job.Platform == "" { return fmt.Errorf("invalid publish outbox payload for event %s", event.ID) } return s.queue.Enqueue(ctx, job) diff --git a/backend/internal/services/publish/queue.go b/backend/internal/services/publish/queue.go index f0d24770..0bd44ed4 100644 --- a/backend/internal/services/publish/queue.go +++ b/backend/internal/services/publish/queue.go @@ -41,12 +41,14 @@ var ( ErrPublicationAlreadyPublishing = errors.New("publication is already publishing") ErrPublishQueueEmpty = errors.New("publish queue empty") errPublishLockOwnershipLost = errors.New("publish lock ownership lost") + errPublishJobWorkspaceMismatch = errors.New("publish job workspace does not match project") publishLockRefreshInterval = publishLockRefreshEvery ) type PublishJob struct { JobID uuid.UUID `json:"job_id"` ProjectID uuid.UUID `json:"project_id"` + WorkspaceID uuid.UUID `json:"workspace_id"` UserID uuid.UUID `json:"user_id"` Platform string `json:"platform"` PublicationID uuid.UUID `json:"publication_id,omitempty"` @@ -197,18 +199,22 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID return PublishResponse{}, ErrForbidden } req.IdempotencyKey = normalizeIdempotencyKey(req.IdempotencyKey) + project, projectErr := s.projectForPublish(ctx, projectID, *scopeUserID) + if projectErr != nil { + return PublishResponse{}, projectErr + } + workspaceID := models.ProjectWorkspaceID(project) if req.IdempotencyKey != "" { - if resp, ok, err := s.findIdempotentPublishResponse(projectID, platform, *scopeUserID, req.IdempotencyKey); err != nil { + if resp, ok, err := s.findIdempotentPublishResponseForWorkspace(workspaceID, project.ID, platform, *scopeUserID, req.IdempotencyKey); err != nil { return PublishResponse{}, err } else if ok { return resp, nil } } - - project, pub, err := s.preparePublishJob(ctx, projectID, platform, *scopeUserID) + project, pub, err := s.preparePublishJobForProject(ctx, project, platform, *scopeUserID) if err != nil { if req.IdempotencyKey != "" { - if resp, ok, lookupErr := s.findIdempotentPublishResponse(projectID, platform, *scopeUserID, req.IdempotencyKey); lookupErr != nil { + if resp, ok, lookupErr := s.findIdempotentPublishResponseForWorkspace(workspaceID, project.ID, platform, *scopeUserID, req.IdempotencyKey); lookupErr != nil { return PublishResponse{}, lookupErr } else if ok { return resp, nil @@ -223,7 +229,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID if err != nil { return PublishResponse{}, err } - resp, err := s.PublishProjectWithContext(ctx, projectID, platform, scopeUserID, schedule.ID) + resp, err := s.PublishProjectInWorkspaceWithContext(ctx, workspaceID, project.ID, platform, scopeUserID, schedule.ID) if err != nil { return PublishResponse{}, err } @@ -237,6 +243,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID job := PublishJob{ JobID: uuid.New(), ProjectID: project.ID, + WorkspaceID: workspaceID, UserID: *scopeUserID, Platform: platform, PublicationID: pub.ID, @@ -254,7 +261,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID } if !acquired { if req.IdempotencyKey != "" { - if resp, ok, err := s.waitForIdempotentPublishResponse(ctx, project.ID, platform, *scopeUserID, req.IdempotencyKey); err != nil { + if resp, ok, err := s.waitForIdempotentPublishResponseForWorkspace(ctx, workspaceID, project.ID, platform, *scopeUserID, req.IdempotencyKey); err != nil { return PublishResponse{}, err } else if ok { return resp, nil @@ -277,6 +284,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID job.ScheduleID = schedule.ID if err := txService.recordPublishEvent(models.PublishEvent{ PublicationID: pub.ID, + WorkspaceID: job.WorkspaceID, ProjectID: project.ID, UserID: *scopeUserID, Platform: platform, @@ -287,7 +295,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID }); err != nil { return err } - if err := txService.recordProjectPublishActivity(project.ID, *scopeUserID, models.ProjectActivityPublishRequested, map[string]any{ + if err := txService.recordProjectPublishActivityForWorkspace(job.WorkspaceID, project.ID, *scopeUserID, models.ProjectActivityPublishRequested, map[string]any{ "platform": platform, "job_id": job.JobID.String(), }); err != nil { @@ -298,6 +306,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID } if err := txService.recordPublishEvent(models.PublishEvent{ PublicationID: pub.ID, + WorkspaceID: job.WorkspaceID, ProjectID: project.ID, UserID: *scopeUserID, Platform: platform, @@ -308,7 +317,7 @@ func (s *Service) EnqueuePublishProject(ctx context.Context, projectID uuid.UUID }); err != nil { return err } - if err := txService.recordProjectPublishActivity(project.ID, *scopeUserID, models.ProjectActivityPublishQueued, map[string]any{ + if err := txService.recordProjectPublishActivityForWorkspace(job.WorkspaceID, project.ID, *scopeUserID, models.ProjectActivityPublishQueued, map[string]any{ "platform": platform, "job_id": job.JobID.String(), }); err != nil { @@ -397,7 +406,18 @@ func (s *Service) StartPublishWorkerWithErrors(ctx context.Context) <-chan error } func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { - if job.JobID == uuid.Nil || job.ProjectID == uuid.Nil || job.UserID == uuid.Nil || strings.TrimSpace(job.Platform) == "" { + if err := s.ensurePublishJobWorkspaceID(ctx, &job); err != nil { + if errors.Is(err, errPublishJobWorkspaceMismatch) { + log.Printf("discarding publish job %s because workspace does not match project", job.JobID) + if coordinationQueue := s.coordinationQueueOrDefault(); coordinationQueue != nil && job.JobID != uuid.Nil && job.ProjectID != uuid.Nil && strings.TrimSpace(job.Platform) != "" { + _ = coordinationQueue.ReleaseLock(context.Background(), publishLockKey(job.ProjectID, job.Platform), job.JobID.String()) + } + return nil + } + log.Printf("failed to resolve workspace for publish job %s: %v", job.JobID, err) + return err + } + if job.JobID == uuid.Nil || job.ProjectID == uuid.Nil || job.WorkspaceID == uuid.Nil || job.UserID == uuid.Nil || strings.TrimSpace(job.Platform) == "" { log.Printf("discarding invalid publish job: %+v", job) return nil } @@ -431,6 +451,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { if err := s.recordPublishEvent(models.PublishEvent{ PublicationID: job.PublicationID, + WorkspaceID: job.WorkspaceID, ProjectID: job.ProjectID, UserID: job.UserID, Platform: job.Platform, @@ -442,7 +463,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { log.Printf("failed to record publish job %s start event: %v", job.JobID, err) } - resp, err := s.PublishProjectWithContext(publishCtx, job.ProjectID, job.Platform, &job.UserID, job.ScheduleID) + resp, err := s.PublishProjectInWorkspaceWithContext(publishCtx, job.WorkspaceID, job.ProjectID, job.Platform, &job.UserID, job.ScheduleID) if err != nil { if errors.Is(context.Cause(publishCtx), errPublishLockOwnershipLost) { log.Printf("publish job %s stopped because lock ownership was lost", job.JobID) @@ -451,7 +472,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { log.Printf("publish job %s failed: %v", job.JobID, err) observeJob(publishJobResultError) cleanupCtx, cancelCleanup := publishCleanupContext(ctx) - if markErr := s.lifecycle().MarkFailed(cleanupCtx, job.ProjectID, job.Platform, err.Error()); markErr != nil { + if markErr := s.lifecycle().MarkFailedForWorkspace(cleanupCtx, job.WorkspaceID, job.ProjectID, job.Platform, err.Error()); markErr != nil { log.Printf("failed to mark publish job %s as failed: %v", job.JobID, markErr) } else { s.invalidateDashboardCaches(cleanupCtx) @@ -460,6 +481,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { cancelCleanup() _ = s.recordPublishEvent(models.PublishEvent{ PublicationID: job.PublicationID, + WorkspaceID: job.WorkspaceID, ProjectID: job.ProjectID, UserID: job.UserID, Platform: job.Platform, @@ -480,6 +502,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { observeJob(publishJobResultError) _ = s.recordPublishEvent(models.PublishEvent{ PublicationID: job.PublicationID, + WorkspaceID: job.WorkspaceID, ProjectID: job.ProjectID, UserID: job.UserID, Platform: job.Platform, @@ -496,6 +519,7 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { } _ = s.recordPublishEvent(models.PublishEvent{ PublicationID: job.PublicationID, + WorkspaceID: job.WorkspaceID, ProjectID: job.ProjectID, UserID: job.UserID, Platform: job.Platform, @@ -514,6 +538,40 @@ func (s *Service) processPublishJob(ctx context.Context, job PublishJob) error { return nil } +func (s *Service) ensurePublishJobWorkspaceID(ctx context.Context, job *PublishJob) error { + if job == nil { + return nil + } + if job.ProjectID == uuid.Nil { + if job.UserID != uuid.Nil { + job.WorkspaceID = models.PersonalWorkspaceID(job.UserID) + } + return nil + } + + queryCtx := ctx + if queryCtx == nil || queryCtx.Err() != nil { + queryCtx = s.requestContext() + } + var project models.Project + err := s.writerDB(queryCtx).Select("id", "user_id", "workspace_id").First(&project, "id = ?", job.ProjectID).Error + if err == nil { + projectWorkspaceID := models.ProjectWorkspaceID(project) + if job.WorkspaceID != uuid.Nil && job.WorkspaceID != projectWorkspaceID { + return errPublishJobWorkspaceMismatch + } + job.WorkspaceID = projectWorkspaceID + return nil + } + if errors.Is(err, gorm.ErrRecordNotFound) { + if job.UserID != uuid.Nil { + job.WorkspaceID = models.PersonalWorkspaceID(job.UserID) + } + return nil + } + return err +} + func (s *Service) ensurePublishJobLock(ctx context.Context, job PublishJob, lockKey string) (bool, error) { coordinationQueue := s.coordinationQueueOrDefault() if coordinationQueue == nil { @@ -530,7 +588,7 @@ func (s *Service) ensurePublishJobLock(ctx context.Context, job PublishJob, lock return false, nil } - retriable, err := s.publicationRetriableForJob(job.ProjectID, job.Platform) + retriable, err := s.publicationRetriableForJob(job.WorkspaceID, job.ProjectID, job.Platform) if err != nil { return false, err } @@ -585,9 +643,13 @@ func (s *Service) preparePublishJob(ctx context.Context, projectID uuid.UUID, pl if err != nil { return models.Project{}, models.ProjectPlatformPublication{}, ErrForbidden } + return s.preparePublishJobForProject(ctx, project, platform, userID) +} +func (s *Service) preparePublishJobForProject(ctx context.Context, project models.Project, platform string, userID uuid.UUID) (models.Project, models.ProjectPlatformPublication, error) { var pub models.ProjectPlatformPublication - if err := s.strongReadDB(ctx).Where("project_id = ? AND platform = ?", projectID, platform).First(&pub).Error; err != nil { + workspaceID := models.ProjectWorkspaceID(project) + if err := s.strongReadDB(ctx).Where("workspace_id = ? AND project_id = ? AND platform = ?", workspaceID, project.ID, platform).First(&pub).Error; err != nil { return models.Project{}, models.ProjectPlatformPublication{}, fmt.Errorf("publication record not found for platform: %s", platform) } if !pub.Enabled || pub.Status == models.PublicationStatusCancelled { @@ -614,9 +676,9 @@ func (s *Service) preparePublishJob(ctx context.Context, projectID uuid.UUID, pl return project, pub, nil } -func (s *Service) publicationRetriableForJob(projectID uuid.UUID, platform string) (bool, error) { +func (s *Service) publicationRetriableForJob(workspaceID uuid.UUID, projectID uuid.UUID, platform string) (bool, error) { var pub models.ProjectPlatformPublication - if err := s.writerDB(s.requestContext()).Select("enabled", "status").Where("project_id = ? AND platform = ?", projectID, platform).First(&pub).Error; err != nil { + if err := s.writerDB(s.requestContext()).Select("enabled", "status").Where("workspace_id = ? AND project_id = ? AND platform = ?", workspaceID, projectID, platform).First(&pub).Error; err != nil { return false, err } if !pub.Enabled || pub.Status == models.PublicationStatusCancelled { diff --git a/backend/internal/services/publish/queue_test.go b/backend/internal/services/publish/queue_test.go index 92974be6..10f628ac 100644 --- a/backend/internal/services/publish/queue_test.go +++ b/backend/internal/services/publish/queue_test.go @@ -407,6 +407,7 @@ func TestEnqueuePublishProjectQueuesAndLocksPublication(t *testing.T) { require.NoError(t, err) require.Equal(t, models.PublicationStatusQueued, resp.Status) require.Len(t, queue.jobs, 1) + require.Equal(t, models.PersonalWorkspaceID(user.ID), queue.jobs[0].WorkspaceID) require.Equal(t, uuid.Nil, queue.jobs[0].BrowserSessionID) lockKey := publishLockKey(project.ID, "wechat") @@ -419,6 +420,9 @@ func TestEnqueuePublishProjectQueuesAndLocksPublication(t *testing.T) { require.Equal(t, models.OutboxStatusDispatched, outbox.Status) require.Equal(t, 1, outbox.Attempts) require.NotNil(t, outbox.ProcessedAt) + var outboxJob PublishJob + require.NoError(t, json.Unmarshal(outbox.Payload, &outboxJob)) + require.Equal(t, models.PersonalWorkspaceID(user.ID), outboxJob.WorkspaceID) var saved models.ProjectPlatformPublication require.NoError(t, db.First(&saved, "project_id = ? AND platform = ?", project.ID, "wechat").Error) @@ -844,6 +848,7 @@ func TestEnqueuePublishProjectLeavesFailedDispatchInOutboxForRetry(t *testing.T) require.NoError(t, service.FlushPublishOutbox(context.Background(), 10)) require.Len(t, queue.jobs, 1) require.Equal(t, resp.JobID, queue.jobs[0].JobID.String()) + require.Equal(t, models.PersonalWorkspaceID(user.ID), queue.jobs[0].WorkspaceID) require.NoError(t, db.First(&outbox, "id = ?", outbox.ID).Error) require.Equal(t, models.OutboxStatusDispatched, outbox.Status) @@ -857,14 +862,36 @@ func TestFlushPublishOutboxRetriesStaleProcessingEvent(t *testing.T) { queue := newTestPublishQueue() service.queue = queue + user := models.User{Username: "legacy-outbox-owner"} + require.NoError(t, db.Create(&user).Error) + project := models.Project{ + UserID: user.ID, + Title: "Legacy queued post", + SourceContent: "
ready
", + Status: models.ProjectStatusReady, + } + require.NoError(t, db.Create(&project).Error) + job := PublishJob{ JobID: uuid.New(), - ProjectID: uuid.New(), - UserID: uuid.New(), + ProjectID: project.ID, + UserID: user.ID, Platform: "wechat", EnqueuedAt: time.Now().UTC(), } - payload, err := json.Marshal(job) + payload, err := json.Marshal(struct { + JobID uuid.UUID `json:"job_id"` + ProjectID uuid.UUID `json:"project_id"` + UserID uuid.UUID `json:"user_id"` + Platform string `json:"platform"` + EnqueuedAt time.Time `json:"enqueued_at"` + }{ + JobID: job.JobID, + ProjectID: job.ProjectID, + UserID: job.UserID, + Platform: job.Platform, + EnqueuedAt: job.EnqueuedAt, + }) require.NoError(t, err) staleUpdatedAt := time.Now().UTC().Add(-publishOutboxClaimTimeout - time.Second) outbox := models.OutboxEvent{ @@ -882,6 +909,7 @@ func TestFlushPublishOutboxRetriesStaleProcessingEvent(t *testing.T) { require.NoError(t, service.FlushPublishOutbox(context.Background(), 10)) require.Len(t, queue.jobs, 1) + job.WorkspaceID = models.PersonalWorkspaceID(user.ID) require.Equal(t, job, queue.jobs[0]) require.NoError(t, db.First(&outbox, "id = ?", outbox.ID).Error) require.Equal(t, models.OutboxStatusDispatched, outbox.Status) @@ -1166,6 +1194,58 @@ func TestProcessPublishJobPublishesAndReleasesLock(t *testing.T) { }, observer.observations) } +func TestProcessPublishJobRejectsWorkspaceMismatch(t *testing.T) { + db := setupPublishQueueTestDB(t) + service := newPublishTestService(db) + queue := newTestPublishQueue() + service.queue = queue + + publisher.Factory.Register("wechat", queueTestPublisher{}) + defer publisher.Factory.Register("wechat", &publisher.WechatPublisher{}) + + user := models.User{Username: "owner"} + require.NoError(t, db.Create(&user).Error) + project := models.Project{ + UserID: user.ID, + Title: "Queued post", + SourceContent: "ready
", + Status: models.ProjectStatusReady, + } + require.NoError(t, db.Create(&project).Error) + createConnectedQueueAccount(t, db, user.ID, "wechat") + require.NoError(t, db.Create(&models.ProjectPlatformPublication{ + ProjectID: project.ID, + Platform: "wechat", + Enabled: true, + Status: models.PublicationStatusPublishing, + Config: datatypes.JSON(`{"title":"Queued post"}`), + AdaptedContent: datatypes.JSON(`{"format":"html","html":"ready"}`), + }).Error) + + wrongWorkspaceID := uuid.New() + job := PublishJob{ + JobID: uuid.New(), + ProjectID: project.ID, + WorkspaceID: wrongWorkspaceID, + UserID: user.ID, + Platform: "wechat", + EnqueuedAt: time.Now().UTC(), + } + lockKey := publishLockKey(project.ID, "wechat") + queue.locks[lockKey] = job.JobID.String() + + require.NoError(t, service.processPublishJob(context.Background(), job)) + + var saved models.ProjectPlatformPublication + require.NoError(t, db.First(&saved, "project_id = ? AND platform = ?", project.ID, "wechat").Error) + require.Equal(t, models.PublicationStatusPublishing, saved.Status) + require.Empty(t, queue.locks[lockKey]) + + var wrongWorkspaceEvents int64 + require.NoError(t, db.Model(&models.PublishEvent{}).Where("workspace_id = ?", wrongWorkspaceID).Count(&wrongWorkspaceEvents).Error) + require.Zero(t, wrongWorkspaceEvents) +} + func TestProcessPublishJobReacquiresExpiredLock(t *testing.T) { db := setupPublishQueueTestDB(t) service := newPublishTestService(db) @@ -1476,6 +1556,7 @@ func TestRedisPublishQueueEnqueuesAsynqTask(t *testing.T) { var payload PublishJob require.NoError(t, json.Unmarshal(tasks[0].Payload, &payload)) require.Equal(t, job, payload) + require.Contains(t, string(tasks[0].Payload), `"workspace_id"`) } func TestStartPublishWorkerWithErrorsReportsRunnerFailure(t *testing.T) { diff --git a/backend/internal/services/publish/schedule.go b/backend/internal/services/publish/schedule.go index 17969a41..9581a64d 100644 --- a/backend/internal/services/publish/schedule.go +++ b/backend/internal/services/publish/schedule.go @@ -29,10 +29,7 @@ func (s *Service) createScheduledPublication(ctx context.Context, project models if !s.writerDB(ctx).Migrator().HasTable(&models.ScheduledPublication{}) { return models.ScheduledPublication{}, nil } - workspaceID := models.PersonalWorkspaceID(project.UserID) - if project.WorkspaceID != nil && *project.WorkspaceID != uuid.Nil { - workspaceID = *project.WorkspaceID - } + workspaceID := models.ProjectWorkspaceID(project) schedule := models.ScheduledPublication{ WorkspaceID: workspaceID, ProjectID: project.ID, @@ -48,7 +45,7 @@ func (s *Service) createScheduledPublication(ctx context.Context, project models if schedule.Status == "" { schedule.Status = models.ScheduledPublicationStatusScheduled } - if latestVersionID, err := s.latestProjectVersionID(ctx, project.ID); err != nil { + if latestVersionID, err := s.latestProjectVersionID(ctx, workspaceID, project.ID); err != nil { return models.ScheduledPublication{}, err } else if latestVersionID != uuid.Nil { schedule.ProjectVersionID = &latestVersionID @@ -73,7 +70,7 @@ func (s *Service) ScheduleProjectPublication(ctx context.Context, projectID uuid } idempotencyKey := normalizeIdempotencyKey(req.IdempotencyKey) if idempotencyKey != "" { - if existing, found, err := s.findIdempotentScheduledPublication(ctx, project.ID, pub.ID, userID, idempotencyKey); err != nil { + if existing, found, err := s.findIdempotentScheduledPublication(ctx, models.ProjectWorkspaceID(project), project.ID, pub.ID, userID, idempotencyKey); err != nil { return nil, err } else if found { item := scheduledPublicationFromModel(existing, project, pub, nil) @@ -97,13 +94,13 @@ func (s *Service) ScheduleProjectPublication(ctx context.Context, projectID uuid return &item, nil } -func (s *Service) findIdempotentScheduledPublication(ctx context.Context, projectID uuid.UUID, publicationID uuid.UUID, userID uuid.UUID, key string) (models.ScheduledPublication, bool, error) { +func (s *Service) findIdempotentScheduledPublication(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID, publicationID uuid.UUID, userID uuid.UUID, key string) (models.ScheduledPublication, bool, error) { if strings.TrimSpace(key) == "" { return models.ScheduledPublication{}, false, nil } var schedule models.ScheduledPublication err := s.writerDB(ctx). - Where("project_id = ? AND publication_id = ? AND created_by = ? AND idempotency_key = ?", projectID, publicationID, userID, key). + Where("workspace_id = ? AND project_id = ? AND publication_id = ? AND created_by = ? AND idempotency_key = ?", workspaceID, projectID, publicationID, userID, key). Order("created_at DESC"). First(&schedule).Error if errors.Is(err, gorm.ErrRecordNotFound) { @@ -160,7 +157,7 @@ func (s *Service) RetryScheduledPublication(ctx context.Context, projectID uuid. if schedule.Status != models.ScheduledPublicationStatusFailed && schedule.Status != models.ScheduledPublicationStatusNeedsManualAction { return nil, ErrPublicationAlreadyPublishing } - if _, err := s.PublishProjectWithContext(ctx, projectID, schedule.Publication.Platform, &userID, scheduleID); err != nil { + if _, err := s.PublishProjectInWorkspaceWithContext(ctx, schedule.WorkspaceID, projectID, schedule.Publication.Platform, &userID, scheduleID); err != nil { return nil, err } return s.scheduledPublicationDetail(ctx, scheduleID) @@ -289,7 +286,7 @@ func (s *Service) dispatchScheduledPublication(ctx context.Context, schedule mod platform := strings.TrimSpace(schedule.Publication.Platform) if platform == "" { var publication models.ProjectPlatformPublication - if err := s.writerDB(ctx).Select("platform").First(&publication, "id = ?", schedule.PublicationID).Error; err != nil { + if err := s.writerDB(ctx).Select("platform").First(&publication, "id = ? AND workspace_id = ?", schedule.PublicationID, schedule.WorkspaceID).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil } @@ -300,10 +297,14 @@ func (s *Service) dispatchScheduledPublication(ctx context.Context, schedule mod if platform == "" { return nil } + if schedule.WorkspaceID == uuid.Nil { + return nil + } job := PublishJob{ JobID: uuid.New(), ProjectID: schedule.ProjectID, + WorkspaceID: schedule.WorkspaceID, UserID: schedule.CreatedBy, Platform: platform, PublicationID: schedule.PublicationID, @@ -326,13 +327,13 @@ func (s *Service) dispatchScheduledPublication(ctx context.Context, schedule mod return s.processPublishJob(ctx, job) } -func (s *Service) latestProjectVersionID(ctx context.Context, projectID uuid.UUID) (uuid.UUID, error) { +func (s *Service) latestProjectVersionID(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID) (uuid.UUID, error) { if !s.writerDB(ctx).Migrator().HasTable(&models.ProjectVersion{}) { return uuid.Nil, nil } var version models.ProjectVersion err := s.writerDB(ctx).Select("id"). - Where("project_id = ?", projectID). + Where("workspace_id = ? AND project_id = ?", workspaceID, projectID). Order("version_number DESC, created_at DESC"). First(&version).Error if err == nil { diff --git a/backend/internal/services/publish/service.go b/backend/internal/services/publish/service.go index 4058b72d..11d26bd0 100644 --- a/backend/internal/services/publish/service.go +++ b/backend/internal/services/publish/service.go @@ -270,6 +270,10 @@ func (s *Service) PublishProject(projectID uuid.UUID, platform string, scopeUser } func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid.UUID, platform string, scopeUserID *uuid.UUID, scheduleID uuid.UUID) (PublishResponse, error) { + return s.PublishProjectInWorkspaceWithContext(ctx, uuid.Nil, projectID, platform, scopeUserID, scheduleID) +} + +func (s *Service) PublishProjectInWorkspaceWithContext(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID, platform string, scopeUserID *uuid.UUID, scheduleID uuid.UUID) (PublishResponse, error) { // Remote browser sessions are only for account connection and cookie capture. // Publish jobs must be durable across Redis workers, so they load saved credentials instead. if ctx == nil { @@ -284,9 +288,15 @@ func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid. if err != nil { return PublishResponse{}, err } + projectWorkspaceID := models.ProjectWorkspaceID(proj) + if workspaceID == uuid.Nil { + workspaceID = projectWorkspaceID + } else if workspaceID != projectWorkspaceID { + return PublishResponse{}, ErrForbidden + } var pub models.ProjectPlatformPublication - if err := s.strongReadDB(ctx).Where("project_id = ? AND platform = ?", projectID, platform).First(&pub).Error; err != nil { + if err := s.strongReadDB(ctx).Where("workspace_id = ? AND project_id = ? AND platform = ?", workspaceID, projectID, platform).First(&pub).Error; err != nil { return PublishResponse{}, fmt.Errorf("publication record not found for platform: %s", platform) } if !pub.Enabled || pub.Status == models.PublicationStatusCancelled { @@ -295,13 +305,13 @@ func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid. startedAt := time.Now().UTC() lifecycle := s.lifecycle() - attempt, hasAttempt, err := lifecycle.StartPublishAttempt(scheduleID, startedAt) + attempt, hasAttempt, err := lifecycle.StartPublishAttemptForWorkspace(workspaceID, scheduleID, startedAt) if err != nil { return PublishResponse{}, err } failAttempt := func(err error) error { if hasAttempt { - _ = lifecycle.FinishPublishAttempt(&attempt, publishAttemptCompletion{ + _ = lifecycle.FinishPublishAttemptForWorkspace(workspaceID, &attempt, publishAttemptCompletion{ Status: models.PublishAttemptStatusFailed, ErrorMessage: SanitizeUserFacingErrorMessage(err.Error()), }) @@ -330,7 +340,7 @@ func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid. var account models.PlatformAccount if pub.PlatformAccountID != nil && *pub.PlatformAccountID != uuid.Nil { - if err := s.strongReadDB(ctx).Where("id = ?", *pub.PlatformAccountID).First(&account).Error; err != nil { + if err := s.strongReadDB(ctx).Where("id = ? AND workspace_id = ?", *pub.PlatformAccountID, workspaceID).First(&account).Error; err != nil { return PublishResponse{}, err } } @@ -391,7 +401,7 @@ func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid. attemptStatus = models.PublishAttemptStatusFailed } if hasAttempt { - if err := lifecycle.FinishPublishAttempt(&attempt, publishAttemptCompletion{ + if err := lifecycle.FinishPublishAttemptForWorkspace(workspaceID, &attempt, publishAttemptCompletion{ Status: attemptStatus, RemoteID: remoteID, PublishURL: publishURL, @@ -401,7 +411,7 @@ func (s *Service) PublishProjectWithContext(ctx context.Context, projectID uuid. } } s.invalidateDashboardCaches(ctx) - if err := s.recordProjectPublishActivity(projectID, *scopeUserID, models.ProjectActivityPublishCompleted, map[string]any{ + if err := s.recordProjectPublishActivityForWorkspace(workspaceID, projectID, *scopeUserID, models.ProjectActivityPublishCompleted, map[string]any{ "platform": platform, "status": status, "remote_id": remoteID, @@ -481,7 +491,7 @@ func (s *Service) recordPublishEvent(event models.PublishEvent) error { return s.writerDB(s.requestContext()).Create(&event).Error } -func (s *Service) recordProjectPublishActivity(projectID uuid.UUID, userID uuid.UUID, eventType string, metadata map[string]any) error { +func (s *Service) recordProjectPublishActivityForWorkspace(workspaceID uuid.UUID, projectID uuid.UUID, userID uuid.UUID, eventType string, metadata map[string]any) error { if projectID == uuid.Nil || userID == uuid.Nil || strings.TrimSpace(eventType) == "" { return nil } @@ -489,6 +499,12 @@ func (s *Service) recordProjectPublishActivity(projectID uuid.UUID, userID uuid. if err := s.writerDB(s.requestContext()).Select("id", "user_id", "workspace_id").First(&project, "id = ?", projectID).Error; err != nil { return err } + projectWorkspaceID := models.ProjectWorkspaceID(project) + if workspaceID == uuid.Nil { + workspaceID = projectWorkspaceID + } else if workspaceID != projectWorkspaceID { + return ErrForbidden + } payload := datatypes.JSON([]byte(`{}`)) if metadata != nil { encoded, err := json.Marshal(metadata) @@ -498,7 +514,7 @@ func (s *Service) recordProjectPublishActivity(projectID uuid.UUID, userID uuid. payload = datatypes.JSON(encoded) } return s.writerDB(s.requestContext()).Create(&models.ProjectActivity{ - WorkspaceID: models.ProjectWorkspaceID(project), + WorkspaceID: workspaceID, ProjectID: projectID, ActorUserID: userID, EventType: eventType, @@ -506,7 +522,7 @@ func (s *Service) recordProjectPublishActivity(projectID uuid.UUID, userID uuid. }).Error } -func (s *Service) findIdempotentPublishResponse(projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { +func (s *Service) findIdempotentPublishResponseForWorkspace(workspaceID uuid.UUID, projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { if strings.TrimSpace(key) == "" { return PublishResponse{}, false, nil } @@ -516,9 +532,11 @@ func (s *Service) findIdempotentPublishResponse(projectID uuid.UUID, platform st } var queued models.PublishEvent - err := db. + query := db. + Scopes(workspaceScope(workspaceID)). Where("project_id = ? AND platform = ? AND user_id = ? AND idempotency_key = ? AND event_type = ?", projectID, platform, userID, key, "queued"). - Order("created_at DESC"). + Order("created_at DESC") + err := query. First(&queued).Error if errors.Is(err, gorm.ErrRecordNotFound) { return PublishResponse{}, false, nil @@ -530,6 +548,7 @@ func (s *Service) findIdempotentPublishResponse(projectID uuid.UUID, platform st event := queued var events []models.PublishEvent err = db. + Scopes(workspaceScope(workspaceID)). Where("project_id = ? AND platform = ? AND user_id = ? AND job_id = ?", projectID, platform, userID, queued.JobID). Order("created_at ASC"). Find(&events).Error @@ -580,7 +599,7 @@ func publishEventReplayRank(eventType string) int { } } -func (s *Service) waitForIdempotentPublishResponse(ctx context.Context, projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { +func (s *Service) waitForIdempotentPublishResponseForWorkspace(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID, platform string, userID uuid.UUID, key string) (PublishResponse, bool, error) { deadline := time.NewTimer(publishReplayWait) defer deadline.Stop() @@ -588,7 +607,7 @@ func (s *Service) waitForIdempotentPublishResponse(ctx context.Context, projectI defer ticker.Stop() for { - resp, ok, err := s.findIdempotentPublishResponse(projectID, platform, userID, key) + resp, ok, err := s.findIdempotentPublishResponseForWorkspace(workspaceID, projectID, platform, userID, key) if err != nil || ok { return resp, ok, err } diff --git a/doc/plan/database-optimization.md b/doc/plan/database-optimization.md index 843f2644..5f5563cc 100644 --- a/doc/plan/database-optimization.md +++ b/doc/plan/database-optimization.md @@ -11,7 +11,7 @@ Status definitions: - `Not started`: no clear implementation has been found yet. - `Deferred`: not recommended for the current business stage; only trigger conditions are retained. -Current overall progress: about `80%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. +Current overall progress: about `86%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. | Phase | Weight | Current completion | Status | Completed | Not done / next steps | | ----- | ------ | ------------------ | ------ | --------- | --------------------- | @@ -20,7 +20,7 @@ Current overall progress: about `80%`. This number is manually estimated by phas | Phase 2: Read models and cache first | 15% | 100% | Done | Redis and Asynq dependencies are reusable; admin dashboard stats, admin project list, and dashboard account summary have short-TTL Redis cache; stats/project list/account cache misses are merged with singleflight; project, prepublish, publish, and account write paths invalidate the related dashboard cache; `workspace_dashboard_stats` and `project_list_summaries` read models are in place, and APIs prefer read models when coverage is complete; async refresh triggers after project save, platform sync, publish completion, and member changes; admin rebuild API, Asynq queue, and worker support full read-model rebuild | None | | Phase 3: Read/write splitting | 15% | 100% | Done | Optional `DB_READER_*` connection, application-level DB Router, signed sticky writer, consistency routing for project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension, consistency-level inventories for dashboard/publish/collab-service, self-hosted PostgreSQL read replica, managed `postgres-reader` entry point, PgBouncer reader pool, replica lag monitoring and automatic fallback to writer when over threshold | None; Phase 4 continues partitioning, archiving, and recovery flows | | Phase 4: Single-database partitioning, archiving, and hot/cold tiering | 15% | 100% | Done | Collaborative editing already has state + update batch + compaction foundation; `collab_document_update_batches` has PostgreSQL `document_id` hash partition target schema; event and terminal-session history already have row-level R2/S3 archive worker; `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and `remote_browser_sessions` have PostgreSQL monthly partition target schema; the archive worker exports whole cold monthly partitions to R2/S3 before detaching and dropping them; archive recovery procedure is documented | None | -| Phase 5: Citus preparation | 20% | 50% | In progress | Workspace model, `projects.workspace_id`, personal workspace ID, explicit or derived `workspace_id` coverage for project-domain tables, and Citus distributed/reference/colocated table-group design | Unique constraint and foreign-key review, worker payload routing | +| Phase 5: Citus preparation | 20% | 80% | In progress | Workspace model, `projects.workspace_id`, personal workspace ID, explicit or derived `workspace_id` coverage for project-domain tables, Citus distributed/reference/colocated table-group design, unique constraint / foreign-key / join review, and publish worker payload routing | Citus-safe constraint refactors, workspace-scoped repository boundary, validation-cluster rehearsal | | Phase 6: Citus distributed PostgreSQL operation | 10% | 0% | Deferred | None | Future Citus cluster design, worker/coordinator monitoring and backup, large-tenant isolation strategy | ### 0.1 Progress Update Rules @@ -60,14 +60,14 @@ Atomic commit guidance: | Application connection pools | Done | backend/publish-worker support `DB_MAX_OPEN_CONNS`, `DB_MAX_IDLE_CONNS`, `DB_CONN_MAX_LIFETIME`, and `DB_CONN_MAX_IDLE_TIME`; collab-service node-postgres pool supports `DB_MAX_OPEN_CONNS`, `DB_CONN_MAX_LIFETIME`, and `DB_CONN_MAX_IDLE_TIME`; Redis client supports `REDIS_POOL_SIZE`, `REDIS_MIN_IDLE_CONNS`, `REDIS_MAX_IDLE_CONNS`, `REDIS_CONN_MAX_IDLE_TIME`, and `REDIS_CONN_MAX_LIFETIME`; Docker Compose and self-hosted Kubernetes reuse PostgreSQL connections through PgBouncer writer pool; GORM PostgreSQL driver uses simple protocol to remain compatible with transaction pooling; self-hosted Kubernetes added PgBouncer reader pool and points app `DB_READER_HOST` to the reader pool | None | `backend/internal/db/db.go`, `backend/internal/redisclient/redisclient.go`, `collab-service/src/config.ts`, `collab-service/src/persistence/document-persistence.ts`, `collab-service/src/persistence/document-persistence.test.ts`, `deploy/docker/docker-compose.yml`, `deploy/kubernetes/data-services/self-hosted/pgbouncer.yaml`, `deploy/kubernetes/app-baseline/app-config.yaml` | | Query observability | Done | GORM QueryObserver, slow-query logs, `mpp_db_queries_total`, `mpp_db_query_duration_seconds`, `mpp_db_slow_queries_total`, self-hosted PostgreSQL `pg_stat_statements`, PostgreSQL exporter table-level health metrics, Grafana table row count / 24h growth / table size / index size / dead tuples / vacuum panels | Threshold alerts can continue to be added by production baseline in Phase 1/4 | `backend/internal/db/query_observer.go`, `backend/internal/observability/observability.go`, `script/db/audit_database_baseline.sql`, `deploy/docker/observability/postgres-exporter/queries.yml`, `deploy/docker/observability/grafana/dashboards/mpp-observability-baseline.json` | | Dashboard query audit | Done | Existing audit script covers dashboard Count, list, platform filter, publication preload, account query, and active session query plans | Not yet turned into a periodic CI/ops gate | `script/db/audit_dashboard_query_plans.sql` | -| Tenant boundary | In progress | Existing `workspaces`, `workspace_members`, `projects.workspace_id`, and personal workspace rules; project-domain publishing, activity, comment, version, share-link, media, AI, extension, and collaboration rows now carry `workspace_id` directly, with model hooks or service write paths deriving it from the owning project, document, or media asset for new writes; Citus target table groups are mapped to the `tenant_workspace` colocation group | Distributed-table constraints and worker payload ownership are still open | `backend/internal/models/hooks.go`, `backend/internal/models/ai_hooks.go`, `backend/internal/models/collab.go`, `backend/internal/db/monthly_partitions.go`, `backend/internal/db/hash_partitions.go`, `collab-service/src/persistence/document-persistence.ts`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.test.ts`, `script/db/citus_table_groups.yml`, `script/db/test_citus_table_groups.rb` | +| Tenant boundary | In progress | Existing `workspaces`, `workspace_members`, `projects.workspace_id`, and personal workspace rules; project-domain publishing, activity, comment, version, share-link, media, AI, extension, and collaboration rows now carry `workspace_id` directly, with model hooks or service write paths deriving it from the owning project, document, or media asset for new writes; Citus target table groups are mapped to the `tenant_workspace` colocation group; unique constraints, foreign keys, and cross-tenant joins have a documented Citus review; publish worker payloads carry `workspace_id` | Citus-safe constraint refactors and workspace-scoped repository boundaries are still open before validation-cluster DDL | `backend/internal/models/hooks.go`, `backend/internal/models/ai_hooks.go`, `backend/internal/models/collab.go`, `backend/internal/db/monthly_partitions.go`, `backend/internal/db/hash_partitions.go`, `collab-service/src/persistence/document-persistence.ts`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.test.ts`, `script/db/citus_table_groups.yml`, `script/db/citus_constraint_review.yml`, `script/db/test_citus_table_groups.rb`, `script/db/test_citus_constraint_review.rb`, `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/services/publish/schedule.go` | | Dashboard read models | Done | Added `workspace_dashboard_stats` and `project_list_summaries` read models, idempotently recomputed from fact tables by a centralized readmodel service; async refresh is triggered after project save, platform sync, publish completion, and member changes; admin stats and admin project list prefer read models when coverage is complete; admin rebuild API enqueues through Asynq, and API/worker processes can start readmodel workers for full rebuild from fact tables | None | `backend/internal/models/models.go`, `backend/internal/services/readmodel/service.go`, `backend/internal/services/readmodel/queue.go`, `backend/internal/services/readmodel/service_test.go`, `backend/internal/services/readmodel/queue_test.go`, `backend/internal/services/stats/overview.go`, `backend/internal/services/project/lifecycle.go`, `backend/internal/handlers/dashboard.go`, `backend/cmd/api/main.go`, `backend/cmd/publish-worker/main.go` | | Redis read cache | Done | Redis is already used for queues, locks, OAuth, browser sessions, and short-term coordination; admin dashboard stats, admin project list, and dashboard account summary use 15s TTL cache and bypass scoped/sticky-writer strong-consistency paths; stats/project list/account cache misses use singleflight to prevent process-local stampede; stats and account caches use versioned payloads and semantic validation, and Redis read-error fallback is also merged into one DB computation per key; project create/edit/platform save, prepublish sync/draft update, publish queue/execute/fail, and platform account write paths invalidate the related dashboard cache; full read-model rebuild reuses the Redis/Asynq queue | None | `backend/internal/services/stats/overview.go`, `backend/internal/services/stats/overview_test.go`, `backend/internal/services/project/list_cache.go`, `backend/internal/services/project/list_cache_test.go`, `backend/internal/services/prepublish/drafts.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/publication_flow_test.go`, `backend/internal/services/publish/queue_test.go`, `backend/internal/services/platform_account/account_cache.go`, `backend/internal/services/platform_account/account_cache_test.go`, `backend/internal/services/browser_session/complete.go`, `backend/internal/services/browser_session/service_test.go`, `backend/internal/services/readmodel/queue.go` | | Read/write splitting | Done | Supports optional `DB_READER_*` read-replica connection, `DefaultRouter`, and signed sticky writer; project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension are wired to strong/eventual/writer routing; dashboard, publish, and collab-service consistency-level inventories are complete, with collab-service online path kept writer-only; writer/reader pools are in self-hosted Kubernetes, and managed overlay provides a `postgres-reader` ExternalName entry point; `DB_READER_MAX_REPLICA_LAG` configures the replica lag threshold, eventual/analytics reads automatically fall back to writer when over threshold or lag is unknown, and `mpp_db_replica_lag_seconds` and `mpp_db_replica_healthy` metrics are exposed | None | `backend/internal/db/db.go`, `backend/internal/db/router.go`, `backend/internal/db/replica_lag.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/prepublish/service.go`, `backend/internal/services/mediaasset/service.go`, `backend/internal/services/browser_session/service.go`, `backend/internal/services/extension/service.go`, `backend/internal/app/runtime.go`, `deploy/kubernetes/data-services/self-hosted/postgres.yaml`, `deploy/kubernetes/data-services/self-hosted/pgbouncer.yaml`, `deploy/kubernetes/data-services/managed/services.yaml`, `script/kubernetes/validation/data_services.rb` | | Event-table partitioning and archiving | Done | `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and terminal `remote_browser_sessions` have default retention periods; the `archive` worker can batch-export JSONL to R2/S3 and delete old hot-table rows after successful upload; PostgreSQL schema initialization now creates monthly `created_at` partitions for `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and `remote_browser_sessions`, with partition-compatible `(id, created_at)` primary keys and rolling partition creation; the archive worker exports whole cold monthly partitions as JSONL to R2/S3, then detaches and drops the partition after successful upload; PostgreSQL browser-session active-row fallback uses a scoped advisory transaction lock because partitioned unique constraints must include the partition key; the archive recovery procedure defines inspection, staging restore, optional hot-table reinsertion, and audit checks | None | `backend/internal/db/monthly_partitions.go`, `backend/internal/db/db.go`, `backend/internal/models/models.go`, `backend/internal/db/db_test.go`, `backend/internal/services/browser_session/start.go`, `backend/internal/services/browser_session/cleanup.go`, `backend/internal/services/archive/worker.go`, `backend/internal/services/archive/partitions.go`, `backend/internal/services/archive/worker_test.go`, `backend/internal/services/archive/partitions_test.go`, Phase 4 archive recovery procedure in this document | | Collaboration batch governance | In progress | `collab_document_states`, `collab_document_update_batches`, and compaction/retention foundations exist; PostgreSQL schema initialization creates a 16-way `document_id` hash-partitioned `collab_document_update_batches` target table and migrates existing regular-table rows into it | Cold archiving is not implemented | `backend/internal/db/hash_partitions.go`, `backend/internal/db/db.go`, `backend/internal/models/collab.go`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.ts` | -| Outbox/CDC/event stream | In progress | The publishing queue path has a transactional Outbox: `EnqueuePublishProject` writes `outbox_events` in the same transaction and dispatches immediately after commit; publish worker starts an outbox dispatcher and supports retries for failed/stale processing records; Asynq continues to serve as the task-execution queue, and `PublishEvent` continues to serve as publishing audit | Currently covers only `publish.job_requested`; general business-event outbox, Debezium, and Redpanda/Kafka CDC are not implemented | `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/models/models.go` | -| Citus target state | In progress | Confirmed `workspace_id` as the most suitable distribution-column direction; project-domain tables now have explicit or stable derived tenant routing coverage, including direct-insert fallbacks for scheduled publishing, media, and AI rows; distributed tables, reference-table candidates, control-domain tables, and deferred colocation gaps are mapped in a machine-readable design manifest | Unique-constraint / foreign-key review and worker payload routing are not implemented | Phase 5 checklist, `script/db/citus_table_groups.yml`, `script/db/test_citus_table_groups.rb`, `backend/internal/models/ai_hooks.go`, and `backend/internal/db/db_test.go` | +| Outbox/CDC/event stream | In progress | The publishing queue path has a transactional Outbox: `EnqueuePublishProject` writes `outbox_events` in the same transaction and dispatches immediately after commit; publish worker starts an outbox dispatcher and supports retries for failed/stale processing records; publish job payloads include `workspace_id` for worker routing, while `outbox_events` remains coordinator/control-domain state; Asynq continues to serve as the task-execution queue, and `PublishEvent` continues to serve as publishing audit | Currently covers only `publish.job_requested`; general business-event outbox, Debezium, Redpanda/Kafka CDC, and relational `outbox_events.workspace_id` are not implemented | `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/services/publish/schedule.go`, `backend/internal/models/models.go` | +| Citus target state | In progress | Confirmed `workspace_id` as the most suitable distribution-column direction; project-domain tables now have explicit or stable derived tenant routing coverage, including direct-insert fallbacks for scheduled publishing, media, and AI rows; distributed tables, reference-table candidates, control-domain tables, and deferred colocation gaps are mapped in a machine-readable design manifest; unique constraints, foreign keys, cross-tenant joins, and worker payload routing now have reviewed evidence | Actual Citus-safe constraint refactors, workspace-scoped repository entry points, relational outbox routing, and validation-cluster rehearsal are not implemented | Phase 5 checklist, `script/db/citus_table_groups.yml`, `script/db/citus_constraint_review.yml`, `script/db/test_citus_table_groups.rb`, `script/db/test_citus_constraint_review.rb`, `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/services/publish/schedule.go`, `backend/internal/models/ai_hooks.go`, and `backend/internal/db/db_test.go` | ### 0.3 Phase Checklist @@ -129,8 +129,8 @@ Atomic commit guidance: - [x] Confirm `workspace_id` as the preferred Citus distribution column. - [x] Complete `workspace_id` or a stable derivation path for project-domain tables. Verification entry point: `backend/internal/models/hooks.go`, `backend/internal/models/ai_hooks.go`, `backend/internal/models/collab.go`, `backend/internal/db/monthly_partitions.go`, `backend/internal/db/hash_partitions.go`, `collab-service/src/persistence/document-persistence.ts`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.test.ts`. - [x] Design Citus distributed tables, reference tables, and colocated table groups. Verification entry point: `script/db/citus_table_groups.yml`, `script/db/test_citus_table_groups.rb`, and Section 7.2.1 Citus table group design. -- [ ] Review unique constraints, foreign keys, and cross-tenant joins. -- [ ] Add `workspace_id` to worker payloads. +- [x] Review unique constraints, foreign keys, and cross-tenant joins. Verification entry point: `script/db/citus_constraint_review.yml`, `script/db/test_citus_constraint_review.rb`, and Section 7.2.2 Citus constraint, foreign-key, and join review. +- [x] Add `workspace_id` to worker payloads. Verification entry point: `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/services/publish/schedule.go`, and `backend/internal/services/publish/queue_test.go`. #### Phase 6: Citus Distributed PostgreSQL Operation @@ -599,7 +599,7 @@ Control-domain tables for the first Citus validation: | ----- | --------------------------------------------- | | `platform_accounts`, `platform_account_grants` | Credential ownership is still user/platform scoped, and `platform_accounts.workspace_id` is nullable | | `remote_browser_sessions` | Short-lived runtime state is governed by Redis and the existing monthly archive path | -| `outbox_events` | Queue dispatch remains coordinator/control-domain state until worker payloads carry `workspace_id` | +| `outbox_events` | Queue dispatch remains coordinator/control-domain state; publish worker payloads now carry `workspace_id`, but `outbox_events` has no relational routing column yet | | `extension_execution_event_claims` | Global idempotency claims do not carry tenant routing yet | Deferred colocation gaps: @@ -611,6 +611,23 @@ Deferred colocation gaps: | `content_templates` | Split system templates from workspace templates or make tenant-scoped rows non-null by design | | `ai_drafting_messages`, `ai_tool_calls`, `ai_drafting_session_summaries`, `ai_session_events` | Add `workspace_id` derived from `ai_drafting_sessions` and review session-local uniqueness | +#### 7.2.2 Citus Constraint, Foreign-key, and Join Review + +The Phase 5 constraint review is captured in `script/db/citus_constraint_review.yml` and checked by `script/db/test_citus_constraint_review.rb`. + +Constraint review findings: + +- Ready without primary-key redesign: `workspaces`, `workspace_members`, `workspace_dashboard_stats`, and `workspace_quota_aggregates`, because their current key already contains the distribution value. +- Needs Citus-safe unique or primary-key redesign before distributed DDL: project rows, project publications, project summaries, share links, workspace invites, media assets/usages, and extension callback tokens. The common pattern is to include `workspace_id` in tenant-local uniqueness or keep globally unique token/object lookups in a control-domain table. +- Needs partition strategy rehearsal before distributed DDL: monthly event/activity tables and `collab_document_update_batches`, because existing partition-compatible primary keys are shaped around `created_at` or `document_id` rather than `workspace_id`. + +Foreign-key and join review findings: + +- Workspace-owned child rows should point through colocated `workspace_id` where possible. +- User identity joins can be served by treating `users` as a reference-table candidate during the first validation cluster, or by denormalizing display fields into read models if reference replication becomes undesirable. +- `platform_accounts` and grants remain control-domain data until account ownership becomes non-null workspace ownership. +- Cross-workspace write transactions and hot request-path cross-tenant joins are not allowed; admin/global statistics should use read models, CDC consumers, or offline warehouse queries. + Initial Citus validation DDL shape: ```sql diff --git a/script/db/citus_constraint_review.yml b/script/db/citus_constraint_review.yml new file mode 100644 index 00000000..9c0fe9fc --- /dev/null +++ b/script/db/citus_constraint_review.yml @@ -0,0 +1,143 @@ +version: 1 +distribution_key: workspace_id +review_scope: + models: + - backend/internal/models/models.go + - backend/internal/models/collab.go + - backend/internal/models/ai.go + - backend/internal/models/ai_quota.go + table_group_manifest: script/db/citus_table_groups.yml +unique_constraints: + reviewed_ready: + - table: workspaces + current_constraints: + - PRIMARY KEY (id) + reason: The root tenant table is distributed by id, so the primary key includes the distribution value. + - table: workspace_members + current_constraints: + - PRIMARY KEY (workspace_id, user_id) + reason: The member primary key already includes workspace_id. + - table: workspace_dashboard_stats + current_constraints: + - PRIMARY KEY (workspace_id) + reason: The read-model row is keyed by workspace_id. + - table: workspace_quota_aggregates + current_constraints: + - PRIMARY KEY (workspace_id) + reason: The quota aggregate row is keyed by workspace_id. + requires_refactor: + - table: projects + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (collab_document_id) + required_change: Use a Citus-safe tenant key such as (workspace_id, id), and move collab-document uniqueness to a workspace-scoped invariant. + - table: project_platform_publications + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (project_id, platform) + required_change: Include workspace_id in the primary and project/platform uniqueness strategy. + - table: project_list_summaries + current_constraints: + - PRIMARY KEY (project_id) + required_change: Include workspace_id in the summary key or make it dependent on a colocated projects key. + - table: project_share_links + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (token_hash) + required_change: Keep token uniqueness in a control-domain lookup table or make the token invariant workspace-scoped. + - table: workspace_invites + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (token_hash) + required_change: Keep invite token uniqueness in a control-domain lookup table or make the token invariant workspace-scoped. + - table: media_assets + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (object_key) + required_change: Include workspace routing in the metadata key or keep object-key uniqueness in object storage/control metadata. + - table: media_asset_usages + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (media_asset_id, resource_type, resource_id) + required_change: Include workspace_id in the asset/resource uniqueness strategy. + - table: extension_callback_tokens + current_constraints: + - PRIMARY KEY (id) + - UNIQUE (token) + required_change: Keep token uniqueness in a control-domain lookup table or make the token invariant workspace-scoped. + - table: extension_execution_event_claims + current_constraints: + - PRIMARY KEY (event_id) + - UNIQUE (record_id) + required_change: Keep as control-domain idempotency state until claims carry workspace routing. +partitioned_tables: + require_workspace_partition_strategy: + - table: publish_events + current_primary_key: [id, created_at] + - table: extension_execution_events + current_primary_key: [id, created_at] + - table: project_activities + current_primary_key: [id, created_at] + - table: workspace_activities + current_primary_key: [id, created_at] + - table: collab_document_update_batches + current_primary_key: [id, document_id] + note: Existing document_id hash partitioning must be rehearsed with Citus and workspace-compatible uniqueness. +foreign_keys: + colocated_ready: + - child: workspace_members.workspace_id + parent: workspaces.id + - child: workspace_dashboard_stats.workspace_id + parent: workspaces.id + - child: workspace_quota_aggregates.workspace_id + parent: workspaces.id + reference_table_dependencies: + - parent_table: users + reason: Owner, actor, author, recipient, and creator references cross many tenant tables; first Citus validation should treat users as a reference-table candidate. + control_domain_boundaries: + - tables: [platform_accounts, platform_account_grants] + reason: Credential ownership remains user/platform scoped and platform_accounts.workspace_id is nullable. + - tables: [outbox_events] + reason: Dispatch state remains coordinator-owned; publish job payloads now carry workspace_id for worker routing. + deferred_until_workspace_key: + - project_collaborators + - collab_document_collaborators + - publish_attempts + - ai_drafting_messages + - ai_tool_calls + - ai_drafting_session_summaries + - ai_session_events +cross_tenant_joins: + shard_local: + - query_family: workspace project lists + rule: Filter by projects.workspace_id before joining project publications or summaries. + - query_family: project experience views + rule: Use workspace_id plus project_id when reading comments, versions, activity, share links, and media usage. + - query_family: publishing state transitions + rule: Carry workspace_id in publish jobs and scheduled-publication rows before loading project/publication state. + reference_or_control: + - query_family: user identity preloads + rule: Use users as a reference-table candidate, or denormalize display fields into read models if reference-table replication is rejected. + - query_family: platform account resolution + rule: Keep credential/account joins in the control domain until accounts become non-null workspace assets. + aggregate_offline: + - query_family: cross-workspace admin statistics + rule: Build read models, CDC consumers, or offline warehouse queries instead of making cross-tenant joins a hot request path. + prohibited_core_paths: + - Cross-workspace write transactions. + - Project/publication lookups by project_id alone in worker entry points. + - Distributed-table foreign keys that do not include compatible workspace routing. +worker_payloads: + tenant_routed: + - task_type: publish:project + payload_field: workspace_id + producers: + - backend/internal/services/publish/queue.go + - backend/internal/services/publish/schedule.go + replay_paths: + - backend/internal/services/publish/outbox.go + not_tenant_scoped: + - task_type: email:code + reason: Email verification and password reset jobs do not read or write tenant project-domain rows. + - task_type: readmodel:dashboard:rebuild + reason: Full dashboard rebuild is a global maintenance task with no single workspace owner. diff --git a/script/db/citus_table_groups.yml b/script/db/citus_table_groups.yml index 8b53d628..1e0edf87 100644 --- a/script/db/citus_table_groups.yml +++ b/script/db/citus_table_groups.yml @@ -10,177 +10,177 @@ physical_colocation_groups: - table: workspaces domain: tenant_core distribution_column: id - readiness: ready_after_constraint_review + readiness: reviewed_ready ddl: "create_distributed_table('workspaces', 'id')" notes: Root tenant table; the distributed value is the workspace UUID used by child workspace_id columns. - table: workspace_members domain: tenant_core distribution_column: workspace_id - readiness: ready_after_constraint_review + readiness: reviewed_ready ddl: "create_distributed_table('workspace_members', 'workspace_id', colocate_with => 'workspaces')" notes: Permission checks should remain shard-local with the workspace row. - table: workspace_invites domain: tenant_core distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('workspace_invites', 'workspace_id', colocate_with => 'workspaces')" - notes: Token uniqueness must be reviewed before distribution. + notes: Token uniqueness must be made Citus-safe before distribution. - table: workspace_activities domain: tenant_core distribution_column: workspace_id - readiness: target_after_partition_constraint_review + readiness: requires_partition_workspace_refactor ddl: "create_distributed_table('workspace_activities', 'workspace_id', colocate_with => 'workspaces')" - notes: Monthly partition primary key currently follows id and created_at. + notes: Monthly partition primary key currently follows id and created_at; Citus validation needs a workspace-compatible partition key strategy. - table: workspace_dashboard_stats domain: read_model distribution_column: workspace_id - readiness: ready_after_constraint_review + readiness: reviewed_ready ddl: "create_distributed_table('workspace_dashboard_stats', 'workspace_id', colocate_with => 'workspaces')" notes: Workspace-scoped aggregate read model. - table: notifications domain: tenant_core distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('notifications', 'workspace_id', colocate_with => 'workspaces')" - notes: User notification fan-out stays tenant-local; cross-workspace inboxes need read-model aggregation. + notes: User notification fan-out stays tenant-local; the id primary key needs workspace routing before distribution, and cross-workspace inboxes need read-model aggregation. - table: workspace_quota_aggregates domain: metering distribution_column: workspace_id - readiness: ready_after_constraint_review + readiness: reviewed_ready ddl: "create_distributed_table('workspace_quota_aggregates', 'workspace_id', colocate_with => 'workspaces')" notes: Workspace quota row is naturally keyed by workspace_id. - table: ai_usage_records domain: metering distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('ai_usage_records', 'workspace_id', colocate_with => 'workspaces')" - notes: Usage history is tenant-local and can aggregate into workspace_quota_aggregates. + notes: Usage history is tenant-local and can aggregate into workspace_quota_aggregates; the id primary key needs workspace routing before distribution. - table: projects domain: project distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('projects', 'workspace_id', colocate_with => 'workspaces')" - notes: Primary project row; project_id-only uniqueness must be reviewed before distribution. + notes: Primary project row; id primary key and collab-document uniqueness need workspace routing before distribution. - table: project_platform_publications domain: project distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('project_platform_publications', 'workspace_id', colocate_with => 'workspaces')" - notes: Draft/status rows should colocate with projects; project/platform uniqueness needs workspace_id. + notes: Draft/status rows should colocate with projects; id primary key and project/platform uniqueness need workspace_id. - table: project_list_summaries domain: read_model distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('project_list_summaries', 'workspace_id', colocate_with => 'workspaces')" - notes: Dashboard list read model follows the project workspace. + notes: Dashboard list read model follows the project workspace; the project_id primary key needs workspace routing before distribution. - table: project_activities domain: project distribution_column: workspace_id - readiness: target_after_partition_constraint_review + readiness: requires_partition_workspace_refactor ddl: "create_distributed_table('project_activities', 'workspace_id', colocate_with => 'workspaces')" - notes: Monthly partition primary key currently follows id and created_at. + notes: Monthly partition primary key currently follows id and created_at; Citus validation needs a workspace-compatible partition key strategy. - table: project_comments domain: project distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('project_comments', 'workspace_id', colocate_with => 'workspaces')" - notes: Comment reads and writes should remain local to the project workspace. + notes: Comment reads and writes should remain local to the project workspace; the id primary key needs workspace routing before distribution. - table: project_versions domain: project distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('project_versions', 'workspace_id', colocate_with => 'workspaces')" - notes: Version history should colocate with projects and collaboration state. + notes: Version history should colocate with projects and collaboration state; the id primary key needs workspace routing before distribution. - table: project_share_links domain: project distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('project_share_links', 'workspace_id', colocate_with => 'workspaces')" - notes: Token uniqueness must be reviewed before distribution. + notes: Token uniqueness must be made Citus-safe before distribution. - table: scheduled_publications domain: publishing distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('scheduled_publications', 'workspace_id', colocate_with => 'workspaces')" - notes: Scheduled publishing workers should route by workspace_id. + notes: Scheduled publishing workers route by workspace_id; the id primary key needs workspace routing before distribution. - table: publish_events domain: publishing distribution_column: workspace_id - readiness: target_after_partition_constraint_review + readiness: requires_partition_workspace_refactor ddl: "create_distributed_table('publish_events', 'workspace_id', colocate_with => 'workspaces')" - notes: Monthly partition primary key currently follows id and created_at. + notes: Monthly partition primary key currently follows id and created_at; Citus validation needs a workspace-compatible partition key strategy. - table: extension_callback_tokens domain: extension distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('extension_callback_tokens', 'workspace_id', colocate_with => 'workspaces')" - notes: Token uniqueness must be reviewed before distribution. + notes: Token uniqueness must be made Citus-safe before distribution. - table: extension_execution_events domain: extension distribution_column: workspace_id - readiness: target_after_partition_constraint_review + readiness: requires_partition_workspace_refactor ddl: "create_distributed_table('extension_execution_events', 'workspace_id', colocate_with => 'workspaces')" - notes: Monthly partition primary key currently follows id and created_at. + notes: Monthly partition primary key currently follows id and created_at; Citus validation needs a workspace-compatible partition key strategy. - table: media_assets domain: media distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('media_assets', 'workspace_id', colocate_with => 'workspaces')" - notes: Object key uniqueness must be reviewed; object bytes stay in R2/S3. + notes: Object key uniqueness must be made Citus-safe; object bytes stay in R2/S3. - table: media_asset_usages domain: media distribution_column: workspace_id - readiness: target_after_unique_constraint_review + readiness: requires_unique_workspace_refactor ddl: "create_distributed_table('media_asset_usages', 'workspace_id', colocate_with => 'workspaces')" notes: Asset/resource uniqueness must include workspace routing before distribution. - table: brand_profiles domain: content_setup distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('brand_profiles', 'workspace_id', colocate_with => 'workspaces')" - notes: Workspace-owned brand profiles should colocate with projects and AI context. + notes: Workspace-owned brand profiles should colocate with projects and AI context; the id primary key needs workspace routing before distribution. - table: collab_documents domain: collaboration distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('collab_documents', 'workspace_id', colocate_with => 'workspaces')" - notes: Collaboration document metadata follows the workspace tenant. + notes: Collaboration document metadata follows the workspace tenant; the id primary key needs workspace routing before distribution. - table: collab_document_states domain: collaboration distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('collab_document_states', 'workspace_id', colocate_with => 'workspaces')" - notes: State snapshots should colocate with their document metadata. + notes: State snapshots should colocate with their document metadata; the document_id primary key needs workspace routing before distribution. - table: collab_document_update_batches domain: collaboration distribution_column: workspace_id - readiness: target_after_partition_constraint_review + readiness: requires_partition_workspace_refactor ddl: "create_distributed_table('collab_document_update_batches', 'workspace_id', colocate_with => 'workspaces')" - notes: Existing document_id hash partitioning must be rehearsed with Citus partitioned-table support. + notes: Existing document_id hash partitioning must be rehearsed with Citus partitioned-table support and workspace-compatible uniqueness. - table: ai_context_snapshots domain: ai distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('ai_context_snapshots', 'workspace_id', colocate_with => 'workspaces')" - notes: Context snapshots follow the owning project workspace. + notes: Context snapshots follow the owning project workspace; the id primary key needs workspace routing before distribution. - table: ai_growth_optimization_runs domain: ai distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('ai_growth_optimization_runs', 'workspace_id', colocate_with => 'workspaces')" - notes: Growth runs should colocate with snapshots and proposals. + notes: Growth runs should colocate with snapshots and proposals; the id primary key needs workspace routing before distribution. - table: ai_proposals domain: ai distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('ai_proposals', 'workspace_id', colocate_with => 'workspaces')" - notes: Proposal decisions should stay shard-local with the project. + notes: Proposal decisions should stay shard-local with the project; the id primary key needs workspace routing before distribution. - table: ai_drafting_sessions domain: ai distribution_column: workspace_id - readiness: target_after_constraint_review + readiness: requires_pk_workspace_refactor ddl: "create_distributed_table('ai_drafting_sessions', 'workspace_id', colocate_with => 'workspaces')" - notes: Drafting sessions are the parent route for AI chat child rows. + notes: Drafting sessions are the parent route for AI chat child rows; the id primary key needs workspace routing before distribution. reference_tables: - table: users readiness: reference_candidate_for_validation @@ -198,7 +198,7 @@ control_domain_tables: - table: remote_browser_sessions reason: Short-lived runtime state is governed by Redis and monthly archive paths before Citus. - table: outbox_events - reason: Queue dispatch remains coordinator/control-domain state until worker payloads carry workspace_id. + reason: Queue dispatch remains coordinator/control-domain state; publish worker payloads now carry workspace_id, but outbox_events has no relational routing column yet. - table: extension_execution_event_claims reason: Global idempotency claim table does not have workspace routing today. deferred_colocation_tables: diff --git a/script/db/test_citus_constraint_review.rb b/script/db/test_citus_constraint_review.rb new file mode 100644 index 00000000..367e8e99 --- /dev/null +++ b/script/db/test_citus_constraint_review.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true + +require "minitest/autorun" +require "yaml" + +class CitusConstraintReviewTest < Minitest::Test + REVIEW_PATH = File.expand_path("citus_constraint_review.yml", __dir__) + GROUPS_PATH = File.expand_path("citus_table_groups.yml", __dir__) + + REQUIRED_UNIQUE_REFACTOR_TABLES = %w[ + projects + project_platform_publications + project_list_summaries + project_share_links + workspace_invites + media_assets + media_asset_usages + extension_callback_tokens + extension_execution_event_claims + ].freeze + + REQUIRED_PARTITION_TABLES = %w[ + publish_events + extension_execution_events + project_activities + workspace_activities + collab_document_update_batches + ].freeze + + REQUIRED_WORKER_TASKS = %w[ + publish:project + ].freeze + + def setup + @review = YAML.safe_load_file(REVIEW_PATH, aliases: false) + @groups = YAML.safe_load_file(GROUPS_PATH, aliases: false) + end + + def test_review_declares_distribution_scope + assert_equal 1, @review.fetch("version") + assert_equal "workspace_id", @review.fetch("distribution_key") + refute_empty @review.fetch("review_scope").fetch("models") + assert_equal "script/db/citus_table_groups.yml", @review.fetch("review_scope").fetch("table_group_manifest") + end + + def test_unique_constraint_review_names_required_refactors + reviewed = table_names(@review.fetch("unique_constraints").fetch("requires_refactor")) + + missing = REQUIRED_UNIQUE_REFACTOR_TABLES - reviewed + + assert_empty missing, "missing unique constraint review entries: #{missing.join(", ")}" + end + + def test_partition_review_names_workspace_strategy_gaps + reviewed = table_names(@review.fetch("partitioned_tables").fetch("require_workspace_partition_strategy")) + + missing = REQUIRED_PARTITION_TABLES - reviewed + + assert_empty missing, "missing partition review entries: #{missing.join(", ")}" + end + + def test_foreign_key_and_join_review_has_actionable_decisions + foreign_keys = @review.fetch("foreign_keys") + cross_tenant_joins = @review.fetch("cross_tenant_joins") + + refute_empty foreign_keys.fetch("colocated_ready") + refute_empty foreign_keys.fetch("reference_table_dependencies") + refute_empty foreign_keys.fetch("control_domain_boundaries") + refute_empty foreign_keys.fetch("deferred_until_workspace_key") + refute_empty cross_tenant_joins.fetch("shard_local") + refute_empty cross_tenant_joins.fetch("reference_or_control") + refute_empty cross_tenant_joins.fetch("aggregate_offline") + refute_empty cross_tenant_joins.fetch("prohibited_core_paths") + end + + def test_worker_payload_review_names_tenant_routed_tasks + reviewed_tasks = @review.fetch("worker_payloads").fetch("tenant_routed").map { |entry| entry.fetch("task_type") } + + missing = REQUIRED_WORKER_TASKS - reviewed_tasks + + assert_empty missing, "missing worker payload routing entries: #{missing.join(", ")}" + end + + def test_table_group_manifest_uses_reviewed_readiness_terms + colocated_tables.each do |entry| + readiness = entry.fetch("readiness") + + refute_match(/after_.*review/, readiness, entry.fetch("table")) + end + end + + private + + def table_names(entries) + entries.map { |entry| entry.fetch("table") } + end + + def colocated_tables + @groups.fetch("physical_colocation_groups").fetch("tenant_workspace").fetch("tables") + end +end