Skip to content

Commit 292d38b

Browse files
committed
feat: add LogPersistenceSpec proto and KubeRay collector sidecar injection
Proto: - Add LogPersistenceSpec message (storage backend, bucket, path prefix, region, credentials secret, collector image) - Add log_persistence to RayClusterSpec - Add log_url to RayClusterStatus Mapper: - Load LogPersistenceConfig from controllers.rayCluster.logPersistence YAML config block (FX-injected) - Implement InjectCollectorSidecar() that injects the KubeRay History Server collector sidecar into Ray pod templates (head + workers) with the env-var S3 config pattern (no ConfigMap mount) - Reuse an existing /tmp/ray volume when present so the collector and Ray container share log data - Set ImagePullPolicy: IfNotPresent on the collector container - Add lifecycle PostStart hook to extract raylet node ID - Wire sidecar injection into mapRayCluster() when config is enabled Tests: - Cover env-var pattern (no ConfigMap mount), head vs worker env-var differences, port and resources, lifecycle preservation
1 parent cd2cd0a commit 292d38b

8 files changed

Lines changed: 530 additions & 22 deletions

File tree

go/components/jobs/client/client_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func TestDeleteJob(t *testing.T) {
188188
k8sc := Client{
189189
factory: f,
190190
helper: NewHelper(),
191-
mapper: k8sengine.NewMapper().Mapper,
191+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
192192
}
193193

194194
// test
@@ -255,7 +255,7 @@ func TestDeletePromConfigMap(t *testing.T) {
255255
k8sc := Client{
256256
factory: f,
257257
helper: NewHelper(),
258-
mapper: k8sengine.NewMapper().Mapper,
258+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
259259
}
260260

261261
// test
@@ -322,7 +322,7 @@ func TestDeleteSecret(t *testing.T) {
322322
k8sc := Client{
323323
factory: f,
324324
helper: NewHelper(),
325-
mapper: k8sengine.NewMapper().Mapper,
325+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
326326
}
327327

328328
// test
@@ -419,7 +419,7 @@ func TestCreatePromConfigMap(t *testing.T) {
419419
return NewClient(Params{
420420
Factory: f,
421421
Logger: zaptest.NewLogger(t),
422-
Mapper: k8sengine.NewMapper().Mapper,
422+
Mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
423423
Helper: NewHelper(),
424424
})
425425
},
@@ -440,7 +440,7 @@ func TestCreatePromConfigMap(t *testing.T) {
440440
Factory: f,
441441
Logger: zaptest.NewLogger(t),
442442
Helper: NewHelper(),
443-
Mapper: k8sengine.NewMapper().Mapper,
443+
Mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
444444
})
445445
},
446446
wantError: true,
@@ -476,7 +476,7 @@ func TestCreatePromConfigMap(t *testing.T) {
476476
Factory: f,
477477
Logger: zaptest.NewLogger(t),
478478
Helper: NewHelper(),
479-
Mapper: k8sengine.NewMapper().Mapper,
479+
Mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
480480
})
481481
},
482482
// file path will be created as temp file in test body
@@ -570,7 +570,7 @@ func TestCreateSecret(t *testing.T) {
570570
factory: f,
571571
helper: NewHelper(),
572572
secretsProvider: provider,
573-
mapper: k8sengine.NewMapper().Mapper,
573+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
574574
}
575575

576576
// test
@@ -639,7 +639,7 @@ func TestGetJobStatus(t *testing.T) {
639639
gctrl := gomock.NewController(t)
640640
defer gctrl.Finish()
641641
f := computemocks.NewMockFactory(gctrl)
642-
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper().Mapper, logger: zaptest.NewLogger(t)}
642+
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper, logger: zaptest.NewLogger(t)}
643643
if tt.setupFactory != nil {
644644
tt.setupFactory(f)
645645
}
@@ -661,7 +661,7 @@ func TestGetJobStatus(t *testing.T) {
661661
return
662662
}
663663
// When not using GetJobStatus path, validate mapper-based conversion on local RayJob object
664-
js, err := k8sengine.NewMapper().Mapper.MapLocalJobStatusToGlobal(tt.jobObject)
664+
js, err := k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper.MapLocalJobStatusToGlobal(tt.jobObject)
665665
require.NoError(t, err)
666666
if js != nil && js.Ray != nil {
667667
assert.Equal(t, tt.expectStatus, js.Ray.JobStatus)
@@ -753,7 +753,7 @@ func TestCreateJob(t *testing.T) {
753753
k8sc := Client{
754754
factory: f,
755755
helper: NewHelper(),
756-
mapper: k8sengine.NewMapper().Mapper,
756+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
757757
}
758758

759759
// test
@@ -772,7 +772,7 @@ func TestCreateJob(t *testing.T) {
772772
func TestGetJobClusterStatus_ClientSetError(t *testing.T) {
773773
g := gomock.NewController(t)
774774
f := computemocks.NewMockFactory(g)
775-
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper().Mapper}
775+
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper}
776776
jobCluster := &v2pb.RayCluster{ObjectMeta: metav1.ObjectMeta{Name: "rc1"}}
777777
cluster := &v2pb.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "c1"}}
778778

go/components/jobs/client/k8sengine/BUILD.bazel

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,29 @@ go_library(
1515
"//proto/api/v2:go_default_library",
1616
"@com_github_ray_project_kuberay_ray_operator//apis/ray/v1:go_default_library",
1717
"@io_k8s_api//core/v1:go_default_library",
18+
"@io_k8s_apimachinery//pkg/api/resource:go_default_library",
1819
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
1920
"@io_k8s_apimachinery//pkg/runtime:go_default_library",
2021
"@io_k8s_utils//ptr:go_default_library",
22+
"@org_uber_go_config//:go_default_library",
2123
"@org_uber_go_fx//:go_default_library",
2224
],
2325
)
2426

2527
go_test(
2628
name = "go_default_test",
27-
srcs = ["mapper_test.go"],
29+
srcs = [
30+
"mapper_test.go",
31+
"ray_collector_test.go",
32+
],
2833
embed = [":go_default_library"],
2934
deps = [
3035
"//proto/api/v2:go_default_library",
3136
"@com_github_ray_project_kuberay_ray_operator//apis/ray/v1:go_default_library",
3237
"@com_github_stretchr_testify//assert:go_default_library",
3338
"@com_github_stretchr_testify//require:go_default_library",
3439
"@io_k8s_api//core/v1:go_default_library",
40+
"@io_k8s_apimachinery//pkg/api/resource:go_default_library",
3541
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
3642
"@io_k8s_apimachinery//pkg/runtime:go_default_library",
3743
],

go/components/jobs/client/k8sengine/mapper.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ import (
66
"github.com/michelangelo-ai/michelangelo/go/components/jobs/common/types"
77
v2pb "github.com/michelangelo-ai/michelangelo/proto-go/api/v2"
88
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
9+
"go.uber.org/config"
910
"go.uber.org/fx"
1011
"k8s.io/apimachinery/pkg/runtime"
1112
)
1213

1314
// Mapper helps to map global to local crds and vice versa
14-
type Mapper struct{}
15+
type Mapper struct {
16+
LogPersistence LogPersistenceConfig
17+
}
1518

1619
// MapperResult has Mapper result
1720
type MapperResult struct {
@@ -22,10 +25,23 @@ type MapperResult struct {
2225

2326
const _mapperName = "k8sengineMapper"
2427

28+
const logPersistenceConfigKey = "controllers.rayCluster.logPersistence"
29+
30+
// NewLogPersistenceConfig loads LogPersistenceConfig from YAML config provider.
31+
func NewLogPersistenceConfig(provider config.Provider) (LogPersistenceConfig, error) {
32+
conf := LogPersistenceConfig{}
33+
err := provider.Get(logPersistenceConfigKey).Populate(&conf)
34+
if err != nil {
35+
// Config is optional — return zero-value (disabled) if not present
36+
return LogPersistenceConfig{}, nil
37+
}
38+
return conf, nil
39+
}
40+
2541
// NewMapper constructs the Mapper
26-
func NewMapper() MapperResult {
42+
func NewMapper(logPersistence LogPersistenceConfig) MapperResult {
2743
return MapperResult{
28-
Mapper: Mapper{},
44+
Mapper: Mapper{LogPersistence: logPersistence},
2945
}
3046
}
3147

0 commit comments

Comments
 (0)