Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion listener/src/database/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS scheduled_notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,

-- Notification content and metadata
payload TEXT NOT NULL, -- JSON payload of the notification
payload TEXT NOT NULL, -- JSON payload of the notification (compressed when large)
notification_type VARCHAR(50) NOT NULL, -- Type: 'discord', 'email', 'webhook', etc.
target_recipient TEXT NOT NULL, -- User ID, webhook URL, or recipient identifier

Expand Down
5 changes: 5 additions & 0 deletions listener/src/services/scheduled-notification-repository.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Database } from '../database/database';
import logger from '../utils/logger';
import { compressPayload, decompressPayload } from '../utils/payload-compression';
import {
ScheduledNotification,
ScheduledNotificationRow,
Expand Down Expand Up @@ -31,7 +32,10 @@ export class ScheduledNotificationRepository {
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`;

const serializedPayload = compressPayload(input.payload);

const params = [
serializedPayload,
payloadJson,
payloadHash,
input.notificationType,
Expand Down Expand Up @@ -495,6 +499,7 @@ export class ScheduledNotificationRepository {
private rowToModel(row: ScheduledNotificationRow): ScheduledNotification {
return {
id: row.id,
payload: decompressPayload(row.payload),
payload: row.payload,
payloadHash: row.payload_hash,
notificationType: row.notification_type as any,
Expand Down
58 changes: 58 additions & 0 deletions listener/src/utils/payload-compression.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import {
compressPayload,
decompressPayload,
COMPRESSION_PREFIX,
DEFAULT_COMPRESSION_THRESHOLD_BYTES,
} from './payload-compression';

describe('payload compression utilities', () => {
it('compresses and restores a large payload without data loss', () => {
const payload = {
event: 'large-notification',
message: 'x'.repeat(15000),
details: Array.from({ length: 200 }, (_, index) => ({
id: index,
label: `entry-${index}`,
text: 'compressed-payload'.repeat(20),
})),
};

const serialized = JSON.stringify(payload);
const compressed = compressPayload(payload);
const restored = decompressPayload(compressed);

expect(compressed).toContain(COMPRESSION_PREFIX);
expect(Buffer.byteLength(compressed, 'utf8')).toBeLessThan(Buffer.byteLength(serialized, 'utf8'));
expect(restored).toEqual(serialized);
});

it('passes a small payload through without compression', () => {
const payload = { message: 'small payload' };
const serialized = JSON.stringify(payload);

const compressed = compressPayload(payload, { thresholdBytes: DEFAULT_COMPRESSION_THRESHOLD_BYTES + 10000 });

expect(compressed).toEqual(serialized);
expect(decompressPayload(compressed)).toEqual(serialized);
});

it('falls back safely for legacy uncompressed payloads', () => {
const legacyPayload = JSON.stringify({ message: 'legacy payload' });

expect(decompressPayload(legacyPayload)).toEqual(legacyPayload);
});

it('handles invalid or corrupted compressed input without throwing', () => {
const corruptedPayload = `${COMPRESSION_PREFIX}not-valid-base64`;

expect(() => decompressPayload(corruptedPayload)).not.toThrow();
expect(decompressPayload(corruptedPayload)).toEqual(corruptedPayload);
});

it('passes unsupported payload formats through safely', () => {
const unsupportedPayload = 42 as unknown;

expect(compressPayload(unsupportedPayload)).toBe('42');
expect(decompressPayload(unsupportedPayload)).toBe('42');
});
});
102 changes: 102 additions & 0 deletions listener/src/utils/payload-compression.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { gzipSync, gunzipSync } from 'zlib';
import logger from './logger';

export const COMPRESSION_PREFIX = '__COMPRESSED__:';
export const DEFAULT_COMPRESSION_THRESHOLD_BYTES = 10 * 1024;

interface CompressionOptions {
thresholdBytes?: number;
}

interface CompressionMetrics {
originalSizeBytes: number;
compressedSizeBytes: number;
reductionPercent: number;
wasCompressed: boolean;
}

function toSerializableString(payload: unknown): string {
if (typeof payload === 'string') {
return payload;
}

if (payload === null || payload === undefined) {
return JSON.stringify(payload);
}

if (typeof payload === 'object' || typeof payload === 'number' || typeof payload === 'boolean') {
return JSON.stringify(payload);
}

return String(payload);
}

function calculateMetrics(originalSizeBytes: number, compressedSizeBytes: number): CompressionMetrics {
const reductionPercent = originalSizeBytes > 0
? Math.max(0, Math.round(((originalSizeBytes - compressedSizeBytes) / originalSizeBytes) * 100))
: 0;

return {
originalSizeBytes,
compressedSizeBytes,
reductionPercent,
wasCompressed: compressedSizeBytes < originalSizeBytes,
};
}

export function compressPayload(payload: unknown, options: CompressionOptions = {}): string {
const threshold = options.thresholdBytes ?? DEFAULT_COMPRESSION_THRESHOLD_BYTES;
const serialized = toSerializableString(payload);
const originalSize = Buffer.byteLength(serialized, 'utf8');

if (originalSize < threshold) {
return serialized;
}

try {
const compressedBuffer = gzipSync(serialized);
const compressedSize = compressedBuffer.length;
const metrics = calculateMetrics(originalSize, compressedSize);

logger.info('Payload compressed', {
originalSizeBytes: metrics.originalSizeBytes,
compressedSizeBytes: metrics.compressedSizeBytes,
reductionPercent: metrics.reductionPercent,
});

return `${COMPRESSION_PREFIX}${compressedBuffer.toString('base64')}`;
} catch (error) {
logger.warn('Payload compression failed, using original payload', { error });
return serialized;
}
}

export function decompressPayload(compressedPayload: unknown): string {
if (typeof compressedPayload !== 'string') {
return toSerializableString(compressedPayload);
}

if (!compressedPayload.startsWith(COMPRESSION_PREFIX)) {
return compressedPayload;
}

const base64Payload = compressedPayload.slice(COMPRESSION_PREFIX.length);

try {
const buffer = Buffer.from(base64Payload, 'base64');
const decompressed = gunzipSync(buffer);
return decompressed.toString('utf8');
} catch (error) {
logger.warn('Payload decompression failed, returning original value', { error, payloadPreview: compressedPayload.slice(0, 80) });
return compressedPayload;
}
}

export function logCompressionMetrics(metrics: CompressionMetrics): void {
logger.info('Payload compression metrics', {
originalSizeBytes: metrics.originalSizeBytes,
compressedSizeBytes: metrics.compressedSizeBytes,
reductionPercent: metrics.reductionPercent,
wasCompressed: metrics.wasCompressed,
});
}
Loading