From b8b3705c7833d77b0ab6aa7397fb38a88caa1f6a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 23 May 2026 13:48:48 +0000 Subject: [PATCH 1/5] feat(infra): add queryBatched with structural filter keying Agent-Logs-Url: https://github.com/effect-app/libs/sessions/63b21bcb-5670-44d4-ae74-85cf6f7aebf0 Co-authored-by: patroza <42661+patroza@users.noreply.github.com> --- .changeset/cuddly-lions-wait.md | 5 + .../src/Model/Repository/internal/internal.ts | 239 +++++++++++++++--- .../infra/src/Model/Repository/service.ts | 2 + packages/infra/test/query.test.ts | 40 +++ 4 files changed, 256 insertions(+), 30 deletions(-) create mode 100644 .changeset/cuddly-lions-wait.md diff --git a/.changeset/cuddly-lions-wait.md b/.changeset/cuddly-lions-wait.md new file mode 100644 index 000000000..ae2cb1626 --- /dev/null +++ b/.changeset/cuddly-lions-wait.md @@ -0,0 +1,5 @@ +--- +"@effect-app/infra": patch +--- + +Add `Repository.queryBatched` with projection-aware batching and structural filter-key comparison. diff --git a/packages/infra/src/Model/Repository/internal/internal.ts b/packages/infra/src/Model/Repository/internal/internal.ts index 6ba7a9345..77895eb62 100644 --- a/packages/infra/src/Model/Repository/internal/internal.ts +++ b/packages/infra/src/Model/Repository/internal/internal.ts @@ -15,6 +15,8 @@ import * as Equivalence from "effect/Equivalence" import { flow, pipe } from "effect/Function" import * as Pipeable from "effect/Pipeable" import * as PubSub from "effect/PubSub" +import * as Request from "effect/Request" +import * as RequestResolver from "effect/RequestResolver" import * as Result from "effect/Result" import * as SchemaAST from "effect/SchemaAST" import * as Unify from "effect/Unify" @@ -322,6 +324,169 @@ export function makeRepoInternal< ) ) + type SelectItem = NonNullable["select"]>[number] + type PlainSelectItem = keyof Encoded | { key: string; subKeys: readonly string[] } + interface QueryBatchRequest extends Request.Request { + readonly _tag: "RepositoryQueryBatch" + readonly key: string + readonly baseArgs: Omit, "select"> + readonly fixedSelect: readonly SelectItem[] + readonly plainSelect: readonly PlainSelectItem[] | undefined + readonly resolve: (rows: readonly PM[]) => Effect.Effect + } + const QueryBatchRequest = Request.tagged("RepositoryQueryBatch") + + const splitSelect = (select: FilterArgs["select"]) => { + const plain: PlainSelectItem[] = [] + const fixed: SelectItem[] = [] + if (select) { + for (const item of select) { + if (typeof item === "string" || (typeof item === "object" && item !== null && "subKeys" in item)) { + plain.push(item) + } else { + fixed.push(item) + } + } + } + return { + plain: Array.isArrayNonEmpty(plain) ? plain : undefined, + fixed + } + } + + const mergePlainSelect = ( + plainSelects: readonly (readonly PlainSelectItem[] | undefined)[] + ): readonly PlainSelectItem[] | undefined => { + if (plainSelects.some((_) => _ === undefined)) { + return undefined + } + const all = plainSelects.flatMap((_) => _ ?? []) + if (!Array.isArrayNonEmpty(all)) { + return undefined + } + const keys = new Map() + const subKeys = new Map>() + for (const item of all) { + if (typeof item === "string") { + keys.set(item, item) + subKeys.delete(item) + } else if (!keys.has(item.key) || typeof keys.get(item.key) !== "string") { + const set = subKeys.get(item.key) ?? new Set() + for (const subKey of item.subKeys) { + set.add(subKey) + } + subKeys.set(item.key, set) + keys.set(item.key, { key: item.key, subKeys: [...set] }) + } + } + return [...keys.values()] + } + + const makeBatchKey = (options: { + readonly baseArgs: Omit, "select"> + readonly mode: "collect" | "project" | "transform" | "aggregate" | undefined + readonly ttype: "one" | "many" | "count" | undefined + readonly fixedSelect: readonly SelectItem[] + }) => { + const canonicalize = (value: unknown): unknown => { + if (globalThis.Array.isArray(value)) { + return value.map(canonicalize) + } + if (typeof value === "object" && value !== null) { + const record = value as Record + return Object.fromEntries( + Object + .entries(record) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([key, val]) => [key, canonicalize(val)]) + ) + } + return value + } + return JSON.stringify(canonicalize({ + mode: options.mode, + ttype: options.ttype, + filter: options.baseArgs.filter, + order: options.baseArgs.order, + limit: options.baseArgs.limit, + skip: options.baseArgs.skip, + fixedSelect: options.fixedSelect + })) + } + + const queryBatchResolver = RequestResolver.makeGrouped({ + key: ({ request }) => request.key, + resolver: Effect.fnUntraced(function*(entries) { + const first = entries[0]!.request + const mergedPlainSelect = mergePlainSelect(entries.map((_) => _.request.plainSelect)) + const mergedSelect = mergedPlainSelect + ? [...first.fixedSelect, ...mergedPlainSelect] + : undefined + const rows = yield* filter({ + ...first.baseArgs, + select: mergedSelect + }) + for (const entry of entries) { + const exit = yield* Effect.result(entry.request.resolve(rows)) + entry.completeUnsafe(exit) + } + }) + }) + + const runQueryFromRows = ( + a: ReturnType>, + rows: readonly PM[] + ) => { + const eff = a.mode === "project" + ? flow( + S.decodeEffectConcurrently(S.Array(a.schema ?? schema)), + provideRctx, + Effect.withSpan("parseMany", { + attributes: { "app.entity": name, "app.query.mode": "project" } + }) + )(rows as readonly Encoded[]) + : a.mode === "collect" + ? flow( + S.decodeEffectConcurrently(S.Array(a.schema)), + Effect.map(Array.getSomes), + provideRctx, + Effect.withSpan("parseMany", { + attributes: { "app.entity": name, "app.query.mode": "collect" } + }) + )(rows as readonly Encoded[]) + : Unify.unify( + a.schema + // TODO: partial may not match? + ? parseMany2(rows, a.schema) + : parseMany(rows) + ) + return pipe( + a.ttype === "one" + ? Effect.flatMap( + eff, + flow( + Array.head, + Option.match({ + onNone: () => Effect.fail(new NotFoundError({ id: "query", /* TODO */ type: name })), + onSome: Effect.succeed + }) + ) + ) + : a.ttype === "count" + ? Effect + .map(eff, (_) => NonNegativeInt(_.length)) + .pipe(Effect.catchTag("SchemaError", (e) => Effect.die(e))) + : eff, + Effect.tap((r) => + Effect.annotateCurrentSpan({ + "app.query.ttype": a.ttype, + "app.query.mode": a.mode, + "db.response.returned_rows": Array.isArray(r) ? r.length : 1 + }) + ) + ) + } + // TODO: For raw we should use S.from, and drop the R... const query: { ( @@ -383,38 +548,10 @@ export function makeRepoInternal< ) : Effect.flatMap( filter(a), - (_) => - Unify.unify( - a.schema - // TODO: partial may not match? - ? parseMany2(_ as any, a.schema as any) - : parseMany(_ as any) - ) + (_) => runQueryFromRows(a, _) ) return pipe( - a.ttype === "one" - ? Effect.flatMap( - eff, - flow( - Array.head, - Option.match({ - onNone: () => Effect.fail(new NotFoundError({ id: "query", /* TODO */ type: name })), - onSome: Effect.succeed - }) - ) - ) - : a.ttype === "count" - ? Effect - .map(eff, (_) => NonNegativeInt(_.length)) - .pipe(Effect.catchTag("SchemaError", (e) => Effect.die(e))) - : eff, - Effect.tap((r) => - Effect.annotateCurrentSpan({ - "app.query.ttype": a.ttype, - "app.query.mode": a.mode, - "db.response.returned_rows": Array.isArray(r) ? r.length : 1 - }) - ), + eff, Effect.withSpan("Repository.query", { kind: "client", attributes: { "app.entity": name } @@ -422,6 +559,44 @@ export function makeRepoInternal< ) }) as any + const queryBatched: typeof query = (( + q: Q.QAll + ) => { + const a = Q.toFilter(q, schema) + if (a.mode === "aggregate") { + return query(q) + } + const { plain, fixed } = splitSelect(a.select) + const baseArgs: Omit, "select"> = { + t: a.t, + filter: a.filter, + order: a.order, + limit: a.limit, + skip: a.skip + } + const key = makeBatchKey({ + baseArgs, + mode: a.mode, + ttype: a.ttype, + fixedSelect: fixed + }) + return Effect.request( + QueryBatchRequest({ + key, + baseArgs, + fixedSelect: fixed, + plainSelect: plain, + resolve: (rows) => runQueryFromRows(a, rows) + }), + queryBatchResolver + ).pipe( + Effect.withSpan("Repository.queryBatched", { + kind: "client", + attributes: { "app.entity": name } + }, { captureStackTrace: false }) + ) + }) as typeof query + const validateSample = Effect.fn("Repository.validateSample", { attributes: { "app.entity": name } })( function*(options?: { percentage?: number @@ -517,6 +692,10 @@ export function makeRepoInternal< // eslint-disable-next-line prefer-rest-params return query(typeof q === "function" ? Pipeable.pipeArguments(Q.make(), arguments) : q) as any }, + queryBatched(q: any) { + // eslint-disable-next-line prefer-rest-params + return queryBatched(typeof q === "function" ? Pipeable.pipeArguments(Q.make(), arguments) : q) as any + }, /** * @internal */ diff --git a/packages/infra/src/Model/Repository/service.ts b/packages/infra/src/Model/Repository/service.ts index 96c81d2d4..8511e5972 100644 --- a/packages/infra/src/Model/Repository/service.ts +++ b/packages/infra/src/Model/Repository/service.ts @@ -546,6 +546,8 @@ export interface Repository< > } + readonly queryBatched: this["query"] + /** @deprecated use query */ readonly mapped: Mapped diff --git a/packages/infra/test/query.test.ts b/packages/infra/test/query.test.ts index 5b7c8108d..3e339b834 100644 --- a/packages/infra/test/query.test.ts +++ b/packages/infra/test/query.test.ts @@ -1959,3 +1959,43 @@ it("memFilter: aggregate with nested path grouping", () => { expect(result.find((r: any) => r.city === "NYC")!.count).toBe(2) expect(result.find((r: any) => r.city === "LA")!.count).toBe(1) }) + +class CartLine extends S.Class("CartLine")({ + id: S.String, + cartId: S.String, + sku: S.String, + qty: S.Number +}) {} + +it("queryBatched supports structural in-filter arrays across different projections", () => + Effect + .gen(function*() { + const repo = yield* makeRepo("CartLine", CartLine, {}) + yield* repo.saveAndPublish([ + new CartLine({ id: "1", cartId: "cart-1", sku: "a", qty: 1 }), + new CartLine({ id: "2", cartId: "cart-1", sku: "b", qty: 2 }), + new CartLine({ id: "3", cartId: "cart-2", sku: "c", qty: 3 }), + new CartLine({ id: "4", cartId: "cart-3", sku: "d", qty: 4 }) + ]) + + const cartIds = ["cart-1", "cart-2"] as const + const [identityProjection, quantityProjection] = yield* Effect.all([ + repo.queryBatched( + where("cartId", "in", cartIds), + project(S.Struct({ id: S.String, cartId: S.String })) + ), + repo.queryBatched( + where("cartId", "in", cartIds), + project(S.Struct({ id: S.String, qty: S.Number })) + ) + ]) + + expect(identityProjection).toHaveLength(3) + expect(quantityProjection).toHaveLength(3) + expect(identityProjection.map((_) => _.id).toSorted()).toEqual(["1", "2", "3"]) + expect(quantityProjection.map((_) => _.qty).toSorted()).toEqual([1, 2, 3]) + }) + .pipe( + setupRequestContextFromCurrent(), + Effect.provide(TestStoreLive) + )) From d7d9244226f97c01ab6fc8a01abb7b11f03d3666 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 23 May 2026 13:50:50 +0000 Subject: [PATCH 2/5] fix(infra): finalize queryBatched typing and structural keying Agent-Logs-Url: https://github.com/effect-app/libs/sessions/63b21bcb-5670-44d4-ae74-85cf6f7aebf0 Co-authored-by: patroza <42661+patroza@users.noreply.github.com> --- .../src/Model/Repository/internal/internal.ts | 49 ++++++++++--------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/packages/infra/src/Model/Repository/internal/internal.ts b/packages/infra/src/Model/Repository/internal/internal.ts index 77895eb62..6efc27081 100644 --- a/packages/infra/src/Model/Repository/internal/internal.ts +++ b/packages/infra/src/Model/Repository/internal/internal.ts @@ -324,8 +324,8 @@ export function makeRepoInternal< ) ) - type SelectItem = NonNullable["select"]>[number] - type PlainSelectItem = keyof Encoded | { key: string; subKeys: readonly string[] } + type SelectItem = NonNullable["select"]>[number] + type PlainSelectItem = string | { key: string; subKeys: readonly string[] } interface QueryBatchRequest extends Request.Request { readonly _tag: "RepositoryQueryBatch" readonly key: string @@ -336,7 +336,7 @@ export function makeRepoInternal< } const QueryBatchRequest = Request.tagged("RepositoryQueryBatch") - const splitSelect = (select: FilterArgs["select"]) => { + const splitSelect = (select: FilterArgs["select"]) => { const plain: PlainSelectItem[] = [] const fixed: SelectItem[] = [] if (select) { @@ -416,18 +416,19 @@ export function makeRepoInternal< const queryBatchResolver = RequestResolver.makeGrouped({ key: ({ request }) => request.key, - resolver: Effect.fnUntraced(function*(entries) { - const first = entries[0]!.request + resolver: Effect.fnUntraced(function*(entries, _key) { + const first = entries[0].request const mergedPlainSelect = mergePlainSelect(entries.map((_) => _.request.plainSelect)) const mergedSelect = mergedPlainSelect ? [...first.fixedSelect, ...mergedPlainSelect] - : undefined + : first.fixedSelect + const mutableSelect = [...mergedSelect] const rows = yield* filter({ ...first.baseArgs, - select: mergedSelect + select: Array.isArrayNonEmpty(mutableSelect) ? mutableSelect : undefined }) for (const entry of entries) { - const exit = yield* Effect.result(entry.request.resolve(rows)) + const exit = yield* Effect.exit(entry.request.resolve(rows)) entry.completeUnsafe(exit) } }) @@ -580,21 +581,23 @@ export function makeRepoInternal< ttype: a.ttype, fixedSelect: fixed }) - return Effect.request( - QueryBatchRequest({ - key, - baseArgs, - fixedSelect: fixed, - plainSelect: plain, - resolve: (rows) => runQueryFromRows(a, rows) - }), - queryBatchResolver - ).pipe( - Effect.withSpan("Repository.queryBatched", { - kind: "client", - attributes: { "app.entity": name } - }, { captureStackTrace: false }) - ) + return Effect + .request( + QueryBatchRequest({ + key, + baseArgs, + fixedSelect: fixed, + plainSelect: plain, + resolve: (rows) => runQueryFromRows(a, rows) as Effect.Effect + }), + queryBatchResolver + ) + .pipe( + Effect.withSpan("Repository.queryBatched", { + kind: "client", + attributes: { "app.entity": name } + }, { captureStackTrace: false }) + ) }) as typeof query const validateSample = Effect.fn("Repository.validateSample", { attributes: { "app.entity": name } })( From a5a3d4199d8eb9b145f7278e41d50aa006ae5830 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 23 May 2026 13:52:11 +0000 Subject: [PATCH 3/5] refactor(infra): address review comments in query batching Agent-Logs-Url: https://github.com/effect-app/libs/sessions/63b21bcb-5670-44d4-ae74-85cf6f7aebf0 Co-authored-by: patroza <42661+patroza@users.noreply.github.com> --- packages/infra/src/Model/Repository/internal/internal.ts | 8 ++++---- packages/infra/test/query.test.ts | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/infra/src/Model/Repository/internal/internal.ts b/packages/infra/src/Model/Repository/internal/internal.ts index 6efc27081..78c858f27 100644 --- a/packages/infra/src/Model/Repository/internal/internal.ts +++ b/packages/infra/src/Model/Repository/internal/internal.ts @@ -416,20 +416,20 @@ export function makeRepoInternal< const queryBatchResolver = RequestResolver.makeGrouped({ key: ({ request }) => request.key, - resolver: Effect.fnUntraced(function*(entries, _key) { + resolver: Effect.fnUntraced(function*(entries) { const first = entries[0].request const mergedPlainSelect = mergePlainSelect(entries.map((_) => _.request.plainSelect)) const mergedSelect = mergedPlainSelect ? [...first.fixedSelect, ...mergedPlainSelect] : first.fixedSelect - const mutableSelect = [...mergedSelect] + const select = [...mergedSelect] const rows = yield* filter({ ...first.baseArgs, - select: Array.isArrayNonEmpty(mutableSelect) ? mutableSelect : undefined + select: Array.isArrayNonEmpty(select) ? select : undefined }) for (const entry of entries) { const exit = yield* Effect.exit(entry.request.resolve(rows)) - entry.completeUnsafe(exit) + yield* Request.complete(exit)(entry) } }) }) diff --git a/packages/infra/test/query.test.ts b/packages/infra/test/query.test.ts index 3e339b834..20a506247 100644 --- a/packages/infra/test/query.test.ts +++ b/packages/infra/test/query.test.ts @@ -1978,7 +1978,7 @@ it("queryBatched supports structural in-filter arrays across different projectio new CartLine({ id: "4", cartId: "cart-3", sku: "d", qty: 4 }) ]) - const cartIds = ["cart-1", "cart-2"] as const + const cartIds = ["cart-1", "cart-2"] const [identityProjection, quantityProjection] = yield* Effect.all([ repo.queryBatched( where("cartId", "in", cartIds), From 82b403f1c6c3b8302ac82a3b27b1f01246e2c5ad Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 23 May 2026 13:53:08 +0000 Subject: [PATCH 4/5] chore(infra): finalize structural keying cleanup Agent-Logs-Url: https://github.com/effect-app/libs/sessions/63b21bcb-5670-44d4-ae74-85cf6f7aebf0 Co-authored-by: patroza <42661+patroza@users.noreply.github.com> --- packages/infra/src/Model/Repository/internal/internal.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/infra/src/Model/Repository/internal/internal.ts b/packages/infra/src/Model/Repository/internal/internal.ts index 78c858f27..d3635390e 100644 --- a/packages/infra/src/Model/Repository/internal/internal.ts +++ b/packages/infra/src/Model/Repository/internal/internal.ts @@ -389,7 +389,7 @@ export function makeRepoInternal< readonly fixedSelect: readonly SelectItem[] }) => { const canonicalize = (value: unknown): unknown => { - if (globalThis.Array.isArray(value)) { + if (Array.isArray(value)) { return value.map(canonicalize) } if (typeof value === "object" && value !== null) { @@ -468,7 +468,7 @@ export function makeRepoInternal< flow( Array.head, Option.match({ - onNone: () => Effect.fail(new NotFoundError({ id: "query", /* TODO */ type: name })), + onNone: () => Effect.fail(new NotFoundError({ id: "query", type: name })), onSome: Effect.succeed }) ) From 20cce98d3c8d382032415289078b9d75a3f6f4f9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 23 May 2026 13:54:18 +0000 Subject: [PATCH 5/5] test(infra): strengthen queryBatched assertions and key canonicalization Agent-Logs-Url: https://github.com/effect-app/libs/sessions/63b21bcb-5670-44d4-ae74-85cf6f7aebf0 Co-authored-by: patroza <42661+patroza@users.noreply.github.com> --- .../src/Model/Repository/internal/internal.ts | 36 +++++++++---------- packages/infra/test/query.test.ts | 2 ++ 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/packages/infra/src/Model/Repository/internal/internal.ts b/packages/infra/src/Model/Repository/internal/internal.ts index d3635390e..9c3976340 100644 --- a/packages/infra/src/Model/Repository/internal/internal.ts +++ b/packages/infra/src/Model/Repository/internal/internal.ts @@ -382,28 +382,29 @@ export function makeRepoInternal< return [...keys.values()] } + const canonicalizeBatchKey = (value: unknown): unknown => { + if (Array.isArray(value)) { + return value.map(canonicalizeBatchKey) + } + if (typeof value === "object" && value !== null) { + const record = value as Record + return Object.fromEntries( + Object + .entries(record) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([key, val]) => [key, canonicalizeBatchKey(val)]) + ) + } + return value + } + const makeBatchKey = (options: { readonly baseArgs: Omit, "select"> readonly mode: "collect" | "project" | "transform" | "aggregate" | undefined readonly ttype: "one" | "many" | "count" | undefined readonly fixedSelect: readonly SelectItem[] - }) => { - const canonicalize = (value: unknown): unknown => { - if (Array.isArray(value)) { - return value.map(canonicalize) - } - if (typeof value === "object" && value !== null) { - const record = value as Record - return Object.fromEntries( - Object - .entries(record) - .sort(([a], [b]) => a.localeCompare(b)) - .map(([key, val]) => [key, canonicalize(val)]) - ) - } - return value - } - return JSON.stringify(canonicalize({ + }) => + JSON.stringify(canonicalizeBatchKey({ mode: options.mode, ttype: options.ttype, filter: options.baseArgs.filter, @@ -412,7 +413,6 @@ export function makeRepoInternal< skip: options.baseArgs.skip, fixedSelect: options.fixedSelect })) - } const queryBatchResolver = RequestResolver.makeGrouped({ key: ({ request }) => request.key, diff --git a/packages/infra/test/query.test.ts b/packages/infra/test/query.test.ts index 20a506247..259eacb9f 100644 --- a/packages/infra/test/query.test.ts +++ b/packages/infra/test/query.test.ts @@ -1993,7 +1993,9 @@ it("queryBatched supports structural in-filter arrays across different projectio expect(identityProjection).toHaveLength(3) expect(quantityProjection).toHaveLength(3) expect(identityProjection.map((_) => _.id).toSorted()).toEqual(["1", "2", "3"]) + expect(identityProjection.map((_) => _.cartId).toSorted()).toEqual(["cart-1", "cart-1", "cart-2"]) expect(quantityProjection.map((_) => _.qty).toSorted()).toEqual([1, 2, 3]) + expect(quantityProjection.map((_) => _.id).toSorted()).toEqual(["1", "2", "3"]) }) .pipe( setupRequestContextFromCurrent(),