I'm creating a client and workers as follows - note there is a main queue that a scheduler is using to receive jobs (and then allocate resources) and a cleanup queue that will have a job added after the resources are allocated. For example "This job was just scheduled for a duration of an hour, make sure to run the cancel / cleanup worker in an hour."
// NewQueue starts a new queue with a river client
func NewQueue(ctx context.Context) (*Queue, error) {
dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
return nil, err
}
// The default strategy now mirrors what fluence with Kubernetes does
// This can eventually be customizable. We provide the pool to the
// strategy because it also manages the provisional queue.
strategy := strategies.EasyBackfill{}
workers := river.NewWorkers()
// Each strategy has its own worker type
strategy.AddWorkers(workers)
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
// Change the verbosity of the logger here
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
// Default queue handles job allocation
river.QueueDefault: {MaxWorkers: queueMaxWorkers},
// Cleanup queue is only for cancel
"cancel_queue": {MaxWorkers: queueMaxWorkers},
},
Workers: workers,
})
if err != nil {
return nil, err
}
// Create the queue and setup events for it
err = riverClient.Start(ctx)
if err != nil {
return nil, err
}
queue := Queue{
riverClient: riverClient,
Pool: dbPool,
Strategy: strategy,
Context: ctx,
ReservationDepth: depth,
}
queue.setupEvents()
return &queue, nil
}
And note that AddWorkers for the strategy is registering both types:
// AddtWorkers adds the worker for the queue strategy
// job worker: a queue to submit jobs to fluxion
// cleanup worker: a queue to cleanup
func (EasyBackfill) AddWorkers(workers *river.Workers) {
river.AddWorker(workers, &work.JobWorker{})
river.AddWorker(workers, &work.CleanupWorker{})
}
In the work function, we define a job as needed a spec with resources,
// The job worker submits jobs to fluxion with match allocate
// or match allocate else reserve, depending on the reservation depth
func (args JobArgs) Kind() string { return "job" }
type JobWorker struct {
river.WorkerDefaults[JobArgs]
}
type JobArgs struct {
// Submit Args
Jobspec string `json:"jobspec"`
Podspec string `json:"podspec"`
GroupName string `json:"groupName"`
GroupSize int32 `json:"groupSize"`
// If true, we are allowed to ask Fluxion for a reservation
Reservation bool `json:"reservation"`
// Attributes updated to send back via events
Nodes string `json:"nodes"`
FluxJob int64 `json:"jobid"`
PodId string `json:"podid"`
}
// Work performs a call to the scheduler, Fluxion
func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
// Assume work is done here to ask the scheduler, and get an allocation
...
// Kick off a cleaning job for when everyting should be cancelled
// This is a duration of 10 seconds for now
return SubmitCleanup(ctx, pool, 10, int64(fluxID), true, []string{})
}
Note that the context above given to SubmitCleanup is the same one for the worker, so we should be able to create an "in worker" client. Here that is:
// The cleanup workers cleans up a reservation (issuing cancel)
func (args CleanupArgs) Kind() string { return "cleanup" }
type CleanupWorker struct {
river.WorkerDefaults[CleanupArgs]
}
// SubmitCleanup submits a cleanup job N seconds into the future
func SubmitCleanup(
ctx context.Context,
pool *pgxpool.Pool,
seconds int32,
fluxID int64,
inKubernetes bool,
tags []string,
) error {
klog.Infof("SUBMIT CLEANUP starting for %d", fluxID)
client, err := river.ClientFromContextSafely[pgx.Tx](ctx)
if err != nil {
return fmt.Errorf("error getting client from context: %w", err)
}
tx, err := pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
// Create scheduledAt time - N seconds from now
now := time.Now()
scheduledAt := now.Add(time.Second * time.Duration(seconds))
insertOpts := river.InsertOpts{
MaxAttempts: defaults.MaxAttempts,
Tags: tags,
Queue: "cleanup_queue",
ScheduledAt: scheduledAt,
}
_, err = client.InsertTx(ctx, tx, CleanupArgs{FluxID: fluxID, Kubernetes: inKubernetes}, &insertOpts)
if err != nil {
return err
}
if err := tx.Commit(ctx); err != nil {
return err
}
klog.Infof("SUBMIT CLEANUP ending for %d", fluxID)
return nil
}
The problem is that I can see that function runs through completion (I see the SUBMIT CLEANUP at the end) but I never see any output from the job, where the first thing that happens is a print statement. Is it the case that a worker of type A cannot access a worker of type B (associated with the same client) because it's a different queue? Or there is something off about the context I'm using, or another step to register the job type again? Thanks for the help!
I'm creating a client and workers as follows - note there is a main queue that a scheduler is using to receive jobs (and then allocate resources) and a cleanup queue that will have a job added after the resources are allocated. For example "This job was just scheduled for a duration of an hour, make sure to run the cancel / cleanup worker in an hour."
And note that AddWorkers for the strategy is registering both types:
In the work function, we define a job as needed a spec with resources,
Note that the context above given to
SubmitCleanupis the same one for the worker, so we should be able to create an "in worker" client. Here that is:The problem is that I can see that function runs through completion (I see the
SUBMIT CLEANUPat the end) but I never see any output from the job, where the first thing that happens is a print statement. Is it the case that a worker of type A cannot access a worker of type B (associated with the same client) because it's a different queue? Or there is something off about the context I'm using, or another step to register the job type again? Thanks for the help!