Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sanity/live.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
isNextProductionBuild,
} from "./liveEnvironment"

const libraryClient = client.withConfig({ useCdn: false })
const libraryClient = client.withConfig({ useCdn: true })

/**
* Use defineLive to keep Sanity fetches tagged with Content Lake sync tags.
Expand Down
79 changes: 68 additions & 11 deletions sanity/liveProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import * as z from "zod"
import {
getLiveProxySupport,
getLiveProxyUnsupportedMessage,
isLocalSiteURL,
} from "./liveEnvironment"
import { stringifyLiveProxyEvent } from "./liveProxyEvents"

Expand All @@ -26,14 +27,27 @@ const STARTUP_SAFETY_WINDOW_MS = 60_000
const LIVE_STATE_ID = "_reform.liveState"
const LIVE_STATE_TYPE = "reformLiveState"
const WATERMARK_WRITE_MAX_ATTEMPTS = 5
const MAX_LIVE_STATE_ENTRIES = 50
const HEARTBEAT_INTERVAL_MS = 25_000
const MAX_CONNECTION_MS = 280_000
const RECONNECT_LEAD_TIME_MS = 5_000

let messageQueue = Promise.resolve()
let sanitySubscription: LiveSubscription | null = null
let liveRequestOrigin: string | null = null

type LiveSubscription = { unsubscribe: () => void }
type LiveStateEntry = {
_key: string
key: string
processedAt: string
processedThroughUpdatedAt: string
reason: string
}
type LiveState = {
_rev?: string
states?: LiveStateEntry[]
}

const postPayloadSchema = z.union([
z
Expand Down Expand Up @@ -62,12 +76,26 @@ const latestPublishedUpdatedAtQuery = defineQuery(`
const liveStateQuery = defineQuery(`
*[_id == $id][0] {
_rev,
processedThroughUpdatedAt
states[] {
_key,
key,
processedAt,
processedThroughUpdatedAt,
reason
}
}
`)

function getLiveStateKey() {
if (isLocalSiteURL(siteURL)) return null

const { deploymentId } = getDeploymentVersionMetadata()
return deploymentId
}

async function revalidate(payload: { broad: true } | { tags: string[] }) {
const response = await fetch(`${siteURL}/api/live`, {
const revalidationURL = new URL("/api/live", liveRequestOrigin ?? siteURL)
const response = await fetch(revalidationURL, {
method: "POST",
headers: {
"Content-Type": "application/json",
Expand All @@ -78,6 +106,7 @@ async function revalidate(payload: { broad: true } | { tags: string[] }) {

if (!response.ok) {
console.error("Sanity live revalidation failed", {
url: revalidationURL.toString(),
status: response.status,
body: await response.text(),
})
Expand Down Expand Up @@ -121,51 +150,65 @@ async function getLatestPublishedUpdatedAt() {
}

async function getLiveState() {
return await stateClient.fetch(liveStateQuery, {
return await stateClient.fetch<LiveState | null>(liveStateQuery, {
id: LIVE_STATE_ID,
})
}

function getLiveStateEntry(state: LiveState | null, key: string) {
return state?.states?.find((entry) => entry.key === key)
}

async function writeProcessedWatermark({
processedThroughUpdatedAt,
reason,
stateKey,
}: {
processedThroughUpdatedAt: string
reason: string
stateKey: string
}) {
const nextState = {
const nextStateEntry = {
_key: stateKey,
key: stateKey,
processedThroughUpdatedAt,
processedAt: new Date().toISOString(),
reason,
}

for (let attempt = 0; attempt < WATERMARK_WRITE_MAX_ATTEMPTS; attempt++) {
const state = await getLiveState()
const stateEntry = getLiveStateEntry(state, stateKey)

if (
state?.processedThroughUpdatedAt &&
stateEntry?.processedThroughUpdatedAt &&
!isNewerTimestamp(
processedThroughUpdatedAt,
state.processedThroughUpdatedAt ?? undefined,
stateEntry.processedThroughUpdatedAt ?? undefined,
)
) {
return
}

const states = [
nextStateEntry,
...(state?.states ?? []).filter((entry) => entry.key !== stateKey),
].slice(0, MAX_LIVE_STATE_ENTRIES)

try {
if (state?._rev) {
await stateClient
.patch(LIVE_STATE_ID)
.ifRevisionId(state._rev)
.set(nextState)
.set({ states })
.commit()
return
}

await stateClient.createIfNotExists({
_id: LIVE_STATE_ID,
_type: LIVE_STATE_TYPE,
...nextState,
states,
})
return
} catch (error) {
Expand All @@ -176,11 +219,12 @@ async function writeProcessedWatermark({
}

const latestState = await getLiveState()
const latestStateEntry = getLiveStateEntry(latestState, stateKey)
if (
latestState?.processedThroughUpdatedAt &&
latestStateEntry?.processedThroughUpdatedAt &&
!isNewerTimestamp(
processedThroughUpdatedAt,
latestState.processedThroughUpdatedAt ?? undefined,
latestStateEntry.processedThroughUpdatedAt ?? undefined,
)
) {
return
Expand All @@ -192,18 +236,28 @@ async function writeProcessedWatermark({
}

async function updateProcessedWatermark(reason: string) {
const stateKey = getLiveStateKey()
if (!stateKey) return

const latestPublishedUpdatedAt = await getLatestPublishedUpdatedAt()
if (!latestPublishedUpdatedAt) return

await writeProcessedWatermark({
processedThroughUpdatedAt: latestPublishedUpdatedAt,
reason,
stateKey,
}).catch((error) => {
console.error("Unable to write Sanity live state watermark", error)
})
}

async function runStartupCatchup(workerStartedAt: Date) {
const stateKey = getLiveStateKey()
if (!stateKey) {
await revalidate({ broad: true })
return
}

const [latestPublishedResult, stateResult] = await Promise.allSettled([
getLatestPublishedUpdatedAt(),
getLiveState(),
Expand All @@ -228,12 +282,13 @@ async function runStartupCatchup(workerStartedAt: Date) {
}

if (latestPublishedUpdatedAt) {
const stateEntry = getLiveStateEntry(state, stateKey)
const latestPublishedMs = Date.parse(latestPublishedUpdatedAt)
const isNearStartup =
latestPublishedMs >= workerStartedAt.getTime() - STARTUP_SAFETY_WINDOW_MS
const isNewerThanWatermark = isNewerTimestamp(
latestPublishedUpdatedAt,
state?.processedThroughUpdatedAt ?? undefined,
stateEntry?.processedThroughUpdatedAt ?? undefined,
)

shouldBroadRevalidate =
Expand Down Expand Up @@ -312,6 +367,8 @@ export async function POST(request: Request) {
}

export async function GET(request: Request) {
liveRequestOrigin = new URL(request.url).origin

const { allowProxy, canUseLiveProxy } = getLiveProxySupport({
allowProxy: libraryConfig.allowProxy,
currentSiteURL: siteURL,
Expand Down