From e869b06af78c6a6be3a1293bc15a148e69688440 Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 20 May 2026 02:31:06 +0100 Subject: [PATCH 01/10] Declare activeFlushPromise to track in-flight flushes --- frontend/src/lib/telemetry-sink.js | 1 + 1 file changed, 1 insertion(+) diff --git a/frontend/src/lib/telemetry-sink.js b/frontend/src/lib/telemetry-sink.js index 37233612..02101ad8 100644 --- a/frontend/src/lib/telemetry-sink.js +++ b/frontend/src/lib/telemetry-sink.js @@ -14,6 +14,7 @@ const DEFAULT_CONFIG = { let sinkConfig = { ...DEFAULT_CONFIG }; let eventQueue = []; let flushTimer = null; +let activeFlushPromise = null; export function configureSink(config) { sinkConfig = { From c6fe5590bb66503a6c329ba8e7e845394f2123fb Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 20 May 2026 02:31:15 +0100 Subject: [PATCH 02/10] Reset activeFlushPromise in resetSink --- frontend/src/lib/telemetry-sink.js | 1 + 1 file changed, 1 insertion(+) diff --git a/frontend/src/lib/telemetry-sink.js b/frontend/src/lib/telemetry-sink.js index 02101ad8..d7cf1be1 100644 --- a/frontend/src/lib/telemetry-sink.js +++ b/frontend/src/lib/telemetry-sink.js @@ -166,5 +166,6 @@ export function clearQueue() { export function resetSink() { stopFlushTimer(); eventQueue = []; + activeFlushPromise = null; sinkConfig = { ...DEFAULT_CONFIG }; } From 59350f29ce3ca05df14501de81691eb3854bdff2 Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 20 May 2026 02:31:24 +0100 Subject: [PATCH 03/10] Serialize concurrent flushes by chaining onto activeFlushPromise --- frontend/src/lib/telemetry-sink.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/frontend/src/lib/telemetry-sink.js b/frontend/src/lib/telemetry-sink.js index d7cf1be1..e41dc992 100644 --- a/frontend/src/lib/telemetry-sink.js +++ b/frontend/src/lib/telemetry-sink.js @@ -112,6 +112,10 @@ async function sendBatch(events, attempt = 1) { } export async function flush() { + if (activeFlushPromise) { + return activeFlushPromise.then(() => flush()).catch(() => flush()); + } + if (!isSinkEnabled() || eventQueue.length === 0) { return { success: true, count: 0 }; } From 685d6342e4b21adf7cbd8815063c6c23be80accf Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 20 May 2026 02:31:33 +0100 Subject: [PATCH 04/10] Track active flush promise and clear it when complete --- frontend/src/lib/telemetry-sink.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/frontend/src/lib/telemetry-sink.js b/frontend/src/lib/telemetry-sink.js index e41dc992..3cce5479 100644 --- a/frontend/src/lib/telemetry-sink.js +++ b/frontend/src/lib/telemetry-sink.js @@ -121,7 +121,11 @@ export async function flush() { } const batch = eventQueue.splice(0, sinkConfig.batchSize); - return sendBatch(batch); + activeFlushPromise = sendBatch(batch).then((result) => { + activeFlushPromise = null; + return result; + }); + return activeFlushPromise; } export async function sendSnapshot() { From 7875b46b19aa94d28a1802d3e2a7575160980b45 Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 20 May 2026 02:31:42 +0100 Subject: [PATCH 05/10] Restore queued events when flush fails to preserve queue integrity --- frontend/src/lib/telemetry-sink.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/frontend/src/lib/telemetry-sink.js b/frontend/src/lib/telemetry-sink.js index 3cce5479..017be9ff 100644 --- a/frontend/src/lib/telemetry-sink.js +++ b/frontend/src/lib/telemetry-sink.js @@ -123,6 +123,9 @@ export async function flush() { const batch = eventQueue.splice(0, sinkConfig.batchSize); activeFlushPromise = sendBatch(batch).then((result) => { activeFlushPromise = null; + if (!result.success && isSinkEnabled()) { + eventQueue.unshift(...batch); + } return result; }); return activeFlushPromise; From 101fa0b8c31fafb58d50a8ff99156d38df66fb30 Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 20 May 2026 02:31:51 +0100 Subject: [PATCH 06/10] Add robust catch block for unexpected flush rejections --- frontend/src/lib/telemetry-sink.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/frontend/src/lib/telemetry-sink.js b/frontend/src/lib/telemetry-sink.js index 017be9ff..cd75d516 100644 --- a/frontend/src/lib/telemetry-sink.js +++ b/frontend/src/lib/telemetry-sink.js @@ -127,6 +127,12 @@ export async function flush() { eventQueue.unshift(...batch); } return result; + }).catch((error) => { + activeFlushPromise = null; + if (isSinkEnabled()) { + eventQueue.unshift(...batch); + } + throw error; }); return activeFlushPromise; } From 14ff1fd709cd65b87756ef7f58174bd595945028 Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 20 May 2026 02:32:36 +0100 Subject: [PATCH 07/10] Add unit test for serialized overlapping flush operations --- frontend/src/test/telemetry-sink.test.js | 33 ++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/frontend/src/test/telemetry-sink.test.js b/frontend/src/test/telemetry-sink.test.js index 67cf3eb7..1fa93c92 100644 --- a/frontend/src/test/telemetry-sink.test.js +++ b/frontend/src/test/telemetry-sink.test.js @@ -195,11 +195,40 @@ describe('telemetry-sink', () => { // Advance timers to allow all retries (10ms * 1 + 10ms * 2 = 30ms total) await vi.advanceTimersByTimeAsync(30); - const result = await flushPromise; - expect(result.success).toBe(false); expect(result.error).toBe('Network error'); }); + + it('serializes overlapping flush operations', async () => { + let resolveFirstFetch; + const firstFetchPromise = new Promise((resolve) => { + resolveFirstFetch = () => resolve({ ok: true, status: 200 }); + }); + + global.fetch + .mockReturnValueOnce(firstFetchPromise) + .mockResolvedValueOnce({ ok: true, status: 200 }); + + configureSink({ + enabled: true, + endpoint: 'https://test.com', + batchSize: 1, + }); + + queueEvent('event1', {}); + queueEvent('event2', {}); + + const promise1 = flush(); + const promise2 = flush(); + + expect(global.fetch).toHaveBeenCalledTimes(1); + + resolveFirstFetch(); + await promise1; + await promise2; + + expect(global.fetch).toHaveBeenCalledTimes(2); + }); }); describe('clearQueue', () => { From bbfe8273d33b22729398324fd3ca9aadc84a0aec Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 20 May 2026 02:32:46 +0100 Subject: [PATCH 08/10] Add unit test for queue integrity and event ordering during failed concurrent flushes --- frontend/src/test/telemetry-sink.test.js | 31 ++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/frontend/src/test/telemetry-sink.test.js b/frontend/src/test/telemetry-sink.test.js index 1fa93c92..f15ad205 100644 --- a/frontend/src/test/telemetry-sink.test.js +++ b/frontend/src/test/telemetry-sink.test.js @@ -229,6 +229,37 @@ describe('telemetry-sink', () => { expect(global.fetch).toHaveBeenCalledTimes(2); }); + + it('preserves queue integrity and order when flush fails', async () => { + global.fetch + .mockRejectedValueOnce(new Error('Network error 1')) + .mockRejectedValueOnce(new Error('Network error 2')) + .mockResolvedValueOnce({ ok: true, status: 200 }); + + configureSink({ + enabled: true, + endpoint: 'https://test.com', + batchSize: 1, + retryAttempts: 2, + retryDelayMs: 10, + }); + + queueEvent('event1', {}); + + const promise1 = flush(); + queueEvent('event2', {}); + const promise2 = flush(); + + await vi.advanceTimersByTimeAsync(30); + + const result1 = await promise1; + expect(result1.success).toBe(false); + + const result2 = await promise2; + expect(result2.success).toBe(true); + + expect(getQueueLength()).toBe(1); + }); }); describe('clearQueue', () => { From 103b19e119cd78be45cb42026714aaa40b8a7ef9 Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 20 May 2026 02:32:57 +0100 Subject: [PATCH 09/10] Add unit test for sink reset during active concurrent flushes --- frontend/src/test/telemetry-sink.test.js | 35 ++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/frontend/src/test/telemetry-sink.test.js b/frontend/src/test/telemetry-sink.test.js index f15ad205..f5f44675 100644 --- a/frontend/src/test/telemetry-sink.test.js +++ b/frontend/src/test/telemetry-sink.test.js @@ -260,6 +260,41 @@ describe('telemetry-sink', () => { expect(getQueueLength()).toBe(1); }); + + it('handles sink reset/disable during failed flush to not prepend', async () => { + let rejectFirstFetch; + const firstFetchPromise = new Promise((_, reject) => { + rejectFirstFetch = () => reject(new Error('Fetch failed')); + }); + + global.fetch.mockReturnValueOnce(firstFetchPromise); + + configureSink({ + enabled: true, + endpoint: 'https://test.com', + batchSize: 1, + retryAttempts: 1, + }); + + queueEvent('event1', {}); + queueEvent('event2', {}); + + const promise1 = flush(); + const promise2 = flush(); + + resetSink(); + + rejectFirstFetch(); + + const result1 = await promise1; + expect(result1.success).toBe(false); + + const result2 = await promise2; + expect(result2.success).toBe(true); + expect(result2.count).toBe(0); + + expect(getQueueLength()).toBe(0); + }); }); describe('clearQueue', () => { From 23959ebf9e34e72ed8cc56817edc1b47466ea32d Mon Sep 17 00:00:00 2001 From: 0xMosas Date: Wed, 20 May 2026 02:36:22 +0100 Subject: [PATCH 10/10] Optimize concurrent flush unit tests and verify all test assertions --- frontend/src/test/telemetry-sink.test.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/frontend/src/test/telemetry-sink.test.js b/frontend/src/test/telemetry-sink.test.js index f5f44675..65cefead 100644 --- a/frontend/src/test/telemetry-sink.test.js +++ b/frontend/src/test/telemetry-sink.test.js @@ -195,6 +195,7 @@ describe('telemetry-sink', () => { // Advance timers to allow all retries (10ms * 1 + 10ms * 2 = 30ms total) await vi.advanceTimersByTimeAsync(30); + const result = await flushPromise; expect(result.success).toBe(false); expect(result.error).toBe('Network error'); }); @@ -239,7 +240,7 @@ describe('telemetry-sink', () => { configureSink({ enabled: true, endpoint: 'https://test.com', - batchSize: 1, + batchSize: 10, retryAttempts: 2, retryDelayMs: 10, }); @@ -257,8 +258,9 @@ describe('telemetry-sink', () => { const result2 = await promise2; expect(result2.success).toBe(true); + expect(result2.count).toBe(2); - expect(getQueueLength()).toBe(1); + expect(getQueueLength()).toBe(0); }); it('handles sink reset/disable during failed flush to not prepend', async () => { @@ -272,7 +274,7 @@ describe('telemetry-sink', () => { configureSink({ enabled: true, endpoint: 'https://test.com', - batchSize: 1, + batchSize: 10, retryAttempts: 1, });