diff --git a/.yarnrc.yml b/.yarnrc.yml index 20364fd..20ec602 100644 --- a/.yarnrc.yml +++ b/.yarnrc.yml @@ -1,11 +1,11 @@ nodeLinker: node-modules -# @rwdocs/{core,viewer} 0.1.27 was just published; preapprove it past yarn's +# @rwdocs/{core,viewer} 0.1.28 was just published; preapprove it past yarn's # npmMinimalAgeGate (24h supply-chain quarantine). These are first-party -# packages we publish. Removable once 0.1.27 is >24h old. +# packages we publish. Removable once 0.1.28 is >24h old. npmPreapprovedPackages: - - "@rwdocs/core@^0.1.27" - - "@rwdocs/core-darwin-arm64@^0.1.27" - - "@rwdocs/core-linux-x64-gnu@^0.1.27" - - "@rwdocs/core-linux-x64-musl@^0.1.27" - - "@rwdocs/viewer@^0.1.27" + - "@rwdocs/core@^0.1.28" + - "@rwdocs/core-darwin-arm64@^0.1.28" + - "@rwdocs/core-linux-x64-gnu@^0.1.28" + - "@rwdocs/core-linux-x64-musl@^0.1.28" + - "@rwdocs/viewer@^0.1.28" diff --git a/plugins/rw-backend/migrations/20260624000000_create_site_index_tables.js b/plugins/rw-backend/migrations/20260624000000_create_site_index_tables.js new file mode 100644 index 0000000..2e66c49 --- /dev/null +++ b/plugins/rw-backend/migrations/20260624000000_create_site_index_tables.js @@ -0,0 +1,42 @@ +// @ts-check +/** @param {import('knex').Knex} knex */ +exports.up = async function up(knex) { + await knex.schema.createTable('section_ownership', table => { + table.text('site_ref').notNullable(); + table.text('section_ref').notNullable(); + table.text('entity_ref').notNullable(); + table.text('entity_owner_ref').nullable(); + table.primary(['site_ref', 'section_ref']); + table.index(['entity_owner_ref'], 'section_ownership_owner_idx'); + }); + await knex.schema.createTable('sections', table => { + table.text('site_ref').notNullable(); + table.text('section_ref').notNullable(); + table.text('section_path').notNullable(); + table.text('parent_section_ref').nullable(); + table.primary(['site_ref', 'section_ref']); + }); + await knex.schema.createTable('pages', table => { + table.text('site_ref').notNullable(); + table.text('section_ref').notNullable(); + table.text('subpath').notNullable(); + table.text('title').notNullable(); + table.primary(['site_ref', 'section_ref', 'subpath']); + }); + await knex.schema.createTable('site_refresh', table => { + table.text('site_ref').primary(); + table.dateTime('next_update_at').notNullable(); + table.dateTime('last_built_at').nullable(); + table.text('result_hash').nullable(); + table.text('errors').nullable(); + table.dateTime('last_discovery_at').notNullable(); + table.index(['next_update_at'], 'site_refresh_next_update_idx'); + }); +}; +/** @param {import('knex').Knex} knex */ +exports.down = async function down(knex) { + await knex.schema.dropTableIfExists('site_refresh'); + await knex.schema.dropTableIfExists('pages'); + await knex.schema.dropTableIfExists('sections'); + await knex.schema.dropTableIfExists('section_ownership'); +}; diff --git a/plugins/rw-backend/package.json b/plugins/rw-backend/package.json index bbe6515..2712b4e 100644 --- a/plugins/rw-backend/package.json +++ b/plugins/rw-backend/package.json @@ -53,10 +53,11 @@ "@backstage/plugin-permission-node": "^0.11.1", "@backstage/types": "^1.2.2", "@rwdocs/backstage-plugin-rw-common": "workspace:^", - "@rwdocs/core": "^0.1.27", + "@rwdocs/core": "^0.1.28", "express": "^4.21.0", "express-promise-router": "^4.1.0", "luxon": "^3.7.2", + "p-limit": "^3.1.0", "uuid": "^14.0.0", "zod": "^3.25.76 || ^4.0.0" }, diff --git a/plugins/rw-backend/src/plugin.ts b/plugins/rw-backend/src/plugin.ts index 8416d56..0d56db5 100644 --- a/plugins/rw-backend/src/plugin.ts +++ b/plugins/rw-backend/src/plugin.ts @@ -2,7 +2,14 @@ import { coreServices, createBackendPlugin, resolvePackagePath, + readSchedulerServiceTaskScheduleDefinitionFromConfig, } from "@backstage/backend-plugin-api"; +import { SectionOwnershipStore } from "./siteIndex/SectionOwnershipStore"; +import { RegistryStore } from "./siteIndex/RegistryStore"; +import { SiteRefreshStore } from "./siteIndex/SiteRefreshStore"; +import { runScan } from "./siteIndex/runScan"; +import { runWorker } from "./siteIndex/runWorker"; +import { makeSiteFactory } from "./siteIndex/schedule"; import { readDurationFromConfig } from "@backstage/config"; import { readRwSiteConfig, @@ -88,6 +95,40 @@ export const rwPlugin = createBackendPlugin({ } const store = new CommentStore(client); + const sectionOwnershipStore = new SectionOwnershipStore(client); + const registryStore = new RegistryStore(client); + const siteRefreshStore = new SiteRefreshStore(client); + const makeSite = makeSiteFactory(siteConfig); + + const scanSchedule = config.has("rw.siteIndex.schedule") + ? readSchedulerServiceTaskScheduleDefinitionFromConfig( + config.getConfig("rw.siteIndex.schedule"), + ) + : { frequency: { minutes: 15 }, timeout: { minutes: 10 }, initialDelay: { seconds: 30 } }; + const workerSchedule = config.has("rw.siteIndex.worker") + ? readSchedulerServiceTaskScheduleDefinitionFromConfig( + config.getConfig("rw.siteIndex.worker"), + ) + : { frequency: { seconds: 30 }, timeout: { minutes: 5 }, initialDelay: { seconds: 10 } }; + + await scheduler.scheduleTask({ + id: "rw-site-index-scan", + scope: "global", + ...scanSchedule, + fn: async () => + runScan({ catalog, auth, logger, siteConfig, sectionOwnershipStore, siteRefreshStore }), + }); + await scheduler.scheduleTask({ + id: "rw-site-index-worker", + scope: "local", + ...workerSchedule, + fn: async () => runWorker({ logger, siteRefreshStore, registryStore, makeSite }), + }); + logger.info( + `Scheduled site index rebuild: scan ${JSON.stringify(scanSchedule.frequency)} (global), ` + + `worker ${JSON.stringify(workerSchedule.frequency)} (local)`, + ); + const commentsEnabled = config.getOptionalBoolean("rw.comments.enabled") ?? true; permissionsRegistry.addResourceType({ diff --git a/plugins/rw-backend/src/siteIndex/RegistryStore.test.ts b/plugins/rw-backend/src/siteIndex/RegistryStore.test.ts new file mode 100644 index 0000000..5dd097c --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/RegistryStore.test.ts @@ -0,0 +1,60 @@ +import type { Knex } from "knex"; +import { createTestDb } from "./__testUtils__/testDb"; +import { RegistryStore } from "./RegistryStore"; + +describe("RegistryStore", () => { + let knex: Knex; + beforeEach(async () => (knex = await createTestDb())); + afterEach(async () => knex.destroy()); + + it("swapSite replaces sections and pages for the site atomically", async () => { + const store = new RegistryStore(knex); + await store.swapSite( + "component:default/a", + [ + { + site_ref: "component:default/a", + section_ref: "s1", + section_path: "", + parent_section_ref: null, + }, + ], + [{ site_ref: "component:default/a", section_ref: "s1", subpath: "", title: "Home" }], + ); + // seed site b before second swap on a + await store.swapSite( + "component:default/b", + [ + { + site_ref: "component:default/b", + section_ref: "sb1", + section_path: "b", + parent_section_ref: null, + }, + ], + [{ site_ref: "component:default/b", section_ref: "sb1", subpath: "bp", title: "B Home" }], + ); + await store.swapSite( + "component:default/a", + [ + { + site_ref: "component:default/a", + section_ref: "s2", + section_path: "x", + parent_section_ref: "s1", + }, + ], + [{ site_ref: "component:default/a", section_ref: "s2", subpath: "p", title: "P" }], + ); + + expect( + await knex("sections").where({ site_ref: "component:default/a" }).pluck("section_ref"), + ).toEqual(["s2"]); + expect( + await knex("pages").where({ site_ref: "component:default/a" }).pluck("section_ref"), + ).toEqual(["s2"]); + // site b must be untouched by the second swap on a + expect(await knex("sections").where({ site_ref: "component:default/b" })).toHaveLength(1); + expect(await knex("pages").where({ site_ref: "component:default/b" })).toHaveLength(1); + }); +}); diff --git a/plugins/rw-backend/src/siteIndex/RegistryStore.ts b/plugins/rw-backend/src/siteIndex/RegistryStore.ts new file mode 100644 index 0000000..5f8dd25 --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/RegistryStore.ts @@ -0,0 +1,16 @@ +import type { Knex } from "knex"; +import type { SectionRow, PageRow } from "./types"; + +export class RegistryStore { + constructor(private readonly knex: Knex) {} + + /** Replace the site's `sections` and `pages` rows in one transaction. */ + async swapSite(siteRef: string, sections: SectionRow[], pages: PageRow[]): Promise { + await this.knex.transaction(async (tx) => { + await tx("sections").where({ site_ref: siteRef }).del(); + await tx("pages").where({ site_ref: siteRef }).del(); + if (sections.length) await tx.batchInsert("sections", sections, 500); + if (pages.length) await tx.batchInsert("pages", pages, 500); + }); + } +} diff --git a/plugins/rw-backend/src/siteIndex/SectionOwnershipStore.test.ts b/plugins/rw-backend/src/siteIndex/SectionOwnershipStore.test.ts new file mode 100644 index 0000000..258efc9 --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/SectionOwnershipStore.test.ts @@ -0,0 +1,47 @@ +import type { Knex } from "knex"; +import { createTestDb } from "./__testUtils__/testDb"; +import { SectionOwnershipStore } from "./SectionOwnershipStore"; + +describe("SectionOwnershipStore", () => { + let knex: Knex; + beforeEach(async () => (knex = await createTestDb())); + afterEach(async () => knex.destroy()); + + it("swapSite replaces only the given site's links", async () => { + const store = new SectionOwnershipStore(knex); + await store.swapSite("component:default/a", [ + { + site_ref: "component:default/a", + section_ref: "s1", + entity_ref: "e1", + entity_owner_ref: "g1", + }, + ]); + await store.swapSite("component:default/b", [ + { + site_ref: "component:default/b", + section_ref: "s2", + entity_ref: "e2", + entity_owner_ref: null, + }, + ]); + // re-swap a with new links + await store.swapSite("component:default/a", [ + { + site_ref: "component:default/a", + section_ref: "s3", + entity_ref: "e3", + entity_owner_ref: "g3", + }, + ]); + + const rows = await knex("section_ownership").orderBy(["site_ref", "section_ref"]); + expect(rows.map((r) => `${r.site_ref}:${r.section_ref}`)).toEqual([ + "component:default/a:s3", + "component:default/b:s2", + ]); + const rowA = rows.find((r) => r.site_ref === "component:default/a" && r.section_ref === "s3"); + expect(rowA?.entity_ref).toBe("e3"); + expect(rowA?.entity_owner_ref).toBe("g3"); + }); +}); diff --git a/plugins/rw-backend/src/siteIndex/SectionOwnershipStore.ts b/plugins/rw-backend/src/siteIndex/SectionOwnershipStore.ts new file mode 100644 index 0000000..bd845cd --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/SectionOwnershipStore.ts @@ -0,0 +1,19 @@ +import type { Knex } from "knex"; +import type { SectionOwnershipRow } from "./types"; + +const TABLE = "section_ownership"; + +export class SectionOwnershipStore { + constructor(private readonly knex: Knex) {} + + /** Replace all links for `siteRef`. Pass `executor` to join a per-site transaction. */ + async swapSite( + siteRef: string, + links: SectionOwnershipRow[], + executor?: Knex | Knex.Transaction, + ): Promise { + const exec = executor ?? this.knex; + await exec(TABLE).where({ site_ref: siteRef }).del(); + if (links.length) await exec.batchInsert(TABLE, links, 500); + } +} diff --git a/plugins/rw-backend/src/siteIndex/SiteRefreshStore.test.ts b/plugins/rw-backend/src/siteIndex/SiteRefreshStore.test.ts new file mode 100644 index 0000000..f5e75fb --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/SiteRefreshStore.test.ts @@ -0,0 +1,73 @@ +import type { Knex } from "knex"; +import { createTestDb } from "./__testUtils__/testDb"; +import { SiteRefreshStore } from "./SiteRefreshStore"; + +describe("SiteRefreshStore", () => { + let knex: Knex; + let store: SiteRefreshStore; + beforeEach(async () => { + knex = await createTestDb(); + store = new SiteRefreshStore(knex); + }); + afterEach(async () => knex.destroy()); + + it("upsertSite inserts new rows as due now and keeps existing schedule on re-scan", async () => { + const t1 = new Date("2026-06-24T00:00:00Z"); + await store.upsertSite("s", t1); + let row = await knex("site_refresh").where({ site_ref: "s" }).first(); + expect(new Date(row.next_update_at).getTime()).toBe(t1.getTime()); + + // simulate a completed build pushing next_update_at far out + const future = new Date("2026-06-24T01:00:00Z"); + await store.completeSuccess("s", "h", future, t1); + + const t2 = new Date("2026-06-24T00:30:00Z"); + await store.upsertSite("s", t2); + row = await knex("site_refresh").where({ site_ref: "s" }).first(); + // next_update_at preserved (not reset to t2); last_discovery_at advanced + expect(new Date(row.next_update_at).getTime()).toBe(future.getTime()); + expect(new Date(row.last_discovery_at).getTime()).toBe(t2.getTime()); + }); + + it("pruneMissing deletes rows not seen this scan", async () => { + await store.upsertSite("old", new Date("2026-06-24T00:00:00Z")); + await store.upsertSite("new", new Date("2026-06-24T02:00:00Z")); + const deleted = await store.pruneMissing(new Date("2026-06-24T01:00:00Z")); + expect(deleted).toBe(1); + expect(await knex("site_refresh").pluck("site_ref")).toEqual(["new"]); + }); + + it("claimDue returns due rows oldest-first and bumps next_update_at to the lease", async () => { + await store.upsertSite("a", new Date("2026-06-24T00:00:00Z")); + await store.upsertSite("b", new Date("2026-06-24T00:00:01Z")); + const lease = new Date("2026-06-24T03:00:00Z"); + const claimed = await store.claimDue(new Date("2026-06-24T01:00:00Z"), 10, lease); + expect(claimed.map((c) => c.siteRef)).toEqual(["a", "b"]); + // both leased out → no longer due before lease + const again = await store.claimDue(new Date("2026-06-24T02:00:00Z"), 10, lease); + expect(again).toEqual([]); + }); + + it("completeSuccess sets last_built_at/result_hash/next_update_at and clears errors", async () => { + await store.upsertSite("a", new Date("2026-06-24T00:00:00Z")); + await store.recordError("a", "boom"); + const next = new Date("2026-06-24T04:00:00Z"); + const now = new Date("2026-06-24T00:05:00Z"); + await store.completeSuccess("a", "hash1", next, now); + const row = await knex("site_refresh").where({ site_ref: "a" }).first(); + expect(row.result_hash).toBe("hash1"); + expect(row.errors).toBeNull(); + expect(new Date(row.last_built_at).getTime()).toBe(now.getTime()); + expect(new Date(row.next_update_at).getTime()).toBe(next.getTime()); + }); + + it("allBuilt is false until every row has last_built_at", async () => { + await store.upsertSite("a", new Date("2026-06-24T00:00:00Z")); + await store.upsertSite("b", new Date("2026-06-24T00:00:00Z")); + expect(await store.allBuilt()).toBe(false); + await store.completeSuccess("a", "h", new Date(), new Date()); + expect(await store.allBuilt()).toBe(false); + await store.completeSuccess("b", "h", new Date(), new Date()); + expect(await store.allBuilt()).toBe(true); + }); +}); diff --git a/plugins/rw-backend/src/siteIndex/SiteRefreshStore.ts b/plugins/rw-backend/src/siteIndex/SiteRefreshStore.ts new file mode 100644 index 0000000..6f0146b --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/SiteRefreshStore.ts @@ -0,0 +1,89 @@ +import type { Knex } from "knex"; +import type { ClaimedSite } from "./types"; + +const TABLE = "site_refresh"; +// SQLite does not implement FOR UPDATE SKIP LOCKED; only take row locks on clients that support it. +const LOCKING_CLIENTS = ["pg", "mysql", "mysql2"]; + +export class SiteRefreshStore { + private readonly useLocking: boolean; + constructor(private readonly knex: Knex) { + this.useLocking = LOCKING_CLIENTS.includes(String(knex.client.config.client)); + } + + /** Insert a new queue row (due now) or, on conflict, only advance last_discovery_at. */ + async upsertSite( + siteRef: string, + scanStart: Date, + executor?: Knex | Knex.Transaction, + ): Promise { + const exec = executor ?? this.knex; + await exec(TABLE) + .insert({ + site_ref: siteRef, + next_update_at: scanStart, + last_built_at: null, + result_hash: null, + errors: null, + last_discovery_at: scanStart, + }) + .onConflict("site_ref") + .merge(["last_discovery_at"]); + } + + /** Delete queue rows not seen in the current (completed) scan. Returns count. */ + async pruneMissing(scanStart: Date): Promise { + return this.knex(TABLE).where("last_discovery_at", "<", scanStart).del(); + } + + /** Claim up to `batch` due rows, bumping their lease, in one transaction. */ + async claimDue(now: Date, batch: number, leaseUntil: Date): Promise { + return this.knex.transaction(async (tx) => { + const q = tx(TABLE) + .where("next_update_at", "<=", now) + .orderBy("next_update_at", "asc") + .limit(batch) + .select("site_ref", "result_hash"); + if (this.useLocking) q.forUpdate().skipLocked(); + const rows = await q; + if (rows.length) { + await tx(TABLE) + .whereIn( + "site_ref", + rows.map((r) => r.site_ref), + ) + .update({ next_update_at: leaseUntil }); + } + return rows.map((r) => ({ siteRef: r.site_ref, resultHash: r.result_hash ?? null })); + }); + } + + async completeSuccess( + siteRef: string, + resultHash: string, + nextUpdateAt: Date, + now: Date, + ): Promise { + await this.knex(TABLE).where({ site_ref: siteRef }).update({ + last_built_at: now, + result_hash: resultHash, + errors: null, + next_update_at: nextUpdateAt, + }); + } + + async recordError(siteRef: string, message: string): Promise { + await this.knex(TABLE).where({ site_ref: siteRef }).update({ errors: message }); + } + + /** True iff every queue row has been built at least once. */ + async allBuilt(): Promise { + const [{ count }] = await this.knex(TABLE).whereNull("last_built_at").count({ count: "*" }); + return Number(count) === 0; + } + + /** Run fn in a single DB transaction (lets a caller atomically span multiple stores). */ + async transaction(fn: (tx: Knex.Transaction) => Promise): Promise { + return this.knex.transaction(fn); + } +} diff --git a/plugins/rw-backend/src/siteIndex/__testUtils__/testDb.test.ts b/plugins/rw-backend/src/siteIndex/__testUtils__/testDb.test.ts new file mode 100644 index 0000000..9fe4671 --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/__testUtils__/testDb.test.ts @@ -0,0 +1,14 @@ +import { createTestDb } from "./testDb"; + +describe("ownership migrations", () => { + it("creates the four ownership tables", async () => { + const knex = await createTestDb(); + try { + for (const t of ["section_ownership", "sections", "pages", "site_refresh"]) { + expect(await knex.schema.hasTable(t)).toBe(true); + } + } finally { + await knex.destroy(); + } + }); +}); diff --git a/plugins/rw-backend/src/siteIndex/__testUtils__/testDb.ts b/plugins/rw-backend/src/siteIndex/__testUtils__/testDb.ts new file mode 100644 index 0000000..2540e4c --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/__testUtils__/testDb.ts @@ -0,0 +1,16 @@ +import { TestDatabases } from "@backstage/backend-test-utils"; +import { resolvePackagePath } from "@backstage/backend-plugin-api"; +import type { Knex } from "knex"; + +// Created once at module scope (TestDatabases registers an afterAll hook, which +// must not run inside a test/beforeEach). Each createTestDb() call gets a fresh DB. +const databases = TestDatabases.create({ ids: ["SQLITE_3"] }); + +/** Fresh in-memory SQLite DB with all rw-backend migrations applied. */ +export async function createTestDb(): Promise { + const knex = await databases.init("SQLITE_3"); + await knex.migrate.latest({ + directory: resolvePackagePath("@rwdocs/backstage-plugin-rw-backend", "migrations"), + }); + return knex; +} diff --git a/plugins/rw-backend/src/siteIndex/registryHash.test.ts b/plugins/rw-backend/src/siteIndex/registryHash.test.ts new file mode 100644 index 0000000..dcef7d0 --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/registryHash.test.ts @@ -0,0 +1,14 @@ +import { registryHash } from "./registryHash"; + +describe("registryHash", () => { + it("is stable for identical input and changes when content changes", () => { + const sections = [ + { site_ref: "a", section_ref: "s1", section_path: "", parent_section_ref: null }, + ]; + const pages = [{ site_ref: "a", section_ref: "s1", subpath: "", title: "Home" }]; + const h1 = registryHash(sections, pages); + expect(registryHash(sections, pages)).toBe(h1); + expect(registryHash(sections, [{ ...pages[0], title: "Changed" }])).not.toBe(h1); + expect(registryHash([{ ...sections[0], section_path: "changed" }], pages)).not.toBe(h1); + }); +}); diff --git a/plugins/rw-backend/src/siteIndex/registryHash.ts b/plugins/rw-backend/src/siteIndex/registryHash.ts new file mode 100644 index 0000000..4e14bc9 --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/registryHash.ts @@ -0,0 +1,7 @@ +import { createHash } from "crypto"; +import type { SectionRow, PageRow } from "./types"; + +/** Deterministic hash of a site's registries. Caller must pass sorted arrays. */ +export function registryHash(sections: SectionRow[], pages: PageRow[]): string { + return createHash("sha256").update(JSON.stringify({ sections, pages })).digest("hex"); +} diff --git a/plugins/rw-backend/src/siteIndex/runScan.test.ts b/plugins/rw-backend/src/siteIndex/runScan.test.ts new file mode 100644 index 0000000..76ee394 --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/runScan.test.ts @@ -0,0 +1,134 @@ +import type { Knex } from "knex"; +import { createTestDb } from "./__testUtils__/testDb"; +import { SectionOwnershipStore } from "./SectionOwnershipStore"; +import { SiteRefreshStore } from "./SiteRefreshStore"; +import { runScan } from "./runScan"; + +const RW = "rwdocs.org/ref"; + +function ent(name: string, ref: string, owner?: string) { + return { + kind: "Component", + metadata: { namespace: "default", name, annotations: { [RW]: ref } }, + relations: owner ? [{ type: "ownedBy", targetRef: owner }] : [], + } as any; +} + +function catalogReturning(items: any[]) { + return { queryEntities: async () => ({ items, pageInfo: {}, totalItems: items.length }) }; +} + +const deps = (knex: Knex, catalog: any) => ({ + catalog, + auth: { getOwnServiceCredentials: async () => ({}) } as any, + logger: { warn() {}, info() {} } as any, + siteConfig: { s3: { bucket: "b" } } as any, + sectionOwnershipStore: new SectionOwnershipStore(knex), + siteRefreshStore: new SiteRefreshStore(knex), +}); + +describe("runScan", () => { + let knex: Knex; + beforeEach(async () => (knex = await createTestDb())); + afterEach(async () => knex.destroy()); + + it("writes section_ownership links and seeds site_refresh", async () => { + // entity arch claims section s1 of site docs; entity docs self-hosts its root + const catalog = catalogReturning([ + ent("arch", "component:default/docs#s1", "group:default/team-a"), + ent("docs", ".", "group:default/owners"), + ]); + await runScan(deps(knex, catalog)); + + const links = await knex("section_ownership").orderBy("section_ref"); + expect(links).toEqual([ + { + site_ref: "component:default/docs", + section_ref: "component:default/docs", + entity_ref: "component:default/docs", + entity_owner_ref: "group:default/owners", + }, + { + site_ref: "component:default/docs", + section_ref: "s1", + entity_ref: "component:default/arch", + entity_owner_ref: "group:default/team-a", + }, + ]); + expect(await knex("site_refresh").pluck("site_ref")).toEqual(["component:default/docs"]); + }); + + it("prunes a vanished site's queue row but leaves section_ownership orphans", async () => { + await runScan(deps(knex, catalogReturning([ent("docs", ".", "group:default/o")]))); + // second scan: docs is gone, only other appears + await runScan(deps(knex, catalogReturning([ent("other", ".", "group:default/o")]))); + + expect(await knex("site_refresh").pluck("site_ref")).toEqual(["component:default/other"]); + // orphan link for the vanished site remains + expect(await knex("section_ownership").pluck("site_ref")).toContain("component:default/docs"); + }); + + it("does NOT prune when iteration fails", async () => { + await runScan(deps(knex, catalogReturning([ent("docs", ".", "group:default/o")]))); + const failing = { + queryEntities: async () => { + throw new Error("catalog down"); + }, + }; + await runScan(deps(knex, failing)); + // queue row survives the failed scan + expect(await knex("site_refresh").pluck("site_ref")).toEqual(["component:default/docs"]); + }); + + it("deduplicates section links (last-claim-wins) when two entities share the same section", async () => { + // Both arch-a and arch-b claim section s1 of the same site; arch-b appears last → wins. + const catalog = catalogReturning([ + ent("arch-a", "component:default/docs#s1", "group:default/team-a"), + ent("arch-b", "component:default/docs#s1", "group:default/team-b"), + ]); + await runScan(deps(knex, catalog)); + + const links = await knex("section_ownership").where({ section_ref: "s1" }); + expect(links).toHaveLength(1); + expect(links[0].entity_ref).toBe("component:default/arch-b"); + // site_refresh row must exist (no PK crash) + expect(await knex("site_refresh").pluck("site_ref")).toEqual(["component:default/docs"]); + }); + + it("does NOT prune a site whose per-site write failed", async () => { + // Seed a site in the queue via a clean first scan. + await runScan(deps(knex, catalogReturning([ent("docs", ".", "group:default/o")]))); + expect(await knex("site_refresh").pluck("site_ref")).toEqual(["component:default/docs"]); + + // Second scan: docs is still in the catalog but swapSite throws for it. + const realSectionOwnershipStore = new SectionOwnershipStore(knex); + const faultySectionOwnershipStore = { + ...realSectionOwnershipStore, + swapSite: async () => { + throw new Error("db write error"); + }, + }; + // Use a spy to confirm pruneMissing is never called. + const realSiteRefreshStore = new SiteRefreshStore(knex); + let pruneCallCount = 0; + const spySiteRefreshStore = { + ...realSiteRefreshStore, + transaction: realSiteRefreshStore.transaction.bind(realSiteRefreshStore), + pruneMissing: async (d: Date) => { + pruneCallCount++; + return realSiteRefreshStore.pruneMissing(d); + }, + upsertSite: realSiteRefreshStore.upsertSite.bind(realSiteRefreshStore), + }; + + await runScan({ + ...deps(knex, catalogReturning([ent("docs", ".", "group:default/o")])), + sectionOwnershipStore: faultySectionOwnershipStore as any, + siteRefreshStore: spySiteRefreshStore as any, + }); + + expect(pruneCallCount).toBe(0); + // The site_refresh row must still exist (not pruned). + expect(await knex("site_refresh").pluck("site_ref")).toEqual(["component:default/docs"]); + }); +}); diff --git a/plugins/rw-backend/src/siteIndex/runScan.ts b/plugins/rw-backend/src/siteIndex/runScan.ts new file mode 100644 index 0000000..add6f3b --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/runScan.ts @@ -0,0 +1,121 @@ +import { + stringifyEntityRef, + getCompoundEntityRef, + parseEntityRef, + RELATION_OWNED_BY, + type Entity, +} from "@backstage/catalog-model"; +import type { AuthService, LoggerService } from "@backstage/backend-plugin-api"; +import type { CatalogService } from "@backstage/plugin-catalog-node"; +import { + iterateAnnotatedEntities, + parseAnnotation, + toEntityPath, + RW_ANNOTATION, + type RwSiteConfig, +} from "@rwdocs/backstage-plugin-rw-common"; +import type { SectionOwnershipStore } from "./SectionOwnershipStore"; +import type { SiteRefreshStore } from "./SiteRefreshStore"; +import type { SectionOwnershipRow } from "./types"; + +function ownerOf(entity: Entity): string | null { + const rel = (entity.relations ?? []).find((r) => r.type === RELATION_OWNED_BY); + return rel?.targetRef ?? null; +} + +export async function runScan(deps: { + catalog: Pick; + auth: AuthService; + logger: LoggerService; + siteConfig: RwSiteConfig; + sectionOwnershipStore: SectionOwnershipStore; + siteRefreshStore: SiteRefreshStore; + now?: () => Date; +}): Promise { + const { catalog, auth, logger, siteConfig, sectionOwnershipStore, siteRefreshStore } = deps; + const now = deps.now ?? (() => new Date()); + const credentials = await auth.getOwnServiceCredentials(); + + // projectDir mode serves exactly one site; constrain discovery to it. + const onlySiteRef = + siteConfig.projectDir && siteConfig.entity + ? stringifyEntityRef(parseEntityRef(siteConfig.entity)) + : undefined; + + const scanStart = now(); + // Dedup per-site links by section_ref (last-claim-wins) to avoid a PK violation on swap. + const linksBySite = new Map>(); + let completed = false; + let writeFailed = false; + + try { + for await (const { entity } of iterateAnnotatedEntities(catalog, credentials)) { + const selfRef = stringifyEntityRef(getCompoundEntityRef(entity)); + const parsed = parseAnnotation( + entity.metadata?.annotations?.[RW_ANNOTATION], + toEntityPath(selfRef), + ); + if (!parsed) continue; + const siteRef = parsed.entityRef; + if (onlySiteRef && siteRef !== onlySiteRef) continue; + + let link: SectionOwnershipRow | undefined; + if (parsed.sectionRef) { + link = { + site_ref: siteRef, + section_ref: parsed.sectionRef, + entity_ref: selfRef, + entity_owner_ref: ownerOf(entity), + }; + } else if (parsed.entityRef === selfRef) { + // Self-host root claim: no explicit section ref, so use the site ref itself as the + // section_ref sentinel. At read time, ownership resolution falls back to this sentinel + // when no more-specific section link is found. + link = { + site_ref: siteRef, + section_ref: siteRef, + entity_ref: selfRef, + entity_owner_ref: ownerOf(entity), + }; + } + // section-less claim on another site: ignored (matches legacy behavior) + if (!link) continue; + + const inner = linksBySite.get(siteRef) ?? new Map(); + inner.set(link.section_ref, link); // last-claim-wins dedup by section_ref + linksBySite.set(siteRef, inner); + } + completed = true; + } catch (err) { + logger.warn(`Site index scan iteration failed; skipping prune: ${err}`); + } + + // Per site: swap links + upsert queue row atomically in one transaction. + for (const [siteRef, inner] of linksBySite) { + try { + await siteRefreshStore.transaction(async (tx) => { + await sectionOwnershipStore.swapSite(siteRef, [...inner.values()], tx); + await siteRefreshStore.upsertSite(siteRef, scanStart, tx); + }); + } catch (err) { + logger.warn(`Site index scan failed for site ${siteRef}: ${err}`); + // A per-site write failure leaves last_discovery_at un-updated, so + // pruneMissing would incorrectly delete that still-present site. Mark the scan + // as dirty so we skip the prune entirely. + writeFailed = true; + } + } + + // Prune only after a clean, complete iteration and all per-site writes succeeded — + // a partial/failed scan must not delete still-present sites from the queue. + let pruned = 0; + if (completed && !writeFailed) { + pruned = await siteRefreshStore.pruneMissing(scanStart); + } + + logger.info( + `Site index scan: ${linksBySite.size} site(s) discovered${ + completed && !writeFailed ? `, ${pruned} stale pruned` : ", prune skipped (incomplete scan)" + }`, + ); +} diff --git a/plugins/rw-backend/src/siteIndex/runWorker.test.ts b/plugins/rw-backend/src/siteIndex/runWorker.test.ts new file mode 100644 index 0000000..fc12231 --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/runWorker.test.ts @@ -0,0 +1,91 @@ +import type { Knex } from "knex"; +import { createTestDb } from "./__testUtils__/testDb"; +import { SiteRefreshStore } from "./SiteRefreshStore"; +import { RegistryStore } from "./RegistryStore"; +import { runWorker } from "./runWorker"; + +function fakeSite() { + return { + listSections: async () => [{ sectionRef: "component:default/docs", path: "", ancestors: [] }], + listPages: async () => [{ sectionRef: "component:default/docs", subpath: "", title: "Home" }], + }; +} + +const deps = (knex: Knex, makeSite: any, now?: () => Date) => ({ + logger: { warn() {}, info() {}, debug() {} } as any, + siteRefreshStore: new SiteRefreshStore(knex), + registryStore: new RegistryStore(knex), + makeSite, + now, + rng: () => 0.5, +}); + +describe("runWorker", () => { + let knex: Knex; + beforeEach(async () => (knex = await createTestDb())); + afterEach(async () => knex.destroy()); + + it("builds a due site: writes sections/pages and marks built", async () => { + const store = new SiteRefreshStore(knex); + await store.upsertSite("component:default/docs", new Date("2026-06-24T00:00:00Z")); + await runWorker( + deps( + knex, + () => fakeSite(), + () => new Date("2026-06-24T00:01:00Z"), + ), + ); + + expect(await knex("sections").pluck("section_ref")).toEqual(["component:default/docs"]); + expect(await knex("pages").pluck("title")).toEqual(["Home"]); + const row = await knex("site_refresh").where({ site_ref: "component:default/docs" }).first(); + expect(row.last_built_at).not.toBeNull(); + expect(row.result_hash).not.toBeNull(); + }); + + it("records an error and does not throw when site load fails", async () => { + const store = new SiteRefreshStore(knex); + await store.upsertSite("component:default/docs", new Date("2026-06-24T00:00:00Z")); + const makeSite = () => ({ + listSections: async () => { + throw new Error("s3 down"); + }, + listPages: async () => [], + }); + await runWorker(deps(knex, makeSite)); + const row = await knex("site_refresh").where({ site_ref: "component:default/docs" }).first(); + expect(row.errors).toContain("s3 down"); + expect(row.last_built_at).toBeNull(); + }); + + it("short-circuits the registry swap when result_hash is unchanged", async () => { + const store = new SiteRefreshStore(knex); + await store.upsertSite("component:default/docs", new Date("2026-06-24T00:00:00Z")); + await runWorker( + deps( + knex, + () => fakeSite(), + () => new Date("2026-06-24T00:01:00Z"), + ), + ); + // make the site due again + await knex("site_refresh").update({ next_update_at: new Date("2026-06-24T00:00:00Z") }); + let swaps = 0; + const registryStore = new RegistryStore(knex); + const orig = registryStore.swapSite.bind(registryStore); + registryStore.swapSite = async (...a: Parameters) => { + swaps++; + return orig(...a); + }; + await runWorker({ + ...deps( + knex, + () => fakeSite(), + () => new Date("2026-06-25T00:00:00Z"), + ), + registryStore, + rng: () => 0.5, + }); + expect(swaps).toBe(0); // unchanged content → no swap + }); +}); diff --git a/plugins/rw-backend/src/siteIndex/runWorker.ts b/plugins/rw-backend/src/siteIndex/runWorker.ts new file mode 100644 index 0000000..e46ff1e --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/runWorker.ts @@ -0,0 +1,96 @@ +import pLimit from "p-limit"; +import type { LoggerService } from "@backstage/backend-plugin-api"; +import { toEntityPath } from "@rwdocs/backstage-plugin-rw-common"; +import type { RwSite } from "@rwdocs/core"; +import type { SiteRefreshStore } from "./SiteRefreshStore"; +import type { RegistryStore } from "./RegistryStore"; +import type { SectionRow, PageRow } from "./types"; +import { registryHash } from "./registryHash"; +import { jitteredNextUpdate, BATCH_SIZE, CONCURRENCY, LEASE_MS, INTERVAL_MS } from "./schedule"; + +function sortSections(rows: SectionRow[]): SectionRow[] { + return [...rows].sort((a, b) => a.section_ref.localeCompare(b.section_ref)); +} +function sortPages(rows: PageRow[]): PageRow[] { + return [...rows].sort( + (a, b) => a.section_ref.localeCompare(b.section_ref) || a.subpath.localeCompare(b.subpath), + ); +} + +export async function runWorker(deps: { + logger: LoggerService; + siteRefreshStore: SiteRefreshStore; + registryStore: RegistryStore; + makeSite: (entityPath: string) => Pick; + now?: () => Date; + rng?: () => number; +}): Promise { + const { logger, siteRefreshStore, registryStore, makeSite } = deps; + const now = deps.now ?? (() => new Date()); + + const claimNow = now(); + const claimed = await siteRefreshStore.claimDue( + claimNow, + BATCH_SIZE, + new Date(claimNow.getTime() + LEASE_MS), + ); + if (!claimed.length) return; + logger.info(`Site index worker: rebuilding ${claimed.length} site(s)`); + + const limit = pLimit(CONCURRENCY); + await Promise.all( + claimed.map(({ siteRef, resultHash }) => + limit(async () => { + try { + const site = makeSite(toEntityPath(siteRef)); + const [rawSections, rawPages] = await Promise.all([ + site.listSections(), + site.listPages(), + ]); + const sections = sortSections( + rawSections.map((s) => ({ + site_ref: siteRef, + section_ref: s.sectionRef, + section_path: s.path, + parent_section_ref: s.ancestors[0] ?? null, // ancestors is nearest-first; [0] is immediate parent + })), + ); + const pages = sortPages( + rawPages.map((p) => ({ + site_ref: siteRef, + section_ref: p.sectionRef, + subpath: p.subpath, + title: p.title, + })), + ); + const hash = registryHash(sections, pages); + const changed = hash !== resultHash; + if (changed) { + await registryStore.swapSite(siteRef, sections, pages); + } + const completedAt = now(); + await siteRefreshStore.completeSuccess( + siteRef, + hash, + jitteredNextUpdate(completedAt, INTERVAL_MS, deps.rng), + completedAt, + ); + logger.debug( + `Rebuilt ${siteRef}: ${sections.length} sections, ${pages.length} pages${ + changed ? "" : " (unchanged)" + }`, + ); + } catch (err) { + logger.warn(`Site index rebuild failed for site ${siteRef}: ${err}`); + // recordError is best-effort: if it also throws (e.g. DB down), swallow it so one + // site's failure never rejects Promise.all and aborts the rest of the batch. + try { + await siteRefreshStore.recordError(siteRef, String(err)); + } catch (recordErr) { + logger.warn(`Failed to record rebuild error for site ${siteRef}: ${recordErr}`); + } + } + }), + ), + ); +} diff --git a/plugins/rw-backend/src/siteIndex/schedule.test.ts b/plugins/rw-backend/src/siteIndex/schedule.test.ts new file mode 100644 index 0000000..2c838ec --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/schedule.test.ts @@ -0,0 +1,11 @@ +import { jitteredNextUpdate, INTERVAL_MS } from "./schedule"; + +describe("jitteredNextUpdate", () => { + it("stays within [0.5x, 1.5x] of the interval", () => { + const now = new Date("2026-06-24T00:00:00Z"); + const lo = jitteredNextUpdate(now, INTERVAL_MS, () => 0).getTime() - now.getTime(); + const hi = jitteredNextUpdate(now, INTERVAL_MS, () => 1).getTime() - now.getTime(); + expect(lo).toBe(INTERVAL_MS * 0.5); + expect(hi).toBe(INTERVAL_MS * 1.5); + }); +}); diff --git a/plugins/rw-backend/src/siteIndex/schedule.ts b/plugins/rw-backend/src/siteIndex/schedule.ts new file mode 100644 index 0000000..8bf3459 --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/schedule.ts @@ -0,0 +1,27 @@ +import { createSite, type RwSite } from "@rwdocs/core"; +import type { RwSiteConfig } from "@rwdocs/backstage-plugin-rw-common"; + +export const INTERVAL_MS = 15 * 60_000; +export const LEASE_MS = 5 * 60_000; +export const BATCH_SIZE = 10; +export const CONCURRENCY = 4; + +/** now + intervalMs * (0.5 + rng()); rng defaults to Math.random (range [0.5x, 1.5x]). */ +export function jitteredNextUpdate( + now: Date, + intervalMs: number = INTERVAL_MS, + rng: () => number = Math.random, +): Date { + return new Date(now.getTime() + intervalMs * (0.5 + rng())); +} + +/** Build an `RwSite` for an entity path, branching projectDir vs S3 (matches the collator). */ +export function makeSiteFactory(siteConfig: RwSiteConfig): (entityPath: string) => RwSite { + return (entityPath: string) => + siteConfig.projectDir + ? createSite({ projectDir: siteConfig.projectDir, diagrams: siteConfig.diagrams }) + : createSite({ + s3: { ...siteConfig.s3!, entity: entityPath }, + diagrams: siteConfig.diagrams, + }); +} diff --git a/plugins/rw-backend/src/siteIndex/types.ts b/plugins/rw-backend/src/siteIndex/types.ts new file mode 100644 index 0000000..a718e67 --- /dev/null +++ b/plugins/rw-backend/src/siteIndex/types.ts @@ -0,0 +1,29 @@ +/** A sparse section→entity claim link (written by the scan). */ +export interface SectionOwnershipRow { + site_ref: string; + section_ref: string; + entity_ref: string; + entity_owner_ref: string | null; +} + +/** A section registry row (written by the worker from listSections). */ +export interface SectionRow { + site_ref: string; + section_ref: string; + section_path: string; + parent_section_ref: string | null; +} + +/** A page registry row (written by the worker from listPages). */ +export interface PageRow { + site_ref: string; + section_ref: string; + subpath: string; + title: string; +} + +/** A row claimed off the queue for processing. */ +export interface ClaimedSite { + siteRef: string; + resultHash: string | null; +} diff --git a/plugins/rw-common/config.d.ts b/plugins/rw-common/config.d.ts index f0e703b..7a1a2da 100644 --- a/plugins/rw-common/config.d.ts +++ b/plugins/rw-common/config.d.ts @@ -64,5 +64,15 @@ export interface Config { */ enabled?: boolean; }; + /** + * Site index rebuild (catalog scan + per-site worker queue). + */ + siteIndex?: { + /** Schedule for the catalog scan (producer). Parsed by + * readSchedulerServiceTaskScheduleDefinitionFromConfig. Accepts HumanDuration / cron. */ + schedule?: object; + /** Schedule for the worker tick (queue drain). Same parser as `schedule`. */ + worker?: object; + }; }; } diff --git a/plugins/rw-common/package.json b/plugins/rw-common/package.json index cf30b90..35d5e60 100644 --- a/plugins/rw-common/package.json +++ b/plugins/rw-common/package.json @@ -47,6 +47,7 @@ "postpack": "backstage-cli package postpack" }, "dependencies": { + "@backstage/catalog-client": "^1.9.3", "@backstage/catalog-model": "^1.7.6", "@backstage/config": "^1.3.6", "@backstage/errors": "^1.2.7", diff --git a/plugins/rw-common/src/index.ts b/plugins/rw-common/src/index.ts index 7dcc9e8..b04b246 100644 --- a/plugins/rw-common/src/index.ts +++ b/plugins/rw-common/src/index.ts @@ -4,3 +4,4 @@ export type { ParsedAnnotation } from "./parseAnnotation"; export { readRwSiteConfig } from "./config"; export type { RwSiteConfig, S3Config, RwDiagramsConfig } from "./config"; export * from "./permissions"; +export { iterateAnnotatedEntities, RW_ANNOTATION } from "./iterateAnnotatedEntities"; diff --git a/plugins/rw-common/src/iterateAnnotatedEntities.test.ts b/plugins/rw-common/src/iterateAnnotatedEntities.test.ts new file mode 100644 index 0000000..ec2971f --- /dev/null +++ b/plugins/rw-common/src/iterateAnnotatedEntities.test.ts @@ -0,0 +1,30 @@ +import { iterateAnnotatedEntities, RW_ANNOTATION } from "./iterateAnnotatedEntities"; + +function entity(name: string) { + return { kind: "Component", metadata: { name, annotations: { [RW_ANNOTATION]: "." } } } as any; +} + +describe("iterateAnnotatedEntities", () => { + it("pages through the catalog via cursor", async () => { + const calls: { req: any; opts: any }[] = []; + const catalog = { + queryEntities: async (req: any, opts: any) => { + calls.push({ req, opts }); + if (!("cursor" in req)) { + return { items: [entity("a")], pageInfo: { nextCursor: "c1" }, totalItems: 2 }; + } + return { items: [entity("b")], pageInfo: {}, totalItems: 2 }; + }, + }; + const seen: string[] = []; + for await (const { entity: e } of iterateAnnotatedEntities(catalog as any, {} as any)) { + seen.push(e.metadata.name); + } + expect(seen).toEqual(["a", "b"]); + expect(calls[0].req.filter).toEqual({ + [`metadata.annotations.${RW_ANNOTATION}`]: expect.anything(), + }); + expect(calls[1].req).toEqual({ cursor: "c1" }); + expect(calls[0].opts).toEqual({ credentials: {} }); + }); +}); diff --git a/plugins/rw-common/src/iterateAnnotatedEntities.ts b/plugins/rw-common/src/iterateAnnotatedEntities.ts new file mode 100644 index 0000000..3354c64 --- /dev/null +++ b/plugins/rw-common/src/iterateAnnotatedEntities.ts @@ -0,0 +1,23 @@ +import { CATALOG_FILTER_EXISTS } from "@backstage/catalog-client"; +import type { Entity } from "@backstage/catalog-model"; +import type { BackstageCredentials } from "@backstage/backend-plugin-api"; +import type { CatalogService } from "@backstage/plugin-catalog-node"; + +export const RW_ANNOTATION = "rwdocs.org/ref"; + +export async function* iterateAnnotatedEntities( + catalog: Pick, + credentials: BackstageCredentials, +): AsyncGenerator<{ entity: Entity }> { + let cursor: string | undefined; + do { + const response = await catalog.queryEntities( + cursor + ? { cursor } + : { filter: { [`metadata.annotations.${RW_ANNOTATION}`]: CATALOG_FILTER_EXISTS } }, + { credentials }, + ); + for (const entity of response.items) yield { entity }; + cursor = response.pageInfo.nextCursor; + } while (cursor); +} diff --git a/plugins/search-backend-module-rw/package.json b/plugins/search-backend-module-rw/package.json index c8a6ec0..8136148 100644 --- a/plugins/search-backend-module-rw/package.json +++ b/plugins/search-backend-module-rw/package.json @@ -49,7 +49,7 @@ "@backstage/plugin-permission-common": "^0.9.6", "@backstage/plugin-search-common": "^1.2.16", "@rwdocs/backstage-plugin-rw-common": "workspace:^", - "@rwdocs/core": "^0.1.27" + "@rwdocs/core": "^0.1.28" }, "peerDependencies": { "@backstage/backend-plugin-api": "^1.0.0", diff --git a/plugins/search-backend-module-rw/src/collator/RwDocsCollatorFactory.ts b/plugins/search-backend-module-rw/src/collator/RwDocsCollatorFactory.ts index ebed71d..db193b5 100644 --- a/plugins/search-backend-module-rw/src/collator/RwDocsCollatorFactory.ts +++ b/plugins/search-backend-module-rw/src/collator/RwDocsCollatorFactory.ts @@ -5,18 +5,18 @@ import type { CatalogService } from "@backstage/plugin-catalog-node"; import type { DocumentCollatorFactory, IndexableDocument } from "@backstage/plugin-search-common"; import type { Permission } from "@backstage/plugin-permission-common"; import { catalogEntityReadPermission } from "@backstage/plugin-catalog-common/alpha"; -import { CATALOG_FILTER_EXISTS } from "@backstage/catalog-client"; import { stringifyEntityRef } from "@backstage/catalog-model"; import type { Entity } from "@backstage/catalog-model"; import { createSite, type RwSite, type NavItemResponse } from "@rwdocs/core"; import { + iterateAnnotatedEntities, + RW_ANNOTATION, parseAnnotation, toEntityPath, readRwSiteConfig, type RwSiteConfig, } from "@rwdocs/backstage-plugin-rw-common"; -const RW_ANNOTATION = "rwdocs.org/ref"; const DEFAULT_LOCATION_TEMPLATE = "/catalog/:namespace/:kind/:name/docs/:path"; function applyLocationTemplate( @@ -92,34 +92,18 @@ export class RwDocsCollatorFactory implements DocumentCollatorFactory { : undefined; let docCount = 0; - let cursor: string | undefined; - - do { - const response = await this.catalog.queryEntities( - cursor - ? { cursor } - : { - filter: { - [`metadata.annotations.${RW_ANNOTATION}`]: CATALOG_FILTER_EXISTS, - }, - }, - { credentials }, - ); - - for (const entity of response.items) { - try { - for await (const doc of this.indexEntity(entity, localEntityPath)) { - docCount++; - yield doc; - } - } catch (err) { - const ref = stringifyEntityRef(entity); - this.logger.warn(`Failed to index entity ${ref}: ${err}`); + + for await (const { entity } of iterateAnnotatedEntities(this.catalog, credentials)) { + try { + for await (const doc of this.indexEntity(entity, localEntityPath)) { + docCount++; + yield doc; } + } catch (err) { + const ref = stringifyEntityRef(entity); + this.logger.warn(`Failed to index entity ${ref}: ${err}`); } - - cursor = response.pageInfo.nextCursor; - } while (cursor); + } this.logger.info(`RW docs indexing complete: ${docCount} documents indexed`); } diff --git a/yarn.lock b/yarn.lock index 66cea2c..c05b6f8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7160,7 +7160,7 @@ __metadata: "@backstage/plugin-permission-node": "npm:^0.11.1" "@backstage/types": "npm:^1.2.2" "@rwdocs/backstage-plugin-rw-common": "workspace:^" - "@rwdocs/core": "npm:^0.1.27" + "@rwdocs/core": "npm:^0.1.28" "@types/express": "npm:^4.17.0" "@types/jest": "npm:^30.0.0" "@types/supertest": "npm:^7.2.0" @@ -7168,6 +7168,7 @@ __metadata: express-promise-router: "npm:^4.1.0" jest: "npm:^30.2.0" luxon: "npm:^3.7.2" + p-limit: "npm:^3.1.0" prettier: "npm:^3.4.2" supertest: "npm:^7.2.2" typescript: "npm:^5.7.0" @@ -7182,6 +7183,7 @@ __metadata: version: 0.0.0-use.local resolution: "@rwdocs/backstage-plugin-rw-common@workspace:plugins/rw-common" dependencies: + "@backstage/catalog-client": "npm:^1.9.3" "@backstage/catalog-model": "npm:^1.7.6" "@backstage/cli": "npm:^0.36.0" "@backstage/config": "npm:^1.3.6" @@ -7251,7 +7253,7 @@ __metadata: "@backstage/plugin-search-backend-node": "npm:^1.3.10" "@backstage/plugin-search-common": "npm:^1.2.16" "@rwdocs/backstage-plugin-rw-common": "workspace:^" - "@rwdocs/core": "npm:^0.1.27" + "@rwdocs/core": "npm:^0.1.28" "@types/jest": "npm:^30.0.0" jest: "npm:^30.2.0" prettier: "npm:^3.4.2" @@ -7263,34 +7265,34 @@ __metadata: languageName: unknown linkType: soft -"@rwdocs/core-darwin-arm64@npm:0.1.27": - version: 0.1.27 - resolution: "@rwdocs/core-darwin-arm64@npm:0.1.27" +"@rwdocs/core-darwin-arm64@npm:0.1.28": + version: 0.1.28 + resolution: "@rwdocs/core-darwin-arm64@npm:0.1.28" conditions: os=darwin & cpu=arm64 languageName: node linkType: hard -"@rwdocs/core-linux-x64-gnu@npm:0.1.27": - version: 0.1.27 - resolution: "@rwdocs/core-linux-x64-gnu@npm:0.1.27" +"@rwdocs/core-linux-x64-gnu@npm:0.1.28": + version: 0.1.28 + resolution: "@rwdocs/core-linux-x64-gnu@npm:0.1.28" conditions: os=linux & cpu=x64 & libc=glibc languageName: node linkType: hard -"@rwdocs/core-linux-x64-musl@npm:0.1.27": - version: 0.1.27 - resolution: "@rwdocs/core-linux-x64-musl@npm:0.1.27" +"@rwdocs/core-linux-x64-musl@npm:0.1.28": + version: 0.1.28 + resolution: "@rwdocs/core-linux-x64-musl@npm:0.1.28" conditions: os=linux & cpu=x64 & libc=musl languageName: node linkType: hard -"@rwdocs/core@npm:^0.1.27": - version: 0.1.27 - resolution: "@rwdocs/core@npm:0.1.27" +"@rwdocs/core@npm:^0.1.28": + version: 0.1.28 + resolution: "@rwdocs/core@npm:0.1.28" dependencies: - "@rwdocs/core-darwin-arm64": "npm:0.1.27" - "@rwdocs/core-linux-x64-gnu": "npm:0.1.27" - "@rwdocs/core-linux-x64-musl": "npm:0.1.27" + "@rwdocs/core-darwin-arm64": "npm:0.1.28" + "@rwdocs/core-linux-x64-gnu": "npm:0.1.28" + "@rwdocs/core-linux-x64-musl": "npm:0.1.28" dependenciesMeta: "@rwdocs/core-darwin-arm64": optional: true @@ -7298,7 +7300,7 @@ __metadata: optional: true "@rwdocs/core-linux-x64-musl": optional: true - checksum: 10c0/9ba097c776841bd80be3a8056fdcf08ecdcf01570ac21009872c83057a28b4bbacc83cb30df28ecc7f9c34971fba0cb5c4a90da890efb403a94c1be553c1d73e + checksum: 10c0/c4fc52bdc6ca6554c9eadb218a8365df0098f2bdd63903a3ce2f92b4491bfd6fad76b34309bfda8db044f6a47601249700b1f07652c576d18edb5bd87e8043fb languageName: node linkType: hard