Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 52 additions & 23 deletions internal/command/deploy/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,35 @@ const (
STOPPED_MACHINES_POOL_SIZE = 30
)

// Helper functions for status logging to eliminate duplication
func logMachineActionRunning(sl statuslogger.StatusLine, action, machineID string) {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("%s machine %s", action, machineID))
}

func logMachineActionSuccess(sl statuslogger.StatusLine, action, machineID string) {
sl.LogStatus(statuslogger.StatusSuccess, fmt.Sprintf("%s machine %s", action, machineID))
}

func logMachineActionFailure(sl statuslogger.StatusLine, err error) {
sl.LogStatus(statuslogger.StatusFailure, err.Error())
}

func logLeaseActionRunning(sl statuslogger.StatusLine, action, machineID string) {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("%s lease for %s", action, machineID))
}

func logLeaseActionSuccess(sl statuslogger.StatusLine, action, machineID string) {
sl.LogStatus(statuslogger.StatusSuccess, fmt.Sprintf("%s lease for %s", action, machineID))
}

func logLeaseActionFailure(sl statuslogger.StatusLine, action, machineID string, err error) {
sl.LogStatus(statuslogger.StatusFailure, fmt.Sprintf("Failed to %s lease for %s: %v", action, machineID, err))
}

func logMachineStatus(sl statuslogger.StatusLine, status, machineID string) {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Machine %s %s", machineID, status))
}

type MachineLogger struct {
store map[string]statuslogger.StatusLine
sl statuslogger.StatusLogger
Expand Down Expand Up @@ -375,7 +404,7 @@ func (md *machineDeployment) updateProcessGroup(ctx context.Context, machineTupl
// this shouldn't happen, we ensure that the machine is in the map but just in case
if !ok {
err := fmt.Errorf("no health checks stored for machine")
sl.LogStatus(statuslogger.StatusFailure, err.Error())
logMachineActionFailure(sl, err)
span.RecordError(err)

return fmt.Errorf("failed to update machine %s: %w", machineID, err)
Expand All @@ -384,7 +413,7 @@ func (md *machineDeployment) updateProcessGroup(ctx context.Context, machineTupl

err := md.updateMachineWChecks(gCtx, oldMachine, newMachine, sl, md.io, machineCheckResult)
if err != nil {
sl.LogStatus(statuslogger.StatusFailure, err.Error())
logMachineActionFailure(sl, err)
span.RecordError(err)

return fmt.Errorf("failed to update machine %s: %w", oldMachine.ID, err)
Expand Down Expand Up @@ -437,19 +466,19 @@ func (md *machineDeployment) acquireLeases(ctx context.Context, machineTuples []
return nil
}

sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Acquiring lease for %s", machine.ID))
logLeaseActionRunning(sl, "Acquiring", machine.ID)

lease, err := md.acquireMachineLease(ctx, machine.ID)
if err != nil {
sl.LogStatus(statuslogger.StatusFailure, fmt.Sprintf("Failed to acquire lease for %s: %v", machine.ID, err))
logLeaseActionFailure(sl, "acquire", machine.ID, err)

return err
}

machine.LeaseNonce = lease.Data.Nonce
lm := mach.NewLeasableMachine(md.flapsClient, md.io, md.app.Name, machine, false)
lm.StartBackgroundLeaseRefresh(ctx, md.leaseTimeout, md.leaseDelayBetween)
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Acquired lease for %s", machine.ID))
logLeaseActionSuccess(sl, "Acquired", machine.ID)

return nil
})
Expand Down Expand Up @@ -487,21 +516,21 @@ func (md *machineDeployment) releaseLeases(ctx context.Context, machineTuples []

sl := machToLogger.getLoggerFromID(machine.ID)

sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Clearing lease for %s", machine.ID))
logLeaseActionRunning(sl, "Clearing", machine.ID)
if machine.LeaseNonce == "" {
sl.LogStatus(statuslogger.StatusSuccess, fmt.Sprintf("Cleared lease for %s", machine.ID))
logLeaseActionSuccess(sl, "Cleared", machine.ID)

return nil
}
err := md.clearMachineLease(ctx, machine.ID, machine.LeaseNonce)
if err != nil {
sl.LogStatus(statuslogger.StatusFailure, fmt.Sprintf("Failed to clear lease for %s: %v", machine.ID, err))
logLeaseActionFailure(sl, "clear", machine.ID, err)

return err
}
machine.LeaseNonce = ""

sl.LogStatus(statuslogger.StatusSuccess, fmt.Sprintf("Cleared lease for %s", machine.ID))
logLeaseActionSuccess(sl, "Cleared", machine.ID)

return nil
})
Expand Down Expand Up @@ -592,7 +621,7 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin
span.SetAttributes(attribute.Bool("should_start", shouldStart))

if !healthcheckResult.machineChecksPassed || !healthcheckResult.smokeChecksPassed {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Waiting for machine %s to reach a good state", machine.ID))
logMachineStatus(sl, "to reach a good state", machine.ID)
_, err := waitForMachineState(ctx, lm, []string{"stopped", "started", "suspended"}, md.waitTimeout, sl)
if err != nil {
span.RecordError(err)
Expand All @@ -602,15 +631,15 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin
}

if !shouldStart {
sl.LogStatus(statuslogger.StatusSuccess, fmt.Sprintf("Machine %s is now in a good state", machine.ID))
logMachineActionSuccess(sl, "is now in a good state", machine.ID)

return nil
}

md.warnAboutIncorrectListenAddress(ctx, lm)

if !healthcheckResult.smokeChecksPassed {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Running smoke checks on machine %s", machine.ID))
logMachineActionRunning(sl, "Running smoke checks on", machine.ID)
err = md.doSmokeChecks(ctx, lm, false)
if err != nil {
span.RecordError(err)
Expand All @@ -621,7 +650,7 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin
}

if !healthcheckResult.machineChecksPassed {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Running machine checks on machine %s", machine.ID))
logMachineActionRunning(sl, "Running machine checks on", machine.ID)
err = md.runTestMachines(ctx, machine, sl)
if err != nil {
err := &unrecoverableError{err: err}
Expand All @@ -633,7 +662,7 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin
}

if !healthcheckResult.regularChecksPassed {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Checking health of machine %s", machine.ID))
logMachineActionRunning(sl, "Checking health of", machine.ID)
err = lm.WaitForHealthchecksToPass(ctx, md.waitTimeout)
if err != nil {
err := &unrecoverableError{err: err}
Expand All @@ -644,7 +673,7 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin
healthcheckResult.regularChecksPassed = true
}

sl.LogStatus(statuslogger.StatusSuccess, fmt.Sprintf("Machine %s is now in a good state", machine.ID))
logMachineActionSuccess(sl, "is now in a good state", machine.ID)

return nil
}
Expand All @@ -657,45 +686,45 @@ func (md *machineDeployment) updateOrCreateMachine(ctx context.Context, oldMachi
span.AddEvent("Old machine exists")
if newMachine == nil {
span.AddEvent("Destroying old machine")
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Destroying machine %s", oldMachine.ID))
logMachineActionRunning(sl, "Destroying", oldMachine.ID)

err := md.destroyMachine(ctx, oldMachine.ID, oldMachine.LeaseNonce)
span.RecordError(err)

sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Destroyed machine %s", oldMachine.ID))
logMachineActionRunning(sl, "Destroyed", oldMachine.ID)

return nil, nil, err
} else {
span.AddEvent("Updating old machine")
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Updating machine config for %s", oldMachine.ID))
logMachineActionRunning(sl, "Updating config for", oldMachine.ID)
machine, err := md.updateMachineConfig(ctx, oldMachine, newMachine.Config, sl, newMachine.State == "replacing")
if err != nil {
span.RecordError(err)

return oldMachine, nil, err
}
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Updated machine config for %s", oldMachine.ID))
logMachineActionRunning(sl, "Updated config for", oldMachine.ID)

return machine, nil, nil
}
} else if newMachine != nil {
span.AddEvent("Creating a new machine")
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Creating machine for %s", newMachine.ID))
logMachineActionRunning(sl, "Creating", newMachine.ID)
machine, err := md.createMachine(ctx, newMachine.Config, newMachine.Region)
if err != nil {
span.RecordError(err)

return nil, nil, err
}

sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Acquiring lease for %s", newMachine.ID))
logLeaseActionRunning(sl, "Acquiring", newMachine.ID)
lease, err := md.acquireMachineLease(ctx, machine.ID)
if err != nil {
span.RecordError(err)

return nil, nil, err
}
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Acquired lease for %s", newMachine.ID))
logLeaseActionSuccess(sl, "Acquired", newMachine.ID)

return machine, lease, nil
} else {
Expand Down Expand Up @@ -760,7 +789,7 @@ func waitForMachineState(ctx context.Context, lm mach.LeasableMachine, possibleS
if successfulState != "" {
return
}
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Machine %s reached %s state", lm.Machine().ID, state))
logMachineStatus(sl, fmt.Sprintf("reached %s state", state), lm.Machine().ID)

if err != nil {
waitErr = err
Expand Down