Skip to content

Commit 4cdc4c2

Browse files
committed
feat: add log-persistence end-to-end (proto + mapper + controller)
Proto: - Add LogPersistenceSpec message and log_url to RayClusterStatus (proto/api/v2/ray_cluster.proto) Mapper (go/components/jobs/client/k8sengine): - Load LogPersistenceConfig from controllers.rayCluster.logPersistence via FX - injectCollectorSidecar(): inject the KubeRay History Server collector into Ray pod templates with env-var S3 config (no ConfigMap), reuse an existing /tmp/ray volume when present, set ImagePullPolicy: IfNotPresent, add the postStart raylet-node-id hook - Wire sidecar injection into mapRayCluster() when logPersistence.enabled - buildLogURL() and MapLocalClusterStatusToGlobal(): compute the human-browsable log URL during local→global status translation (mapper owns the formula since it knows config + local cluster name + compute-cluster Ray namespace = RayLocalNamespace) Format: {LogUrlBaseUrl}/{Bucket}/{PathPrefix}/{cluster}_{rayLocalNamespace}/ Controller (go/components/ray/cluster): - applyRayClusterStatus(): copy clusterStatus.Ray.LogUrl onto rayCluster.Status.LogUrl so callers see the value the mapper computed - Controller intentionally does NOT own the log_url formula or any LogPersistenceConfig — single source of truth lives in the mapper Defaults (go/cmd/controllermgr/config/base.yaml): - controllers.rayCluster.logPersistence: enabled: true storageEndpoint: "k3d-michelangelo-sandbox-agent-0:30007" bucket: ray-history pathPrefix: log region: us-east-1 credentialsSecret: minio-credentials collectorImage: kuberay-collector:v0.1.0 logUrlBaseUrl: "http://localhost:9090/browser" - Sandbox MinIO defaults; production overlays flip these. Tests cover: mapper sidecar injection (env vars, lifecycle preservation, config knobs), controller log_url pass-through.
1 parent cd2cd0a commit 4cdc4c2

13 files changed

Lines changed: 598 additions & 29 deletions

File tree

go/cmd/controllermgr/config/base.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ workflowClient:
1212
# taskList is a used as a fallback if project.Metatdata.Annotations["michelangelo/worker_queue"] is empty
1313
taskList: default # TODO(#934): MaCTL support for adding project.Metatdata.Annotations["michelangelo/worker_queue"] to project CRD
1414
executionUrlFormat: http://localhost:8088/namespaces/{{.Domain}}/workflows/{{.ExecutionID}}
15-
15+
1616
k8s:
1717
qps: 300
1818
burst: 600
@@ -23,6 +23,15 @@ controllers:
2323
rayCluster:
2424
k8sQps: 300
2525
k8sBurst: 600
26+
logPersistence:
27+
enabled: true
28+
storageEndpoint: "k3d-michelangelo-sandbox-agent-0:30007"
29+
bucket: ray-history
30+
pathPrefix: log
31+
region: us-east-1
32+
credentialsSecret: minio-credentials
33+
collectorImage: kuberay-collector:v0.1.0
34+
logUrlBaseUrl: "http://localhost:9090/browser"
2635

2736
minio:
2837
awsRegion: us-east-1

go/components/jobs/client/client_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func TestDeleteJob(t *testing.T) {
188188
k8sc := Client{
189189
factory: f,
190190
helper: NewHelper(),
191-
mapper: k8sengine.NewMapper().Mapper,
191+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
192192
}
193193

194194
// test
@@ -255,7 +255,7 @@ func TestDeletePromConfigMap(t *testing.T) {
255255
k8sc := Client{
256256
factory: f,
257257
helper: NewHelper(),
258-
mapper: k8sengine.NewMapper().Mapper,
258+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
259259
}
260260

261261
// test
@@ -322,7 +322,7 @@ func TestDeleteSecret(t *testing.T) {
322322
k8sc := Client{
323323
factory: f,
324324
helper: NewHelper(),
325-
mapper: k8sengine.NewMapper().Mapper,
325+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
326326
}
327327

328328
// test
@@ -419,7 +419,7 @@ func TestCreatePromConfigMap(t *testing.T) {
419419
return NewClient(Params{
420420
Factory: f,
421421
Logger: zaptest.NewLogger(t),
422-
Mapper: k8sengine.NewMapper().Mapper,
422+
Mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
423423
Helper: NewHelper(),
424424
})
425425
},
@@ -440,7 +440,7 @@ func TestCreatePromConfigMap(t *testing.T) {
440440
Factory: f,
441441
Logger: zaptest.NewLogger(t),
442442
Helper: NewHelper(),
443-
Mapper: k8sengine.NewMapper().Mapper,
443+
Mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
444444
})
445445
},
446446
wantError: true,
@@ -476,7 +476,7 @@ func TestCreatePromConfigMap(t *testing.T) {
476476
Factory: f,
477477
Logger: zaptest.NewLogger(t),
478478
Helper: NewHelper(),
479-
Mapper: k8sengine.NewMapper().Mapper,
479+
Mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
480480
})
481481
},
482482
// file path will be created as temp file in test body
@@ -570,7 +570,7 @@ func TestCreateSecret(t *testing.T) {
570570
factory: f,
571571
helper: NewHelper(),
572572
secretsProvider: provider,
573-
mapper: k8sengine.NewMapper().Mapper,
573+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
574574
}
575575

576576
// test
@@ -639,7 +639,7 @@ func TestGetJobStatus(t *testing.T) {
639639
gctrl := gomock.NewController(t)
640640
defer gctrl.Finish()
641641
f := computemocks.NewMockFactory(gctrl)
642-
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper().Mapper, logger: zaptest.NewLogger(t)}
642+
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper, logger: zaptest.NewLogger(t)}
643643
if tt.setupFactory != nil {
644644
tt.setupFactory(f)
645645
}
@@ -661,7 +661,7 @@ func TestGetJobStatus(t *testing.T) {
661661
return
662662
}
663663
// When not using GetJobStatus path, validate mapper-based conversion on local RayJob object
664-
js, err := k8sengine.NewMapper().Mapper.MapLocalJobStatusToGlobal(tt.jobObject)
664+
js, err := k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper.MapLocalJobStatusToGlobal(tt.jobObject)
665665
require.NoError(t, err)
666666
if js != nil && js.Ray != nil {
667667
assert.Equal(t, tt.expectStatus, js.Ray.JobStatus)
@@ -753,7 +753,7 @@ func TestCreateJob(t *testing.T) {
753753
k8sc := Client{
754754
factory: f,
755755
helper: NewHelper(),
756-
mapper: k8sengine.NewMapper().Mapper,
756+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
757757
}
758758

759759
// test
@@ -772,7 +772,7 @@ func TestCreateJob(t *testing.T) {
772772
func TestGetJobClusterStatus_ClientSetError(t *testing.T) {
773773
g := gomock.NewController(t)
774774
f := computemocks.NewMockFactory(g)
775-
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper().Mapper}
775+
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper}
776776
jobCluster := &v2pb.RayCluster{ObjectMeta: metav1.ObjectMeta{Name: "rc1"}}
777777
cluster := &v2pb.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "c1"}}
778778

go/components/jobs/client/k8sengine/BUILD.bazel

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,29 @@ go_library(
1515
"//proto/api/v2:go_default_library",
1616
"@com_github_ray_project_kuberay_ray_operator//apis/ray/v1:go_default_library",
1717
"@io_k8s_api//core/v1:go_default_library",
18+
"@io_k8s_apimachinery//pkg/api/resource:go_default_library",
1819
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
1920
"@io_k8s_apimachinery//pkg/runtime:go_default_library",
2021
"@io_k8s_utils//ptr:go_default_library",
22+
"@org_uber_go_config//:go_default_library",
2123
"@org_uber_go_fx//:go_default_library",
2224
],
2325
)
2426

2527
go_test(
2628
name = "go_default_test",
27-
srcs = ["mapper_test.go"],
29+
srcs = [
30+
"mapper_test.go",
31+
"ray_collector_test.go",
32+
],
2833
embed = [":go_default_library"],
2934
deps = [
3035
"//proto/api/v2:go_default_library",
3136
"@com_github_ray_project_kuberay_ray_operator//apis/ray/v1:go_default_library",
3237
"@com_github_stretchr_testify//assert:go_default_library",
3338
"@com_github_stretchr_testify//require:go_default_library",
3439
"@io_k8s_api//core/v1:go_default_library",
40+
"@io_k8s_apimachinery//pkg/api/resource:go_default_library",
3541
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
3642
"@io_k8s_apimachinery//pkg/runtime:go_default_library",
3743
],

go/components/jobs/client/k8sengine/mapper.go

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,20 @@ package k8sengine
22

33
import (
44
"fmt"
5+
"strings"
56

67
"github.com/michelangelo-ai/michelangelo/go/components/jobs/common/types"
78
v2pb "github.com/michelangelo-ai/michelangelo/proto-go/api/v2"
89
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
10+
"go.uber.org/config"
911
"go.uber.org/fx"
1012
"k8s.io/apimachinery/pkg/runtime"
1113
)
1214

1315
// Mapper helps to map global to local crds and vice versa
14-
type Mapper struct{}
16+
type Mapper struct {
17+
LogPersistence LogPersistenceConfig
18+
}
1519

1620
// MapperResult has Mapper result
1721
type MapperResult struct {
@@ -22,10 +26,23 @@ type MapperResult struct {
2226

2327
const _mapperName = "k8sengineMapper"
2428

29+
const logPersistenceConfigKey = "controllers.rayCluster.logPersistence"
30+
31+
// NewLogPersistenceConfig loads LogPersistenceConfig from YAML config provider.
32+
func NewLogPersistenceConfig(provider config.Provider) (LogPersistenceConfig, error) {
33+
conf := LogPersistenceConfig{}
34+
err := provider.Get(logPersistenceConfigKey).Populate(&conf)
35+
if err != nil {
36+
// Config is optional — return zero-value (disabled) if not present
37+
return LogPersistenceConfig{}, nil
38+
}
39+
return conf, nil
40+
}
41+
2542
// NewMapper constructs the Mapper
26-
func NewMapper() MapperResult {
43+
func NewMapper(logPersistence LogPersistenceConfig) MapperResult {
2744
return MapperResult{
28-
Mapper: Mapper{},
45+
Mapper: Mapper{LogPersistence: logPersistence},
2946
}
3047
}
3148

@@ -86,6 +103,12 @@ func (m Mapper) GetLocalName(obj runtime.Object) (namespace, name string) {
86103

87104
// MapLocalClusterStatusToGlobal converts a local (Kubernetes) cluster status object
88105
// to the global Michelangelo ClusterStatus representation.
106+
//
107+
// Also computes log_url here (vs in the controller) because the mapper owns
108+
// every piece of knowledge required: the LogPersistenceConfig, the local
109+
// cluster name (off the local object), and the compute-cluster Ray namespace
110+
// (RayLocalNamespace). The controller just copies clusterStatus.Ray.LogUrl
111+
// onto its RayClusterStatus during applyRayClusterStatus.
89112
func (m Mapper) MapLocalClusterStatusToGlobal(localClusterObject runtime.Object) (*types.JobClusterStatus, error) {
90113
if localClusterObject == nil {
91114
return nil, fmt.Errorf("localClusterObject cannot be nil")
@@ -94,6 +117,7 @@ func (m Mapper) MapLocalClusterStatusToGlobal(localClusterObject runtime.Object)
94117
switch obj := localClusterObject.(type) {
95118
case *rayv1.RayCluster:
96119
v2Status := convertRayV1ClusterStatusToV2(obj)
120+
v2Status.LogUrl = m.buildLogURL(obj.GetName())
97121
reason := obj.Status.Reason
98122
return &types.JobClusterStatus{
99123
Ray: v2Status,
@@ -104,6 +128,29 @@ func (m Mapper) MapLocalClusterStatusToGlobal(localClusterObject runtime.Object)
104128
}
105129
}
106130

131+
// buildLogURL returns the human-browsable log URL for a Ray cluster, or "" when
132+
// log persistence is disabled / no base URL is configured. The URL points at
133+
// where the collector sidecar wrote the cluster's logs in object storage.
134+
//
135+
// {LogUrlBaseUrl}/{Bucket}/{PathPrefix}/{cluster}_{rayLocalNamespace}/
136+
//
137+
// rayLocalNamespace is RayLocalNamespace ("default") — the namespace where Ray
138+
// pods actually run on the compute cluster, not the control-plane namespace
139+
// of the v2 RayCluster CR. The bucket layout is created by the collector and
140+
// keyed off the compute-cluster namespace, so the browse URL must match.
141+
func (m Mapper) buildLogURL(clusterName string) string {
142+
if !m.LogPersistence.Enabled || m.LogPersistence.LogUrlBaseUrl == "" {
143+
return ""
144+
}
145+
return fmt.Sprintf("%s/%s/%s/%s_%s/",
146+
strings.TrimRight(m.LogPersistence.LogUrlBaseUrl, "/"),
147+
m.LogPersistence.Bucket,
148+
strings.Trim(m.LogPersistence.PathPrefix, "/"),
149+
clusterName,
150+
RayLocalNamespace,
151+
)
152+
}
153+
107154
// MapLocalJobStatusToGlobal converts a local (Kubernetes) job status object
108155
// to the global Michelangelo JobStatus representation.
109156
func (m Mapper) MapLocalJobStatusToGlobal(localJobObject runtime.Object) (*types.JobStatus, error) {

0 commit comments

Comments
 (0)