Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flypg/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (l *Launcher) LaunchMachinesPostgres(ctx context.Context, config *CreateClu
waitTimeout = time.Hour
}

err = mach.WaitForStartOrStop(ctx, app.Name, machine, "start", waitTimeout)
err = mach.WaitForStartOrStop(ctx, app.Name, machine, "start", waitTimeout, "")
if err != nil {
return err
}
Expand Down
18 changes: 15 additions & 3 deletions internal/command/deploy/machines_deploymachinesapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,22 @@ func (md *machineDeployment) waitForMachine(ctx context.Context, e *machineUpdat
}

if !md.skipHealthChecks {
if err := lm.WaitForState(ctx, fly.MachineStateStarted, md.waitTimeout, machine.WithJustCreated()); err != nil {
waitOpts := []machine.WaitOption{machine.WithJustCreated()}
if e.previousInstanceID != "" {
waitOpts = append(waitOpts, machine.WithVersion(lm.Machine().InstanceID))
}
if err := lm.WaitForState(ctx, fly.MachineStateStarted, md.waitTimeout, waitOpts...); err != nil {
err = suggestChangeWaitTimeout(err, "wait-timeout")

return err
}

if e.previousInstanceID != "" {
if _, err := machine.VerifyUpdateApplied(ctx, md.app.Name, lm.Machine().ID, lm.Machine().InstanceID, e.previousInstanceID); err != nil {
return err
}
}

if err := md.runTestMachines(ctx, e.leasableMachine.Machine(), sl); err != nil {
return err
}
Expand Down Expand Up @@ -492,8 +502,9 @@ func (md *machineDeployment) deployMachinesApp(ctx context.Context) error {
}

type machineUpdateEntry struct {
leasableMachine machine.LeasableMachine
launchInput *fly.LaunchMachineInput
leasableMachine machine.LeasableMachine
launchInput *fly.LaunchMachineInput
previousInstanceID string
}

type machineUpdateEntries []*machineUpdateEntry
Expand Down Expand Up @@ -1049,6 +1060,7 @@ func (md *machineDeployment) updateMachineByReplace(ctx context.Context, e *mach

func (md *machineDeployment) updateMachineInPlace(ctx context.Context, e *machineUpdateEntry) error {
lm := e.leasableMachine
e.previousInstanceID = lm.Machine().InstanceID

return lm.Update(ctx, *e.launchInput)
}
Expand Down
16 changes: 13 additions & 3 deletions internal/command/deploy/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,22 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin

if !healthcheckResult.machineChecksPassed || !healthcheckResult.smokeChecksPassed {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Waiting for machine %s to reach a good state", machine.ID))
_, err := waitForMachineState(ctx, lm, []string{"stopped", "started", "suspended"}, md.waitTimeout, sl)
var waitOpts []mach.WaitOption
if oldMachine != nil {
waitOpts = append(waitOpts, mach.WithVersion(machine.InstanceID))
}
_, err := waitForMachineState(ctx, lm, []string{"stopped", "started", "suspended"}, md.waitTimeout, sl, waitOpts...)
if err != nil {
span.RecordError(err)

return err
}

if oldMachine != nil {
if _, err := mach.VerifyUpdateApplied(ctx, md.app.Name, machine.ID, machine.InstanceID, oldMachine.InstanceID); err != nil {
return err
}
}
}

if !shouldStart {
Expand Down Expand Up @@ -733,7 +743,7 @@ func (md *machineDeployment) clearMachineLease(ctx context.Context, machID, leas
}

// returns when the machine is in one of the possible states, or after passing the timeout threshold
func waitForMachineState(ctx context.Context, lm mach.LeasableMachine, possibleStates []string, timeout time.Duration, sl statuslogger.StatusLine) (string, error) {
func waitForMachineState(ctx context.Context, lm mach.LeasableMachine, possibleStates []string, timeout time.Duration, sl statuslogger.StatusLine, waitOpts ...mach.WaitOption) (string, error) {
ctx, span := tracing.GetTracer().Start(ctx, "wait_for_machine_state", trace.WithAttributes(
attribute.StringSlice("possible_states", possibleStates),
))
Expand All @@ -750,7 +760,7 @@ func waitForMachineState(ctx context.Context, lm mach.LeasableMachine, possibleS

for _, state := range possibleStates {
go func() {
err := lm.WaitForState(ctx, state, timeout)
err := lm.WaitForState(ctx, state, timeout, waitOpts...)
mutex.Lock()
defer func() {
numCompleted += 1
Expand Down
6 changes: 3 additions & 3 deletions internal/command/deploy/strategy_bluegreen.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (bg *blueGreen) CreateGreenMachines(ctx context.Context) error {
lock.Lock()
defer lock.Unlock()

bg.greenMachines = append(bg.greenMachines, &machineUpdateEntry{greenMachine, launchInput})
bg.greenMachines = append(bg.greenMachines, &machineUpdateEntry{leasableMachine: greenMachine, launchInput: launchInput})

fmt.Fprintf(bg.io.ErrOut, " Created machine %s\n", bg.colorize.Bold(greenMachine.FormattedMachineId()))

Expand Down Expand Up @@ -270,7 +270,7 @@ func (bg *blueGreen) WaitForGreenMachinesToBeStarted(ctx context.Context) error
}

go func(lm machine.LeasableMachine) {
err := machine.WaitForStartOrStop(ctx, bg.app.Name, lm.Machine(), "start", bg.timeout)
err := machine.WaitForStartOrStop(ctx, bg.app.Name, lm.Machine(), "start", bg.timeout, "")
if err != nil {
errChan <- err

Expand Down Expand Up @@ -588,7 +588,7 @@ func (bg *blueGreen) WaitForBlueMachinesToBeStopped(ctx context.Context) error {
id := gm.leasableMachine.FormattedMachineId()

go func(lm machine.LeasableMachine) {
err := machine.WaitForStartOrStop(ctx, bg.app.Name, lm.Machine(), "stop", bg.timeout)
err := machine.WaitForStartOrStop(ctx, bg.app.Name, lm.Machine(), "stop", bg.timeout, "")
if err != nil {
errChan <- fmt.Errorf("failed to stop machine %s: %v", lm.FormattedMachineId(), err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion internal/command/machine/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func runMachineClone(ctx context.Context) (err error) {
fmt.Fprintf(out, " Waiting for Machine %s to start...\n", colorize.Bold(launchedMachine.ID))

// wait for a machine to be started
err = mach.WaitForStartOrStop(ctx, appName, launchedMachine, "start", time.Minute*5)
err = mach.WaitForStartOrStop(ctx, appName, launchedMachine, "start", time.Minute*5, "")
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/command/machine/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func runMachineRun(ctx context.Context) error {

s.Start()
// wait for machine to be started
err = mach.WaitForStartOrStop(ctx, app.Name, machine, "start", time.Minute*5)
err = mach.WaitForStartOrStop(ctx, app.Name, machine, "start", time.Minute*5, "")
s.Stop()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/command/postgres/barman.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func runBarmanCreate(ctx context.Context) error {

waitTimeout := time.Minute * 5

err = mach.WaitForStartOrStop(ctx, appName, machine, "start", waitTimeout)
err = mach.WaitForStartOrStop(ctx, appName, machine, "start", waitTimeout, "")
if err != nil {
return err
}
Expand Down
16 changes: 15 additions & 1 deletion internal/machine/leasable_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type WaitOptions struct {
allowInfinite bool
justCreated bool
version string
}

type WaitOption func(*WaitOptions)
Expand All @@ -41,6 +42,12 @@ func WithJustCreated() WaitOption {
}
}

func WithVersion(version string) WaitOption {
return func(opts *WaitOptions) {
opts.version = version
}
}

type LeasableMachine interface {
Machine() *fly.Machine
HasLease() bool
Expand Down Expand Up @@ -240,8 +247,15 @@ func (lm *leasableMachine) WaitForState(ctx context.Context, desiredState string
if lm.showLogs {
lm.logStatusWaiting(ctx, desiredState)
}
flapsOpts := []flaps.WaitOption{
flaps.WithWaitStates(desiredState),
flaps.WithWaitTimeout(timeout),
}
if options.version != "" {
flapsOpts = append(flapsOpts, flaps.WithWaitVersion(options.version))
}
for {
err := lm.flapsClient.Wait(waitCtx, lm.appName, lm.Machine().ID, flaps.WithWaitStates(desiredState), flaps.WithWaitTimeout(timeout))
err := lm.flapsClient.Wait(waitCtx, lm.appName, lm.Machine().ID, flapsOpts...)
notFoundResponse := false
if err != nil {
var flapsErr *flaps.FlapsError
Expand Down
2 changes: 1 addition & 1 deletion internal/machine/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Restart(ctx context.Context, appName string, m *fly.Machine, input *fly.Res
return fmt.Errorf("could not stop machine %s: %w", input.ID, err)
}

if err := WaitForStartOrStop(ctx, appName, &fly.Machine{ID: input.ID}, "start", time.Minute*5); err != nil {
if err := WaitForStartOrStop(ctx, appName, &fly.Machine{ID: input.ID}, "start", time.Minute*5, ""); err != nil {
return err
}

Expand Down
36 changes: 30 additions & 6 deletions internal/machine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,21 @@ func Update(ctx context.Context, appName string, m *fly.Machine, input *fly.Laun
waitTimeout = time.Duration(input.Timeout) * time.Second
}

state, err := WaitForState(ctx, appName, updatedMachine, "settled", waitTimeout)
if err != nil {
if err := WaitForStartOrStop(ctx, appName, updatedMachine, "settled", waitTimeout, updatedMachine.InstanceID); err != nil {
return fmt.Errorf("error while waiting for machine to update: %w", err)
}

if state == "failed" {
return fmt.Errorf("machine %s update failed: machine entered %q state", m.ID, state)
settledMachine, err := VerifyUpdateApplied(ctx, appName, m.ID, updatedMachine.InstanceID, m.InstanceID)
if err != nil {
return err
}

if state == "started" && !input.SkipHealthChecks {
if err := watch.MachinesChecks(ctx, appName, []*fly.Machine{updatedMachine}); err != nil {
if settledMachine.State == "failed" {
return fmt.Errorf("machine %s update failed: machine entered %q state", m.ID, settledMachine.State)
}

if settledMachine.State == "started" && !input.SkipHealthChecks {
if err := watch.MachinesChecks(ctx, appName, []*fly.Machine{settledMachine}); err != nil {
return fmt.Errorf("failed to wait for health checks to pass: %w", err)
}
}
Expand All @@ -118,6 +122,26 @@ func Update(ctx context.Context, appName string, m *fly.Machine, input *fly.Laun
return nil
}

// VerifyUpdateApplied fetches the machine and returns it if its InstanceID
// matches targetInstanceID. Otherwise it returns an error distinguishing
// whether the machine remains on previousInstanceID or some unexpected
// version.
func VerifyUpdateApplied(ctx context.Context, appName, machineID, targetInstanceID, previousInstanceID string) (*fly.Machine, error) {
flapsClient := flapsutil.ClientFromContext(ctx)
current, err := flapsClient.Get(ctx, appName, machineID)
if err != nil {
return nil, fmt.Errorf("failed to get machine: %w", err)
}
switch current.InstanceID {
case targetInstanceID:
return current, nil
case previousInstanceID:
return nil, fmt.Errorf("machine %s update failed: machine remains on previous version %s (state: %s)", machineID, current.InstanceID, current.State)
default:
return nil, fmt.Errorf("machine %s update failed: unexpected version %s (state: %s)", machineID, current.InstanceID, current.State)
}
}

type invalidConfigReason string

const (
Expand Down
29 changes: 9 additions & 20 deletions internal/machine/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,7 @@ import (
"github.com/superfly/flyctl/internal/flyerr"
)

func WaitForState(ctx context.Context, appName string, machine *fly.Machine, desiredState string, timeout time.Duration) (string, error) {
flapsClient := flapsutil.ClientFromContext(ctx)

if err := WaitForStartOrStop(ctx, appName, machine, desiredState, timeout); err != nil {
return "", err
}
if desiredState == "settled" {
m, err := flapsClient.Get(ctx, appName, machine.ID)
if err != nil {
return "", fmt.Errorf("failed to get machine after waiting for it to settle: %w", err)
}

return m.State, nil
}

return desiredState, nil
}

func WaitForStartOrStop(ctx context.Context, appName string, machine *fly.Machine, action string, timeout time.Duration) error {
func WaitForStartOrStop(ctx context.Context, appName string, machine *fly.Machine, action string, timeout time.Duration, version string) error {
flapsClient := flapsutil.ClientFromContext(ctx)

waitCtx, cancel := context.WithTimeout(ctx, timeout)
Expand All @@ -57,8 +39,15 @@ func WaitForStartOrStop(ctx context.Context, appName string, machine *fly.Machin
Factor: 2,
Jitter: false,
}
flapsOpts := []flaps.WaitOption{
flaps.WithWaitStates(waitOnAction),
flaps.WithWaitTimeout(60 * time.Second),
}
if version != "" {
flapsOpts = append(flapsOpts, flaps.WithWaitVersion(version))
}
for {
err := flapsClient.Wait(waitCtx, appName, machine.ID, flaps.WithWaitStates(waitOnAction), flaps.WithWaitTimeout(60*time.Second))
err := flapsClient.Wait(waitCtx, appName, machine.ID, flapsOpts...)
if err == nil {
return nil
}
Expand Down
Loading