Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 58 additions & 14 deletions backend/internal/services/publish/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func (s *Service) lifecycle() PublicationLifecycle {
}

func (l PublicationLifecycle) MarkQueued(pub *models.ProjectPlatformPublication, queuedAt time.Time) error {
result := l.db.Model(&models.ProjectPlatformPublication{}).
Where("id = ? AND status = ?", pub.ID, pub.Status).
result := l.publicationQuery(pub).
Where("status = ?", pub.Status).
Updates(map[string]any{
"status": models.PublicationStatusQueued,
"error_message": "",
Expand All @@ -58,8 +58,8 @@ func (l PublicationLifecycle) MarkQueued(pub *models.ProjectPlatformPublication,
}

func (l PublicationLifecycle) MarkPublishing(pub *models.ProjectPlatformPublication, startedAt time.Time) error {
result := l.db.Model(&models.ProjectPlatformPublication{}).
Where("id = ? AND status = ?", pub.ID, pub.Status).
result := l.publicationQuery(pub).
Where("status = ?", pub.Status).
Updates(map[string]any{
"status": models.PublicationStatusPublishing,
"error_message": "",
Expand All @@ -77,12 +77,18 @@ func (l PublicationLifecycle) MarkPublishing(pub *models.ProjectPlatformPublicat
}

func (l PublicationLifecycle) MarkFailed(ctx context.Context, projectID uuid.UUID, platform string, message string) error {
return l.MarkFailedForWorkspace(ctx, uuid.Nil, projectID, platform, message)
}

func (l PublicationLifecycle) MarkFailedForWorkspace(ctx context.Context, workspaceID uuid.UUID, projectID uuid.UUID, platform string, message string) error {
db := l.db
if ctx != nil {
db = db.WithContext(ctx)
}
return db.Model(&models.ProjectPlatformPublication{}).
query := db.Model(&models.ProjectPlatformPublication{}).
Where("project_id = ? AND platform = ?", projectID, platform).
Scopes(workspaceScope(workspaceID))
return query.
Updates(map[string]any{
"status": models.PublicationStatusFailed,
"error_message": SanitizeUserFacingErrorMessage(message),
Expand All @@ -103,8 +109,12 @@ func (l PublicationLifecycle) CompletePublication(pub *models.ProjectPlatformPub
} else {
updates["retry_count"] = gorm.Expr("retry_count + ?", 1)
}
if err := l.db.Model(pub).Updates(updates).Error; err != nil {
return err
result := l.publicationQuery(pub).Updates(updates)
if result.Error != nil {
return result.Error
}
if result.RowsAffected == 0 {
return l.publicationStateChangedError(pub.ID)
}
pub.Status = completion.Status
pub.RemoteID = completion.RemoteID
Expand All @@ -113,7 +123,15 @@ func (l PublicationLifecycle) CompletePublication(pub *models.ProjectPlatformPub
return nil
}

func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedAt time.Time) (models.PublishAttempt, bool, error) {
func (l PublicationLifecycle) publicationQuery(pub *models.ProjectPlatformPublication) *gorm.DB {
query := l.db.Model(&models.ProjectPlatformPublication{}).Where("id = ?", pub.ID)
if pub.WorkspaceID != uuid.Nil {
query = query.Where("workspace_id = ?", pub.WorkspaceID)
}
return query
}

func (l PublicationLifecycle) StartPublishAttemptForWorkspace(workspaceID uuid.UUID, scheduleID uuid.UUID, startedAt time.Time) (models.PublishAttempt, bool, error) {
if scheduleID == uuid.Nil {
return models.PublishAttempt{}, false, nil
}
Expand All @@ -122,22 +140,23 @@ func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedA
}
var attempt models.PublishAttempt
if err := l.db.Transaction(func(tx *gorm.DB) error {
result := tx.Model(&models.ScheduledPublication{}).
scheduleQuery := tx.Model(&models.ScheduledPublication{}).
Where("id = ? AND status IN ?", scheduleID, []string{
models.ScheduledPublicationStatusScheduled,
models.ScheduledPublicationStatusFailed,
models.ScheduledPublicationStatusNeedsManualAction,
}).
Updates(map[string]any{
"status": models.ScheduledPublicationStatusRunning,
"last_error": "",
})
Scopes(workspaceScope(workspaceID))
result := scheduleQuery.Updates(map[string]any{
"status": models.ScheduledPublicationStatusRunning,
"last_error": "",
})
if result.Error != nil {
return result.Error
}
if result.RowsAffected == 0 {
var schedule models.ScheduledPublication
if err := tx.Select("status").First(&schedule, "id = ?", scheduleID).Error; err != nil {
if err := tx.Select("status").Scopes(workspaceScope(workspaceID)).First(&schedule, "id = ?", scheduleID).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errScheduledPublicationMissing
}
Expand Down Expand Up @@ -173,7 +192,15 @@ func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedA
return attempt, true, nil
}

func (l PublicationLifecycle) StartPublishAttempt(scheduleID uuid.UUID, startedAt time.Time) (models.PublishAttempt, bool, error) {
return l.StartPublishAttemptForWorkspace(uuid.Nil, scheduleID, startedAt)
}

func (l PublicationLifecycle) FinishPublishAttempt(attempt *models.PublishAttempt, completion publishAttemptCompletion) error {
return l.FinishPublishAttemptForWorkspace(uuid.Nil, attempt, completion)
}

func (l PublicationLifecycle) FinishPublishAttemptForWorkspace(workspaceID uuid.UUID, attempt *models.PublishAttempt, completion publishAttemptCompletion) error {
if attempt == nil {
return nil
}
Expand All @@ -199,6 +226,7 @@ func (l PublicationLifecycle) FinishPublishAttempt(attempt *models.PublishAttemp
}
return tx.Model(&models.ScheduledPublication{}).
Where("id = ?", attempt.ScheduledPublicationID).
Scopes(workspaceScope(workspaceID)).
Updates(map[string]any{
"status": scheduleStatus,
"last_error": completion.ErrorMessage,
Expand All @@ -207,13 +235,18 @@ func (l PublicationLifecycle) FinishPublishAttempt(attempt *models.PublishAttemp
}

func (l PublicationLifecycle) MarkPrepublishSyncing(projectID uuid.UUID, platforms []string) error {
return l.MarkPrepublishSyncingForWorkspace(uuid.Nil, projectID, platforms)
}

func (l PublicationLifecycle) MarkPrepublishSyncingForWorkspace(workspaceID uuid.UUID, projectID uuid.UUID, platforms []string) error {
if len(platforms) == 0 {
return nil
}
return l.db.Transaction(func(tx *gorm.DB) error {
var activeCount int64
if err := tx.Model(&models.ProjectPlatformPublication{}).
Where("project_id = ? AND platform IN ? AND status IN ?", projectID, platforms, activePublicationStatuses()).
Scopes(workspaceScope(workspaceID)).
Count(&activeCount).Error; err != nil {
return err
}
Expand All @@ -223,6 +256,7 @@ func (l PublicationLifecycle) MarkPrepublishSyncing(projectID uuid.UUID, platfor

if err := tx.Model(&models.ProjectPlatformPublication{}).
Where("project_id = ? AND platform IN ? AND status NOT IN ?", projectID, platforms, activePublicationStatuses()).
Scopes(workspaceScope(workspaceID)).
Updates(map[string]any{
"draft_status": models.PublicationDraftStatusSyncing,
"error_message": "",
Expand All @@ -233,6 +267,7 @@ func (l PublicationLifecycle) MarkPrepublishSyncing(projectID uuid.UUID, platfor

if err := tx.Model(&models.ProjectPlatformPublication{}).
Where("project_id = ? AND platform IN ? AND status IN ?", projectID, platforms, activePublicationStatuses()).
Scopes(workspaceScope(workspaceID)).
Count(&activeCount).Error; err != nil {
return err
}
Expand All @@ -251,6 +286,15 @@ func activePublicationStatuses() []string {
}
}

func workspaceScope(workspaceID uuid.UUID) func(*gorm.DB) *gorm.DB {
return func(db *gorm.DB) *gorm.DB {
if workspaceID == uuid.Nil {
return db
}
return db.Where("workspace_id = ?", workspaceID)
}
}

func (l PublicationLifecycle) publicationStateChangedError(publicationID uuid.UUID) error {
var pub models.ProjectPlatformPublication
if err := l.db.Select("status", "last_attempt_at").First(&pub, "id = ?", publicationID).Error; err != nil {
Expand Down
5 changes: 4 additions & 1 deletion backend/internal/services/publish/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ func (s *Service) dispatchClaimedOutboxEvent(ctx context.Context, event models.O
if err := json.Unmarshal(event.Payload, &job); err != nil {
return fmt.Errorf("decode publish outbox payload: %w", err)
}
if job.JobID == uuid.Nil || job.ProjectID == uuid.Nil || job.UserID == uuid.Nil || job.Platform == "" {
if err := s.ensurePublishJobWorkspaceID(ctx, &job); err != nil {
return fmt.Errorf("resolve publish outbox workspace for event %s: %w", event.ID, err)
}
if job.JobID == uuid.Nil || job.ProjectID == uuid.Nil || job.WorkspaceID == uuid.Nil || job.UserID == uuid.Nil || job.Platform == "" {
return fmt.Errorf("invalid publish outbox payload for event %s", event.ID)
}
return s.queue.Enqueue(ctx, job)
Expand Down
Loading
Loading