Skip to content

Run 39636 controller core logic#77

Open
shaked-bouktus wants to merge 11 commits into
mainfrom
RUN-39636-controller-core-logicc
Open

Run 39636 controller core logic#77
shaked-bouktus wants to merge 11 commits into
mainfrom
RUN-39636-controller-core-logicc

Conversation

@shaked-bouktus

@shaked-bouktus shaked-bouktus commented May 28, 2026

Copy link
Copy Markdown
Contributor

What does this PR do?

Karta controller core logic

Related issue(s)

#67
#68
#69
#70

Summary by CodeRabbit

  • New Features

    • Karta operator with startup, health/readiness endpoints, metrics/probe and leader-election flags.
    • Validation of Karta resources, computed Ready condition from Validated+CRD existence, and automatic stamping/removal of root group/version/kind labels.
    • CRD events enqueue affected Karta reconciles.
  • API

    • Exported condition types and label key constants added.
  • Tests

    • Extensive reconciliation, conditions, labels, CRD handling, and regression tests.
  • Chores

    • New operator module and dependency manifest added.

@shaked-bouktus shaked-bouktus changed the title Run 39636 controller core logicc Run 39636 controller core logic May 28, 2026
@shaked-bouktus

Copy link
Copy Markdown
Contributor Author

@CodeRabbit full review

@coderabbitai

coderabbitai Bot commented May 28, 2026

Copy link
Copy Markdown
Contributor
✅ Actions performed

Full review triggered.

@coderabbitai

coderabbitai Bot commented May 28, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: 91061e29-f5d0-4e5c-b5de-f0a64b276fd0

📥 Commits

Reviewing files that changed from the base of the PR and between ddbc6ea and ec2f553.

📒 Files selected for processing (1)
  • operator/go.mod
🚧 Files skipped from review as they are similar to previous changes (1)
  • operator/go.mod

Walkthrough

Adds a Karta operator: API condition/label contracts, step-chain primitives and status helpers, a Reconciler with CRD-watch mapping, ordered reconcile steps (validate, CRD existence, Ready derivation, conditional status patch, label stamping/removal), manager bootstrap, tests, and module manifest.

Changes

Karta Operator

Layer / File(s) Summary
API Contracts and Constants
pkg/api/runai/v1alpha1/conditions.go, pkg/api/runai/v1alpha1/labels.go
Defines exported ConditionType and condition constants (Validated, CRDExists, Ready) and operator-owned label key constants (LabelRootGroup, LabelRootVersion, LabelRootKind).
Step Chain and Status Management
operator/internal/flow.go, operator/internal/flow_test.go, operator/internal/status.go, operator/internal/status_test.go
Adds StepFn/StepResult reconciliation primitives, status upsert/derivation helpers, patch-if-changed logic, and tests ensuring correct transition/LastTransitionTime behavior.
Reconciler Registration and CRD Watch
operator/internal/operator.go, operator/internal/operator_test.go, operator/internal/eventfilter.go, operator/internal/eventfilter_test.go
Introduces Reconciler, SetupWithManager wiring to watch Karta and CustomResourceDefinition, maps CRD events to affected Kartas via MapCRDToKartaEvent, and provides lifecycle and predicate tests.
Core Reconciliation Logic
operator/internal/reconciler.go, operator/internal/reconciler_test.go
Implements ordered reconcile steps: validate Karta, check referenced root CRD existence, derive Ready from Validated and CRDExists, conditionally patch status, and ensure/remove root GVK-derived index labels on the Karta; includes comprehensive tests for transitions, idempotency, label stamping, foreign-condition preservation, and regression cases.
Operator Bootstrap and Test Infrastructure
operator/cmd/main.go, operator/internal/suite_test.go, operator/go.mod
Adds operator main() that builds a controller-runtime manager (scheme registration, metrics/health endpoints, leader election flags), registers the reconciler and readiness/health checks, and provides test suite helpers (buildScheme, newKarta, newValidKarta, newCRD, label helpers) plus the operator Go module manifest.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~65 minutes

Poem

A rabbit watches controllers start and hum,
It hops through steps where Karta truths become,
Labels are stamped and conditions gently set,
CRDs checked, statuses patched — no race to fret,
Hooray for reconciles and tidy operator drums 🐇✨

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 61.29% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Run 39636 controller core logic' is overly vague and contains a ticket reference instead of describing the actual changes—it fails to clearly convey that the PR adds a Karta operator/controller with reconciliation logic, event filtering, status management, and condition handling. Revise the title to be more descriptive of the actual implementation, such as 'Add Karta operator controller with reconciliation and status management' or 'Implement controller core logic for Karta resource management'.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch RUN-39636-controller-core-logicc

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
operator/internal/status.go (1)

128-137: ⚡ Quick win

Make statusChanged order-insensitive by comparing KartaStatus.Conditions via Condition.Type.

KartaStatus.Conditions is CRD-modeled as a map keyed by type (+listType=map, +listMapKey=type, patchMergeKey:"type"), so slice reordering shouldn’t be treated as a meaningful change; comparing by index can trigger unnecessary status patches. Update statusChanged to compare conditions by Type (and the existing Type/Status/Reason/Message fields).

Suggested refactor
func statusChanged(original, current *kartav1alpha1.KartaStatus) bool {
 	if len(original.Conditions) != len(current.Conditions) {
 		return true
 	}
-	for i := range original.Conditions {
-		o, c := original.Conditions[i], current.Conditions[i]
+	byType := func(in []metav1.Condition) map[string]metav1.Condition {
+		out := make(map[string]metav1.Condition, len(in))
+		for _, c := range in {
+			out[c.Type] = c
+		}
+		return out
+	}
+	origByType := byType(original.Conditions)
+	currByType := byType(current.Conditions)
+	for t, o := range origByType {
+		c, ok := currByType[t]
+		if !ok {
+			return true
+		}
 		if o.Type != c.Type || o.Status != c.Status || o.Reason != c.Reason || o.Message != c.Message {
 			return true
 		}
 	}
 	return false
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@operator/internal/status.go` around lines 128 - 137, The statusChanged
function currently compares KartaStatus.Conditions by index which treats
reordering as a change; update statusChanged to be order-insensitive by building
a lookup map keyed by Condition.Type (e.g., map[string]v1.Condition or the
concrete condition type) from one of the slices (original or current), then
iterate the other slice and for each condition.Type look up the counterpart and
compare Type, Status, Reason and Message, returning true on any mismatch; also
ensure you handle missing entries (treat as change) and compare lengths via map
sizes to detect added/removed conditions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@operator/go.mod`:
- Line 78: Remove the local replace directive "replace github.com/run-ai/karta
=> ../" from the module manifest and restore a versioned dependency declaration;
instead configure local monorepo wiring via a go.work file or CI/local-only
overrides so the published module remains reproducible. Specifically, delete the
replace line in operator/go.mod and create/use a go.work that references the
local module during development (or configure your build/CI to inject
replacements), ensuring the module requires a proper semantic version of
github.com/run-ai/karta for consumers.

In `@operator/internal/reconciler.go`:
- Around line 75-80: The current error handling after calling r.crdExistsForGVK
treats any lookup error as a successful reconcile by calling
setCRDExists(&karta.Status, metav1.ConditionFalse) and returning Continue(),
which suppresses retries; instead, when r.crdExistsForGVK returns an error you
should not flip the status to False—log the error via logger.Error but propagate
the error (or return a requeue result) so the controller will retry; update the
branch in reconciler.go to avoid calling setCRDExists on transient List errors
and return the error (or ctrl.Result{Requeue: true}) rather than Continue() so
failed CRD lookups trigger a retry.
- Around line 104-107: When rootGVK(karta) returns nil (gvk == nil), remove any
stale operator-owned labels "karta/group", "karta/version", and "karta/kind"
from the object before returning Continue(); locate the gvk check in
reconciler.go and add logic to mutate the object's labels to delete those keys
and persist the change (e.g., using the reconciler's client Update or Patch),
handling update errors appropriately, then return Continue().
- Around line 172-174: crdMatchesGVK currently returns true if a version name
matches gvk.Version even when that CRD version is not served; update
crdMatchesGVK to also require the version’s served flag be true by checking
v.Served (or v.served) in the loop alongside v.Name == gvk.Version so only
served API versions count toward a match; make sure to reference the function
crdMatchesGVK and the loop over crd.Spec.Versions (v.Name and v.Served) when
applying the change.

---

Nitpick comments:
In `@operator/internal/status.go`:
- Around line 128-137: The statusChanged function currently compares
KartaStatus.Conditions by index which treats reordering as a change; update
statusChanged to be order-insensitive by building a lookup map keyed by
Condition.Type (e.g., map[string]v1.Condition or the concrete condition type)
from one of the slices (original or current), then iterate the other slice and
for each condition.Type look up the counterpart and compare Type, Status, Reason
and Message, returning true on any mismatch; also ensure you handle missing
entries (treat as change) and compare lengths via map sizes to detect
added/removed conditions.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: 2f3aad05-4068-4146-898b-1fcab74fcf2f

📥 Commits

Reviewing files that changed from the base of the PR and between c8d543d and dda5c5b.

⛔ Files ignored due to path filters (1)
  • operator/go.sum is excluded by !**/*.sum
📒 Files selected for processing (15)
  • operator/cmd/main.go
  • operator/go.mod
  • operator/internal/eventfilter.go
  • operator/internal/eventfilter_test.go
  • operator/internal/flow.go
  • operator/internal/flow_test.go
  • operator/internal/operator.go
  • operator/internal/operator_test.go
  • operator/internal/reconciler.go
  • operator/internal/reconciler_test.go
  • operator/internal/status.go
  • operator/internal/status_test.go
  • operator/internal/suite_test.go
  • pkg/api/runai/v1alpha1/conditions.go
  • pkg/api/runai/v1alpha1/labels.go

Comment thread operator/go.mod
sigs.k8s.io/yaml v1.6.0 // indirect
)

replace github.com/run-ai/karta => ../

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify whether workspace-based wiring already exists and where local replace directives are committed.
set -euo pipefail

echo "== go.work presence =="
fd -HI '^go\.work$' .

echo
echo "== replace directives across repo =="
rg -n '^\s*replace\s+github\.com/run-ai/karta\s*=>\s*\.\./\s*$|^\s*replace\s+' --glob '**/go.mod'

echo
echo "== module roots and go directives =="
fd -HI '^go\.mod$' . -x sh -c 'echo "--- {}"; rg -n "^(module|go)\s+" "{}"'

Repository: run-ai/karta

Length of output: 347


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== operator/go.mod: relevant sections =="
nl -ba operator/go.mod | sed -n '1,120p'

echo
echo "== other go.mod files with replace directives =="
rg -n '^\s*replace\s+' --glob '**/go.mod' . || true

Repository: run-ai/karta

Length of output: 158


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== operator/go.mod (lines 1-120) =="
cat -n operator/go.mod | sed -n '1,120p'

echo
echo "== operator/go.mod (lines 60-90) =="
cat -n operator/go.mod | sed -n '60,90p'

echo
echo "== other go.mod files with replace directives =="
rg -n '^\s*replace\s+' --glob '**/go.mod' . || true

echo
echo "== other occurrences of github.com/run-ai/karta in operator/go.mod =="
rg -n 'github\.com/run-ai/karta' operator/go.mod

Repository: run-ai/karta

Length of output: 5142


Avoid committing a local replace path in operator/go.mod

operator/go.mod hard-codes replace github.com/run-ai/karta => ../ and the repo has no go.work, so this relative path override becomes part of the published module and can break external consumers/builds where ../ doesn’t exist. Use go.work (or CI/local-only overrides) for monorepo wiring, and keep module manifests versioned/reproducible.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@operator/go.mod` at line 78, Remove the local replace directive "replace
github.com/run-ai/karta => ../" from the module manifest and restore a versioned
dependency declaration; instead configure local monorepo wiring via a go.work
file or CI/local-only overrides so the published module remains reproducible.
Specifically, delete the replace line in operator/go.mod and create/use a
go.work that references the local module during development (or configure your
build/CI to inject replacements), ensuring the module requires a proper semantic
version of github.com/run-ai/karta for consumers.

Comment thread operator/internal/reconciler.go
Comment thread operator/internal/reconciler.go
Comment thread operator/internal/reconciler.go

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@operator/internal/status.go`:
- Around line 112-121: The current code uses r.Status().Patch with
client.RawPatch(types.MergePatchType, ...) and a full conditions array
(karta.Status.Conditions) which causes JSON MergePatch to replace the array
wholesale; replace this with a conflict-aware status update: fetch the latest
Karta (get), merge conditions by their "type" key into karta.Status.Conditions
(preserving other writers' per-type entries), then call r.Status().Update(ctx,
karta); wrap in a retry on conflict (detect apierrors.IsConflict and
requeue/loop to refetch+merge+update) so concurrent writers surface conflicts
instead of clobbering; stop using client.RawPatch(types.MergePatchType, ...) for
conditions.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: 844a0515-373f-4282-95fb-38bb7411c948

📥 Commits

Reviewing files that changed from the base of the PR and between c8d543d and dda5c5b.

⛔ Files ignored due to path filters (1)
  • operator/go.sum is excluded by !**/*.sum
📒 Files selected for processing (15)
  • operator/cmd/main.go
  • operator/go.mod
  • operator/internal/eventfilter.go
  • operator/internal/eventfilter_test.go
  • operator/internal/flow.go
  • operator/internal/flow_test.go
  • operator/internal/operator.go
  • operator/internal/operator_test.go
  • operator/internal/reconciler.go
  • operator/internal/reconciler_test.go
  • operator/internal/status.go
  • operator/internal/status_test.go
  • operator/internal/suite_test.go
  • pkg/api/runai/v1alpha1/conditions.go
  • pkg/api/runai/v1alpha1/labels.go

Comment thread operator/internal/status.go Outdated
StopWithError on crd list failure.
checking if CRD served.
Removing Karta gvk labels when GVK is nil.
@coderabbitai

coderabbitai Bot commented May 28, 2026

Copy link
Copy Markdown
Contributor

Actionable comments posted: 0

@shaked-bouktus

Copy link
Copy Markdown
Contributor Author

@CodeRabbit full review

@coderabbitai

coderabbitai Bot commented May 28, 2026

Copy link
Copy Markdown
Contributor
✅ Actions performed

Full review triggered.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
operator/internal/status.go (1)

111-115: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Add optimistic locking to the status patch to prevent concurrent condition loss.

Line 114 uses client.MergeFrom(base) without optimistic lock, so concurrent status writers can still overwrite status.conditions updates instead of surfacing a conflict.

Suggested patch
 import (
 	"context"
 	"fmt"
 
 	kartav1alpha1 "github.com/run-ai/karta/pkg/api/runai/v1alpha1"
 
+	apierrors "k8s.io/apimachinery/pkg/api/errors"
 	"k8s.io/apimachinery/pkg/api/equality"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"sigs.k8s.io/controller-runtime/pkg/client"
 )
@@
-	if err := r.Status().Patch(ctx, karta, client.MergeFrom(base)); err != nil {
+	patch := client.MergeFromWithOptions(base, client.MergeFromWithOptimisticLock{})
+	if err := r.Status().Patch(ctx, karta, patch); err != nil {
+		if apierrors.IsConflict(err) {
+			return fmt.Errorf("patch status conflict: %w", err)
+		}
 		return fmt.Errorf("patch status: %w", err)
 	}
#!/bin/bash
set -euo pipefail

# Verify status patch callsites and whether optimistic lock is used.
rg -n -C3 'Status\(\)\.Patch\(' operator/internal
rg -n -C3 'MergeFromWithOptimisticLock|MergeFrom\(' operator/internal/status.go
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@operator/internal/status.go` around lines 111 - 115, The status patch
currently uses client.MergeFrom(base) in the r.Status().Patch call (after the
equality.Semantic.DeepEqual check between base.Status and karta.Status), which
can allow concurrent writes to overwrite conditions; change the patch to use
optimistic locking by replacing client.MergeFrom(base) with
client.MergeFromWithOptimisticLock(base) so conflicts are surfaced, and ensure
the controller-runtime client import is available and any error handling remains
the same for the r.Status().Patch(ctx, karta,
client.MergeFromWithOptimisticLock(base)) call.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@operator/internal/status.go`:
- Around line 111-115: The status patch currently uses client.MergeFrom(base) in
the r.Status().Patch call (after the equality.Semantic.DeepEqual check between
base.Status and karta.Status), which can allow concurrent writes to overwrite
conditions; change the patch to use optimistic locking by replacing
client.MergeFrom(base) with client.MergeFromWithOptimisticLock(base) so
conflicts are surfaced, and ensure the controller-runtime client import is
available and any error handling remains the same for the r.Status().Patch(ctx,
karta, client.MergeFromWithOptimisticLock(base)) call.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: 9c286e69-8073-4e16-a185-7c32d84b4cb7

📥 Commits

Reviewing files that changed from the base of the PR and between 1ace886 and b86dc45.

📒 Files selected for processing (3)
  • operator/internal/reconciler.go
  • operator/internal/status.go
  • operator/internal/status_test.go
💤 Files with no reviewable changes (1)
  • operator/internal/status_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • operator/internal/reconciler.go

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
operator/internal/status.go (1)

114-115: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Use optimistic-locking for status patches to avoid lost concurrent updates.

At Line 114, client.MergeFrom(base) does not enforce resourceVersion preconditions, so concurrent status writers can still overwrite each other. Use optimistic locking so conflicts surface and requeue.

🔧 Proposed fix
-	if err := r.Status().Patch(ctx, karta, client.MergeFrom(base)); err != nil {
+	patch := client.MergeFromWithOptions(base, client.MergeFromWithOptimisticLock())
+	if err := r.Status().Patch(ctx, karta, patch); err != nil {
 		return fmt.Errorf("patch status: %w", err)
 	}
#!/bin/bash
set -euo pipefail

# Verify current status patch strategy and whether optimistic lock is used.
rg -n --type=go 'Status\(\)\.Patch\(' operator/internal
rg -n --type=go 'MergeFromWithOptimisticLock|MergeFromWithOptions|MergeFrom\(' operator/internal/status.go -C2
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@operator/internal/status.go` around lines 114 - 115, The current status patch
uses client.MergeFrom(base) which does not enforce resourceVersion preconditions
and can silently overwrite concurrent updates; change the patch to use the
controller-runtime optimistic-locking variant (e.g.,
client.MergeFromWithOptimisticLock or client.MergeFromWithOptions with a
Preconditions/ResourceVersion set from base) when calling r.Status().Patch so
conflicts surface as errors and the reconcile requeues; update the call site
that currently uses client.MergeFrom(base) to the optimistic-locking variant and
handle the resulting conflict (requeue) error path.
🧹 Nitpick comments (2)
operator/internal/operator_test.go (1)

72-72: 💤 Low value

Consider removing the "Q4" comment prefix.

The comment includes "Q4" which appears to be a reference to a planning phase or issue tracker. Consider simplifying to just describe the test's purpose.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@operator/internal/operator_test.go` at line 72, Remove the "Q4:"
planning/issue-tracker prefix from the test comment so it reads simply "verify
GenerationChangedPredicate semantics on CRDs." — update the comment near the
test that references GenerationChangedPredicate in
operator/internal/operator_test.go to be concise and descriptive without the
"Q4" tag.
operator/internal/status_test.go (1)

71-71: ⚖️ Poor tradeoff

Consider avoiding time.Sleep in timestamp comparison tests.

The tests use time.Sleep(2 * time.Millisecond) before comparing LastTransitionTime values. This can be flaky in CI environments with low time resolution or under heavy load. Consider using a time-mocking approach (e.g., injecting a clock interface) or increasing the sleep duration significantly (e.g., 100ms+) to reduce flakiness risk.

Also applies to: 81-81

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@operator/internal/status_test.go` at line 71, Tests in
operator/internal/status_test.go use time.Sleep(2 * time.Millisecond) before
comparing LastTransitionTime which is flaky; replace the sleeps by making the
test deterministic — either inject a clock interface (replace real time.Now
usage in the code under test with a Clock and in tests use a fake/static Clock)
or explicitly set LastTransitionTime values in the test before comparisons;
update the tests that reference LastTransitionTime (the assertions around
LastTransitionTime at the locations where time.Sleep is currently used) to
compare against the injected/static times or use assert/require.WithinDuration
with a larger tolerance (e.g., 100ms) if changing the code to accept a Clock is
not feasible.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@operator/internal/reconciler.go`:
- Around line 119-129: The current MergePatch in stepEnsureLabels builds
`merged` from all existing labels (`current`) then marshals it, which will
clobber concurrent non-operator labels; instead, construct a new map (e.g.,
`operatorLabels`) containing only operator-owned label keys (those matching the
`karta/` prefix) copied from `current`, apply/override those keys from
`desired`, and marshal only `metadata.labels: operatorLabels` for the Patch;
finally call `r.Patch(..., client.RawPatch(types.MergePatchType, patchBytes))`
with that trimmed patch so non-operator labels are not touched.

---

Duplicate comments:
In `@operator/internal/status.go`:
- Around line 114-115: The current status patch uses client.MergeFrom(base)
which does not enforce resourceVersion preconditions and can silently overwrite
concurrent updates; change the patch to use the controller-runtime
optimistic-locking variant (e.g., client.MergeFromWithOptimisticLock or
client.MergeFromWithOptions with a Preconditions/ResourceVersion set from base)
when calling r.Status().Patch so conflicts surface as errors and the reconcile
requeues; update the call site that currently uses client.MergeFrom(base) to the
optimistic-locking variant and handle the resulting conflict (requeue) error
path.

---

Nitpick comments:
In `@operator/internal/operator_test.go`:
- Line 72: Remove the "Q4:" planning/issue-tracker prefix from the test comment
so it reads simply "verify GenerationChangedPredicate semantics on CRDs." —
update the comment near the test that references GenerationChangedPredicate in
operator/internal/operator_test.go to be concise and descriptive without the
"Q4" tag.

In `@operator/internal/status_test.go`:
- Line 71: Tests in operator/internal/status_test.go use time.Sleep(2 *
time.Millisecond) before comparing LastTransitionTime which is flaky; replace
the sleeps by making the test deterministic — either inject a clock interface
(replace real time.Now usage in the code under test with a Clock and in tests
use a fake/static Clock) or explicitly set LastTransitionTime values in the test
before comparisons; update the tests that reference LastTransitionTime (the
assertions around LastTransitionTime at the locations where time.Sleep is
currently used) to compare against the injected/static times or use
assert/require.WithinDuration with a larger tolerance (e.g., 100ms) if changing
the code to accept a Clock is not feasible.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: 209e8d33-51a5-4c7f-a66a-269047af0dce

📥 Commits

Reviewing files that changed from the base of the PR and between c8d543d and b86dc45.

⛔ Files ignored due to path filters (1)
  • operator/go.sum is excluded by !**/*.sum
📒 Files selected for processing (15)
  • operator/cmd/main.go
  • operator/go.mod
  • operator/internal/eventfilter.go
  • operator/internal/eventfilter_test.go
  • operator/internal/flow.go
  • operator/internal/flow_test.go
  • operator/internal/operator.go
  • operator/internal/operator_test.go
  • operator/internal/reconciler.go
  • operator/internal/reconciler_test.go
  • operator/internal/status.go
  • operator/internal/status_test.go
  • operator/internal/suite_test.go
  • pkg/api/runai/v1alpha1/conditions.go
  • pkg/api/runai/v1alpha1/labels.go

Comment thread operator/internal/reconciler.go Outdated
@coderabbitai

coderabbitai Bot commented May 28, 2026

Copy link
Copy Markdown
Contributor

Actionable comments posted: 0

Comment thread operator/cmd/main.go
flag.StringVar(&leaderElectionID, "leader-election-id", "karta-operator.run.ai",
"Name of the resource used for leader election.")

zapOpts := zap.Options{Development: false}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the debug flag?

Comment thread operator/cmd/main.go
import (
"flag"
"fmt"
"github.com/run-ai/karta/operator/internal"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semi-random location for this comment - why is the package name internal?
Since 100% of the code lives in internal this means that it's not go-gettable, none can import this.
For example: Error types to assert, Utility functions, Consts, Shared status updates etc...

Unless there is a specific reason to make it internal, rename the top level to pkg or similar.

Comment thread operator/cmd/main.go
}

func main() {
if err := run(); err != nil {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is run() needed? We can directly write in main. Bloats the code

Comment thread operator/internal/flow.go
// StopWithError signals that the step failed. The manager will requeue with
// exponential back-off.
func StopWithError(err error) StepResult {
return StepResult{result: ctrl.Result{Requeue: true}, err: err, continueReconcile: false}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StopWithError sets both Requeue:true and a non-nil error, . When err != nil it ignores Result and requeues via the rate limiter anyway.

Comment thread operator/internal/flow.go
}

// shortCircuit returns true when the step chain should stop.
func shortCircuit(r StepResult) bool {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be instance method

Suggested change
func shortCircuit(r StepResult) bool {
func (r StepResult) ShortCircuitReconcileFlow() bool {
return !r.continueReconcile
}

logger := log.FromContext(ctx).WithValues("karta", karta.Name)
base := karta.DeepCopy()

steps := []StepFn{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the step chain is over-engineered.
Wraps a fixed 5-step linear pipeline that never branches dynamically.

  • The function signature becomes bloated a superset of everything other "steps" need
  • Not clear what each step does.
  • stepPatchStatusWith generator callback confuses

Some of those steps doesn't even require a function (such as stepDeriveReady).
Those are very small functions, with very explicit code that are not re-used anywhere else.

TLDR:

  • Remove the StepFn concept
  • Inline the methods, it's easier to read.

Comment thread operator/internal/flow.go
Comment on lines +39 to +41
func Stop() StepResult {
return StepResult{continueReconcile: false}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing call's Stop() do we need it?

})
}

func setCRDExists(status *kartav1alpha1.KartaStatus, s metav1.ConditionStatus) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We pass everywhere setCRDExists(&...) instead of doing reference everywhere, why not pass it like this?
i.e setCRDExists(&karta.Status, metav1.ConditionFalse)

Suggested change
func setCRDExists(status *kartav1alpha1.KartaStatus, s metav1.ConditionStatus) {
func setCRDExists(status kartav1alpha1.KartaStatus, s metav1.ConditionStatus) {

Comment on lines +15 to +27
const (
// LabelRootGroup is the API group of the root component
// (e.g. "ray.io", "kubeflow.org").
LabelRootGroup = "karta/group"

// LabelRootVersion is the API version of the root component
// (e.g. "v1", "v1alpha1").
LabelRootVersion = "karta/version"

// LabelRootKind is the kind of the root component
// (e.g. "RayCluster", "PyTorchJob").
LabelRootKind = "karta/kind"
)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the architectural justification for maintaining three separate labels if they are exclusively used within the same list/get API calls? To reduce metadata overhead and simplify our query logic, I highly recommend consolidating them into a single karta/gvk label.

Comment thread operator/go.mod Outdated
Co-authored-by: Yuval Grauer <108901541+yuval-gr@users.noreply.github.com>
Signed-off-by: shaked-bouktus <157353909+shaked-bouktus@users.noreply.github.com>
const (
// LabelRootGroup is the API group of the root component
// (e.g. "ray.io", "kubeflow.org").
LabelRootGroup = "karta/group"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

labels should be called

run.ai/karta-group 
run.ai/karta-version
run.ai/karta-kind

// stepValidateKarta runs the Karta spec validator and writes Validated.
func (r *Reconciler) stepValidateKarta(_ context.Context, logger logr.Logger, karta *kartav1alpha1.Karta) StepResult {
if err := kartav1alpha1.NewKartaValidator(karta).Validate(); err != nil {
logger.V(1).Info("Karta spec validation failed", "error", err.Error())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass err directly - logr extracts the message and you keep the error type for structured logging. its also what every controller-runtime example does

return nil
}

func buildCondition(t kartav1alpha1.ConditionType, status metav1.ConditionStatus, reason, message string) metav1.Condition {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would add

cond.ObservedGeneration = karta.Generation

without it a Validated=False from gen 3 looks the same as gen 12 and you cant tell if its stale
this convention is being used in cert-manager / cluster-api etc..

)

func setValidated(status *kartav1alpha1.KartaStatus, s metav1.ConditionStatus) {
upsertConditions(&status.Conditions, map[kartav1alpha1.ConditionType]metav1.Condition{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not using apimeta.SetStatusCondition
it handles LastTransitionTime + ObservedGeneration for free and preserves foreign conditions

// stepPatchStatusWith returns a step that flushes status to the cluster,
// closing over the whole-object snapshot taken at the start of reconcile so
// that only the fields that actually changed are sent in the patch body.
func stepPatchStatusWith(r *Reconciler, base *kartav1alpha1.Karta) StepFn {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closure is unnecessary

Comment thread operator/cmd/main.go
// CRs by validating their spec and verifying that the referenced
// CustomResourceDefinition is present in the cluster.
package main

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no EventRecorder. today if validation fails the user has to dig into status.conditions , with events kubectl describe karta foo shows it at the bottom which is where everyone looks first. kubebuilder template ships it by default

// controller-runtime metrics and logs.
ControllerName = "karta-operator"

rateLimiterBaseDelay = 500 * time.Millisecond

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets put this as an env so we will be able to control it

setConditions(status, allTrue)
before := transitionTimes(status.Conditions)

time.Sleep(2 * time.Millisecond)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont like time sleep might be flaky lets use eventually instead

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants