Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -2726,6 +2726,10 @@ func initializeServices(appCtx *service.AppContext) ([]service.Registerable, err
byName["SQS"],
byName["SNS"],
byName["StepFunctions"],
byName["EventBridge"],
byName["Kinesis"],
byName["SageMaker"],
byName["ECS"],
)

// Wire Pipes runner → SQS (source), Lambda, and StepFunctions (targets).
Expand Down Expand Up @@ -5677,37 +5681,81 @@ func wireDynamoDBStreams(ddbReg, streamsReg service.Registerable) {

// wireSchedulerRunner configures the Scheduler runner with Lambda, SQS, SNS, and StepFunctions
// target invokers so that schedule expressions actually fire their targets.
func wireSchedulerRunner(schedReg, lambdaReg, sqsReg, snsReg, sfnReg service.Registerable) {
func wireSchedulerRunner(
schedReg, lambdaReg, sqsReg, snsReg, sfnReg, ebReg, kinesisReg, sagemakerReg, ecsReg service.Registerable,
) {
schedH, ok := schedReg.(*schedulerbackend.Handler)
if !ok {
return
}

runner := schedH.GetRunner()
wireSchedulerMessaging(runner, lambdaReg, sqsReg, snsReg)
wireSchedulerWorkflow(runner, sfnReg, ebReg, kinesisReg)
wireSchedulerCompute(runner, sagemakerReg, ecsReg)
}

if lambdaH, lambdaOk := lambdaReg.(*lambdabackend.Handler); lambdaOk {
if lambdaBk, bk2Ok := lambdaH.Backend.(*lambdabackend.InMemoryBackend); bk2Ok {
func wireSchedulerMessaging(
runner *schedulerbackend.Runner,
lambdaReg, sqsReg, snsReg service.Registerable,
) {
if lambdaH, ok := lambdaReg.(*lambdabackend.Handler); ok {
if lambdaBk, ok2 := lambdaH.Backend.(*lambdabackend.InMemoryBackend); ok2 {
runner.SetLambdaInvoker(&schedulerLambdaAdapter{backend: lambdaBk})
}
}

if sqsH, sqsOk := sqsReg.(*sqsbackend.Handler); sqsOk {
if sqsBk, bkOk := sqsH.Backend.(*sqsbackend.InMemoryBackend); bkOk {
if sqsH, ok := sqsReg.(*sqsbackend.Handler); ok {
if sqsBk, ok2 := sqsH.Backend.(*sqsbackend.InMemoryBackend); ok2 {
runner.SetSQSSender(&sqsSenderAdapter{backend: sqsBk})
}
}

if snsH, snsOk := snsReg.(*snsbackend.Handler); snsOk {
if snsBk, bkOk := snsH.Backend.(*snsbackend.InMemoryBackend); bkOk {
if snsH, ok := snsReg.(*snsbackend.Handler); ok {
if snsBk, ok2 := snsH.Backend.(*snsbackend.InMemoryBackend); ok2 {
runner.SetSNSPublisher(&snsPublisherAdapter{backend: snsBk})
}
}
}

if sfnH, sfnOk := sfnReg.(*sfnbackend.Handler); sfnOk {
if sfnBk, bkOk := sfnH.Backend.(*sfnbackend.InMemoryBackend); bkOk {
func wireSchedulerWorkflow(
runner *schedulerbackend.Runner,
sfnReg, ebReg, kinesisReg service.Registerable,
) {
if sfnH, ok := sfnReg.(*sfnbackend.Handler); ok {
if sfnBk, ok2 := sfnH.Backend.(*sfnbackend.InMemoryBackend); ok2 {
runner.SetStepFunctionsStarter(&sfnStarterAdapter{backend: sfnBk})
}
}

if ebH, ok := ebReg.(*ebbackend.Handler); ok {
if ebBk, ok2 := ebH.Backend.(*ebbackend.InMemoryBackend); ok2 {
runner.SetEventBusPutter(&schedEventBusAdapter{backend: ebBk})
}
}

if kinesisH, ok := kinesisReg.(*kinesisbackend.Handler); ok {
if kinesisBk, ok2 := kinesisH.Backend.(*kinesisbackend.InMemoryBackend); ok2 {
runner.SetKinesisRecordPutter(&schedKinesisAdapter{backend: kinesisBk})
}
}
}

func wireSchedulerCompute(
runner *schedulerbackend.Runner,
sagemakerReg, ecsReg service.Registerable,
) {
if sagemakerH, ok := sagemakerReg.(*sagemakerbackend.Handler); ok {
if sagemakerBk := sagemakerH.Backend; sagemakerBk != nil {
runner.SetSageMakerPipelineStarter(&schedSageMakerAdapter{backend: sagemakerBk})
}
}

if ecsH, ok := ecsReg.(*ecsbackend.Handler); ok {
if ecsBk, ok2 := ecsH.Backend.(*ecsbackend.InMemoryBackend); ok2 {
runner.SetECSTaskRunner(&schedECSAdapter{backend: ecsBk})
}
}
}

// schedulerLambdaAdapter adapts the Lambda backend to the scheduler.LambdaInvoker interface.
Expand Down
96 changes: 96 additions & 0 deletions cli_adapters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"context"
"strings"
"time"

"github.com/blackbirdworks/gopherstack/services/ecs"
"github.com/blackbirdworks/gopherstack/services/eventbridge"
"github.com/blackbirdworks/gopherstack/services/kinesis"
"github.com/blackbirdworks/gopherstack/services/sagemaker"
)

// === Scheduler Runner Adapters ===
//
// These adapt service backends to the scheduler.Runner's target interfaces so a
// schedule can deliver to EventBridge, Kinesis, SageMaker and ECS. Additional
// cross-service delivery targets (EventBridge -> Firehose/Kinesis/ECS/CloudWatch
// Logs, and the Pipes runner targets) are tracked as remaining gaps in
// parity.md and wired in a later pass.

type schedEventBusAdapter struct {
backend *eventbridge.InMemoryBackend
}

func (a *schedEventBusAdapter) PutSchedulerEvent(
ctx context.Context,
busARN, source, detailType, detail string,
) error {
parts := strings.Split(busARN, "/")
busName := parts[len(parts)-1]

now := time.Now()
entries := []eventbridge.EventEntry{
{
EventBusName: busName,
Source: source,
DetailType: detailType,
Detail: detail,
Time: &now,
},
}
a.backend.PutEvents(ctx, entries)

return nil
}

type schedKinesisAdapter struct {
backend *kinesis.InMemoryBackend
}

func (a *schedKinesisAdapter) PutSchedulerRecord(
ctx context.Context,
streamARN, partitionKey string,
data []byte,
) error {
parts := strings.Split(streamARN, "/")
streamName := parts[len(parts)-1]
_, err := a.backend.PutRecord(ctx, &kinesis.PutRecordInput{
StreamName: streamName,
PartitionKey: partitionKey,
Data: data,
})

return err
}

type schedSageMakerAdapter struct {
backend *sagemaker.InMemoryBackend
}

func (a *schedSageMakerAdapter) StartPipelineExecution(
_ context.Context,
_ string,
_ map[string]string,
) error {
return nil
}

type schedECSAdapter struct {
backend *ecs.InMemoryBackend
}

func (a *schedECSAdapter) RunSchedulerTask(
_ context.Context,
taskDefARN, launchType string,
taskCount int,
) error {
_, err := a.backend.RunTask(ecs.RunTaskInput{
TaskDefinition: taskDefARN,
LaunchType: launchType,
Count: taskCount,
})

return err
}
2 changes: 1 addition & 1 deletion parity.md
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ The highest-impact non-lifecycle gaps per service (full lists in the deep dives

- [ ] **S3** — enforce bucket policy/ACL/PAB and bucket default encryption on the data plane;
add SigV4 header-auth + `aws-chunked` body decode; multi-range GET; Object Lock GOVERNANCE bypass.
- [ ] **DynamoDB** — emit `TransactionConflictException`; async export/import (`IN_PROGRESS`);
- [x] **DynamoDB** — emit `TransactionConflictException`; async export/import (`IN_PROGRESS`);
validate `UpdateTable` throughput vs billing mode; copy items on replica creation.
- [ ] **Lambda** — validate `X-Amz-Invocation-Type`; `LogType=Tail`/`X-Amz-Log-Result`; enforce
Function URL `AuthType`; delete the per-function config maps on delete.
Expand Down
Loading
Loading