Skip to content

Commit a03c264

Browse files
thomtrpcursoragent
andauthored
Fix unreliable SSE event stream updates during workflow form transitions (#20242)
Before - workflow run not up to date, needs refresh to see created company in some cases https://github.com/user-attachments/assets/28517e97-2404-4f75-8bce-cc33e3cbea20 After https://github.com/user-attachments/assets/60f930cb-1265-4c50-8ec5-aa4f978b1873 ## Summary - Split `SSEQuerySubscribeEffect`'s single debounced `updateQueryListeners` into separate `syncAdditions` (leading edge, 1s debounce) and `syncRemovals` (trailing edge, 200ms debounce) callbacks. This prevents query unregistrations during component mount/unmount transitions from creating gaps where events are missed, while keeping new registrations immediate. - Each sync path now updates `activeQueryListenersState` granularly (append-only for additions, filter-only for removals) instead of overwriting the entire state, eliminating a race condition where removals could mark unregistered queries as active. - Mount `WorkflowRunSSESubscribeEffect` inside `WorkflowEditActionFormFiller` so the workflow-run query subscription stays active during form steps. - Extract `buildSortedConnectionEdges` util that builds the resulting edge list of a cached record connection after new records are created. Position placeholders (`'first'` / `'last'`) bypass orderBy and are pinned to the front/back; sortable positions (numeric or undefined) are merged into existing edges and sorted by the connection's actual `orderBy`. This replaces the broken `length * position` insertion logic in `triggerCreateRecordsOptimisticEffect` that treated the sortable `position` field as a 0-1 ratio, causing new records from SSE to land at invisible indices in the cached list. Also fixes `totalCount` increment for batched creates, derives `pageInfo` cursors from the final array, and gracefully skips records whose `toReference` returns null. ## Test plan - [x] Run a workflow with a form step — verify the workflow status updates live after form submission (no stuck "running" state) - [x] Run the same workflow multiple times — verify company creation events appear live on the record index page for every run, not just the first - [x] Click the "+" button to create a record in first position — verify it appears immediately at the top - [x] Verify other SSE-backed live updates (record creation, deletion, updates) still work correctly --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 6854dc5 commit a03c264

4 files changed

Lines changed: 235 additions & 151 deletions

File tree

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { type FieldFunctionOptions } from '@apollo/client/cache';
2+
3+
import { sortCachedObjectEdges } from '@/apollo/optimistic-effect/utils/sortCachedObjectEdges';
4+
import { type RecordGqlRefEdge } from '@/object-record/cache/types/RecordGqlRefEdge';
5+
import { type RecordGqlNode } from '@/object-record/graphql/types/RecordGqlNode';
6+
import { type RecordGqlOperationOrderBy } from 'twenty-shared/types';
7+
8+
type NewEntry = {
9+
edge: RecordGqlRefEdge;
10+
record: RecordGqlNode;
11+
};
12+
13+
type BuildSortedConnectionEdgesArgs = {
14+
currentEdges: readonly RecordGqlRefEdge[];
15+
newEntries: readonly NewEntry[];
16+
orderBy: RecordGqlOperationOrderBy | undefined;
17+
readField: FieldFunctionOptions['readField'];
18+
};
19+
20+
export const buildSortedConnectionEdges = ({
21+
currentEdges,
22+
newEntries,
23+
orderBy,
24+
readField,
25+
}: BuildSortedConnectionEdgesArgs): RecordGqlRefEdge[] => {
26+
const firstEdges: RecordGqlRefEdge[] = [];
27+
const lastEdges: RecordGqlRefEdge[] = [];
28+
const sortableEdges: RecordGqlRefEdge[] = [];
29+
30+
for (const { edge, record } of newEntries) {
31+
if (record.position === 'first') {
32+
firstEdges.push(edge);
33+
} else if (record.position === 'last') {
34+
lastEdges.push(edge);
35+
} else {
36+
sortableEdges.push(edge);
37+
}
38+
}
39+
40+
let middleEdges: RecordGqlRefEdge[];
41+
42+
if (Array.isArray(orderBy) && orderBy.length > 0) {
43+
middleEdges = sortCachedObjectEdges({
44+
edges: [...currentEdges, ...sortableEdges],
45+
orderBy,
46+
readCacheField: readField,
47+
});
48+
} else {
49+
middleEdges = [...sortableEdges, ...currentEdges];
50+
}
51+
52+
return [...firstEdges, ...middleEdges, ...lastEdges];
53+
};

packages/twenty-front/src/modules/apollo/optimistic-effect/utils/triggerCreateRecordsOptimisticEffect.ts

Lines changed: 66 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { type ApolloCache, type StoreObject } from '@apollo/client';
22
import { isNonEmptyString } from '@sniptt/guards';
33

4+
import { buildSortedConnectionEdges } from '@/apollo/optimistic-effect/utils/buildSortedConnectionEdges';
45
import { triggerUpdateRelationsOptimisticEffect } from '@/apollo/optimistic-effect/utils/triggerUpdateRelationsOptimisticEffect';
56
import { type EnrichedObjectMetadataItem } from '@/object-metadata/types/EnrichedObjectMetadataItem';
67
import { type RecordGqlRefEdge } from '@/object-record/cache/types/RecordGqlRefEdge';
@@ -119,120 +120,78 @@ export const triggerCreateRecordsOptimisticEffect = ({
119120
hasPreviousPage?: boolean;
120121
}>('pageInfo', rootQueryCachedObjectRecordConnection);
121122

122-
const nextRootQueryCachedRecordEdges = rootQueryCachedRecordEdges
123-
? [...rootQueryCachedRecordEdges]
124-
: [];
125-
126-
const nextQueryCachedPageInfo = isDefined(rootQueryCachedPageInfo)
127-
? { ...rootQueryCachedPageInfo }
128-
: {};
129-
130-
const hasAddedRecords = recordsToCreate
131-
.map((recordToCreate) => {
132-
if (isNonEmptyString(recordToCreate.id)) {
133-
if (
134-
isDefined(rootQueryFilter) &&
135-
shouldMatchRootQueryFilter === true
136-
) {
137-
const recordToCreateMatchesThisRootQueryFilter =
138-
isRecordMatchingFilter({
139-
record: recordToCreate,
140-
filter: rootQueryFilter,
141-
objectMetadataItem,
142-
});
143-
144-
if (!recordToCreateMatchesThisRootQueryFilter) {
145-
return false;
146-
}
147-
}
148-
149-
const recordToCreateReference = toReference(recordToCreate);
150-
151-
if (!recordToCreateReference) {
152-
throw new Error(
153-
`Failed to create reference for record with id: ${recordToCreate.id}`,
154-
);
155-
}
156-
157-
const recordAlreadyInCache = rootQueryCachedRecordEdges?.some(
158-
(cachedEdge) => {
159-
return (
160-
cache.identify(recordToCreateReference) ===
161-
cache.identify(cachedEdge.node)
162-
);
163-
},
164-
);
165-
166-
if (isDefined(recordToCreateReference) && !recordAlreadyInCache) {
167-
const cursor = encodeCursor(recordToCreate);
168-
169-
const edge = {
170-
__typename: getEdgeTypename(objectMetadataItem.nameSingular),
171-
node: recordToCreateReference,
172-
cursor,
173-
};
174-
175-
if (
176-
!isDefined(recordToCreate.position) ||
177-
recordToCreate.position === 'first'
178-
) {
179-
nextRootQueryCachedRecordEdges.unshift(edge);
180-
nextQueryCachedPageInfo.startCursor = cursor;
181-
} else if (recordToCreate.position === 'last') {
182-
nextRootQueryCachedRecordEdges.push(edge);
183-
nextQueryCachedPageInfo.endCursor = cursor;
184-
} else if (typeof recordToCreate.position === 'number') {
185-
let index = Math.round(
186-
nextRootQueryCachedRecordEdges.length *
187-
recordToCreate.position,
188-
);
189-
190-
if (recordToCreate.position < 0) {
191-
index = Math.max(
192-
0,
193-
nextRootQueryCachedRecordEdges.length +
194-
Math.round(recordToCreate.position),
195-
);
196-
} else if (recordToCreate.position > 1) {
197-
index = nextRootQueryCachedRecordEdges.length;
198-
}
199-
200-
index = Math.max(
201-
0,
202-
Math.min(index, nextRootQueryCachedRecordEdges.length),
203-
);
204-
205-
nextRootQueryCachedRecordEdges.splice(index, 0, edge);
206-
207-
if (index === 0) {
208-
nextQueryCachedPageInfo.startCursor = cursor;
209-
} else if (
210-
index ===
211-
nextRootQueryCachedRecordEdges.length - 1
212-
) {
213-
nextQueryCachedPageInfo.endCursor = cursor;
214-
}
215-
}
216-
217-
return true;
218-
}
219-
}
220-
221-
return false;
222-
})
223-
.some((hasAddedRecord) => hasAddedRecord);
224-
225-
if (!hasAddedRecords) {
123+
const newEntries = recordsToCreate.flatMap<{
124+
edge: RecordGqlRefEdge;
125+
record: RecordGqlNode;
126+
}>((recordToCreate) => {
127+
if (!isNonEmptyString(recordToCreate.id)) {
128+
return [];
129+
}
130+
131+
if (
132+
isDefined(rootQueryFilter) &&
133+
shouldMatchRootQueryFilter === true &&
134+
!isRecordMatchingFilter({
135+
record: recordToCreate,
136+
filter: rootQueryFilter,
137+
objectMetadataItem,
138+
})
139+
) {
140+
return [];
141+
}
142+
143+
const node = toReference(recordToCreate);
144+
145+
if (!isDefined(node)) {
146+
return [];
147+
}
148+
149+
const recordAlreadyInCache = rootQueryCachedRecordEdges?.some(
150+
(cachedEdge) =>
151+
cache.identify(node) === cache.identify(cachedEdge.node),
152+
);
153+
154+
if (recordAlreadyInCache === true) {
155+
return [];
156+
}
157+
158+
return [
159+
{
160+
edge: {
161+
__typename: getEdgeTypename(objectMetadataItem.nameSingular),
162+
node,
163+
cursor: encodeCursor(recordToCreate),
164+
},
165+
record: recordToCreate,
166+
},
167+
];
168+
});
169+
170+
if (newEntries.length === 0) {
226171
return rootQueryCachedObjectRecordConnection;
227172
}
228173

174+
const sortedEdges = buildSortedConnectionEdges({
175+
currentEdges: rootQueryCachedRecordEdges ?? [],
176+
newEntries,
177+
orderBy: rootQueryVariables?.orderBy,
178+
readField,
179+
});
180+
229181
return {
230182
...rootQueryCachedObjectRecordConnection,
231-
edges: nextRootQueryCachedRecordEdges,
183+
edges: sortedEdges,
232184
totalCount: isDefined(rootQueryCachedRecordTotalCount)
233-
? rootQueryCachedRecordTotalCount + 1
185+
? rootQueryCachedRecordTotalCount + newEntries.length
234186
: undefined,
235-
pageInfo: nextQueryCachedPageInfo,
187+
pageInfo: {
188+
...(rootQueryCachedPageInfo ?? {}),
189+
startCursor:
190+
sortedEdges[0]?.cursor ?? rootQueryCachedPageInfo?.startCursor,
191+
endCursor:
192+
sortedEdges[sortedEdges.length - 1]?.cursor ??
193+
rootQueryCachedPageInfo?.endCursor,
194+
},
236195
};
237196
},
238197
},

0 commit comments

Comments
 (0)