Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/workload-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ func main() {
}

codeInterpreterReconciler := &workloadmanager.CodeInterpreterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("codeinterpreter-controller"),
}

if err := setupControllers(mgr, sandboxReconciler, codeInterpreterReconciler); err != nil {
Expand Down
126 changes: 115 additions & 11 deletions pkg/workloadmanager/codeinterpreter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -40,10 +41,22 @@ import (
extensionsv1alpha1 "sigs.k8s.io/agent-sandbox/extensions/api/v1alpha1"
)

const (
codeInterpreterReadyCondition = "Ready"
codeInterpreterWarmPoolCondition = "WarmPoolAvailable"
codeInterpreterReadyReason = "Reconciled"
codeInterpreterWarmPoolDisabled = "WarmPoolDisabled"
codeInterpreterWarmPoolReady = "WarmPoolReady"
codeInterpreterWarmPoolBelowWatermark = "WarmPoolBelowWatermark"
codeInterpreterWarmPoolEmpty = "WarmPoolEmpty"
codeInterpreterWarmPoolNotFound = "WarmPoolNotFound"
)

// CodeInterpreterReconciler reconciles a CodeInterpreter object
type CodeInterpreterReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
Recorder record.EventRecorder
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down Expand Up @@ -101,27 +114,117 @@ func (r *CodeInterpreterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
// when the status is already up-to-date to avoid triggering a new watch event
// that would re-enqueue the object unnecessarily.
func (r *CodeInterpreterReconciler) updateStatus(ctx context.Context, ci *runtimev1alpha1.CodeInterpreter) error {
existing := apimeta.FindStatusCondition(ci.Status.Conditions, "Ready")
if ci.Status.Ready &&
existing != nil &&
existing.Status == metav1.ConditionTrue &&
existing.ObservedGeneration == ci.Generation {
return nil
}
oldReady := ci.Status.Ready
oldConditions := append([]metav1.Condition(nil), ci.Status.Conditions...)
previousWarmPoolCondition := apimeta.FindStatusCondition(oldConditions, codeInterpreterWarmPoolCondition)

ci.Status.Ready = true
// SetStatusCondition only updates LastTransitionTime when the condition
// Status actually changes, preventing spurious status writes that would
// trigger an infinite reconciliation loop.
apimeta.SetStatusCondition(&ci.Status.Conditions, metav1.Condition{
Type: "Ready",
Type: codeInterpreterReadyCondition,
Status: metav1.ConditionTrue,
Reason: "Reconciled",
Reason: codeInterpreterReadyReason,
Message: "CodeInterpreter is ready",
ObservedGeneration: ci.Generation,
})

return r.Status().Update(ctx, ci)
warmPoolCondition, err := r.warmPoolAvailableCondition(ctx, ci)
if err != nil {
return err
}
apimeta.SetStatusCondition(&ci.Status.Conditions, warmPoolCondition)

if oldReady == ci.Status.Ready && reflect.DeepEqual(oldConditions, ci.Status.Conditions) {
return nil
}

if err := r.Status().Update(ctx, ci); err != nil {
return err
}
if r.shouldRecordWarmPoolWarningEvent(previousWarmPoolCondition, warmPoolCondition) {
r.recordEvent(ci, corev1.EventTypeWarning, warmPoolCondition.Reason, warmPoolCondition.Message)
}
return nil
}

func (r *CodeInterpreterReconciler) warmPoolAvailableCondition(ctx context.Context, ci *runtimev1alpha1.CodeInterpreter) (metav1.Condition, error) {
desired := int32(0)
if ci.Spec.WarmPoolSize != nil {
desired = *ci.Spec.WarmPoolSize
}
if desired <= 0 {
return metav1.Condition{
Type: codeInterpreterWarmPoolCondition,
Status: metav1.ConditionUnknown,
Reason: codeInterpreterWarmPoolDisabled,
Message: "Warm pool is not configured",
ObservedGeneration: ci.Generation,
}, nil
}

warmPool := &extensionsv1alpha1.SandboxWarmPool{}
err := r.Get(ctx, types.NamespacedName{Name: ci.Name, Namespace: ci.Namespace}, warmPool)
if errors.IsNotFound(err) {
return metav1.Condition{
Type: codeInterpreterWarmPoolCondition,
Status: metav1.ConditionFalse,
Reason: codeInterpreterWarmPoolNotFound,
Message: fmt.Sprintf("SandboxWarmPool %s/%s has not been created", ci.Namespace, ci.Name),
ObservedGeneration: ci.Generation,
}, nil
}
if err != nil {
return metav1.Condition{}, fmt.Errorf("failed to get SandboxWarmPool for status: %w", err)
}

ready := warmPool.Status.ReadyReplicas
lowWatermark := (desired + 1) / 2
if ready == 0 {
return metav1.Condition{
Type: codeInterpreterWarmPoolCondition,
Status: metav1.ConditionFalse,
Reason: codeInterpreterWarmPoolEmpty,
Message: fmt.Sprintf("SandboxWarmPool has 0 ready replicas out of %d desired", desired),
ObservedGeneration: ci.Generation,
}, nil
}
if ready < lowWatermark {
return metav1.Condition{
Type: codeInterpreterWarmPoolCondition,
Status: metav1.ConditionFalse,
Reason: codeInterpreterWarmPoolBelowWatermark,
Message: fmt.Sprintf("SandboxWarmPool has %d ready replicas out of %d desired, below low watermark %d", ready, desired, lowWatermark),
ObservedGeneration: ci.Generation,
}, nil
}

return metav1.Condition{
Type: codeInterpreterWarmPoolCondition,
Status: metav1.ConditionTrue,
Reason: codeInterpreterWarmPoolReady,
Message: fmt.Sprintf("SandboxWarmPool has %d ready replicas out of %d desired", ready, desired),
ObservedGeneration: ci.Generation,
}, nil
}

func (r *CodeInterpreterReconciler) shouldRecordWarmPoolWarningEvent(previous *metav1.Condition, current metav1.Condition) bool {
if current.Type != codeInterpreterWarmPoolCondition || current.Status != metav1.ConditionFalse {
return false
}
if current.Reason != codeInterpreterWarmPoolEmpty &&
current.Reason != codeInterpreterWarmPoolBelowWatermark {
return false
}
Comment on lines +216 to +219

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Transient Warning Event on Creation / Cache Delay

During the first reconciliation of a newly created CodeInterpreter, ensureSandboxWarmPool will create the SandboxWarmPool resource. However, because the controller's client uses an asynchronous cache (informer), the subsequent r.Get call in warmPoolAvailableCondition (executed in the same reconciliation loop) will likely return a NotFound error because the cache has not yet synced with the newly created resource.

This causes the controller to transiently set the condition to WarmPoolNotFound and emit a Warning event. Once the cache syncs a moment later, a new reconciliation is triggered, and the status is updated to WarmPoolEmpty or WarmPoolReady.

Since the controller automatically and immediately self-heals by creating/recreating the SandboxWarmPool when it is missing, WarmPoolNotFound is always a transient state. Emitting a Warning event for it is noisy and unnecessary.

We should avoid recording warning events for WarmPoolNotFound to prevent spamming warning events on every initial creation of a CodeInterpreter.

	if current.Reason != codeInterpreterWarmPoolEmpty &&
		current.Reason != codeInterpreterWarmPoolBelowWatermark {
		return false
	}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated in the latest push.

Change:

  • Kept the WarmPoolNotFound condition for status visibility.
  • Stopped recording a Warning Event for WarmPoolNotFound, since it can be a transient cache-delay state right after the controller creates the SandboxWarmPool.
  • Added a unit test to verify that WarmPoolNotFound does not emit a warning event.

Validation:

  • /tmp/go-toolchain/go/bin/go test -v ./pkg/workloadmanager -run 'TestShouldRecordWarmPoolWarningEvent|TestReconcileReportsWarmPoolEmptyInsteadOfOnlyReady|TestReconcileReportsWarmPoolBelowWatermark|TestReconcileUpdatesWarmPoolAvailableWhenPoolRecovers'
  • /tmp/go-toolchain/go/bin/go test ./pkg/workloadmanager
  • git diff --check

return previous == nil || previous.Status != current.Status || previous.Reason != current.Reason
}

func (r *CodeInterpreterReconciler) recordEvent(ci *runtimev1alpha1.CodeInterpreter, eventType, reason, message string) {
if r.Recorder == nil {
return
}
r.Recorder.Event(ci, eventType, reason, message)
}

// ensureSandboxTemplate ensures that a SandboxTemplate exists for this CodeInterpreter
Expand Down Expand Up @@ -339,5 +442,6 @@ func (r *CodeInterpreterReconciler) podTemplateEqual(a, b sandboxv1alpha1.PodTem
func (r *CodeInterpreterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&runtimev1alpha1.CodeInterpreter{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&extensionsv1alpha1.SandboxWarmPool{}).
Complete(r)
}
Loading
Loading