@@ -23,6 +23,7 @@ export class PipelineOrchestrator {
2323 private activeTimers : NodeJS . Timeout [ ] = [ ] ;
2424 private readonly pendingPipelines = new Map < string , Promise < void > > ( ) ;
2525 private readonly pipelineMutex = new Map < string , Promise < void > > ( ) ;
26+ private readonly pipelineScheduled = new Set < string > ( ) ;
2627 private nodeProvider : ( ( ) => readonly ConcreteNode [ ] ) | undefined ;
2728
2829 constructor (
@@ -77,7 +78,7 @@ export class PipelineOrchestrator {
7778 nodes : readonly ConcreteNode [ ] ,
7879 targets : ReadonlySet < string > ,
7980 protectedIds : ReadonlySet < string > ,
80- ) => void ,
81+ ) => Promise < void > ,
8182 ) => {
8283 for ( const pipeline of pipelines ) {
8384 for ( const trigger of pipeline . triggers ) {
@@ -91,30 +92,62 @@ export class PipelineOrchestrator {
9192 trigger === 'nodes_aged_out'
9293 ) {
9394 this . eventBus . onConsolidationNeeded ( ( event ) => {
94- executeFn ( pipeline , event . nodes , event . targetNodeIds , new Set ( ) ) ;
95+ void executeFn (
96+ pipeline ,
97+ event . nodes ,
98+ event . targetNodeIds ,
99+ new Set ( ) ,
100+ ) ;
95101 } ) ;
96102 } else if ( trigger === 'new_message' || trigger === 'nodes_added' ) {
97103 this . eventBus . onChunkReceived ( ( event ) => {
98- executeFn ( pipeline , event . nodes , event . targetNodeIds , new Set ( ) ) ;
104+ void executeFn (
105+ pipeline ,
106+ event . nodes ,
107+ event . targetNodeIds ,
108+ new Set ( ) ,
109+ ) ;
99110 } ) ;
100111 }
101112 }
102113 }
103114 } ;
104115
105- bindTriggers ( this . pipelines , ( pipeline , nodes , targets , protectedIds ) => {
106- // Fetch the tail of the current chain for this pipeline, or start a new one
116+ const handleSyncExecution = async (
117+ pipeline : PipelineDef ,
118+ nodes : readonly ConcreteNode [ ] ,
119+ targets : ReadonlySet < string > ,
120+ protectedIds : ReadonlySet < string > ,
121+ ) => {
122+ if ( this . pipelineScheduled . has ( pipeline . name ) ) {
123+ debugLogger . log (
124+ `[Orchestrator] Pipeline ${ pipeline . name } already scheduled (sync), dropping.` ,
125+ ) ;
126+ return ;
127+ }
128+ this . pipelineScheduled . add ( pipeline . name ) ;
129+
107130 const existing =
108131 this . pipelineMutex . get ( pipeline . name ) || Promise . resolve ( ) ;
109132
110133 const nextPromise = ( async ( ) => {
111134 try {
112- // Wait for the previous run of THIS pipeline to complete
113135 await existing ;
136+ this . pipelineScheduled . delete ( pipeline . name ) ;
137+
138+ const latestNodes = this . nodeProvider ? this . nodeProvider ( ) : nodes ;
139+ const latestTargets = latestNodes . filter ( ( n ) => targets . has ( n . id ) ) ;
140+
141+ debugLogger . log (
142+ `[Orchestrator] Executing sync pipeline ${ pipeline . name } with ${ latestTargets . length } latest targets.` ,
143+ ) ;
114144
115- // We re-fetch the LATEST nodes from the environment's live buffer
116- // to ensure this sequential run isn't operating on stale data from the trigger event.
117- const latestNodes = this . nodeProvider ! ( ) ;
145+ if ( latestTargets . length === 0 ) {
146+ debugLogger . log (
147+ `[Orchestrator] No latest targets for sync pipeline ${ pipeline . name } , returning.` ,
148+ ) ;
149+ return ;
150+ }
118151
119152 await this . executePipelineAsync (
120153 pipeline ,
@@ -123,41 +156,87 @@ export class PipelineOrchestrator {
123156 new Set ( protectedIds ) ,
124157 ) ;
125158 } catch ( e ) {
126- debugLogger . error ( `Pipeline chain ${ pipeline . name } failed:` , e ) ;
159+ debugLogger . error ( `Sync pipeline chain ${ pipeline . name } failed:` , e ) ;
127160 }
128161 } ) ( ) ;
129162
130- // Update the chain tail
131163 this . pipelineMutex . set ( pipeline . name , nextPromise ) ;
132-
133164 const pipelineId = `${ pipeline . name } _${ Date . now ( ) } _${ Math . random ( ) . toString ( 36 ) . substr ( 2 , 9 ) } ` ;
134165 this . pendingPipelines . set ( pipelineId , nextPromise ) ;
135166 void nextPromise . finally ( ( ) => {
136167 this . pendingPipelines . delete ( pipelineId ) ;
137- // Only clear the mutex if we are still the tail of the chain
138168 if ( this . pipelineMutex . get ( pipeline . name ) === nextPromise ) {
139169 this . pipelineMutex . delete ( pipeline . name ) ;
140170 }
141171 } ) ;
142- } ) ;
172+ } ;
143173
144- bindTriggers ( this . asyncPipelines , ( pipeline , nodes , targetIds ) => {
145- const inboxSnapshot = new InboxSnapshotImpl (
146- this . env . inbox . getMessages ( ) || [ ] ,
147- ) ;
148- const targets = nodes . filter ( ( n ) => targetIds . has ( n . id ) ) ;
149- for ( const processor of pipeline . processors ) {
150- processor
151- . process ( {
152- targets,
153- inbox : inboxSnapshot ,
154- buffer : ContextWorkingBufferImpl . initialize ( nodes ) ,
155- } )
156- . catch ( ( e : unknown ) =>
157- debugLogger . error ( `AsyncProcessor ${ processor . name } failed:` , e ) ,
158- ) ;
174+ const handleAsyncExecution = async (
175+ pipeline : AsyncPipelineDef ,
176+ nodes : readonly ConcreteNode [ ] ,
177+ targets : ReadonlySet < string > ,
178+ ) => {
179+ if ( this . pipelineScheduled . has ( pipeline . name ) ) {
180+ debugLogger . log (
181+ `[Orchestrator] Pipeline ${ pipeline . name } already scheduled (async), dropping.` ,
182+ ) ;
183+ return ;
159184 }
160- } ) ;
185+ this . pipelineScheduled . add ( pipeline . name ) ;
186+
187+ const existing =
188+ this . pipelineMutex . get ( pipeline . name ) || Promise . resolve ( ) ;
189+
190+ const nextPromise = ( async ( ) => {
191+ try {
192+ await existing ;
193+ this . pipelineScheduled . delete ( pipeline . name ) ;
194+
195+ const latestNodes = this . nodeProvider ? this . nodeProvider ( ) : nodes ;
196+ const latestTargets = latestNodes . filter ( ( n ) => targets . has ( n . id ) ) ;
197+
198+ debugLogger . log (
199+ `[Orchestrator] Executing async pipeline ${ pipeline . name } with ${ latestTargets . length } latest targets.` ,
200+ ) ;
201+
202+ const inboxSnapshot = new InboxSnapshotImpl (
203+ this . env . inbox . getMessages ( ) || [ ] ,
204+ ) ;
205+
206+ for ( const processor of pipeline . processors ) {
207+ debugLogger . log (
208+ `[Orchestrator] Running async processor ${ processor . id } ` ,
209+ ) ;
210+ await processor . process ( {
211+ targets : latestTargets ,
212+ inbox : inboxSnapshot ,
213+ buffer : ContextWorkingBufferImpl . initialize ( latestNodes ) ,
214+ } ) ;
215+ }
216+ this . env . inbox . drainConsumed ( inboxSnapshot . getConsumedIds ( ) ) ;
217+ } catch ( e ) {
218+ debugLogger . error ( `Async pipeline chain ${ pipeline . name } failed:` , e ) ;
219+ }
220+ } ) ( ) ;
221+
222+ this . pipelineMutex . set ( pipeline . name , nextPromise ) ;
223+ const pipelineId = `${ pipeline . name } _${ Date . now ( ) } _${ Math . random ( ) . toString ( 36 ) . substr ( 2 , 9 ) } ` ;
224+ this . pendingPipelines . set ( pipelineId , nextPromise ) ;
225+ void nextPromise . finally ( ( ) => {
226+ this . pendingPipelines . delete ( pipelineId ) ;
227+ if ( this . pipelineMutex . get ( pipeline . name ) === nextPromise ) {
228+ this . pipelineMutex . delete ( pipeline . name ) ;
229+ }
230+ } ) ;
231+ } ;
232+
233+ bindTriggers ( this . pipelines , ( pipeline , nodes , targets , protectedIds ) =>
234+ handleSyncExecution ( pipeline , nodes , targets , protectedIds ) ,
235+ ) ;
236+
237+ bindTriggers ( this . asyncPipelines , ( pipeline , nodes , targets ) =>
238+ handleAsyncExecution ( pipeline , nodes , targets ) ,
239+ ) ;
161240 }
162241
163242 shutdown ( ) {
0 commit comments