Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions runs/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ var defaultConfig = &Config{
Port: 8090,
Host: "0.0.0.0",
},
WatchBufferSize: 100,
ActionsServiceURL: "http://localhost:8090",
StoragePrefix: "file:///tmp/flyte/data",
WatchBufferSize: 100,
ActionsServiceURL: "http://localhost:8090",
DataProxyServiceURL: "http://localhost:8088",
StoragePrefix: "file:///tmp/flyte/data",
SeedProjects: []string{"flytesnacks"},
Domains: []DomainConfig{
{ID: "development", Name: "Development"},
Expand Down Expand Up @@ -50,6 +51,9 @@ type Config struct {
// Actions service URL for enqueuing actions
ActionsServiceURL string `json:"actionsServiceUrl" pflag:",URL of the actions service"`

// DataProxyServiceURL is the URL of the DataProxy service, used by RunLogsService to delegate log streaming.
DataProxyServiceURL string `json:"dataProxyServiceUrl" pflag:",URL of the DataProxy service"`

// StoragePrefix is the base URI for storing run data (inputs, outputs)
// e.g. "s3://my-bucket" or "gs://my-bucket" or "file:///tmp/flyte/data"
StoragePrefix string `json:"storagePrefix" pflag:",Base URI prefix for storing run inputs and outputs"`
Expand Down
97 changes: 0 additions & 97 deletions runs/service/k8s_log_streamer.go

This file was deleted.

144 changes: 0 additions & 144 deletions runs/service/k8s_log_streamer_test.go

This file was deleted.

93 changes: 25 additions & 68 deletions runs/service/run_logs_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,33 @@ package service

import (
"context"
"errors"
"fmt"

"database/sql"

"connectrpc.com/connect"
"golang.org/x/sync/semaphore"

"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/common"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/dataproxy"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/dataproxy/dataproxyconnect"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow"
"github.com/flyteorg/flyte/v2/runs/repository/interfaces"

"github.com/samber/lo"
)

const defaultMaxConcurrentStreams = 100

// LogStreamer abstracts log fetching from different backends.
type LogStreamer interface {
TailLogs(ctx context.Context, logContext *core.LogContext, stream *connect.ServerStream[workflow.TailLogsResponse]) error
}

// RunLogsService implements the RunLogsServiceHandler interface.
type RunLogsService struct {
repo interfaces.Repository
streamer LogStreamer
sem *semaphore.Weighted
dataProxyClient dataproxyconnect.DataProxyServiceClient
sem *semaphore.Weighted
}

// NewRunLogsService creates a new RunLogsService.
func NewRunLogsService(repo interfaces.Repository, streamer LogStreamer) *RunLogsService {
func NewRunLogsService(dataProxyClient dataproxyconnect.DataProxyServiceClient) *RunLogsService {
return &RunLogsService{
repo: repo,
streamer: streamer,
sem: semaphore.NewWeighted(defaultMaxConcurrentStreams),
dataProxyClient: dataProxyClient,
sem: semaphore.NewWeighted(defaultMaxConcurrentStreams),
}
}

// TailLogs streams pod logs for an action attempt.
// TailLogs streams pod logs for an action attempt by delegating to DataProxyService.
func (s *RunLogsService) TailLogs(ctx context.Context, req *connect.Request[workflow.TailLogsRequest], stream *connect.ServerStream[workflow.TailLogsResponse]) error {
msg := req.Msg
if msg.GetActionId() == nil {
Expand All @@ -52,56 +39,26 @@ func (s *RunLogsService) TailLogs(ctx context.Context, req *connect.Request[work
}
defer s.sem.Release(1)

logContext, err := getLogContextForAttempt(ctx, s.repo, msg.GetActionId(), msg.GetAttempt())
dpStream, err := s.dataProxyClient.TailLogs(ctx, connect.NewRequest(&dataproxy.TailLogsRequest{
ActionId: msg.GetActionId(),
Attempt: msg.GetAttempt(),
}))
if err != nil {
return err
}

return s.streamer.TailLogs(ctx, logContext, stream)
}

// getLogContextForAttempt fetches the latest event for the given attempt and
// extracts its LogContext. Uses a targeted DB query instead of scanning all events.
func getLogContextForAttempt(ctx context.Context, repo interfaces.Repository, actionID *common.ActionIdentifier, attempt uint32) (*core.LogContext, error) {
m, err := repo.ActionRepo().GetLatestEventByAttempt(ctx, actionID, attempt)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no event found for action %v attempt %d", actionID, attempt))
defer dpStream.Close()

for dpStream.Receive() {
dpResp := dpStream.Msg()
logs := make([]*workflow.TailLogsResponse_Logs, 0, len(dpResp.GetLogs()))
for _, l := range dpResp.GetLogs() {
logs = append(logs, &workflow.TailLogsResponse_Logs{
Lines: l.GetLines(),
})
}
if err := stream.Send(&workflow.TailLogsResponse{Logs: logs}); err != nil {
return err
}
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to get event for action %v attempt %d: %w", actionID, attempt, err))
}

event, err := m.ToActionEvent()
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to deserialize event: %w", err))
}

if event.GetLogContext() == nil {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no log context found for action %v attempt %d", actionID, attempt))
}

return event.GetLogContext(), nil
}

// getPrimaryPodAndContainer finds the primary pod and container from a LogContext.
func getPrimaryPodAndContainer(logContext *core.LogContext) (*core.PodLogContext, *core.ContainerContext, error) {
if logContext.GetPrimaryPodName() == "" {
return nil, nil, fmt.Errorf("primary pod name is empty in log context")
}

pod, found := lo.Find(logContext.GetPods(), func(pod *core.PodLogContext) bool {
return pod.GetPodName() == logContext.GetPrimaryPodName()
})
if !found {
return nil, nil, fmt.Errorf("primary pod %s not found in log context", logContext.GetPrimaryPodName())
}

container, found := lo.Find(pod.GetContainers(), func(c *core.ContainerContext) bool {
return c.GetContainerName() == pod.GetPrimaryContainerName()
})
if !found {
return nil, nil, fmt.Errorf("primary container %s not found in pod %s", pod.GetPrimaryContainerName(), pod.GetPodName())
}

return pod, container, nil
return dpStream.Err()
}
Loading
Loading