From dac7352fadbef4da4c3a64087d2fff0ec8715847 Mon Sep 17 00:00:00 2001 From: RJWadley Date: Thu, 18 Jun 2026 13:23:30 -0600 Subject: [PATCH 1/2] sanity: use cdn client --- sanity/live.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sanity/live.tsx b/sanity/live.tsx index 19d31beb..07318bb3 100644 --- a/sanity/live.tsx +++ b/sanity/live.tsx @@ -20,7 +20,7 @@ import { isNextProductionBuild, } from "./liveEnvironment" -const libraryClient = client.withConfig({ useCdn: false }) +const libraryClient = client.withConfig({ useCdn: true }) /** * Use defineLive to keep Sanity fetches tagged with Content Lake sync tags. From 8957cefead367f45f563a570a30ce5e18edf488e Mon Sep 17 00:00:00 2001 From: RJWadley Date: Mon, 22 Jun 2026 11:14:40 -0600 Subject: [PATCH 2/2] sanity: scope live revalidation state --- sanity/liveProxy.ts | 79 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 11 deletions(-) diff --git a/sanity/liveProxy.ts b/sanity/liveProxy.ts index 10ed19cf..1ba6aa8d 100644 --- a/sanity/liveProxy.ts +++ b/sanity/liveProxy.ts @@ -12,6 +12,7 @@ import * as z from "zod" import { getLiveProxySupport, getLiveProxyUnsupportedMessage, + isLocalSiteURL, } from "./liveEnvironment" import { stringifyLiveProxyEvent } from "./liveProxyEvents" @@ -26,14 +27,27 @@ const STARTUP_SAFETY_WINDOW_MS = 60_000 const LIVE_STATE_ID = "_reform.liveState" const LIVE_STATE_TYPE = "reformLiveState" const WATERMARK_WRITE_MAX_ATTEMPTS = 5 +const MAX_LIVE_STATE_ENTRIES = 50 const HEARTBEAT_INTERVAL_MS = 25_000 const MAX_CONNECTION_MS = 280_000 const RECONNECT_LEAD_TIME_MS = 5_000 let messageQueue = Promise.resolve() let sanitySubscription: LiveSubscription | null = null +let liveRequestOrigin: string | null = null type LiveSubscription = { unsubscribe: () => void } +type LiveStateEntry = { + _key: string + key: string + processedAt: string + processedThroughUpdatedAt: string + reason: string +} +type LiveState = { + _rev?: string + states?: LiveStateEntry[] +} const postPayloadSchema = z.union([ z @@ -62,12 +76,26 @@ const latestPublishedUpdatedAtQuery = defineQuery(` const liveStateQuery = defineQuery(` *[_id == $id][0] { _rev, - processedThroughUpdatedAt + states[] { + _key, + key, + processedAt, + processedThroughUpdatedAt, + reason + } } `) +function getLiveStateKey() { + if (isLocalSiteURL(siteURL)) return null + + const { deploymentId } = getDeploymentVersionMetadata() + return deploymentId +} + async function revalidate(payload: { broad: true } | { tags: string[] }) { - const response = await fetch(`${siteURL}/api/live`, { + const revalidationURL = new URL("/api/live", liveRequestOrigin ?? siteURL) + const response = await fetch(revalidationURL, { method: "POST", headers: { "Content-Type": "application/json", @@ -78,6 +106,7 @@ async function revalidate(payload: { broad: true } | { tags: string[] }) { if (!response.ok) { console.error("Sanity live revalidation failed", { + url: revalidationURL.toString(), status: response.status, body: await response.text(), }) @@ -121,19 +150,27 @@ async function getLatestPublishedUpdatedAt() { } async function getLiveState() { - return await stateClient.fetch(liveStateQuery, { + return await stateClient.fetch(liveStateQuery, { id: LIVE_STATE_ID, }) } +function getLiveStateEntry(state: LiveState | null, key: string) { + return state?.states?.find((entry) => entry.key === key) +} + async function writeProcessedWatermark({ processedThroughUpdatedAt, reason, + stateKey, }: { processedThroughUpdatedAt: string reason: string + stateKey: string }) { - const nextState = { + const nextStateEntry = { + _key: stateKey, + key: stateKey, processedThroughUpdatedAt, processedAt: new Date().toISOString(), reason, @@ -141,23 +178,29 @@ async function writeProcessedWatermark({ for (let attempt = 0; attempt < WATERMARK_WRITE_MAX_ATTEMPTS; attempt++) { const state = await getLiveState() + const stateEntry = getLiveStateEntry(state, stateKey) if ( - state?.processedThroughUpdatedAt && + stateEntry?.processedThroughUpdatedAt && !isNewerTimestamp( processedThroughUpdatedAt, - state.processedThroughUpdatedAt ?? undefined, + stateEntry.processedThroughUpdatedAt ?? undefined, ) ) { return } + const states = [ + nextStateEntry, + ...(state?.states ?? []).filter((entry) => entry.key !== stateKey), + ].slice(0, MAX_LIVE_STATE_ENTRIES) + try { if (state?._rev) { await stateClient .patch(LIVE_STATE_ID) .ifRevisionId(state._rev) - .set(nextState) + .set({ states }) .commit() return } @@ -165,7 +208,7 @@ async function writeProcessedWatermark({ await stateClient.createIfNotExists({ _id: LIVE_STATE_ID, _type: LIVE_STATE_TYPE, - ...nextState, + states, }) return } catch (error) { @@ -176,11 +219,12 @@ async function writeProcessedWatermark({ } const latestState = await getLiveState() + const latestStateEntry = getLiveStateEntry(latestState, stateKey) if ( - latestState?.processedThroughUpdatedAt && + latestStateEntry?.processedThroughUpdatedAt && !isNewerTimestamp( processedThroughUpdatedAt, - latestState.processedThroughUpdatedAt ?? undefined, + latestStateEntry.processedThroughUpdatedAt ?? undefined, ) ) { return @@ -192,18 +236,28 @@ async function writeProcessedWatermark({ } async function updateProcessedWatermark(reason: string) { + const stateKey = getLiveStateKey() + if (!stateKey) return + const latestPublishedUpdatedAt = await getLatestPublishedUpdatedAt() if (!latestPublishedUpdatedAt) return await writeProcessedWatermark({ processedThroughUpdatedAt: latestPublishedUpdatedAt, reason, + stateKey, }).catch((error) => { console.error("Unable to write Sanity live state watermark", error) }) } async function runStartupCatchup(workerStartedAt: Date) { + const stateKey = getLiveStateKey() + if (!stateKey) { + await revalidate({ broad: true }) + return + } + const [latestPublishedResult, stateResult] = await Promise.allSettled([ getLatestPublishedUpdatedAt(), getLiveState(), @@ -228,12 +282,13 @@ async function runStartupCatchup(workerStartedAt: Date) { } if (latestPublishedUpdatedAt) { + const stateEntry = getLiveStateEntry(state, stateKey) const latestPublishedMs = Date.parse(latestPublishedUpdatedAt) const isNearStartup = latestPublishedMs >= workerStartedAt.getTime() - STARTUP_SAFETY_WINDOW_MS const isNewerThanWatermark = isNewerTimestamp( latestPublishedUpdatedAt, - state?.processedThroughUpdatedAt ?? undefined, + stateEntry?.processedThroughUpdatedAt ?? undefined, ) shouldBroadRevalidate = @@ -312,6 +367,8 @@ export async function POST(request: Request) { } export async function GET(request: Request) { + liveRequestOrigin = new URL(request.url).origin + const { allowProxy, canUseLiveProxy } = getLiveProxySupport({ allowProxy: libraryConfig.allowProxy, currentSiteURL: siteURL,