diff --git a/flypg/launcher.go b/flypg/launcher.go index f136586959..ce93cdf0a0 100644 --- a/flypg/launcher.go +++ b/flypg/launcher.go @@ -265,7 +265,7 @@ func (l *Launcher) LaunchMachinesPostgres(ctx context.Context, config *CreateClu waitTimeout = time.Hour } - err = mach.WaitForStartOrStop(ctx, app.Name, machine, "start", waitTimeout) + err = mach.WaitForStartOrStop(ctx, app.Name, machine, "start", waitTimeout, "") if err != nil { return err } diff --git a/internal/command/deploy/machines_deploymachinesapp.go b/internal/command/deploy/machines_deploymachinesapp.go index 3cd6e42c8f..2cfc911dff 100644 --- a/internal/command/deploy/machines_deploymachinesapp.go +++ b/internal/command/deploy/machines_deploymachinesapp.go @@ -194,12 +194,22 @@ func (md *machineDeployment) waitForMachine(ctx context.Context, e *machineUpdat } if !md.skipHealthChecks { - if err := lm.WaitForState(ctx, fly.MachineStateStarted, md.waitTimeout, machine.WithJustCreated()); err != nil { + waitOpts := []machine.WaitOption{machine.WithJustCreated()} + if e.previousInstanceID != "" { + waitOpts = append(waitOpts, machine.WithVersion(lm.Machine().InstanceID)) + } + if err := lm.WaitForState(ctx, fly.MachineStateStarted, md.waitTimeout, waitOpts...); err != nil { err = suggestChangeWaitTimeout(err, "wait-timeout") return err } + if e.previousInstanceID != "" { + if _, err := machine.VerifyUpdateApplied(ctx, md.app.Name, lm.Machine().ID, lm.Machine().InstanceID, e.previousInstanceID); err != nil { + return err + } + } + if err := md.runTestMachines(ctx, e.leasableMachine.Machine(), sl); err != nil { return err } @@ -492,8 +502,9 @@ func (md *machineDeployment) deployMachinesApp(ctx context.Context) error { } type machineUpdateEntry struct { - leasableMachine machine.LeasableMachine - launchInput *fly.LaunchMachineInput + leasableMachine machine.LeasableMachine + launchInput *fly.LaunchMachineInput + previousInstanceID string } type machineUpdateEntries []*machineUpdateEntry @@ -1049,6 +1060,7 @@ func (md *machineDeployment) updateMachineByReplace(ctx context.Context, e *mach func (md *machineDeployment) updateMachineInPlace(ctx context.Context, e *machineUpdateEntry) error { lm := e.leasableMachine + e.previousInstanceID = lm.Machine().InstanceID return lm.Update(ctx, *e.launchInput) } diff --git a/internal/command/deploy/plan.go b/internal/command/deploy/plan.go index 9e99de1cbe..bd82156fd3 100644 --- a/internal/command/deploy/plan.go +++ b/internal/command/deploy/plan.go @@ -593,12 +593,22 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin if !healthcheckResult.machineChecksPassed || !healthcheckResult.smokeChecksPassed { sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Waiting for machine %s to reach a good state", machine.ID)) - _, err := waitForMachineState(ctx, lm, []string{"stopped", "started", "suspended"}, md.waitTimeout, sl) + var waitOpts []mach.WaitOption + if oldMachine != nil { + waitOpts = append(waitOpts, mach.WithVersion(machine.InstanceID)) + } + _, err := waitForMachineState(ctx, lm, []string{"stopped", "started", "suspended"}, md.waitTimeout, sl, waitOpts...) if err != nil { span.RecordError(err) return err } + + if oldMachine != nil { + if _, err := mach.VerifyUpdateApplied(ctx, md.app.Name, machine.ID, machine.InstanceID, oldMachine.InstanceID); err != nil { + return err + } + } } if !shouldStart { @@ -733,7 +743,7 @@ func (md *machineDeployment) clearMachineLease(ctx context.Context, machID, leas } // returns when the machine is in one of the possible states, or after passing the timeout threshold -func waitForMachineState(ctx context.Context, lm mach.LeasableMachine, possibleStates []string, timeout time.Duration, sl statuslogger.StatusLine) (string, error) { +func waitForMachineState(ctx context.Context, lm mach.LeasableMachine, possibleStates []string, timeout time.Duration, sl statuslogger.StatusLine, waitOpts ...mach.WaitOption) (string, error) { ctx, span := tracing.GetTracer().Start(ctx, "wait_for_machine_state", trace.WithAttributes( attribute.StringSlice("possible_states", possibleStates), )) @@ -750,7 +760,7 @@ func waitForMachineState(ctx context.Context, lm mach.LeasableMachine, possibleS for _, state := range possibleStates { go func() { - err := lm.WaitForState(ctx, state, timeout) + err := lm.WaitForState(ctx, state, timeout, waitOpts...) mutex.Lock() defer func() { numCompleted += 1 diff --git a/internal/command/deploy/strategy_bluegreen.go b/internal/command/deploy/strategy_bluegreen.go index de57f5b7ee..c08acca8a3 100644 --- a/internal/command/deploy/strategy_bluegreen.go +++ b/internal/command/deploy/strategy_bluegreen.go @@ -189,7 +189,7 @@ func (bg *blueGreen) CreateGreenMachines(ctx context.Context) error { lock.Lock() defer lock.Unlock() - bg.greenMachines = append(bg.greenMachines, &machineUpdateEntry{greenMachine, launchInput}) + bg.greenMachines = append(bg.greenMachines, &machineUpdateEntry{leasableMachine: greenMachine, launchInput: launchInput}) fmt.Fprintf(bg.io.ErrOut, " Created machine %s\n", bg.colorize.Bold(greenMachine.FormattedMachineId())) @@ -270,7 +270,7 @@ func (bg *blueGreen) WaitForGreenMachinesToBeStarted(ctx context.Context) error } go func(lm machine.LeasableMachine) { - err := machine.WaitForStartOrStop(ctx, bg.app.Name, lm.Machine(), "start", bg.timeout) + err := machine.WaitForStartOrStop(ctx, bg.app.Name, lm.Machine(), "start", bg.timeout, "") if err != nil { errChan <- err @@ -588,7 +588,7 @@ func (bg *blueGreen) WaitForBlueMachinesToBeStopped(ctx context.Context) error { id := gm.leasableMachine.FormattedMachineId() go func(lm machine.LeasableMachine) { - err := machine.WaitForStartOrStop(ctx, bg.app.Name, lm.Machine(), "stop", bg.timeout) + err := machine.WaitForStartOrStop(ctx, bg.app.Name, lm.Machine(), "stop", bg.timeout, "") if err != nil { errChan <- fmt.Errorf("failed to stop machine %s: %v", lm.FormattedMachineId(), err) } else { diff --git a/internal/command/machine/clone.go b/internal/command/machine/clone.go index 72de357514..7aac3436ed 100644 --- a/internal/command/machine/clone.go +++ b/internal/command/machine/clone.go @@ -286,7 +286,7 @@ func runMachineClone(ctx context.Context) (err error) { fmt.Fprintf(out, " Waiting for Machine %s to start...\n", colorize.Bold(launchedMachine.ID)) // wait for a machine to be started - err = mach.WaitForStartOrStop(ctx, appName, launchedMachine, "start", time.Minute*5) + err = mach.WaitForStartOrStop(ctx, appName, launchedMachine, "start", time.Minute*5, "") if err != nil { return err } diff --git a/internal/command/machine/run.go b/internal/command/machine/run.go index c8716064f3..57d2359561 100644 --- a/internal/command/machine/run.go +++ b/internal/command/machine/run.go @@ -470,7 +470,7 @@ func runMachineRun(ctx context.Context) error { s.Start() // wait for machine to be started - err = mach.WaitForStartOrStop(ctx, app.Name, machine, "start", time.Minute*5) + err = mach.WaitForStartOrStop(ctx, app.Name, machine, "start", time.Minute*5, "") s.Stop() if err != nil { return err diff --git a/internal/command/postgres/barman.go b/internal/command/postgres/barman.go index a7943d4924..d9dac16e5a 100644 --- a/internal/command/postgres/barman.go +++ b/internal/command/postgres/barman.go @@ -263,7 +263,7 @@ func runBarmanCreate(ctx context.Context) error { waitTimeout := time.Minute * 5 - err = mach.WaitForStartOrStop(ctx, appName, machine, "start", waitTimeout) + err = mach.WaitForStartOrStop(ctx, appName, machine, "start", waitTimeout, "") if err != nil { return err } diff --git a/internal/machine/leasable_machine.go b/internal/machine/leasable_machine.go index ca3ec71f49..2d964e1d22 100644 --- a/internal/machine/leasable_machine.go +++ b/internal/machine/leasable_machine.go @@ -25,6 +25,7 @@ import ( type WaitOptions struct { allowInfinite bool justCreated bool + version string } type WaitOption func(*WaitOptions) @@ -41,6 +42,12 @@ func WithJustCreated() WaitOption { } } +func WithVersion(version string) WaitOption { + return func(opts *WaitOptions) { + opts.version = version + } +} + type LeasableMachine interface { Machine() *fly.Machine HasLease() bool @@ -240,8 +247,15 @@ func (lm *leasableMachine) WaitForState(ctx context.Context, desiredState string if lm.showLogs { lm.logStatusWaiting(ctx, desiredState) } + flapsOpts := []flaps.WaitOption{ + flaps.WithWaitStates(desiredState), + flaps.WithWaitTimeout(timeout), + } + if options.version != "" { + flapsOpts = append(flapsOpts, flaps.WithWaitVersion(options.version)) + } for { - err := lm.flapsClient.Wait(waitCtx, lm.appName, lm.Machine().ID, flaps.WithWaitStates(desiredState), flaps.WithWaitTimeout(timeout)) + err := lm.flapsClient.Wait(waitCtx, lm.appName, lm.Machine().ID, flapsOpts...) notFoundResponse := false if err != nil { var flapsErr *flaps.FlapsError diff --git a/internal/machine/restart.go b/internal/machine/restart.go index 4ec46269fc..559a7cca84 100644 --- a/internal/machine/restart.go +++ b/internal/machine/restart.go @@ -24,7 +24,7 @@ func Restart(ctx context.Context, appName string, m *fly.Machine, input *fly.Res return fmt.Errorf("could not stop machine %s: %w", input.ID, err) } - if err := WaitForStartOrStop(ctx, appName, &fly.Machine{ID: input.ID}, "start", time.Minute*5); err != nil { + if err := WaitForStartOrStop(ctx, appName, &fly.Machine{ID: input.ID}, "start", time.Minute*5, ""); err != nil { return err } diff --git a/internal/machine/update.go b/internal/machine/update.go index aabbf55cbc..e89f44e665 100644 --- a/internal/machine/update.go +++ b/internal/machine/update.go @@ -98,17 +98,21 @@ func Update(ctx context.Context, appName string, m *fly.Machine, input *fly.Laun waitTimeout = time.Duration(input.Timeout) * time.Second } - state, err := WaitForState(ctx, appName, updatedMachine, "settled", waitTimeout) - if err != nil { + if err := WaitForStartOrStop(ctx, appName, updatedMachine, "settled", waitTimeout, updatedMachine.InstanceID); err != nil { return fmt.Errorf("error while waiting for machine to update: %w", err) } - if state == "failed" { - return fmt.Errorf("machine %s update failed: machine entered %q state", m.ID, state) + settledMachine, err := VerifyUpdateApplied(ctx, appName, m.ID, updatedMachine.InstanceID, m.InstanceID) + if err != nil { + return err } - if state == "started" && !input.SkipHealthChecks { - if err := watch.MachinesChecks(ctx, appName, []*fly.Machine{updatedMachine}); err != nil { + if settledMachine.State == "failed" { + return fmt.Errorf("machine %s update failed: machine entered %q state", m.ID, settledMachine.State) + } + + if settledMachine.State == "started" && !input.SkipHealthChecks { + if err := watch.MachinesChecks(ctx, appName, []*fly.Machine{settledMachine}); err != nil { return fmt.Errorf("failed to wait for health checks to pass: %w", err) } } @@ -118,6 +122,26 @@ func Update(ctx context.Context, appName string, m *fly.Machine, input *fly.Laun return nil } +// VerifyUpdateApplied fetches the machine and returns it if its InstanceID +// matches targetInstanceID. Otherwise it returns an error distinguishing +// whether the machine remains on previousInstanceID or some unexpected +// version. +func VerifyUpdateApplied(ctx context.Context, appName, machineID, targetInstanceID, previousInstanceID string) (*fly.Machine, error) { + flapsClient := flapsutil.ClientFromContext(ctx) + current, err := flapsClient.Get(ctx, appName, machineID) + if err != nil { + return nil, fmt.Errorf("failed to get machine: %w", err) + } + switch current.InstanceID { + case targetInstanceID: + return current, nil + case previousInstanceID: + return nil, fmt.Errorf("machine %s update failed: machine remains on previous version %s (state: %s)", machineID, current.InstanceID, current.State) + default: + return nil, fmt.Errorf("machine %s update failed: unexpected version %s (state: %s)", machineID, current.InstanceID, current.State) + } +} + type invalidConfigReason string const ( diff --git a/internal/machine/wait.go b/internal/machine/wait.go index 696ee3ee6d..c2cf8df390 100644 --- a/internal/machine/wait.go +++ b/internal/machine/wait.go @@ -15,25 +15,7 @@ import ( "github.com/superfly/flyctl/internal/flyerr" ) -func WaitForState(ctx context.Context, appName string, machine *fly.Machine, desiredState string, timeout time.Duration) (string, error) { - flapsClient := flapsutil.ClientFromContext(ctx) - - if err := WaitForStartOrStop(ctx, appName, machine, desiredState, timeout); err != nil { - return "", err - } - if desiredState == "settled" { - m, err := flapsClient.Get(ctx, appName, machine.ID) - if err != nil { - return "", fmt.Errorf("failed to get machine after waiting for it to settle: %w", err) - } - - return m.State, nil - } - - return desiredState, nil -} - -func WaitForStartOrStop(ctx context.Context, appName string, machine *fly.Machine, action string, timeout time.Duration) error { +func WaitForStartOrStop(ctx context.Context, appName string, machine *fly.Machine, action string, timeout time.Duration, version string) error { flapsClient := flapsutil.ClientFromContext(ctx) waitCtx, cancel := context.WithTimeout(ctx, timeout) @@ -57,8 +39,15 @@ func WaitForStartOrStop(ctx context.Context, appName string, machine *fly.Machin Factor: 2, Jitter: false, } + flapsOpts := []flaps.WaitOption{ + flaps.WithWaitStates(waitOnAction), + flaps.WithWaitTimeout(60 * time.Second), + } + if version != "" { + flapsOpts = append(flapsOpts, flaps.WithWaitVersion(version)) + } for { - err := flapsClient.Wait(waitCtx, appName, machine.ID, flaps.WithWaitStates(waitOnAction), flaps.WithWaitTimeout(60*time.Second)) + err := flapsClient.Wait(waitCtx, appName, machine.ID, flapsOpts...) if err == nil { return nil }