From 4423aedb0788c14021730a43bc9c5a485ee07a61 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:52:22 +0800 Subject: [PATCH 01/14] feat(models): derive workspace ids for project rows Citus preparation needs project-domain records to carry a stable tenant key on new writes. Add workspace_id fields, derivation helpers, and create hooks for project publications, events, activity, comments, versions, share links, and extension records. New model writes can now resolve workspace ownership without startup-time compatibility backfills. --- backend/internal/models/models.go | 110 +++++++++++------- backend/internal/models/workspace_identity.go | 60 ++++++++++ 2 files changed, 126 insertions(+), 44 deletions(-) create mode 100644 backend/internal/models/workspace_identity.go diff --git a/backend/internal/models/models.go b/backend/internal/models/models.go index 21733650..c852da26 100644 --- a/backend/internal/models/models.go +++ b/backend/internal/models/models.go @@ -284,33 +284,36 @@ type ProjectCollaborator struct { type ProjectActivity struct { ID uuid.UUID `gorm:"type:uuid;primaryKey;index:idx_project_activities_archive_created_id,priority:2"` + WorkspaceID uuid.UUID `gorm:"type:uuid;not null;index:idx_project_activities_workspace_created_at,priority:1"` ProjectID uuid.UUID `gorm:"type:uuid;not null;index:idx_project_activities_project_created_at,priority:1"` ActorUserID uuid.UUID `gorm:"type:uuid;not null;index"` TargetUserID *uuid.UUID `gorm:"type:uuid;index"` EventType string `gorm:"not null;index"` Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_project_activities_project_created_at,priority:2;index:idx_project_activities_archive_created_id,priority:1"` + CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_project_activities_project_created_at,priority:2;index:idx_project_activities_workspace_created_at,priority:2;index:idx_project_activities_archive_created_id,priority:1"` Project Project `gorm:"foreignKey:ProjectID;constraint:OnDelete:CASCADE"` Actor User `gorm:"foreignKey:ActorUserID;constraint:OnDelete:CASCADE"` TargetUser *User `gorm:"foreignKey:TargetUserID;constraint:OnDelete:SET NULL"` } type ProjectComment struct { - ID uuid.UUID `gorm:"type:uuid;primaryKey"` - ProjectID uuid.UUID `gorm:"type:uuid;not null;index:idx_project_comments_project_created_at,priority:1"` - AuthorID uuid.UUID `gorm:"type:uuid;not null;index"` - Body string `gorm:"type:text;not null"` - AnchorText string `gorm:"type:text;not null;default:''"` - Status string `gorm:"not null;default:'open';index"` - Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"not null;index:idx_project_comments_project_created_at,priority:2"` - ResolvedAt *time.Time - Project Project `gorm:"foreignKey:ProjectID;constraint:OnDelete:CASCADE"` - Author User `gorm:"foreignKey:AuthorID;constraint:OnDelete:CASCADE"` + ID uuid.UUID `gorm:"type:uuid;primaryKey"` + WorkspaceID uuid.UUID `gorm:"type:uuid;not null;index"` + ProjectID uuid.UUID `gorm:"type:uuid;not null;index:idx_project_comments_project_created_at,priority:1"` + AuthorID uuid.UUID `gorm:"type:uuid;not null;index"` + Body string `gorm:"type:text;not null"` + AnchorText string `gorm:"type:text;not null;default:''"` + Status string `gorm:"not null;default:'open';index"` + Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` + CreatedAt time.Time `gorm:"not null;index:idx_project_comments_project_created_at,priority:2"` + ResolvedAt *time.Time + Project Project `gorm:"foreignKey:ProjectID;constraint:OnDelete:CASCADE"` + Author User `gorm:"foreignKey:AuthorID;constraint:OnDelete:CASCADE"` } type ProjectVersion struct { ID uuid.UUID `gorm:"type:uuid;primaryKey"` + WorkspaceID uuid.UUID `gorm:"type:uuid;not null;index"` ProjectID uuid.UUID `gorm:"type:uuid;not null;index:idx_project_versions_project_created_at,priority:1"` CreatedBy uuid.UUID `gorm:"type:uuid;not null;index"` VersionNumber int `gorm:"not null"` @@ -325,17 +328,18 @@ type ProjectVersion struct { } type ProjectShareLink struct { - ID uuid.UUID `gorm:"type:uuid;primaryKey"` - ProjectID uuid.UUID `gorm:"type:uuid;not null;index"` - CreatedBy uuid.UUID `gorm:"type:uuid;not null;index"` - TokenHash string `gorm:"not null;uniqueIndex"` - Role string `gorm:"not null"` - Status string `gorm:"not null;default:'active';index"` - ExpiresAt *time.Time - CreatedAt time.Time `gorm:"not null"` - RevokedAt *time.Time - Project Project `gorm:"foreignKey:ProjectID;constraint:OnDelete:CASCADE"` - Creator User `gorm:"foreignKey:CreatedBy;constraint:OnDelete:CASCADE"` + ID uuid.UUID `gorm:"type:uuid;primaryKey"` + WorkspaceID uuid.UUID `gorm:"type:uuid;not null;index"` + ProjectID uuid.UUID `gorm:"type:uuid;not null;index"` + CreatedBy uuid.UUID `gorm:"type:uuid;not null;index"` + TokenHash string `gorm:"not null;uniqueIndex"` + Role string `gorm:"not null"` + Status string `gorm:"not null;default:'active';index"` + ExpiresAt *time.Time + CreatedAt time.Time `gorm:"not null"` + RevokedAt *time.Time + Project Project `gorm:"foreignKey:ProjectID;constraint:OnDelete:CASCADE"` + Creator User `gorm:"foreignKey:CreatedBy;constraint:OnDelete:CASCADE"` } const ( @@ -361,16 +365,6 @@ const ( WorkspaceActivityInviteRevoked = "invite_revoked" ) -var personalWorkspaceNamespace = uuid.MustParse("03d32585-3f8c-48a8-bf40-53aa3f1698c1") - -func PersonalWorkspaceID(userID uuid.UUID) uuid.UUID { - return uuid.NewSHA1(personalWorkspaceNamespace, []byte(userID.String())) -} - -func PersonalWorkspaceSlug(userID uuid.UUID) string { - return "personal-" + userID.String() -} - type Workspace struct { ID uuid.UUID `gorm:"type:uuid;primaryKey"` OwnerUserID uuid.UUID `gorm:"type:uuid;not null;index"` @@ -470,11 +464,12 @@ type Notification struct { type ProjectPlatformPublication struct { ID uuid.UUID `gorm:"type:uuid;primaryKey"` + WorkspaceID uuid.UUID `gorm:"type:uuid;not null;index:idx_publications_workspace_status"` ProjectID uuid.UUID `gorm:"type:uuid;not null;uniqueIndex:idx_publications_project_platform"` Platform string `gorm:"not null;uniqueIndex:idx_publications_project_platform;index:idx_publications_platform_status"` PlatformAccountID *uuid.UUID `gorm:"type:uuid;index"` Enabled bool `gorm:"not null;default:true"` - Status string `gorm:"not null;index:idx_publications_platform_status"` + Status string `gorm:"not null;index:idx_publications_platform_status;index:idx_publications_workspace_status"` DraftStatus string `gorm:"not null;default:'unsynced';index"` ReviewStatus string `gorm:"not null;default:'draft';index"` SyncRequired bool `gorm:"not null;default:false;index"` @@ -511,6 +506,7 @@ type ProjectListSummary struct { type PublishEvent struct { ID uuid.UUID `gorm:"type:uuid;primaryKey;index:idx_publish_events_archive_created_id,priority:2"` PublicationID uuid.UUID `gorm:"type:uuid;not null;index"` + WorkspaceID uuid.UUID `gorm:"type:uuid;not null;index:idx_publish_events_workspace_created_at,priority:1"` ProjectID uuid.UUID `gorm:"type:uuid;not null;index"` UserID uuid.UUID `gorm:"type:uuid;not null;index:idx_publish_events_user_idempotency"` Platform string `gorm:"not null;index"` @@ -523,7 +519,7 @@ type PublishEvent struct { PublishURL string ErrorMessage string Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_publish_events_archive_created_id,priority:1"` + CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_publish_events_workspace_created_at,priority:2;index:idx_publish_events_archive_created_id,priority:1"` } type ScheduledPublication struct { @@ -653,6 +649,7 @@ type PlatformAccountGrant struct { type ExtensionCallbackToken struct { ID uuid.UUID `gorm:"type:uuid;primaryKey"` + WorkspaceID uuid.UUID `gorm:"type:uuid;not null;index"` ExecutionID string `gorm:"not null;index"` ProjectID uuid.UUID `gorm:"type:uuid;not null;index"` UserID uuid.UUID `gorm:"type:uuid;not null;index"` @@ -666,6 +663,7 @@ type ExtensionCallbackToken struct { type ExtensionExecutionEvent struct { ID uuid.UUID `gorm:"type:uuid;primaryKey;index:idx_extension_execution_events_archive_created_id,priority:2"` CallbackTokenID uuid.UUID `gorm:"type:uuid;not null;index"` + WorkspaceID uuid.UUID `gorm:"type:uuid;not null;index:idx_extension_execution_events_workspace_created_at,priority:1"` ExecutionID string `gorm:"not null;index"` ProjectID uuid.UUID `gorm:"type:uuid;not null;index"` UserID uuid.UUID `gorm:"type:uuid;not null;index"` @@ -677,7 +675,7 @@ type ExtensionExecutionEvent struct { PublishURL string ErrorMessage string Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_extension_execution_events_archive_created_id,priority:1"` + CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_extension_execution_events_workspace_created_at,priority:2;index:idx_extension_execution_events_archive_created_id,priority:1"` } type ExtensionExecutionEventClaim struct { @@ -756,10 +754,13 @@ func (u *MediaAssetUsage) BeforeCreate(_ *gorm.DB) (err error) { return } -func (p *ProjectPlatformPublication) BeforeCreate(_ *gorm.DB) (err error) { +func (p *ProjectPlatformPublication) BeforeCreate(tx *gorm.DB) (err error) { if p.ID == uuid.Nil { p.ID = uuid.New() } + if p.WorkspaceID == uuid.Nil { + p.WorkspaceID = deriveWorkspaceIDFromProject(tx, p.ProjectID, uuid.Nil) + } if p.DraftStatus == "" { p.DraftStatus = PublicationDraftStatusUnsynced } @@ -769,10 +770,13 @@ func (p *ProjectPlatformPublication) BeforeCreate(_ *gorm.DB) (err error) { return } -func (e *PublishEvent) BeforeCreate(_ *gorm.DB) (err error) { +func (e *PublishEvent) BeforeCreate(tx *gorm.DB) (err error) { if e.ID == uuid.Nil { e.ID = uuid.New() } + if e.WorkspaceID == uuid.Nil { + e.WorkspaceID = deriveWorkspaceIDFromProject(tx, e.ProjectID, e.UserID) + } if e.CreatedAt.IsZero() { e.CreatedAt = time.Now().UTC() } @@ -855,37 +859,49 @@ func (i *WorkspaceInvite) BeforeCreate(_ *gorm.DB) (err error) { return } -func (a *ProjectActivity) BeforeCreate(_ *gorm.DB) (err error) { +func (a *ProjectActivity) BeforeCreate(tx *gorm.DB) (err error) { if a.ID == uuid.Nil { a.ID = uuid.New() } + if a.WorkspaceID == uuid.Nil { + a.WorkspaceID = deriveWorkspaceIDFromProject(tx, a.ProjectID, a.ActorUserID) + } if a.CreatedAt.IsZero() { a.CreatedAt = time.Now().UTC() } return } -func (c *ProjectComment) BeforeCreate(_ *gorm.DB) (err error) { +func (c *ProjectComment) BeforeCreate(tx *gorm.DB) (err error) { if c.ID == uuid.Nil { c.ID = uuid.New() } + if c.WorkspaceID == uuid.Nil { + c.WorkspaceID = deriveWorkspaceIDFromProject(tx, c.ProjectID, c.AuthorID) + } if c.Status == "" { c.Status = ProjectCommentStatusOpen } return } -func (v *ProjectVersion) BeforeCreate(_ *gorm.DB) (err error) { +func (v *ProjectVersion) BeforeCreate(tx *gorm.DB) (err error) { if v.ID == uuid.Nil { v.ID = uuid.New() } + if v.WorkspaceID == uuid.Nil { + v.WorkspaceID = deriveWorkspaceIDFromProject(tx, v.ProjectID, v.CreatedBy) + } return } -func (l *ProjectShareLink) BeforeCreate(_ *gorm.DB) (err error) { +func (l *ProjectShareLink) BeforeCreate(tx *gorm.DB) (err error) { if l.ID == uuid.Nil { l.ID = uuid.New() } + if l.WorkspaceID == uuid.Nil { + l.WorkspaceID = deriveWorkspaceIDFromProject(tx, l.ProjectID, l.CreatedBy) + } if l.Status == "" { l.Status = ProjectShareLinkStatusActive } @@ -940,17 +956,23 @@ func (g *PlatformAccountGrant) BeforeCreate(_ *gorm.DB) (err error) { return } -func (t *ExtensionCallbackToken) BeforeCreate(_ *gorm.DB) (err error) { +func (t *ExtensionCallbackToken) BeforeCreate(tx *gorm.DB) (err error) { if t.ID == uuid.Nil { t.ID = uuid.New() } + if t.WorkspaceID == uuid.Nil { + t.WorkspaceID = deriveWorkspaceIDFromProject(tx, t.ProjectID, t.UserID) + } return } -func (e *ExtensionExecutionEvent) BeforeCreate(_ *gorm.DB) (err error) { +func (e *ExtensionExecutionEvent) BeforeCreate(tx *gorm.DB) (err error) { if e.ID == uuid.Nil { e.ID = uuid.New() } + if e.WorkspaceID == uuid.Nil { + e.WorkspaceID = deriveWorkspaceIDFromProject(tx, e.ProjectID, e.UserID) + } if e.CreatedAt.IsZero() { e.CreatedAt = time.Now().UTC() } diff --git a/backend/internal/models/workspace_identity.go b/backend/internal/models/workspace_identity.go new file mode 100644 index 00000000..4cf0fbeb --- /dev/null +++ b/backend/internal/models/workspace_identity.go @@ -0,0 +1,60 @@ +package models + +import ( + "github.com/google/uuid" + "gorm.io/gorm" +) + +var personalWorkspaceNamespace = uuid.MustParse("03d32585-3f8c-48a8-bf40-53aa3f1698c1") + +func PersonalWorkspaceID(userID uuid.UUID) uuid.UUID { + return uuid.NewSHA1(personalWorkspaceNamespace, []byte(userID.String())) +} + +func PersonalWorkspaceSlug(userID uuid.UUID) string { + return "personal-" + userID.String() +} + +func projectWorkspaceIDValue(project Project) uuid.UUID { + if project.WorkspaceID != nil && *project.WorkspaceID != uuid.Nil { + return *project.WorkspaceID + } + if project.UserID != uuid.Nil { + return PersonalWorkspaceID(project.UserID) + } + return uuid.Nil +} + +func ProjectWorkspaceID(project Project) uuid.UUID { + return projectWorkspaceIDValue(project) +} + +func deriveWorkspaceIDFromProject(db *gorm.DB, projectID uuid.UUID, fallbackUserID uuid.UUID) uuid.UUID { + if projectID == uuid.Nil { + if fallbackUserID != uuid.Nil { + return PersonalWorkspaceID(fallbackUserID) + } + return uuid.Nil + } + + var project Project + if err := db.Select("id", "user_id", "workspace_id").First(&project, "id = ?", projectID).Error; err == nil { + return projectWorkspaceIDValue(project) + } + if fallbackUserID != uuid.Nil { + return PersonalWorkspaceID(fallbackUserID) + } + return uuid.Nil +} + +func deriveWorkspaceIDFromDocument(db *gorm.DB, documentID uuid.UUID) uuid.UUID { + if documentID == uuid.Nil { + return uuid.Nil + } + + var document CollabDocument + if err := db.Select("id", "workspace_id").First(&document, "id = ?", documentID).Error; err == nil { + return document.WorkspaceID + } + return uuid.Nil +} From e25129536c6cf3ba7c5dec6f9602c3c9472b533d Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:52:29 +0800 Subject: [PATCH 02/14] feat(models): add collab workspace ownership Collaboration documents and update batches need an explicit tenant key before distributed storage planning. Add workspace_id columns and create hooks that derive ownership from the document or owner for new records. Collab model writes now expose stable workspace routing data without legacy backfill code. --- backend/internal/models/collab.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/backend/internal/models/collab.go b/backend/internal/models/collab.go index 4df3b463..21e65497 100644 --- a/backend/internal/models/collab.go +++ b/backend/internal/models/collab.go @@ -18,6 +18,7 @@ const ( type CollabDocument struct { ID uuid.UUID `gorm:"type:uuid;primaryKey"` + WorkspaceID uuid.UUID `gorm:"type:uuid;not null;index:idx_collab_documents_workspace_updated,priority:1"` OwnerUserID uuid.UUID `gorm:"type:uuid;not null;index:idx_collab_documents_owner_updated,priority:1"` Title string `gorm:"not null"` Status string `gorm:"not null;default:'active'"` @@ -26,7 +27,7 @@ type CollabDocument struct { LastEditedBy *uuid.UUID `gorm:"type:uuid"` LastEditedAt *time.Time CreatedAt time.Time `gorm:"not null"` - UpdatedAt time.Time `gorm:"not null;index:idx_collab_documents_owner_updated,priority:2,sort:desc"` + UpdatedAt time.Time `gorm:"not null;index:idx_collab_documents_owner_updated,priority:2,sort:desc;index:idx_collab_documents_workspace_updated,priority:2,sort:desc"` DeletedAt gorm.DeletedAt `gorm:"index"` Owner User `gorm:"foreignKey:OwnerUserID;references:ID"` @@ -48,6 +49,7 @@ type CollabDocumentCollaborator struct { type CollabDocumentState struct { DocumentID uuid.UUID `gorm:"type:uuid;primaryKey;not null"` + WorkspaceID uuid.UUID `gorm:"type:uuid;not null;index"` YDocState []byte `gorm:"type:bytea;not null"` StateVector []byte `gorm:"type:bytea"` CompactedUntilSeq int64 `gorm:"not null;default:0"` @@ -60,6 +62,7 @@ type CollabDocumentState struct { type CollabDocumentUpdateBatch struct { ID int64 `gorm:"primaryKey;autoIncrement:false"` DocumentID uuid.UUID `gorm:"type:uuid;primaryKey;not null;uniqueIndex:ux_collab_update_batch_doc_seq,priority:1;index:idx_collab_update_batches_doc_seq,priority:1"` + WorkspaceID uuid.UUID `gorm:"type:uuid;not null;index"` FromSeq int64 `gorm:"not null;uniqueIndex:ux_collab_update_batch_doc_seq,priority:2"` ToSeq int64 `gorm:"not null;uniqueIndex:ux_collab_update_batch_doc_seq,priority:3;index:idx_collab_update_batches_doc_seq,priority:2,sort:desc"` UpdatePayload []byte `gorm:"type:bytea;not null"` @@ -76,5 +79,22 @@ func (d *CollabDocument) BeforeCreate(_ *gorm.DB) (err error) { if d.ID == uuid.Nil { d.ID = uuid.New() } + if d.WorkspaceID == uuid.Nil && d.OwnerUserID != uuid.Nil { + d.WorkspaceID = PersonalWorkspaceID(d.OwnerUserID) + } + return +} + +func (s *CollabDocumentState) BeforeCreate(tx *gorm.DB) (err error) { + if s.WorkspaceID == uuid.Nil { + s.WorkspaceID = deriveWorkspaceIDFromDocument(tx, s.DocumentID) + } + return +} + +func (b *CollabDocumentUpdateBatch) BeforeCreate(tx *gorm.DB) (err error) { + if b.WorkspaceID == uuid.Nil { + b.WorkspaceID = deriveWorkspaceIDFromDocument(tx, b.DocumentID) + } return } From 85a8f0002f532917651d8bb566b1ac4e91d26765 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:52:36 +0800 Subject: [PATCH 03/14] feat(database): include workspace id in partition targets Partition target tables should match the new tenant-aware event and collaboration models. Add workspace_id to the fresh monthly event and hash-partitioned collab batch DDL and column lists. New Postgres schemas now create partitioned tables with tenant routing columns from the start. --- backend/internal/db/hash_partitions.go | 2 ++ backend/internal/db/monthly_partitions.go | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/backend/internal/db/hash_partitions.go b/backend/internal/db/hash_partitions.go index 3c889f28..5919b523 100644 --- a/backend/internal/db/hash_partitions.go +++ b/backend/internal/db/hash_partitions.go @@ -26,6 +26,7 @@ var collabUpdateBatchHashPartitionedTable = hashPartitionedTable{ CREATE TABLE IF NOT EXISTS collab_document_update_batches ( id bigserial NOT NULL, document_id uuid NOT NULL, + workspace_id uuid NOT NULL, from_seq bigint NOT NULL, to_seq bigint NOT NULL, update_payload bytea NOT NULL, @@ -39,6 +40,7 @@ var collabUpdateBatchHashPartitionedTable = hashPartitionedTable{ columns: []string{ "id", "document_id", + "workspace_id", "from_seq", "to_seq", "update_payload", diff --git a/backend/internal/db/monthly_partitions.go b/backend/internal/db/monthly_partitions.go index f8506d03..678d6ec6 100644 --- a/backend/internal/db/monthly_partitions.go +++ b/backend/internal/db/monthly_partitions.go @@ -27,6 +27,7 @@ var monthlyEventPartitionedTables = []monthlyPartitionedTable{ CREATE TABLE IF NOT EXISTS publish_events ( id uuid NOT NULL, publication_id uuid NOT NULL, + workspace_id uuid NOT NULL, project_id uuid NOT NULL, user_id uuid NOT NULL, platform text NOT NULL, @@ -46,6 +47,7 @@ var monthlyEventPartitionedTables = []monthlyPartitionedTable{ columns: []string{ "id", "publication_id", + "workspace_id", "project_id", "user_id", "platform", @@ -67,6 +69,7 @@ var monthlyEventPartitionedTables = []monthlyPartitionedTable{ CREATE TABLE IF NOT EXISTS extension_execution_events ( id uuid NOT NULL, callback_token_id uuid NOT NULL, + workspace_id uuid NOT NULL, execution_id text NOT NULL, project_id uuid NOT NULL, user_id uuid NOT NULL, @@ -85,6 +88,7 @@ var monthlyEventPartitionedTables = []monthlyPartitionedTable{ columns: []string{ "id", "callback_token_id", + "workspace_id", "execution_id", "project_id", "user_id", @@ -104,6 +108,7 @@ var monthlyEventPartitionedTables = []monthlyPartitionedTable{ createSQL: ` CREATE TABLE IF NOT EXISTS project_activities ( id uuid NOT NULL, + workspace_id uuid NOT NULL, project_id uuid NOT NULL, actor_user_id uuid NOT NULL, target_user_id uuid, @@ -115,6 +120,7 @@ var monthlyEventPartitionedTables = []monthlyPartitionedTable{ `, columns: []string{ "id", + "workspace_id", "project_id", "actor_user_id", "target_user_id", From b2ddd007326d80d3405a5620c101e21c40c3ef95 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:52:45 +0800 Subject: [PATCH 04/14] feat(project): stamp workspace on project writes Project service writes need to preserve the tenant key when creating collaboration documents and platform selections. Pass the resolved project workspace through access, collaboration, lifecycle, and publication-selection paths. Project-owned rows now receive workspace ownership at write time instead of relying on schema repair. --- backend/internal/services/project/access.go | 2 +- .../internal/services/project/collaboration.go | 1 + backend/internal/services/project/lifecycle.go | 6 +++--- .../project/publicationselection/selection.go | 17 +++++++++-------- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/backend/internal/services/project/access.go b/backend/internal/services/project/access.go index 637ba4ee..429f6e45 100644 --- a/backend/internal/services/project/access.go +++ b/backend/internal/services/project/access.go @@ -57,7 +57,7 @@ func (s *Service) requireProjectOwner(projectID uuid.UUID, actorUserID uuid.UUID } var project models.Project - if err := s.db.Select("id", "user_id").First(&project, "id = ?", projectID).Error; err != nil { + if err := s.db.Select("id", "user_id", "workspace_id").First(&project, "id = ?", projectID).Error; err != nil { return nil, err } if project.UserID != actorUserID { diff --git a/backend/internal/services/project/collaboration.go b/backend/internal/services/project/collaboration.go index 2581adf5..3b7f0afb 100644 --- a/backend/internal/services/project/collaboration.go +++ b/backend/internal/services/project/collaboration.go @@ -158,6 +158,7 @@ func (s *Service) ensureProjectCollabDocument(projectID uuid.UUID, userID uuid.U } document := models.CollabDocument{ + WorkspaceID: projectWorkspaceID(project), OwnerUserID: project.UserID, Title: project.Title, Status: models.CollabDocumentStatusActive, diff --git a/backend/internal/services/project/lifecycle.go b/backend/internal/services/project/lifecycle.go index 064d7a21..3e9a6b35 100644 --- a/backend/internal/services/project/lifecycle.go +++ b/backend/internal/services/project/lifecycle.go @@ -82,7 +82,7 @@ func (s *Service) CreateProjectWithWorkspace(userID uuid.UUID, workspaceID *uuid return err } - created, err := publicationselection.CreateSelected(tx, project.ID, platforms, pendingPublicationConfigForTemplate(title, req.Summary, req.CoverImageURL, template)) + created, err := publicationselection.CreateSelected(tx, project, platforms, pendingPublicationConfigForTemplate(title, req.Summary, req.CoverImageURL, template)) if err != nil { return err } @@ -234,7 +234,7 @@ func (s *Service) UpdateProject(projectID uuid.UUID, userID uuid.UUID, req dto.U return err } - publications, err := publicationselection.ReconcileSelected(tx, project.ID, platforms, publicationselection.ReconcileResetAll, pendingPublicationConfigForTemplate(title, req.Summary, req.CoverImageURL, template)) + publications, err := publicationselection.ReconcileSelected(tx, project, platforms, publicationselection.ReconcileResetAll, pendingPublicationConfigForTemplate(title, req.Summary, req.CoverImageURL, template)) if err != nil { return err } @@ -338,7 +338,7 @@ func (s *Service) SaveProjectPlatforms(projectID uuid.UUID, userID uuid.UUID, re return ErrForbidden } - publications, err := publicationselection.ReconcileSelected(tx, project.ID, platforms, publicationselection.ReconcileKeepActive, defaultPublicationConfigForProjectTitle(project.Title)) + publications, err := publicationselection.ReconcileSelected(tx, project, platforms, publicationselection.ReconcileKeepActive, defaultPublicationConfigForProjectTitle(project.Title)) if err != nil { return err } diff --git a/backend/internal/services/project/publicationselection/selection.go b/backend/internal/services/project/publicationselection/selection.go index 6e8a0d09..8621e00d 100644 --- a/backend/internal/services/project/publicationselection/selection.go +++ b/backend/internal/services/project/publicationselection/selection.go @@ -17,14 +17,14 @@ const ( ReconcileResetAll ) -func CreateSelected(tx *gorm.DB, projectID uuid.UUID, platforms []string, configForPlatform ConfigForPlatform) ([]models.ProjectPlatformPublication, error) { +func CreateSelected(tx *gorm.DB, project models.Project, platforms []string, configForPlatform ConfigForPlatform) ([]models.ProjectPlatformPublication, error) { publications := make([]models.ProjectPlatformPublication, 0, len(platforms)) for _, platform := range platforms { config, err := configForPlatform(platform) if err != nil { return nil, err } - publication := createPendingPublication(projectID, platform, config) + publication := createPendingPublication(project, platform, config) if err := tx.Create(&publication).Error; err != nil { return nil, err } @@ -33,9 +33,9 @@ func CreateSelected(tx *gorm.DB, projectID uuid.UUID, platforms []string, config return publications, nil } -func ReconcileSelected(tx *gorm.DB, projectID uuid.UUID, platforms []string, mode ReconcileMode, configForPlatform ConfigForPlatform) ([]models.ProjectPlatformPublication, error) { +func ReconcileSelected(tx *gorm.DB, project models.Project, platforms []string, mode ReconcileMode, configForPlatform ConfigForPlatform) ([]models.ProjectPlatformPublication, error) { var existing []models.ProjectPlatformPublication - if err := tx.Where("project_id = ?", projectID).Find(&existing).Error; err != nil { + if err := tx.Where("project_id = ?", project.ID).Find(&existing).Error; err != nil { return nil, err } @@ -72,14 +72,14 @@ func ReconcileSelected(tx *gorm.DB, projectID uuid.UUID, platforms []string, mod if err != nil { return nil, err } - publication := createPendingPublication(projectID, platform, config) + publication := createPendingPublication(project, platform, config) if err := tx.Create(&publication).Error; err != nil { return nil, err } } var publications []models.ProjectPlatformPublication - if err := tx.Where("project_id = ?", projectID).Find(&publications).Error; err != nil { + if err := tx.Where("project_id = ?", project.ID).Find(&publications).Error; err != nil { return nil, err } return publications, nil @@ -99,9 +99,10 @@ func MarkDraftsStale(tx *gorm.DB, projectID uuid.UUID) error { }).Error } -func createPendingPublication(projectID uuid.UUID, platform string, config datatypes.JSON) models.ProjectPlatformPublication { +func createPendingPublication(project models.Project, platform string, config datatypes.JSON) models.ProjectPlatformPublication { return models.ProjectPlatformPublication{ - ProjectID: projectID, + WorkspaceID: models.ProjectWorkspaceID(project), + ProjectID: project.ID, Platform: platform, Enabled: true, Status: models.PublicationStatusDraft, From f568856baa21758787496a22cf4e522172e50f19 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:52:56 +0800 Subject: [PATCH 05/14] feat(project): scope experience rows by workspace Project experience records need consistent tenant ownership for comments, versions, and activity history. Resolve accessible project workspace data once and stamp new activity, comment, and version rows with it. Experience APIs now write and query project-domain data with explicit workspace scope. --- .../services/project/experience/activity.go | 13 +++++++--- .../services/project/experience/comments.go | 24 +++++++++++-------- .../services/project/experience/service.go | 8 +++---- .../services/project/experience/versions.go | 6 +++-- 4 files changed, 32 insertions(+), 19 deletions(-) diff --git a/backend/internal/services/project/experience/activity.go b/backend/internal/services/project/experience/activity.go index ad74de91..af8084ac 100644 --- a/backend/internal/services/project/experience/activity.go +++ b/backend/internal/services/project/experience/activity.go @@ -12,7 +12,8 @@ import ( ) func (s *Service) ListProjectActivities(projectID uuid.UUID, userID uuid.UUID, limit int) (*dto.ProjectActivitiesResponse, error) { - if err := s.requireProjectAccess(projectID, userID); err != nil { + project, err := s.accessibleProject(projectID, userID) + if err != nil { return nil, err } if limit <= 0 || limit > 100 { @@ -23,7 +24,7 @@ func (s *Service) ListProjectActivities(projectID uuid.UUID, userID uuid.UUID, l if err := s.db. Preload("Actor", selectUserIdentity). Preload("TargetUser", selectUserIdentity). - Where("project_id = ?", projectID). + Where("workspace_id = ? AND project_id = ?", models.ProjectWorkspaceID(project), projectID). Order("created_at desc"). Order("id desc"). Limit(limit). @@ -46,11 +47,16 @@ func RecordProjectActivity(tx *gorm.DB, projectID uuid.UUID, actorUserID uuid.UU if err != nil { return err } + var project models.Project + if err := tx.Select("id", "user_id", "workspace_id").First(&project, "id = ?", projectID).Error; err != nil { + return err + } + workspaceID := models.ProjectWorkspaceID(project) createdAt := time.Now().UTC() var latestCreatedAt time.Time if err := tx. Model(&models.ProjectActivity{}). - Where("project_id = ?", projectID). + Where("workspace_id = ? AND project_id = ?", workspaceID, projectID). Select("created_at"). Order("created_at desc"). Limit(1). @@ -61,6 +67,7 @@ func RecordProjectActivity(tx *gorm.DB, projectID uuid.UUID, actorUserID uuid.UU createdAt = latestCreatedAt.Add(time.Nanosecond) } return tx.Create(&models.ProjectActivity{ + WorkspaceID: workspaceID, ProjectID: projectID, ActorUserID: actorUserID, TargetUserID: targetUserID, diff --git a/backend/internal/services/project/experience/comments.go b/backend/internal/services/project/experience/comments.go index f12cf840..95b84f50 100644 --- a/backend/internal/services/project/experience/comments.go +++ b/backend/internal/services/project/experience/comments.go @@ -13,14 +13,15 @@ import ( ) func (s *Service) ListProjectComments(projectID uuid.UUID, userID uuid.UUID) (*dto.ProjectCommentsResponse, error) { - if err := s.requireProjectAccess(projectID, userID); err != nil { + project, err := s.accessibleProject(projectID, userID) + if err != nil { return nil, err } var comments []models.ProjectComment if err := s.db. Preload("Author", selectUserIdentity). - Where("project_id = ?", projectID). + Where("workspace_id = ? AND project_id = ?", models.ProjectWorkspaceID(project), projectID). Order("created_at desc"). Find(&comments).Error; err != nil { return nil, err @@ -34,7 +35,8 @@ func (s *Service) ListProjectComments(projectID uuid.UUID, userID uuid.UUID) (*d } func (s *Service) CreateProjectComment(projectID uuid.UUID, userID uuid.UUID, req dto.CreateProjectCommentRequest) (*dto.ProjectComment, error) { - if err := s.requireProjectAccess(projectID, userID); err != nil { + project, err := s.accessibleProject(projectID, userID) + if err != nil { return nil, err } body := strings.TrimSpace(req.Body) @@ -47,12 +49,13 @@ func (s *Service) CreateProjectComment(projectID uuid.UUID, userID uuid.UUID, re return nil, err } comment := models.ProjectComment{ - ProjectID: projectID, - AuthorID: userID, - Body: body, - AnchorText: strings.TrimSpace(req.AnchorText), - Status: models.ProjectCommentStatusOpen, - Metadata: metadata, + WorkspaceID: models.ProjectWorkspaceID(project), + ProjectID: projectID, + AuthorID: userID, + Body: body, + AnchorText: strings.TrimSpace(req.AnchorText), + Status: models.ProjectCommentStatusOpen, + Metadata: metadata, } if err := s.db.Transaction(func(tx *gorm.DB) error { if err := tx.Create(&comment).Error; err != nil { @@ -86,11 +89,12 @@ func (s *Service) UpdateProjectComment(projectID uuid.UUID, userID uuid.UUID, co if strings.TrimSpace(req.Status) != models.ProjectCommentStatusResolved { return nil, ErrInvalidProjectComment } + workspaceID := models.ProjectWorkspaceID(project) now := time.Now().UTC() if err := s.db.Transaction(func(tx *gorm.DB) error { result := tx.Model(&models.ProjectComment{}). - Where("id = ? AND project_id = ?", commentID, projectID). + Where("id = ? AND workspace_id = ? AND project_id = ?", commentID, workspaceID, projectID). Updates(map[string]any{ "status": models.ProjectCommentStatusResolved, "resolved_at": &now, diff --git a/backend/internal/services/project/experience/service.go b/backend/internal/services/project/experience/service.go index 65c068b7..3f5e7430 100644 --- a/backend/internal/services/project/experience/service.go +++ b/backend/internal/services/project/experience/service.go @@ -46,16 +46,16 @@ func NewService( } } -func (s *Service) requireProjectAccess(projectID uuid.UUID, userID uuid.UUID) error { +func (s *Service) accessibleProject(projectID uuid.UUID, userID uuid.UUID) (models.Project, error) { if projectID == uuid.Nil || userID == uuid.Nil { - return projecterr.ErrInvalidProject + return models.Project{}, projecterr.ErrInvalidProject } var project models.Project if err := s.db.Select("id", "user_id", "workspace_id").First(&project, "id = ?", projectID).Error; err != nil { - return err + return models.Project{}, err } _, err := accesspolicy.ProjectAccessRoleWithDB(s.db, project, userID) - return err + return project, err } func selectUserIdentity(db *gorm.DB) *gorm.DB { diff --git a/backend/internal/services/project/experience/versions.go b/backend/internal/services/project/experience/versions.go index b14d8da1..79cef6f0 100644 --- a/backend/internal/services/project/experience/versions.go +++ b/backend/internal/services/project/experience/versions.go @@ -12,14 +12,15 @@ import ( ) func (s *Service) ListProjectVersions(projectID uuid.UUID, userID uuid.UUID) (*dto.ProjectVersionsResponse, error) { - if err := s.requireProjectAccess(projectID, userID); err != nil { + project, err := s.accessibleProject(projectID, userID) + if err != nil { return nil, err } var versions []models.ProjectVersion if err := s.db. Preload("Creator", selectUserIdentity). - Where("project_id = ?", projectID). + Where("workspace_id = ? AND project_id = ?", models.ProjectWorkspaceID(project), projectID). Order("version_number desc"). Find(&versions).Error; err != nil { return nil, err @@ -120,6 +121,7 @@ func CreateProjectVersion(tx *gorm.DB, project models.Project, userID uuid.UUID, } } return tx.Create(&models.ProjectVersion{ + WorkspaceID: models.ProjectWorkspaceID(project), ProjectID: project.ID, CreatedBy: userID, VersionNumber: latestVersionNumber + 1, From cc979412f4ae28b04cc90e3b4bf774d9b4d4987e Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:53:02 +0800 Subject: [PATCH 06/14] feat(project): scope share links by workspace Project share links need tenant ownership for Citus routing and acceptance checks. Stamp share links with the project workspace and validate accepted links against the resolved project tenant. Share-link access now keeps project ownership explicit on both creation and acceptance paths. --- .../project/experience/share_links.go | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/backend/internal/services/project/experience/share_links.go b/backend/internal/services/project/experience/share_links.go index 843b78f9..52d977e7 100644 --- a/backend/internal/services/project/experience/share_links.go +++ b/backend/internal/services/project/experience/share_links.go @@ -22,11 +22,12 @@ import ( const projectShareTokenBytes = 32 func (s *Service) ListProjectShareLinks(projectID uuid.UUID, userID uuid.UUID) (*dto.ProjectShareLinksResponse, error) { - if _, err := s.requireProjectOwner(projectID, userID); err != nil { + project, err := s.requireProjectOwner(projectID, userID) + if err != nil { return nil, err } var links []models.ProjectShareLink - if err := s.db.Where("project_id = ?", projectID).Order("created_at desc").Find(&links).Error; err != nil { + if err := s.db.Where("workspace_id = ? AND project_id = ?", models.ProjectWorkspaceID(*project), projectID).Order("created_at desc").Find(&links).Error; err != nil { return nil, err } items := make([]dto.ProjectShareLink, 0, len(links)) @@ -37,7 +38,8 @@ func (s *Service) ListProjectShareLinks(projectID uuid.UUID, userID uuid.UUID) ( } func (s *Service) CreateProjectShareLink(projectID uuid.UUID, userID uuid.UUID, req dto.CreateProjectShareLinkRequest, baseURL string) (*dto.ProjectShareLinkWithToken, error) { - if _, err := s.requireProjectOwner(projectID, userID); err != nil { + project, err := s.requireProjectOwner(projectID, userID) + if err != nil { return nil, err } role, err := normalizeProjectCollaboratorRole(req.Role) @@ -49,12 +51,13 @@ func (s *Service) CreateProjectShareLink(projectID uuid.UUID, userID uuid.UUID, return nil, err } link := models.ProjectShareLink{ - ProjectID: projectID, - CreatedBy: userID, - TokenHash: hashProjectShareToken(token), - Role: role, - Status: models.ProjectShareLinkStatusActive, - ExpiresAt: req.ExpiresAt, + WorkspaceID: models.ProjectWorkspaceID(*project), + ProjectID: projectID, + CreatedBy: userID, + TokenHash: hashProjectShareToken(token), + Role: role, + Status: models.ProjectShareLinkStatusActive, + ExpiresAt: req.ExpiresAt, } if err := s.db.Transaction(func(tx *gorm.DB) error { if err := tx.Create(&link).Error; err != nil { @@ -97,6 +100,9 @@ func (s *Service) AcceptProjectShareLink(token string, userID uuid.UUID) (*dto.A if err := tx.Select("id", "user_id", "workspace_id").First(&project, "id = ?", link.ProjectID).Error; err != nil { return err } + if models.ProjectWorkspaceID(project) != link.WorkspaceID { + return gorm.ErrRecordNotFound + } projectID = project.ID if project.UserID == userID { @@ -157,16 +163,18 @@ func (s *Service) AcceptProjectShareLink(token string, userID uuid.UUID) (*dto.A } func (s *Service) RevokeProjectShareLink(projectID uuid.UUID, userID uuid.UUID, linkID uuid.UUID) error { - if _, err := s.requireProjectOwner(projectID, userID); err != nil { + project, err := s.requireProjectOwner(projectID, userID) + if err != nil { return err } if linkID == uuid.Nil { return ErrInvalidProjectShareLink } + workspaceID := models.ProjectWorkspaceID(*project) now := time.Now().UTC() return s.db.Transaction(func(tx *gorm.DB) error { result := tx.Model(&models.ProjectShareLink{}). - Where("id = ? AND project_id = ? AND status = ?", linkID, projectID, models.ProjectShareLinkStatusActive). + Where("id = ? AND workspace_id = ? AND project_id = ? AND status = ?", linkID, workspaceID, projectID, models.ProjectShareLinkStatusActive). Updates(map[string]any{ "status": models.ProjectShareLinkStatusRevoked, "revoked_at": &now, @@ -189,7 +197,7 @@ func (s *Service) requireProjectOwner(projectID uuid.UUID, actorUserID uuid.UUID } var project models.Project - if err := s.db.Select("id", "user_id").First(&project, "id = ?", projectID).Error; err != nil { + if err := s.db.Select("id", "user_id", "workspace_id").First(&project, "id = ?", projectID).Error; err != nil { return nil, err } if project.UserID != actorUserID { From 4d8a22d7a3f87c8e2bcdc43060388bbe51ca8763 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:53:09 +0800 Subject: [PATCH 07/14] feat(publish): carry workspace through publish facts Publish facts and read-model joins need the same tenant key as their owning project. Derive workspace_id when recording publish events and project activity, and include workspace scope in publication counts. Publish-derived rows now remain colocatable with project workspace data. --- backend/internal/services/publish/service.go | 11 +++++++++++ backend/internal/services/readmodel/service.go | 1 + 2 files changed, 12 insertions(+) diff --git a/backend/internal/services/publish/service.go b/backend/internal/services/publish/service.go index 156c5bc2..4058b72d 100644 --- a/backend/internal/services/publish/service.go +++ b/backend/internal/services/publish/service.go @@ -466,6 +466,12 @@ func (s *Service) recordPublishEvent(event models.PublishEvent) error { if event.ProjectID == uuid.Nil || event.UserID == uuid.Nil || event.Platform == "" || event.JobID == uuid.Nil { return nil } + if event.WorkspaceID == uuid.Nil { + var project models.Project + if err := s.writerDB(s.requestContext()).Select("id", "user_id", "workspace_id").First(&project, "id = ?", event.ProjectID).Error; err == nil { + event.WorkspaceID = models.ProjectWorkspaceID(project) + } + } if event.Metadata == nil { event.Metadata = datatypes.JSON(`{}`) } @@ -479,6 +485,10 @@ func (s *Service) recordProjectPublishActivity(projectID uuid.UUID, userID uuid. if projectID == uuid.Nil || userID == uuid.Nil || strings.TrimSpace(eventType) == "" { return nil } + var project models.Project + if err := s.writerDB(s.requestContext()).Select("id", "user_id", "workspace_id").First(&project, "id = ?", projectID).Error; err != nil { + return err + } payload := datatypes.JSON([]byte(`{}`)) if metadata != nil { encoded, err := json.Marshal(metadata) @@ -488,6 +498,7 @@ func (s *Service) recordProjectPublishActivity(projectID uuid.UUID, userID uuid. payload = datatypes.JSON(encoded) } return s.writerDB(s.requestContext()).Create(&models.ProjectActivity{ + WorkspaceID: models.ProjectWorkspaceID(project), ProjectID: projectID, ActorUserID: userID, EventType: eventType, diff --git a/backend/internal/services/readmodel/service.go b/backend/internal/services/readmodel/service.go index f10241e5..ac5be231 100644 --- a/backend/internal/services/readmodel/service.go +++ b/backend/internal/services/readmodel/service.go @@ -194,6 +194,7 @@ func (s *Service) countWorkspacePublications(workspaceID uuid.UUID, status strin return s.db.Model(&models.ProjectPlatformPublication{}). Joins("JOIN projects ON projects.id = project_platform_publications.project_id"). Where(where, args...). + Where("project_platform_publications.workspace_id = ?", workspaceID). Where("project_platform_publications.status = ?", status). Count(count).Error } From 84f3f990db50f72df2cbe31cd2680351ae043f67 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:53:17 +0800 Subject: [PATCH 08/14] feat(extension): stamp handoff rows by workspace Extension callback tokens and execution events need tenant ownership for future distributed routing. Resolve the project workspace before creating handoff tokens and execution audit events. Extension handoff records now carry workspace_id on new writes. --- backend/internal/services/extension/handoffs.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/backend/internal/services/extension/handoffs.go b/backend/internal/services/extension/handoffs.go index 5bae5162..6da4427e 100644 --- a/backend/internal/services/extension/handoffs.go +++ b/backend/internal/services/extension/handoffs.go @@ -119,7 +119,7 @@ func (s *Service) CreateExtensionHandoff(userID uuid.UUID, req dto.CreateExtensi } var project models.Project - if err := s.strongReadDB().Select("id", "user_id", "title").First(&project, "id = ?", req.ProjectID).Error; err != nil { + if err := s.strongReadDB().Select("id", "user_id", "workspace_id", "title").First(&project, "id = ?", req.ProjectID).Error; err != nil { return nil, err } if project.UserID != userID { @@ -148,6 +148,7 @@ func (s *Service) CreateExtensionHandoff(userID uuid.UUID, req dto.CreateExtensi } callbackToken := uuid.NewString() if err := tx.Create(&models.ExtensionCallbackToken{ + WorkspaceID: models.ProjectWorkspaceID(project), ExecutionID: executionID, ProjectID: project.ID, UserID: userID, @@ -225,6 +226,7 @@ func (s *Service) RecordExtensionEvent(req dto.ExtensionEventCallbackRequest) (* event := models.ExtensionExecutionEvent{ ID: uuid.New(), CallbackTokenID: token.ID, + WorkspaceID: token.WorkspaceID, ExecutionID: token.ExecutionID, ProjectID: token.ProjectID, UserID: token.UserID, From e721b96079cdbf3a88c9aae2577a9972628f112e Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:53:25 +0800 Subject: [PATCH 09/14] feat(collab): persist workspace ids in collab storage Collaboration persistence writes rows outside the normal GORM model path and must provide tenant ownership directly. Stamp standalone documents with personal workspace ids and pass project or document workspace ids through direct SQL state and batch writes. Collab storage now writes tenant routing data consistently across Go and Node paths. --- .../internal/services/collabdoc/service.go | 1 + .../src/persistence/document-persistence.ts | 47 +++++++++++++++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/backend/internal/services/collabdoc/service.go b/backend/internal/services/collabdoc/service.go index c800fd2f..d62797b7 100644 --- a/backend/internal/services/collabdoc/service.go +++ b/backend/internal/services/collabdoc/service.go @@ -121,6 +121,7 @@ func (s *Service) CreateDocument(ctx context.Context, ownerUserID uuid.UUID, tit } document := models.CollabDocument{ + WorkspaceID: models.PersonalWorkspaceID(ownerUserID), OwnerUserID: ownerUserID, Title: title, Status: models.CollabDocumentStatusActive, diff --git a/collab-service/src/persistence/document-persistence.ts b/collab-service/src/persistence/document-persistence.ts index f5c5bc4a..8c55d012 100644 --- a/collab-service/src/persistence/document-persistence.ts +++ b/collab-service/src/persistence/document-persistence.ts @@ -39,12 +39,17 @@ interface DocumentSeqRow extends Record { } interface ProjectDocumentRow extends Record { + workspace_id: string; source_content: string; current_seq: string | number; has_state: boolean; has_updates: boolean; } +interface DocumentWorkspaceRow extends Record { + workspace_id: string; +} + export interface DocumentPersistence { load(documentId: string, document: Document): Promise; initializeProjectDocument(documentId: string): Promise; @@ -124,6 +129,7 @@ export class PostgresDocumentPersistence implements DocumentPersistence { const result = await this.database.query( ` SELECT + projects.workspace_id, projects.source_content, collab_documents.current_seq, EXISTS ( @@ -165,16 +171,24 @@ export class PostgresDocumentPersistence implements DocumentPersistence { ` INSERT INTO collab_document_states ( document_id, + workspace_id, y_doc_state, state_vector, compacted_until_seq, state_size_bytes, updated_at ) - VALUES ($1, $2, $3, $4, $5, NOW()) + VALUES ($1, $2, $3, $4, $5, $6, NOW()) ON CONFLICT (document_id) DO NOTHING `, - [documentId, state, stateVector, currentSeq, state.length], + [ + documentId, + row.workspace_id, + state, + stateVector, + currentSeq, + state.length, + ], ); } finally { document.destroy(); @@ -251,25 +265,28 @@ export class PostgresDocumentPersistence implements DocumentPersistence { await this.database.query("BEGIN"); try { const seq = await this.lockDocumentAndReadSeq(documentId); + const workspaceId = await this.documentWorkspaceId(documentId); await this.database.query( ` INSERT INTO collab_document_states ( document_id, + workspace_id, y_doc_state, state_vector, compacted_until_seq, state_size_bytes, updated_at ) - VALUES ($1, $2, $3, $4, $5, NOW()) + VALUES ($1, $2, $3, $4, $5, $6, NOW()) ON CONFLICT (document_id) DO UPDATE SET + workspace_id = EXCLUDED.workspace_id, y_doc_state = EXCLUDED.y_doc_state, state_vector = EXCLUDED.state_vector, compacted_until_seq = EXCLUDED.compacted_until_seq, state_size_bytes = EXCLUDED.state_size_bytes, updated_at = EXCLUDED.updated_at `, - [documentId, state, stateVector, seq, state.length], + [documentId, workspaceId, state, stateVector, seq, state.length], ); await this.database.query("COMMIT"); await this.pruneCompactedBatches(documentId, seq).catch((error) => { @@ -397,12 +414,14 @@ export class PostgresDocumentPersistence implements DocumentPersistence { await this.database.query("BEGIN"); try { const currentSeq = await this.lockDocumentAndReadSeq(documentId); + const workspaceId = await this.documentWorkspaceId(documentId); const fromSeq = currentSeq + 1; const toSeq = currentSeq + pending.length; await this.database.query( ` INSERT INTO collab_document_update_batches ( document_id, + workspace_id, from_seq, to_seq, update_payload, @@ -411,10 +430,11 @@ export class PostgresDocumentPersistence implements DocumentPersistence { actor_user_id, created_at ) - VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW()) `, [ documentId, + workspaceId, fromSeq, toSeq, payload, @@ -467,6 +487,23 @@ export class PostgresDocumentPersistence implements DocumentPersistence { return Number(row.current_seq); } + private async documentWorkspaceId(documentId: string): Promise { + const result = await this.database.query( + ` + SELECT workspace_id + FROM collab_documents + WHERE id = $1 + `, + [documentId], + ); + + const workspaceId = result.rows[0]?.workspace_id; + if (!workspaceId) { + throw new Error("collaborative document workspace not found"); + } + return workspaceId; + } + private async pruneCompactedBatches( documentId: string, compactedUntilSeq: number, From e5e67d3a34fda860e770a6ba36a817153f1d9f6c Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:53:32 +0800 Subject: [PATCH 10/14] test(database): cover workspace columns and fixtures The new tenant columns need regression coverage in schema tests and existing SQL fixtures. Assert project-domain workspace columns and derivation on new rows, then update dashboard, publish, and helper fixtures for non-null workspace ids. Backend tests now exercise the clean target schema without historical compatibility backfills. --- backend/internal/db/db_test.go | 85 +++++++++++++++++++ backend/internal/handlers/dashboard_test.go | 8 ++ .../internal/services/publish/queue_test.go | 3 + .../internal/services/testsupport/helpers.go | 10 +++ 4 files changed, 106 insertions(+) diff --git a/backend/internal/db/db_test.go b/backend/internal/db/db_test.go index 02820c74..3431e1b9 100644 --- a/backend/internal/db/db_test.go +++ b/backend/internal/db/db_test.go @@ -121,6 +121,91 @@ func TestSyncSchemaAddsWorkspaceTeamModel(t *testing.T) { require.Equal(t, workspace.Name, loadedProject.Workspace.Name) } +func TestProjectDomainTablesCarryWorkspaceID(t *testing.T) { + database, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, syncSchema(database)) + + for _, tc := range []struct { + model any + column string + }{ + {&models.CollabDocument{}, "workspace_id"}, + {&models.CollabDocumentState{}, "workspace_id"}, + {&models.CollabDocumentUpdateBatch{}, "workspace_id"}, + {&models.ProjectPlatformPublication{}, "workspace_id"}, + {&models.ProjectActivity{}, "workspace_id"}, + {&models.ProjectComment{}, "workspace_id"}, + {&models.ProjectVersion{}, "workspace_id"}, + {&models.ProjectShareLink{}, "workspace_id"}, + {&models.PublishEvent{}, "workspace_id"}, + {&models.ScheduledPublication{}, "workspace_id"}, + {&models.MediaAsset{}, "workspace_id"}, + {&models.MediaAssetUsage{}, "workspace_id"}, + {&models.ExtensionCallbackToken{}, "workspace_id"}, + {&models.ExtensionExecutionEvent{}, "workspace_id"}, + {&models.AIContextSnapshot{}, "workspace_id"}, + {&models.AIGrowthOptimizationRun{}, "workspace_id"}, + {&models.AIProposal{}, "workspace_id"}, + {&models.AIDraftingSession{}, "workspace_id"}, + } { + require.True(t, database.Migrator().HasColumn(tc.model, tc.column), "%T", tc.model) + } +} + +func TestProjectDomainRowsDeriveWorkspaceID(t *testing.T) { + database, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, syncSchema(database)) + + owner := models.User{Username: "tenant-owner", Email: "tenant-owner@example.com"} + require.NoError(t, database.Create(&owner).Error) + workspace := models.Workspace{ + OwnerUserID: owner.ID, + Name: "Tenant workspace", + Slug: "tenant-workspace", + } + require.NoError(t, database.Create(&workspace).Error) + project := models.Project{ + UserID: owner.ID, + WorkspaceID: &workspace.ID, + Title: "Tenant project", + SourceContent: "content", + Status: models.ProjectStatusReady, + } + require.NoError(t, database.Create(&project).Error) + document := models.CollabDocument{ + OwnerUserID: owner.ID, + Title: "Tenant document", + } + require.NoError(t, database.Create(&document).Error) + + publication := models.ProjectPlatformPublication{ + ProjectID: project.ID, + Platform: "wechat", + Status: models.PublicationStatusDraft, + } + require.NoError(t, database.Create(&publication).Error) + require.Equal(t, workspace.ID, publication.WorkspaceID) + + activity := models.ProjectActivity{ + ProjectID: project.ID, + ActorUserID: owner.ID, + EventType: models.ProjectActivityContentSaved, + Metadata: []byte(`{}`), + } + require.NoError(t, database.Create(&activity).Error) + require.Equal(t, workspace.ID, activity.WorkspaceID) + + state := models.CollabDocumentState{ + DocumentID: document.ID, + YDocState: []byte("state"), + StateSizeBytes: 5, + } + require.NoError(t, database.Create(&state).Error) + require.Equal(t, models.PersonalWorkspaceID(owner.ID), state.WorkspaceID) +} + func TestSyncSchemaAddsArchiveScanIndexes(t *testing.T) { database, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) require.NoError(t, err) diff --git a/backend/internal/handlers/dashboard_test.go b/backend/internal/handlers/dashboard_test.go index b06cf992..7c7258fc 100644 --- a/backend/internal/handlers/dashboard_test.go +++ b/backend/internal/handlers/dashboard_test.go @@ -125,6 +125,7 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { require.NoError(t, db.Exec(`CREATE TABLE collab_documents ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, owner_user_id TEXT NOT NULL, title TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'active', @@ -148,6 +149,7 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { require.NoError(t, db.Exec(`CREATE TABLE project_activities ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, actor_user_id TEXT NOT NULL, target_user_id TEXT, @@ -158,6 +160,7 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { require.NoError(t, db.Exec(`CREATE TABLE project_comments ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, author_id TEXT NOT NULL, body TEXT NOT NULL, @@ -170,6 +173,7 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { require.NoError(t, db.Exec(`CREATE TABLE project_versions ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, created_by TEXT NOT NULL, version_number INTEGER NOT NULL, @@ -183,6 +187,7 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { require.NoError(t, db.Exec(`CREATE TABLE project_share_links ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, created_by TEXT NOT NULL, token_hash TEXT NOT NULL UNIQUE, @@ -235,6 +240,7 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { require.NoError(t, db.Exec(`CREATE TABLE project_platform_publications ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, platform TEXT NOT NULL, platform_account_id TEXT, @@ -286,6 +292,7 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { require.NoError(t, db.Exec(`CREATE TABLE extension_callback_tokens ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, execution_id TEXT NOT NULL, project_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -299,6 +306,7 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { require.NoError(t, db.Exec(`CREATE TABLE extension_execution_events ( id TEXT NOT NULL, callback_token_id TEXT NOT NULL, + workspace_id TEXT NOT NULL, execution_id TEXT NOT NULL, project_id TEXT NOT NULL, user_id TEXT NOT NULL, diff --git a/backend/internal/services/publish/queue_test.go b/backend/internal/services/publish/queue_test.go index 5212d5cb..92974be6 100644 --- a/backend/internal/services/publish/queue_test.go +++ b/backend/internal/services/publish/queue_test.go @@ -229,6 +229,7 @@ func setupPublishQueueTestDB(t *testing.T) *gorm.DB { )`).Error) require.NoError(t, db.Exec(`CREATE TABLE project_activities ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, actor_user_id TEXT NOT NULL, target_user_id TEXT, @@ -276,6 +277,7 @@ func setupPublishQueueTestDB(t *testing.T) *gorm.DB { )`).Error) require.NoError(t, db.Exec(`CREATE TABLE project_platform_publications ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, platform TEXT NOT NULL, platform_account_id TEXT, @@ -311,6 +313,7 @@ func setupPublishQueueTestDB(t *testing.T) *gorm.DB { require.NoError(t, db.Exec(`CREATE TABLE publish_events ( id TEXT PRIMARY KEY, publication_id TEXT NOT NULL, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, user_id TEXT NOT NULL, platform TEXT NOT NULL, diff --git a/backend/internal/services/testsupport/helpers.go b/backend/internal/services/testsupport/helpers.go index df115134..96808c4b 100644 --- a/backend/internal/services/testsupport/helpers.go +++ b/backend/internal/services/testsupport/helpers.go @@ -252,6 +252,7 @@ func SetupTestDB() *gorm.DB { db.Exec(`CREATE TABLE collab_documents ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, owner_user_id TEXT NOT NULL, title TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'active', @@ -266,6 +267,7 @@ func SetupTestDB() *gorm.DB { db.Exec(`CREATE TABLE collab_document_states ( document_id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, y_doc_state BLOB NOT NULL, state_vector BLOB, compacted_until_seq INTEGER NOT NULL DEFAULT 0, @@ -276,6 +278,7 @@ func SetupTestDB() *gorm.DB { db.Exec(`CREATE TABLE collab_document_update_batches ( id INTEGER PRIMARY KEY AUTOINCREMENT, document_id TEXT NOT NULL, + workspace_id TEXT NOT NULL, from_seq INTEGER NOT NULL, to_seq INTEGER NOT NULL, update_payload BLOB NOT NULL, @@ -296,6 +299,7 @@ func SetupTestDB() *gorm.DB { db.Exec(`CREATE TABLE project_activities ( id TEXT NOT NULL, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, actor_user_id TEXT NOT NULL, target_user_id TEXT, @@ -307,6 +311,7 @@ func SetupTestDB() *gorm.DB { db.Exec(`CREATE TABLE project_comments ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, author_id TEXT NOT NULL, body TEXT NOT NULL, @@ -319,6 +324,7 @@ func SetupTestDB() *gorm.DB { db.Exec(`CREATE TABLE project_versions ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, created_by TEXT NOT NULL, version_number INTEGER NOT NULL, @@ -332,6 +338,7 @@ func SetupTestDB() *gorm.DB { db.Exec(`CREATE TABLE project_share_links ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, created_by TEXT NOT NULL, token_hash TEXT NOT NULL UNIQUE, @@ -384,6 +391,7 @@ func SetupTestDB() *gorm.DB { db.Exec(`CREATE TABLE project_platform_publications ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, platform TEXT NOT NULL, platform_account_id TEXT, @@ -454,6 +462,7 @@ func SetupTestDB() *gorm.DB { db.Exec(`CREATE TABLE extension_callback_tokens ( id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, execution_id TEXT NOT NULL, project_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -467,6 +476,7 @@ func SetupTestDB() *gorm.DB { db.Exec(`CREATE TABLE extension_execution_events ( id TEXT NOT NULL, callback_token_id TEXT NOT NULL, + workspace_id TEXT NOT NULL, execution_id TEXT NOT NULL, project_id TEXT NOT NULL, user_id TEXT NOT NULL, From 3aad8d189873ef81868558a45814c03fdb3321c8 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:53:38 +0800 Subject: [PATCH 11/14] test(archive): include workspace ids in event fixtures Archive tests insert partitioned event rows that now require workspace ownership. Add workspace_id values to archive partition and worker event fixtures. Archive coverage remains aligned with the tenant-aware event schema. --- backend/internal/services/archive/partitions_test.go | 2 ++ backend/internal/services/archive/worker_test.go | 1 + 2 files changed, 3 insertions(+) diff --git a/backend/internal/services/archive/partitions_test.go b/backend/internal/services/archive/partitions_test.go index d1206cf6..a8fb87f5 100644 --- a/backend/internal/services/archive/partitions_test.go +++ b/backend/internal/services/archive/partitions_test.go @@ -78,6 +78,7 @@ func TestEncodePartitionJSONLinesPreservesStructFieldNames(t *testing.T) { if err := db.Exec(`CREATE TABLE publish_events_2026_01 ( id TEXT PRIMARY KEY, publication_id TEXT NOT NULL, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, user_id TEXT NOT NULL, platform TEXT NOT NULL, @@ -100,6 +101,7 @@ func TestEncodePartitionJSONLinesPreservesStructFieldNames(t *testing.T) { event := models.PublishEvent{ ID: uuid.New(), PublicationID: uuid.New(), + WorkspaceID: uuid.New(), ProjectID: uuid.New(), UserID: uuid.New(), Platform: "wechat", diff --git a/backend/internal/services/archive/worker_test.go b/backend/internal/services/archive/worker_test.go index 5954a92a..666d8aa2 100644 --- a/backend/internal/services/archive/worker_test.go +++ b/backend/internal/services/archive/worker_test.go @@ -431,6 +431,7 @@ func setupArchiveTestDB(t *testing.T) *gorm.DB { if err := db.Exec(`CREATE TABLE publish_events ( id TEXT PRIMARY KEY, publication_id TEXT NOT NULL, + workspace_id TEXT NOT NULL, project_id TEXT NOT NULL, user_id TEXT NOT NULL, platform TEXT NOT NULL, From 3285ddd1c45d333d9e024ed7744c6a1f110137b2 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:53:46 +0800 Subject: [PATCH 12/14] test(collab): cover workspace persistence SQL Direct collab-service SQL writes now provide workspace ids and need matching query expectations. Update persistence tests for project workspace lookup, state writes, and update batch workspace parameters. The Node persistence path is covered for tenant-aware collaboration storage. --- .../persistence/document-persistence.test.ts | 83 ++++++++++++++----- 1 file changed, 64 insertions(+), 19 deletions(-) diff --git a/collab-service/src/persistence/document-persistence.test.ts b/collab-service/src/persistence/document-persistence.test.ts index 45908ed0..f6f93db8 100644 --- a/collab-service/src/persistence/document-persistence.test.ts +++ b/collab-service/src/persistence/document-persistence.test.ts @@ -158,6 +158,7 @@ describe("PostgresDocumentPersistence", () => { database.results = [ [ { + workspace_id: "99999999-9999-4999-8999-999999999999", source_content: "

Project heading

Hello team

", current_seq: 0, @@ -180,10 +181,13 @@ describe("PostgresDocumentPersistence", () => { const insertCall = database.calls[1]; expect(insertCall?.text).toContain("INSERT INTO collab_document_states"); expect(insertCall?.text).toContain("ON CONFLICT (document_id) DO NOTHING"); - expect(insertCall?.values?.[3]).toBe(0); + expect(insertCall?.values?.[1]).toBe( + "99999999-9999-4999-8999-999999999999", + ); + expect(insertCall?.values?.[4]).toBe(0); const restored = new Document("restored"); - applyUpdate(restored, new Uint8Array(insertCall?.values?.[1] as Buffer)); + applyUpdate(restored, new Uint8Array(insertCall?.values?.[2] as Buffer)); expect(projectYDocToProseMirrorJSON(restored)).toMatchObject({ type: "doc", content: [ @@ -207,6 +211,7 @@ describe("PostgresDocumentPersistence", () => { database.results = [ [ { + workspace_id: "99999999-9999-4999-8999-999999999999", source_content: "

Stale project content

", current_seq: 4, has_state: true, @@ -303,6 +308,11 @@ describe("PostgresDocumentPersistence", () => { current_seq: 7, }, ], + [ + { + workspace_id: "99999999-9999-4999-8999-999999999999", + }, + ], ]; const persistence = new PostgresDocumentPersistence( database, @@ -334,23 +344,27 @@ describe("PostgresDocumentPersistence", () => { expect(database.calls.map((call) => call.text.trim())).toEqual([ "BEGIN", expect.stringContaining("SELECT current_seq"), + expect.stringContaining("SELECT workspace_id"), expect.stringContaining("INSERT INTO collab_document_update_batches"), expect.stringContaining("UPDATE collab_documents"), "COMMIT", ]); - const insertCall = database.calls[2]; + const insertCall = database.calls[3]; expect(insertCall?.values?.[0]).toBe( "11111111-1111-4111-8111-111111111111", ); - expect(insertCall?.values?.[1]).toBe(8); - expect(insertCall?.values?.[2]).toBe(9); - expect(insertCall?.values?.[4]).toBe(2); - expect(insertCall?.values?.[6]).toBe( + expect(insertCall?.values?.[1]).toBe( + "99999999-9999-4999-8999-999999999999", + ); + expect(insertCall?.values?.[2]).toBe(8); + expect(insertCall?.values?.[3]).toBe(9); + expect(insertCall?.values?.[5]).toBe(2); + expect(insertCall?.values?.[7]).toBe( "33333333-3333-4333-8333-333333333333", ); const restored = new Document("restored"); - applyUpdate(restored, new Uint8Array(insertCall?.values?.[3] as Buffer)); + applyUpdate(restored, new Uint8Array(insertCall?.values?.[4] as Buffer)); expect(restored.getMap("content").get("title")).toBe("First"); expect(restored.getMap("content").get("body")).toBe("Second"); expect(metrics.flushDurations).toHaveLength(1); @@ -365,6 +379,11 @@ describe("PostgresDocumentPersistence", () => { current_seq: 3, }, ], + [ + { + workspace_id: "99999999-9999-4999-8999-999999999999", + }, + ], ]; const persistence = new PostgresDocumentPersistence(database, 10_000, 2); const first = new Document("first"); @@ -386,12 +405,13 @@ describe("PostgresDocumentPersistence", () => { expect(database.calls.map((call) => call.text.trim())).toEqual([ "BEGIN", expect.stringContaining("SELECT current_seq"), + expect.stringContaining("SELECT workspace_id"), expect.stringContaining("INSERT INTO collab_document_update_batches"), expect.stringContaining("UPDATE collab_documents"), "COMMIT", ]); - expect(database.calls[2]?.values?.[1]).toBe(4); - expect(database.calls[2]?.values?.[2]).toBe(5); + expect(database.calls[3]?.values?.[2]).toBe(4); + expect(database.calls[3]?.values?.[3]).toBe(5); }); it("retries batch-limit flush failures without rejecting appendUpdate", async () => { @@ -402,11 +422,21 @@ describe("PostgresDocumentPersistence", () => { current_seq: 3, }, ], + [ + { + workspace_id: "99999999-9999-4999-8999-999999999999", + }, + ], [ { current_seq: 3, }, ], + [ + { + workspace_id: "99999999-9999-4999-8999-999999999999", + }, + ], ]; database.failures = [ { @@ -452,8 +482,11 @@ describe("PostgresDocumentPersistence", () => { const database = new FakeDatabase(); database.results = [ [{ current_seq: 3 }], + [{ workspace_id: "99999999-9999-4999-8999-999999999999" }], [{ current_seq: 3 }], + [{ workspace_id: "99999999-9999-4999-8999-999999999999" }], [{ current_seq: 3 }], + [{ workspace_id: "99999999-9999-4999-8999-999999999999" }], ]; database.failures = [ { @@ -498,6 +531,11 @@ describe("PostgresDocumentPersistence", () => { current_seq: 12, }, ], + [ + { + workspace_id: "99999999-9999-4999-8999-999999999999", + }, + ], ]; const persistence = new PostgresDocumentPersistence(database); const document = new Document("document"); @@ -506,22 +544,23 @@ describe("PostgresDocumentPersistence", () => { await persistence.store("11111111-1111-4111-8111-111111111111", document); expect(database.calls[0]?.text).toBe("BEGIN"); - const call = database.calls[2]; + const call = database.calls[3]; expect(call?.text).toContain("y_doc_state"); expect(call?.text).not.toContain("ydoc_state"); expect(call?.text).toContain("ON CONFLICT (document_id) DO UPDATE"); expect(call?.values?.[0]).toBe("11111111-1111-4111-8111-111111111111"); - expect(call?.values?.[1]).toBeInstanceOf(Buffer); + expect(call?.values?.[1]).toBe("99999999-9999-4999-8999-999999999999"); expect(call?.values?.[2]).toBeInstanceOf(Buffer); - const state = call?.values?.[1] as Buffer; - expect(call?.values?.[3]).toBe(12); - expect(call?.values?.[4]).toBe(state.length); - expect(call?.values?.[2]).toEqual(Buffer.from(encodeStateVector(document))); - expect(database.calls[3]?.text).toBe("COMMIT"); - expect(database.calls[4]?.text).toContain( + expect(call?.values?.[3]).toBeInstanceOf(Buffer); + const state = call?.values?.[2] as Buffer; + expect(call?.values?.[4]).toBe(12); + expect(call?.values?.[5]).toBe(state.length); + expect(call?.values?.[3]).toEqual(Buffer.from(encodeStateVector(document))); + expect(database.calls[4]?.text).toBe("COMMIT"); + expect(database.calls[5]?.text).toContain( "DELETE FROM collab_document_update_batches", ); - expect(database.calls[4]?.values).toEqual([ + expect(database.calls[5]?.values).toEqual([ "11111111-1111-4111-8111-111111111111", 12, 30, @@ -536,6 +575,11 @@ describe("PostgresDocumentPersistence", () => { current_seq: 12, }, ], + [ + { + workspace_id: "99999999-9999-4999-8999-999999999999", + }, + ], ]; database.failures = [ { @@ -563,6 +607,7 @@ describe("PostgresDocumentPersistence", () => { expect(database.calls.map((call) => call.text.trim())).toEqual([ "BEGIN", expect.stringContaining("SELECT current_seq"), + expect.stringContaining("SELECT workspace_id"), expect.stringContaining("INSERT INTO collab_document_states"), "COMMIT", expect.stringContaining("DELETE FROM collab_document_update_batches"), From a4e3af8e0afed32d0e72686e82c992d8afdb54d5 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 21:53:57 +0800 Subject: [PATCH 13/14] docs(database): update Citus preparation progress The Citus preparation checklist now reflects workspace_id coverage for project-domain data. Mark the completed Phase 5 item and list model, partition, service, and test verification points. The plan now leaves colocation, constraints, and worker payload routing as the remaining work. --- doc/plan/database-optimization.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/doc/plan/database-optimization.md b/doc/plan/database-optimization.md index d0b948c7..d44bd7d6 100644 --- a/doc/plan/database-optimization.md +++ b/doc/plan/database-optimization.md @@ -11,7 +11,7 @@ Status definitions: - `Not started`: no clear implementation has been found yet. - `Deferred`: not recommended for the current business stage; only trigger conditions are retained. -Current overall progress: about `71%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. +Current overall progress: about `77%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. | Phase | Weight | Current completion | Status | Completed | Not done / next steps | | ----- | ------ | ------------------ | ------ | --------- | --------------------- | @@ -20,7 +20,7 @@ Current overall progress: about `71%`. This number is manually estimated by phas | Phase 2: Read models and cache first | 15% | 100% | Done | Redis and Asynq dependencies are reusable; admin dashboard stats, admin project list, and dashboard account summary have short-TTL Redis cache; stats/project list/account cache misses are merged with singleflight; project, prepublish, publish, and account write paths invalidate the related dashboard cache; `workspace_dashboard_stats` and `project_list_summaries` read models are in place, and APIs prefer read models when coverage is complete; async refresh triggers after project save, platform sync, publish completion, and member changes; admin rebuild API, Asynq queue, and worker support full read-model rebuild | None | | Phase 3: Read/write splitting | 15% | 100% | Done | Optional `DB_READER_*` connection, application-level DB Router, signed sticky writer, consistency routing for project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension, consistency-level inventories for dashboard/publish/collab-service, self-hosted PostgreSQL read replica, managed `postgres-reader` entry point, PgBouncer reader pool, replica lag monitoring and automatic fallback to writer when over threshold | None; Phase 4 continues partitioning, archiving, and recovery flows | | Phase 4: Single-database partitioning, archiving, and hot/cold tiering | 15% | 100% | Done | Collaborative editing already has state + update batch + compaction foundation; `collab_document_update_batches` has PostgreSQL `document_id` hash partition target schema; event and terminal-session history already have row-level R2/S3 archive worker; `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and `remote_browser_sessions` have PostgreSQL monthly partition target schema; the archive worker exports whole cold monthly partitions to R2/S3 before detaching and dropping them; archive recovery procedure is documented | None | -| Phase 5: Citus preparation | 20% | 5% | Not started | Workspace model, `projects.workspace_id`, and personal workspace ID already exist | Global `workspace_id`, Citus distribution column/colocation design, unique constraint and foreign-key review | +| Phase 5: Citus preparation | 20% | 35% | In progress | Workspace model, `projects.workspace_id`, personal workspace ID, and explicit or derived `workspace_id` coverage for project-domain tables | Citus distribution column/colocation design, unique constraint and foreign-key review, worker payload routing | | Phase 6: Citus distributed PostgreSQL operation | 10% | 0% | Deferred | None | Future Citus cluster design, worker/coordinator monitoring and backup, large-tenant isolation strategy | ### 0.1 Progress Update Rules @@ -60,14 +60,14 @@ Atomic commit guidance: | Application connection pools | Done | backend/publish-worker support `DB_MAX_OPEN_CONNS`, `DB_MAX_IDLE_CONNS`, `DB_CONN_MAX_LIFETIME`, and `DB_CONN_MAX_IDLE_TIME`; collab-service node-postgres pool supports `DB_MAX_OPEN_CONNS`, `DB_CONN_MAX_LIFETIME`, and `DB_CONN_MAX_IDLE_TIME`; Redis client supports `REDIS_POOL_SIZE`, `REDIS_MIN_IDLE_CONNS`, `REDIS_MAX_IDLE_CONNS`, `REDIS_CONN_MAX_IDLE_TIME`, and `REDIS_CONN_MAX_LIFETIME`; Docker Compose and self-hosted Kubernetes reuse PostgreSQL connections through PgBouncer writer pool; GORM PostgreSQL driver uses simple protocol to remain compatible with transaction pooling; self-hosted Kubernetes added PgBouncer reader pool and points app `DB_READER_HOST` to the reader pool | None | `backend/internal/db/db.go`, `backend/internal/redisclient/redisclient.go`, `collab-service/src/config.ts`, `collab-service/src/persistence/document-persistence.ts`, `collab-service/src/persistence/document-persistence.test.ts`, `deploy/docker/docker-compose.yml`, `deploy/kubernetes/data-services/self-hosted/pgbouncer.yaml`, `deploy/kubernetes/app-baseline/app-config.yaml` | | Query observability | Done | GORM QueryObserver, slow-query logs, `mpp_db_queries_total`, `mpp_db_query_duration_seconds`, `mpp_db_slow_queries_total`, self-hosted PostgreSQL `pg_stat_statements`, PostgreSQL exporter table-level health metrics, Grafana table row count / 24h growth / table size / index size / dead tuples / vacuum panels | Threshold alerts can continue to be added by production baseline in Phase 1/4 | `backend/internal/db/query_observer.go`, `backend/internal/observability/observability.go`, `script/db/audit_database_baseline.sql`, `deploy/docker/observability/postgres-exporter/queries.yml`, `deploy/docker/observability/grafana/dashboards/mpp-observability-baseline.json` | | Dashboard query audit | Done | Existing audit script covers dashboard Count, list, platform filter, publication preload, account query, and active session query plans | Not yet turned into a periodic CI/ops gate | `script/db/audit_dashboard_query_plans.sql` | -| Tenant boundary | In progress | Existing `workspaces`, `workspace_members`, `projects.workspace_id`, and personal workspace rules | Publish events, collaborative state, media metadata, and other tables do not all explicitly carry `workspace_id` yet | `backend/internal/models/models.go` | +| Tenant boundary | In progress | Existing `workspaces`, `workspace_members`, `projects.workspace_id`, and personal workspace rules; project-domain publishing, activity, comment, version, share-link, extension, and collaboration rows now carry `workspace_id` directly, with model hooks or service write paths deriving it from the owning project or document for new writes | Citus colocation groups, distributed-table constraints, and worker payload ownership are still open | `backend/internal/models/models.go`, `backend/internal/models/collab.go`, `backend/internal/db/monthly_partitions.go`, `backend/internal/db/hash_partitions.go`, `collab-service/src/persistence/document-persistence.ts`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.test.ts` | | Dashboard read models | Done | Added `workspace_dashboard_stats` and `project_list_summaries` read models, idempotently recomputed from fact tables by a centralized readmodel service; async refresh is triggered after project save, platform sync, publish completion, and member changes; admin stats and admin project list prefer read models when coverage is complete; admin rebuild API enqueues through Asynq, and API/worker processes can start readmodel workers for full rebuild from fact tables | None | `backend/internal/models/models.go`, `backend/internal/services/readmodel/service.go`, `backend/internal/services/readmodel/queue.go`, `backend/internal/services/readmodel/service_test.go`, `backend/internal/services/readmodel/queue_test.go`, `backend/internal/services/stats/overview.go`, `backend/internal/services/project/lifecycle.go`, `backend/internal/handlers/dashboard.go`, `backend/cmd/api/main.go`, `backend/cmd/publish-worker/main.go` | | Redis read cache | Done | Redis is already used for queues, locks, OAuth, browser sessions, and short-term coordination; admin dashboard stats, admin project list, and dashboard account summary use 15s TTL cache and bypass scoped/sticky-writer strong-consistency paths; stats/project list/account cache misses use singleflight to prevent process-local stampede; stats and account caches use versioned payloads and semantic validation, and Redis read-error fallback is also merged into one DB computation per key; project create/edit/platform save, prepublish sync/draft update, publish queue/execute/fail, and platform account write paths invalidate the related dashboard cache; full read-model rebuild reuses the Redis/Asynq queue | None | `backend/internal/services/stats/overview.go`, `backend/internal/services/stats/overview_test.go`, `backend/internal/services/project/list_cache.go`, `backend/internal/services/project/list_cache_test.go`, `backend/internal/services/prepublish/drafts.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/publication_flow_test.go`, `backend/internal/services/publish/queue_test.go`, `backend/internal/services/platform_account/account_cache.go`, `backend/internal/services/platform_account/account_cache_test.go`, `backend/internal/services/browser_session/complete.go`, `backend/internal/services/browser_session/service_test.go`, `backend/internal/services/readmodel/queue.go` | | Read/write splitting | Done | Supports optional `DB_READER_*` read-replica connection, `DefaultRouter`, and signed sticky writer; project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension are wired to strong/eventual/writer routing; dashboard, publish, and collab-service consistency-level inventories are complete, with collab-service online path kept writer-only; writer/reader pools are in self-hosted Kubernetes, and managed overlay provides a `postgres-reader` ExternalName entry point; `DB_READER_MAX_REPLICA_LAG` configures the replica lag threshold, eventual/analytics reads automatically fall back to writer when over threshold or lag is unknown, and `mpp_db_replica_lag_seconds` and `mpp_db_replica_healthy` metrics are exposed | None | `backend/internal/db/db.go`, `backend/internal/db/router.go`, `backend/internal/db/replica_lag.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/prepublish/service.go`, `backend/internal/services/mediaasset/service.go`, `backend/internal/services/browser_session/service.go`, `backend/internal/services/extension/service.go`, `backend/internal/app/runtime.go`, `deploy/kubernetes/data-services/self-hosted/postgres.yaml`, `deploy/kubernetes/data-services/self-hosted/pgbouncer.yaml`, `deploy/kubernetes/data-services/managed/services.yaml`, `script/kubernetes/validation/data_services.rb` | | Event-table partitioning and archiving | Done | `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and terminal `remote_browser_sessions` have default retention periods; the `archive` worker can batch-export JSONL to R2/S3 and delete old hot-table rows after successful upload; PostgreSQL schema initialization now creates monthly `created_at` partitions for `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and `remote_browser_sessions`, with partition-compatible `(id, created_at)` primary keys and rolling partition creation; the archive worker exports whole cold monthly partitions as JSONL to R2/S3, then detaches and drops the partition after successful upload; PostgreSQL browser-session active-row fallback uses a scoped advisory transaction lock because partitioned unique constraints must include the partition key; the archive recovery procedure defines inspection, staging restore, optional hot-table reinsertion, and audit checks | None | `backend/internal/db/monthly_partitions.go`, `backend/internal/db/db.go`, `backend/internal/models/models.go`, `backend/internal/db/db_test.go`, `backend/internal/services/browser_session/start.go`, `backend/internal/services/browser_session/cleanup.go`, `backend/internal/services/archive/worker.go`, `backend/internal/services/archive/partitions.go`, `backend/internal/services/archive/worker_test.go`, `backend/internal/services/archive/partitions_test.go`, Phase 4 archive recovery procedure in this document | | Collaboration batch governance | In progress | `collab_document_states`, `collab_document_update_batches`, and compaction/retention foundations exist; PostgreSQL schema initialization creates a 16-way `document_id` hash-partitioned `collab_document_update_batches` target table and migrates existing regular-table rows into it | Cold archiving is not implemented | `backend/internal/db/hash_partitions.go`, `backend/internal/db/db.go`, `backend/internal/models/collab.go`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.ts` | | Outbox/CDC/event stream | In progress | The publishing queue path has a transactional Outbox: `EnqueuePublishProject` writes `outbox_events` in the same transaction and dispatches immediately after commit; publish worker starts an outbox dispatcher and supports retries for failed/stale processing records; Asynq continues to serve as the task-execution queue, and `PublishEvent` continues to serve as publishing audit | Currently covers only `publish.job_requested`; general business-event outbox, Debezium, and Redpanda/Kafka CDC are not implemented | `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/models/models.go` | -| Citus target state | Not started | Confirmed `workspace_id` as the most suitable distribution-column direction | Citus distributed tables, reference tables, and colocation are not implemented | Phase 5/6 in this document | +| Citus target state | In progress | Confirmed `workspace_id` as the most suitable distribution-column direction; project-domain tables now have explicit or stable derived tenant routing coverage | Citus distributed tables, reference tables, colocation, unique-constraint review, and worker payload routing are not implemented | Phase 5 checklist and `backend/internal/db/db_test.go` | ### 0.3 Phase Checklist @@ -127,7 +127,7 @@ Atomic commit guidance: - [x] Confirm Workspace as the long-term tenant boundary. - [x] Confirm `workspace_id` as the preferred Citus distribution column. -- [ ] Complete `workspace_id` or a stable derivation path for project-domain tables. +- [x] Complete `workspace_id` or a stable derivation path for project-domain tables. Verification entry point: `backend/internal/models/models.go`, `backend/internal/models/collab.go`, `backend/internal/db/monthly_partitions.go`, `backend/internal/db/hash_partitions.go`, `collab-service/src/persistence/document-persistence.ts`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.test.ts`. - [ ] Design Citus distributed tables, reference tables, and colocated table groups. - [ ] Review unique constraints, foreign keys, and cross-tenant joins. - [ ] Add `workspace_id` to worker payloads. From dc81af6412bb3a210a71df462f4bcd4c90585a78 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 28 Jun 2026 22:29:07 +0800 Subject: [PATCH 14/14] feat(collab): update workspace_id reference in document queries --- collab-service/src/persistence/document-persistence.test.ts | 1 + collab-service/src/persistence/document-persistence.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/collab-service/src/persistence/document-persistence.test.ts b/collab-service/src/persistence/document-persistence.test.ts index f6f93db8..3e44c813 100644 --- a/collab-service/src/persistence/document-persistence.test.ts +++ b/collab-service/src/persistence/document-persistence.test.ts @@ -175,6 +175,7 @@ describe("PostgresDocumentPersistence", () => { expect(initialized).toBe(true); expect(database.calls[0]?.text).toContain("FROM projects"); + expect(database.calls[0]?.text).toContain("collab_documents.workspace_id"); expect(database.calls[0]?.values).toEqual([ "11111111-1111-4111-8111-111111111111", ]); diff --git a/collab-service/src/persistence/document-persistence.ts b/collab-service/src/persistence/document-persistence.ts index 8c55d012..b7862d22 100644 --- a/collab-service/src/persistence/document-persistence.ts +++ b/collab-service/src/persistence/document-persistence.ts @@ -129,7 +129,7 @@ export class PostgresDocumentPersistence implements DocumentPersistence { const result = await this.database.query( ` SELECT - projects.workspace_id, + collab_documents.workspace_id, projects.source_content, collab_documents.current_seq, EXISTS (