Skip to content

Commit b616436

Browse files
dragod812claude
andcommitted
Add log persistence proto field and collector sidecar injection
- Add string log_url = 10 to RayClusterStatus proto for reporting the S3 path where persisted logs are stored - Add LogPersistenceConfig to Mapper struct, loaded via FX from mapper.logPersistence YAML config - Implement InjectCollectorSidecar() that injects KubeRay History Server collector sidecar into Ray pod templates (head + workers) - Wire sidecar injection into mapRayCluster() when config is enabled - Update NewMapper() to accept LogPersistenceConfig parameter Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cd2cd0a commit b616436

7 files changed

Lines changed: 391 additions & 21 deletions

File tree

go/components/jobs/client/client_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func TestDeleteJob(t *testing.T) {
188188
k8sc := Client{
189189
factory: f,
190190
helper: NewHelper(),
191-
mapper: k8sengine.NewMapper().Mapper,
191+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
192192
}
193193

194194
// test
@@ -255,7 +255,7 @@ func TestDeletePromConfigMap(t *testing.T) {
255255
k8sc := Client{
256256
factory: f,
257257
helper: NewHelper(),
258-
mapper: k8sengine.NewMapper().Mapper,
258+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
259259
}
260260

261261
// test
@@ -322,7 +322,7 @@ func TestDeleteSecret(t *testing.T) {
322322
k8sc := Client{
323323
factory: f,
324324
helper: NewHelper(),
325-
mapper: k8sengine.NewMapper().Mapper,
325+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
326326
}
327327

328328
// test
@@ -419,7 +419,7 @@ func TestCreatePromConfigMap(t *testing.T) {
419419
return NewClient(Params{
420420
Factory: f,
421421
Logger: zaptest.NewLogger(t),
422-
Mapper: k8sengine.NewMapper().Mapper,
422+
Mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
423423
Helper: NewHelper(),
424424
})
425425
},
@@ -440,7 +440,7 @@ func TestCreatePromConfigMap(t *testing.T) {
440440
Factory: f,
441441
Logger: zaptest.NewLogger(t),
442442
Helper: NewHelper(),
443-
Mapper: k8sengine.NewMapper().Mapper,
443+
Mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
444444
})
445445
},
446446
wantError: true,
@@ -476,7 +476,7 @@ func TestCreatePromConfigMap(t *testing.T) {
476476
Factory: f,
477477
Logger: zaptest.NewLogger(t),
478478
Helper: NewHelper(),
479-
Mapper: k8sengine.NewMapper().Mapper,
479+
Mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
480480
})
481481
},
482482
// file path will be created as temp file in test body
@@ -570,7 +570,7 @@ func TestCreateSecret(t *testing.T) {
570570
factory: f,
571571
helper: NewHelper(),
572572
secretsProvider: provider,
573-
mapper: k8sengine.NewMapper().Mapper,
573+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
574574
}
575575

576576
// test
@@ -639,7 +639,7 @@ func TestGetJobStatus(t *testing.T) {
639639
gctrl := gomock.NewController(t)
640640
defer gctrl.Finish()
641641
f := computemocks.NewMockFactory(gctrl)
642-
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper().Mapper, logger: zaptest.NewLogger(t)}
642+
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper, logger: zaptest.NewLogger(t)}
643643
if tt.setupFactory != nil {
644644
tt.setupFactory(f)
645645
}
@@ -661,7 +661,7 @@ func TestGetJobStatus(t *testing.T) {
661661
return
662662
}
663663
// When not using GetJobStatus path, validate mapper-based conversion on local RayJob object
664-
js, err := k8sengine.NewMapper().Mapper.MapLocalJobStatusToGlobal(tt.jobObject)
664+
js, err := k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper.MapLocalJobStatusToGlobal(tt.jobObject)
665665
require.NoError(t, err)
666666
if js != nil && js.Ray != nil {
667667
assert.Equal(t, tt.expectStatus, js.Ray.JobStatus)
@@ -753,7 +753,7 @@ func TestCreateJob(t *testing.T) {
753753
k8sc := Client{
754754
factory: f,
755755
helper: NewHelper(),
756-
mapper: k8sengine.NewMapper().Mapper,
756+
mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper,
757757
}
758758

759759
// test
@@ -772,7 +772,7 @@ func TestCreateJob(t *testing.T) {
772772
func TestGetJobClusterStatus_ClientSetError(t *testing.T) {
773773
g := gomock.NewController(t)
774774
f := computemocks.NewMockFactory(g)
775-
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper().Mapper}
775+
cl := &Client{factory: f, helper: NewHelper(), mapper: k8sengine.NewMapper(k8sengine.LogPersistenceConfig{}).Mapper}
776776
jobCluster := &v2pb.RayCluster{ObjectMeta: metav1.ObjectMeta{Name: "rc1"}}
777777
cluster := &v2pb.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "c1"}}
778778

go/components/jobs/client/k8sengine/mapper.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ import (
66
"github.com/michelangelo-ai/michelangelo/go/components/jobs/common/types"
77
v2pb "github.com/michelangelo-ai/michelangelo/proto-go/api/v2"
88
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
9+
"go.uber.org/config"
910
"go.uber.org/fx"
1011
"k8s.io/apimachinery/pkg/runtime"
1112
)
1213

1314
// Mapper helps to map global to local crds and vice versa
14-
type Mapper struct{}
15+
type Mapper struct {
16+
LogPersistence LogPersistenceConfig
17+
}
1518

1619
// MapperResult has Mapper result
1720
type MapperResult struct {
@@ -22,10 +25,23 @@ type MapperResult struct {
2225

2326
const _mapperName = "k8sengineMapper"
2427

28+
const logPersistenceConfigKey = "mapper.logPersistence"
29+
30+
// NewLogPersistenceConfig loads LogPersistenceConfig from YAML config provider.
31+
func NewLogPersistenceConfig(provider config.Provider) (LogPersistenceConfig, error) {
32+
conf := LogPersistenceConfig{}
33+
err := provider.Get(logPersistenceConfigKey).Populate(&conf)
34+
if err != nil {
35+
// Config is optional — return zero-value (disabled) if not present
36+
return LogPersistenceConfig{}, nil
37+
}
38+
return conf, nil
39+
}
40+
2541
// NewMapper constructs the Mapper
26-
func NewMapper() MapperResult {
42+
func NewMapper(logPersistence LogPersistenceConfig) MapperResult {
2743
return MapperResult{
28-
Mapper: Mapper{},
44+
Mapper: Mapper{LogPersistence: logPersistence},
2945
}
3046
}
3147

go/components/jobs/client/k8sengine/ray.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,24 @@ import (
66
v2pb "github.com/michelangelo-ai/michelangelo/proto-go/api/v2"
77
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
88
corev1 "k8s.io/api/core/v1"
9+
"k8s.io/apimachinery/pkg/api/resource"
910
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1011
"k8s.io/apimachinery/pkg/runtime"
1112
k8sptr "k8s.io/utils/ptr"
1213
)
1314

15+
// LogPersistenceConfig holds platform-level configuration for log persistence.
16+
// This is loaded from YAML config at the mapper level, not from per-job proto.
17+
type LogPersistenceConfig struct {
18+
Enabled bool `yaml:"enabled"`
19+
StorageEndpoint string `yaml:"storageEndpoint"`
20+
Bucket string `yaml:"bucket"`
21+
PathPrefix string `yaml:"pathPrefix"`
22+
Region string `yaml:"region"`
23+
CredentialsSecret string `yaml:"credentialsSecret"`
24+
CollectorImage string `yaml:"collectorImage"`
25+
}
26+
1427
func (m Mapper) mapRay(rayJob *v2pb.RayJob, jobClusterObject runtime.Object, cluster *v2pb.Cluster) (runtime.Object, error) {
1528
if jobClusterObject == nil {
1629
return nil, fmt.Errorf("ray job requires associated RayCluster object")
@@ -52,6 +65,14 @@ func (m Mapper) mapRay(rayJob *v2pb.RayJob, jobClusterObject runtime.Object, clu
5265
func (m Mapper) mapRayCluster(rayCluster *v2pb.RayCluster) (runtime.Object, error) {
5366
workerGroupSpecs := getWorkerGroupSpecs(rayCluster.GetName(), rayCluster.GetSpec().Workers)
5467
headGroupSpec := getHeadGroupSpec(rayCluster.GetSpec().Head)
68+
69+
if m.LogPersistence.Enabled {
70+
InjectCollectorSidecar(&headGroupSpec.Template, m.LogPersistence, rayCluster.GetName(), "Head")
71+
for i := range workerGroupSpecs {
72+
InjectCollectorSidecar(&workerGroupSpecs[i].Template, m.LogPersistence, rayCluster.GetName(), "Worker")
73+
}
74+
}
75+
5576
rayV1Cluster := &rayv1.RayCluster{
5677
TypeMeta: metav1.TypeMeta{
5778
Kind: RayClusterKind,
@@ -94,6 +115,124 @@ func getWorkerGroupSpecs(clusterName string, workers []*v2pb.RayWorkerSpec) []ra
94115
return workerGroupSpecsJSON
95116
}
96117

118+
const (
119+
rayLogsVolumeName = "ray-logs"
120+
rayLogsPath = "/tmp/ray"
121+
collectorPort = 8084
122+
)
123+
124+
// InjectCollectorSidecar injects a KubeRay History Server collector sidecar container
125+
// into the pod template. It adds a shared emptyDir volume for Ray logs, mounts it to
126+
// all existing containers, configures Ray event export env vars, adds a lifecycle hook
127+
// to write the node ID, and appends the collector sidecar container.
128+
func InjectCollectorSidecar(podTemplate *corev1.PodTemplateSpec, config LogPersistenceConfig, clusterName string, role string) {
129+
// 1. Add ray-logs emptyDir volume
130+
podTemplate.Spec.Volumes = append(podTemplate.Spec.Volumes, corev1.Volume{
131+
Name: rayLogsVolumeName,
132+
VolumeSource: corev1.VolumeSource{
133+
EmptyDir: &corev1.EmptyDirVolumeSource{},
134+
},
135+
})
136+
137+
rayLogsVolumeMount := corev1.VolumeMount{
138+
Name: rayLogsVolumeName,
139+
MountPath: rayLogsPath,
140+
}
141+
142+
eventExportEnvVars := []corev1.EnvVar{
143+
{
144+
Name: "RAY_enable_core_worker_ray_event_to_aggregator",
145+
Value: "1",
146+
},
147+
{
148+
Name: "RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR",
149+
Value: fmt.Sprintf("http://localhost:%d/events", collectorPort),
150+
},
151+
}
152+
153+
// 2-3, 5. Update all existing containers: add volume mount, env vars, lifecycle hook
154+
for i := range podTemplate.Spec.Containers {
155+
c := &podTemplate.Spec.Containers[i]
156+
c.VolumeMounts = append(c.VolumeMounts, rayLogsVolumeMount)
157+
c.Env = append(c.Env, eventExportEnvVars...)
158+
159+
// Add postStart lifecycle hook to write node ID
160+
if c.Lifecycle == nil {
161+
c.Lifecycle = &corev1.Lifecycle{}
162+
}
163+
c.Lifecycle.PostStart = &corev1.LifecycleHandler{
164+
Exec: &corev1.ExecAction{
165+
Command: []string{"/bin/sh", "-c", "echo $RAY_NODE_ID > /tmp/ray/init.log"},
166+
},
167+
}
168+
}
169+
170+
// 4. Add collector sidecar container
171+
collectorContainer := corev1.Container{
172+
Name: "collector",
173+
Image: config.CollectorImage,
174+
Args: []string{
175+
fmt.Sprintf("--role=%s", role),
176+
"--runtime-class-name=s3",
177+
fmt.Sprintf("--ray-cluster-name=%s", clusterName),
178+
fmt.Sprintf("--ray-root-dir=%s", rayLogsPath),
179+
fmt.Sprintf("--events-port=%d", collectorPort),
180+
},
181+
Env: []corev1.EnvVar{
182+
{
183+
Name: "AWS_ACCESS_KEY_ID",
184+
ValueFrom: &corev1.EnvVarSource{
185+
SecretKeyRef: &corev1.SecretKeySelector{
186+
LocalObjectReference: corev1.LocalObjectReference{
187+
Name: config.CredentialsSecret,
188+
},
189+
Key: "AWS_ACCESS_KEY_ID",
190+
},
191+
},
192+
},
193+
{
194+
Name: "AWS_SECRET_ACCESS_KEY",
195+
ValueFrom: &corev1.EnvVarSource{
196+
SecretKeyRef: &corev1.SecretKeySelector{
197+
LocalObjectReference: corev1.LocalObjectReference{
198+
Name: config.CredentialsSecret,
199+
},
200+
Key: "AWS_SECRET_ACCESS_KEY",
201+
},
202+
},
203+
},
204+
{
205+
Name: "S3_ENDPOINT_URL",
206+
Value: config.StorageEndpoint,
207+
},
208+
{
209+
Name: "S3_BUCKET",
210+
Value: config.Bucket,
211+
},
212+
{
213+
Name: "S3_REGION",
214+
Value: config.Region,
215+
},
216+
},
217+
Ports: []corev1.ContainerPort{
218+
{
219+
Name: "events",
220+
ContainerPort: int32(collectorPort),
221+
Protocol: corev1.ProtocolTCP,
222+
},
223+
},
224+
Resources: corev1.ResourceRequirements{
225+
Requests: corev1.ResourceList{
226+
corev1.ResourceCPU: resource.MustParse("100m"),
227+
corev1.ResourceMemory: resource.MustParse("128Mi"),
228+
},
229+
},
230+
VolumeMounts: []corev1.VolumeMount{rayLogsVolumeMount},
231+
}
232+
233+
podTemplate.Spec.Containers = append(podTemplate.Spec.Containers, collectorContainer)
234+
}
235+
97236
// getRayClusterStateFromStatus maps KubeRay v1 cluster state to our internal v2pb.RayClusterState
98237
func getRayClusterStateFromKubeRayState(kubeRayState rayv1.ClusterState) v2pb.RayClusterState {
99238
switch kubeRayState {

0 commit comments

Comments
 (0)