Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 71 additions & 19 deletions apps/worker/src/scheduler/scheduled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ async function hasActiveWebhookChannels(db: D1Database): Promise<boolean> {
}

const listDueMonitorsStatementByDb = new WeakMap<D1Database, D1PreparedStatement>();
const hasSchedulableMonitorsStatementByDb = new WeakMap<D1Database, D1PreparedStatement>();
const persistStatementTemplatesByDb = new WeakMap<D1Database, PersistStatementTemplates>();
const hasActiveWebhookChannelsStatementByDb = new WeakMap<D1Database, D1PreparedStatement>();
const activeWebhookPresenceCacheByDb = new WeakMap<
Expand Down Expand Up @@ -911,6 +912,15 @@ const LIST_DUE_MONITORS_SQL = `
ORDER BY m.id
`;

const HAS_SCHEDULABLE_MONITORS_SQL = `
SELECT 1 AS present
FROM monitors m
LEFT JOIN monitor_state s ON s.monitor_id = m.id
WHERE m.is_active = 1
AND (s.status IS NULL OR s.status != 'paused')
LIMIT 1
`;

const PERSIST_STATEMENTS_SQL = {
openOutageIfMissing: `
INSERT INTO outages (monitor_id, started_at, ended_at, initial_error, last_error)
Expand Down Expand Up @@ -963,6 +973,11 @@ export type CompletedDueMonitor = {
maintenanceSuppressed: boolean;
};

type InitializedNotifications = {
module: typeof import('./notifications') | null;
notify: NotifyContext | null;
};

function toHttpMethod(
value: string | null,
): 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE' | 'HEAD' | null {
Expand Down Expand Up @@ -1009,6 +1024,17 @@ async function listDueMonitors(db: D1Database, checkedAt: number): Promise<DueMo
return results ?? [];
}

async function hasSchedulableMonitors(db: D1Database): Promise<boolean> {
const cached = hasSchedulableMonitorsStatementByDb.get(db);
const statement = cached ?? db.prepare(HAS_SCHEDULABLE_MONITORS_SQL);
if (!cached) {
hasSchedulableMonitorsStatementByDb.set(db, statement);
}

const row = await statement.first<{ present: number }>();
return row !== null;
}

export async function listMonitorRowsByIds(
db: D1Database,
ids: number[],
Expand Down Expand Up @@ -1840,6 +1866,33 @@ export async function runScheduledTick(env: Env, ctx: ExecutionContext): Promise
return refreshPromise.then(queueShardedPublicSnapshotWork);
};

const initializeNotifications = async (): Promise<InitializedNotifications> => {
const hasWebhookNotifications = await hasActiveWebhookChannels(env.DB);
if (!hasWebhookNotifications) {
return { module: null, notify: null };
}

const notificationsModule = await import('./notifications');
const notify = await notificationsModule.createNotifyContext(env, ctx);
if (notify) {
await notificationsModule.emitMaintenanceWindowNotifications(env, notify, now);
}
return { module: notificationsModule, notify };
};

const queueIdleWork = async (): Promise<void> => {
if (shouldLogScheduledRefresh(env)) {
console.log('scheduled: idle no runnable monitors');
}
await initializeNotifications();
ctx.waitUntil(queueHomepageRefresh());
};

if (!(await hasSchedulableMonitors(env.DB))) {
await queueIdleWork();
return;
}

const acquired = await acquireLease(env.DB, LOCK_NAME, now, LOCK_LEASE_SECONDS);
if (!acquired) {
return;
Expand All @@ -1857,34 +1910,33 @@ export async function runScheduledTick(env: Env, ctx: ExecutionContext): Promise
try {
const useRuntimeFragmentPipeline = shouldUseScheduledRuntimeFragmentPipeline(env);

const [settings, due, hasWebhookNotifications] = await Promise.all([
readSettings(env.DB),
listDueMonitors(env.DB, checkedAt),
hasActiveWebhookChannels(env.DB),
]);
const setupDurMs = performance.now() - totalStart;
const due = await listDueMonitors(env.DB, checkedAt);

let notificationsModule: typeof import('./notifications') | null = null;
let notify: NotifyContext | null = null;
if (hasWebhookNotifications) {
notificationsModule = await import('./notifications');
notify = await notificationsModule.createNotifyContext(env, ctx);
if (notify) {
await notificationsModule.emitMaintenanceWindowNotifications(env, notify, now);
if (due.length === 0) {
const hasRunnableMonitor = await hasSchedulableMonitors(env.DB);
if (!hasRunnableMonitor) {
await queueIdleWork();
return;
}
await initializeNotifications();
schedulerLease.assertHeld('queueing homepage refresh');
ctx.waitUntil(queueHomepageRefresh());
return;
}

const [settings, notifications] = await Promise.all([
readSettings(env.DB),
initializeNotifications(),
]);
const setupDurMs = performance.now() - totalStart;
const notificationsModule = notifications.module;
const notify = notifications.notify;

const stateMachineConfig = {
failuresToDownFromUp: settings.state_failures_to_down_from_up,
successesToUpFromDown: settings.state_successes_to_up_from_down,
};

if (due.length === 0) {
schedulerLease.assertHeld('queueing homepage refresh');
ctx.waitUntil(queueHomepageRefresh());
return;
}

// Maintenance suppression is monitor-scoped.
const dueMonitorIds = due.map((m) => m.id);
const suppressedMonitorIds =
Expand Down
128 changes: 126 additions & 2 deletions apps/worker/test/scheduled.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type CreateEnvOptions = {
startedWindows?: unknown[];
endedWindows?: unknown[];
windowMonitorLinks?: unknown[];
schedulableMonitorPresent?: boolean | boolean[];
onRun?: (normalizedSql: string, args: unknown[]) => void;
};

Expand All @@ -78,8 +79,19 @@ function createEnv(options: CreateEnvOptions = {}): Env {
startedWindows = [],
endedWindows = [],
windowMonitorLinks = [],
schedulableMonitorPresent = true,
onRun,
} = options;
const schedulableMonitorResults = Array.isArray(schedulableMonitorPresent)
? [...schedulableMonitorPresent]
: null;
const schedulableMonitorFallback = Array.isArray(schedulableMonitorPresent)
? (schedulableMonitorPresent.at(-1) ?? false)
: schedulableMonitorPresent;
const readSchedulableMonitorPresent = () =>
schedulableMonitorResults
? (schedulableMonitorResults.shift() ?? schedulableMonitorFallback)
: schedulableMonitorFallback;

const handlers: FakeD1QueryHandler[] = [
{
Expand All @@ -91,6 +103,12 @@ function createEnv(options: CreateEnvOptions = {}): Env {
match: 'from notification_channels',
all: () => channels,
},
{
match: (normalizedSql) =>
normalizedSql.includes('select 1 as present') &&
normalizedSql.includes('from monitors m'),
first: () => (readSchedulableMonitorPresent() ? { present: 1 } : null),
},
{
match: 'from monitors m',
all: () =>
Expand Down Expand Up @@ -313,15 +331,15 @@ describe('scheduler/scheduled regression', () => {
await expect(listMonitorRowsByIds(env.DB, [0, -1])).resolves.toEqual([]);
});

it('returns without background work when no monitors are due', async () => {
it('queues homepage refresh when monitors are runnable but none are due', async () => {
const env = createEnv({ dueRows: [] });
const waitUntil = vi.fn();
const expectedNow = Math.floor(Date.now() / 1000);

await runScheduledTick(env, { waitUntil } as unknown as ExecutionContext);

expect(acquireLease).toHaveBeenCalledWith(env.DB, 'scheduler:tick', expectedNow, 135);
expect(readSettings).toHaveBeenCalledTimes(1);
expect(readSettings).not.toHaveBeenCalled();
expect(waitUntil).toHaveBeenCalledTimes(1);
await Promise.all(waitUntil.mock.calls.map((call) => call[0] as Promise<unknown>));
expect(refreshPublicHomepageSnapshotIfNeeded).toHaveBeenCalledWith({
Expand All @@ -339,6 +357,112 @@ describe('scheduler/scheduled regression', () => {
});
});

it('skips monitor scheduling but keeps idle public refresh and maintenance notifications', async () => {
const now = Math.floor(Date.now() / 1000);
const env = createEnv({
dueRows: [],
schedulableMonitorPresent: false,
channels: [
{
id: 1,
name: 'webhook',
config_json: JSON.stringify({
url: 'https://hooks.example.com/uptimer',
method: 'POST',
payload_type: 'json',
}),
created_at: now - 3600,
},
],
startedWindows: [
{
id: 1,
title: 'Deploy',
message: null,
starts_at: now - 60,
ends_at: now + 60,
created_at: now - 3600,
},
],
windowMonitorLinks: [{ maintenance_window_id: 1, monitor_id: 301 }],
});
const waitUntil = vi.fn();

await runScheduledTick(env, { waitUntil } as unknown as ExecutionContext);

expect(acquireLease).not.toHaveBeenCalled();
expect(readSettings).not.toHaveBeenCalled();
expect(waitUntil).toHaveBeenCalledTimes(2);
await Promise.all(waitUntil.mock.calls.map((call) => call[0] as Promise<unknown>));
expect(refreshPublicHomepageSnapshotIfNeeded).toHaveBeenCalledWith({
db: env.DB,
now,
compute: expect.any(Function),
seedDataSnapshot: true,
});
expect(computePublicHomepagePayload).not.toHaveBeenCalled();
expect(dispatchWebhookToChannels).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'maintenance.started',
eventKey: `maintenance:1:started:${now - 60}`,
channels: [expect.objectContaining({ id: 1 })],
}),
);
});

it('keeps idle public refresh and maintenance notifications after a post-lease pause race', async () => {
const now = Math.floor(Date.now() / 1000);
const env = createEnv({
dueRows: [],
schedulableMonitorPresent: [true, false],
channels: [
{
id: 1,
name: 'webhook',
config_json: JSON.stringify({
url: 'https://hooks.example.com/uptimer',
method: 'POST',
payload_type: 'json',
}),
created_at: now - 3600,
},
],
startedWindows: [
{
id: 1,
title: 'Deploy',
message: null,
starts_at: now - 60,
ends_at: now + 60,
created_at: now - 3600,
},
],
windowMonitorLinks: [{ maintenance_window_id: 1, monitor_id: 301 }],
});
const waitUntil = vi.fn();

await runScheduledTick(env, { waitUntil } as unknown as ExecutionContext);

expect(acquireLease).toHaveBeenCalledTimes(1);
expect(releaseLease).toHaveBeenCalledTimes(1);
expect(readSettings).not.toHaveBeenCalled();
expect(waitUntil).toHaveBeenCalledTimes(2);
await Promise.all(waitUntil.mock.calls.map((call) => call[0] as Promise<unknown>));
expect(refreshPublicHomepageSnapshotIfNeeded).toHaveBeenCalledWith({
db: env.DB,
now,
compute: expect.any(Function),
seedDataSnapshot: true,
});
expect(dispatchWebhookToChannels).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'maintenance.started',
eventKey: `maintenance:1:started:${now - 60}`,
channels: [expect.objectContaining({ id: 1 })],
}),
);
});

it('self-invokes homepage refresh via service binding when SELF is configured', async () => {
const env = createEnv({ dueRows: [] }) as unknown as Env;
env.ADMIN_TOKEN = 'test-admin-token';
Expand Down
Loading