Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cuddly-lions-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-app/infra": patch
---

Add `Repository.queryBatched` with projection-aware batching and structural filter-key comparison.
242 changes: 212 additions & 30 deletions packages/infra/src/Model/Repository/internal/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import * as Equivalence from "effect/Equivalence"
import { flow, pipe } from "effect/Function"
import * as Pipeable from "effect/Pipeable"
import * as PubSub from "effect/PubSub"
import * as Request from "effect/Request"
import * as RequestResolver from "effect/RequestResolver"
import * as Result from "effect/Result"
import * as SchemaAST from "effect/SchemaAST"
import * as Unify from "effect/Unify"
Expand Down Expand Up @@ -322,6 +324,170 @@ export function makeRepoInternal<
)
)

type SelectItem = NonNullable<FilterArgs<Encoded, keyof Encoded>["select"]>[number]
type PlainSelectItem = string | { key: string; subKeys: readonly string[] }
interface QueryBatchRequest extends Request.Request<unknown, unknown> {
readonly _tag: "RepositoryQueryBatch"
readonly key: string
readonly baseArgs: Omit<FilterArgs<Encoded>, "select">
readonly fixedSelect: readonly SelectItem[]
readonly plainSelect: readonly PlainSelectItem[] | undefined
readonly resolve: (rows: readonly PM[]) => Effect.Effect<unknown, unknown, never>
}
const QueryBatchRequest = Request.tagged<QueryBatchRequest>("RepositoryQueryBatch")

const splitSelect = (select: FilterArgs<Encoded, keyof Encoded>["select"]) => {
const plain: PlainSelectItem[] = []
const fixed: SelectItem[] = []
if (select) {
for (const item of select) {
if (typeof item === "string" || (typeof item === "object" && item !== null && "subKeys" in item)) {
plain.push(item)
} else {
fixed.push(item)
}
}
}
return {
plain: Array.isArrayNonEmpty(plain) ? plain : undefined,
fixed
}
}

const mergePlainSelect = (
plainSelects: readonly (readonly PlainSelectItem[] | undefined)[]
): readonly PlainSelectItem[] | undefined => {
if (plainSelects.some((_) => _ === undefined)) {
return undefined
}
const all = plainSelects.flatMap((_) => _ ?? [])
if (!Array.isArrayNonEmpty(all)) {
return undefined
}
const keys = new Map<string, PlainSelectItem>()
const subKeys = new Map<string, Set<string>>()
for (const item of all) {
if (typeof item === "string") {
keys.set(item, item)
subKeys.delete(item)
} else if (!keys.has(item.key) || typeof keys.get(item.key) !== "string") {
const set = subKeys.get(item.key) ?? new Set<string>()
for (const subKey of item.subKeys) {
set.add(subKey)
}
subKeys.set(item.key, set)
keys.set(item.key, { key: item.key, subKeys: [...set] })
}
}
return [...keys.values()]
}

const canonicalizeBatchKey = (value: unknown): unknown => {
if (Array.isArray(value)) {
return value.map(canonicalizeBatchKey)
}
if (typeof value === "object" && value !== null) {
const record = value as Record<string, unknown>
return Object.fromEntries(
Object
.entries(record)
.sort(([a], [b]) => a.localeCompare(b))
.map(([key, val]) => [key, canonicalizeBatchKey(val)])
)
}
return value
}

const makeBatchKey = (options: {
readonly baseArgs: Omit<FilterArgs<Encoded>, "select">
readonly mode: "collect" | "project" | "transform" | "aggregate" | undefined
readonly ttype: "one" | "many" | "count" | undefined
readonly fixedSelect: readonly SelectItem[]
}) =>
JSON.stringify(canonicalizeBatchKey({
mode: options.mode,
ttype: options.ttype,
filter: options.baseArgs.filter,
order: options.baseArgs.order,
limit: options.baseArgs.limit,
skip: options.baseArgs.skip,
fixedSelect: options.fixedSelect
}))

const queryBatchResolver = RequestResolver.makeGrouped<QueryBatchRequest, string>({
key: ({ request }) => request.key,
resolver: Effect.fnUntraced(function*(entries) {
const first = entries[0].request
const mergedPlainSelect = mergePlainSelect(entries.map((_) => _.request.plainSelect))
const mergedSelect = mergedPlainSelect
? [...first.fixedSelect, ...mergedPlainSelect]
: first.fixedSelect
const select = [...mergedSelect]
const rows = yield* filter({
...first.baseArgs,
select: Array.isArrayNonEmpty(select) ? select : undefined
})
for (const entry of entries) {
const exit = yield* Effect.exit(entry.request.resolve(rows))
yield* Request.complete(exit)(entry)
}
})
})

const runQueryFromRows = <A, R, EncodedRefined extends Encoded = Encoded>(
a: ReturnType<typeof Q.toFilter<Encoded, A, R, EncodedRefined>>,
rows: readonly PM[]
) => {
const eff = a.mode === "project"
? flow(
S.decodeEffectConcurrently(S.Array(a.schema ?? schema)),
provideRctx,
Effect.withSpan("parseMany", {
attributes: { "app.entity": name, "app.query.mode": "project" }
})
)(rows as readonly Encoded[])
: a.mode === "collect"
? flow(
S.decodeEffectConcurrently(S.Array(a.schema)),
Effect.map(Array.getSomes),
provideRctx,
Effect.withSpan("parseMany", {
attributes: { "app.entity": name, "app.query.mode": "collect" }
})
)(rows as readonly Encoded[])
: Unify.unify(
a.schema
// TODO: partial may not match?
? parseMany2(rows, a.schema)
: parseMany(rows)
)
return pipe(
a.ttype === "one"
? Effect.flatMap(
eff,
flow(
Array.head,
Option.match({
onNone: () => Effect.fail(new NotFoundError({ id: "query", type: name })),
onSome: Effect.succeed
})
)
)
: a.ttype === "count"
? Effect
.map(eff, (_) => NonNegativeInt(_.length))
.pipe(Effect.catchTag("SchemaError", (e) => Effect.die(e)))
: eff,
Effect.tap((r) =>
Effect.annotateCurrentSpan({
"app.query.ttype": a.ttype,
"app.query.mode": a.mode,
"db.response.returned_rows": Array.isArray(r) ? r.length : 1
})
)
)
}

// TODO: For raw we should use S.from, and drop the R...
const query: {
<A, R, From extends FieldValues>(
Expand Down Expand Up @@ -383,45 +549,57 @@ export function makeRepoInternal<
)
: Effect.flatMap(
filter(a),
(_) =>
Unify.unify(
a.schema
// TODO: partial may not match?
? parseMany2(_ as any, a.schema as any)
: parseMany(_ as any)
)
(_) => runQueryFromRows(a, _)
)
return pipe(
a.ttype === "one"
? Effect.flatMap(
eff,
flow(
Array.head,
Option.match({
onNone: () => Effect.fail(new NotFoundError({ id: "query", /* TODO */ type: name })),
onSome: Effect.succeed
})
)
)
: a.ttype === "count"
? Effect
.map(eff, (_) => NonNegativeInt(_.length))
.pipe(Effect.catchTag("SchemaError", (e) => Effect.die(e)))
: eff,
Effect.tap((r) =>
Effect.annotateCurrentSpan({
"app.query.ttype": a.ttype,
"app.query.mode": a.mode,
"db.response.returned_rows": Array.isArray(r) ? r.length : 1
})
),
eff,
Effect.withSpan("Repository.query", {
kind: "client",
attributes: { "app.entity": name }
}, { captureStackTrace: false })
)
}) as any

const queryBatched: typeof query = (<A, R, EncodedRefined extends Encoded = Encoded>(
q: Q.QAll<Encoded, EncodedRefined, A, R>
) => {
const a = Q.toFilter(q, schema)
if (a.mode === "aggregate") {
return query(q)
}
const { plain, fixed } = splitSelect(a.select)
const baseArgs: Omit<FilterArgs<Encoded>, "select"> = {
t: a.t,
filter: a.filter,
order: a.order,
limit: a.limit,
skip: a.skip
}
const key = makeBatchKey({
baseArgs,
mode: a.mode,
ttype: a.ttype,
fixedSelect: fixed
})
return Effect
.request(
QueryBatchRequest({
key,
baseArgs,
fixedSelect: fixed,
plainSelect: plain,
resolve: (rows) => runQueryFromRows(a, rows) as Effect.Effect<unknown, unknown, never>
}),
queryBatchResolver
)
.pipe(
Effect.withSpan("Repository.queryBatched", {
kind: "client",
attributes: { "app.entity": name }
}, { captureStackTrace: false })
)
}) as typeof query

const validateSample = Effect.fn("Repository.validateSample", { attributes: { "app.entity": name } })(
function*(options?: {
percentage?: number
Expand Down Expand Up @@ -517,6 +695,10 @@ export function makeRepoInternal<
// eslint-disable-next-line prefer-rest-params
return query(typeof q === "function" ? Pipeable.pipeArguments(Q.make(), arguments) : q) as any
},
queryBatched(q: any) {
// eslint-disable-next-line prefer-rest-params
return queryBatched(typeof q === "function" ? Pipeable.pipeArguments(Q.make(), arguments) : q) as any
},
/**
* @internal
*/
Expand Down
2 changes: 2 additions & 0 deletions packages/infra/src/Model/Repository/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ export interface Repository<
>
}

readonly queryBatched: this["query"]

/** @deprecated use query */
readonly mapped: Mapped<Encoded>

Expand Down
42 changes: 42 additions & 0 deletions packages/infra/test/query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1959,3 +1959,45 @@ it("memFilter: aggregate with nested path grouping", () => {
expect(result.find((r: any) => r.city === "NYC")!.count).toBe(2)
expect(result.find((r: any) => r.city === "LA")!.count).toBe(1)
})

class CartLine extends S.Class<CartLine>("CartLine")({
id: S.String,
cartId: S.String,
sku: S.String,
qty: S.Number
}) {}

it("queryBatched supports structural in-filter arrays across different projections", () =>
Effect
.gen(function*() {
const repo = yield* makeRepo("CartLine", CartLine, {})
yield* repo.saveAndPublish([
new CartLine({ id: "1", cartId: "cart-1", sku: "a", qty: 1 }),
new CartLine({ id: "2", cartId: "cart-1", sku: "b", qty: 2 }),
new CartLine({ id: "3", cartId: "cart-2", sku: "c", qty: 3 }),
new CartLine({ id: "4", cartId: "cart-3", sku: "d", qty: 4 })
])

const cartIds = ["cart-1", "cart-2"]
const [identityProjection, quantityProjection] = yield* Effect.all([
repo.queryBatched(
where("cartId", "in", cartIds),
project(S.Struct({ id: S.String, cartId: S.String }))
),
repo.queryBatched(
where("cartId", "in", cartIds),
project(S.Struct({ id: S.String, qty: S.Number }))
)
])

expect(identityProjection).toHaveLength(3)
expect(quantityProjection).toHaveLength(3)
expect(identityProjection.map((_) => _.id).toSorted()).toEqual(["1", "2", "3"])
expect(identityProjection.map((_) => _.cartId).toSorted()).toEqual(["cart-1", "cart-1", "cart-2"])
expect(quantityProjection.map((_) => _.qty).toSorted()).toEqual([1, 2, 3])
expect(quantityProjection.map((_) => _.id).toSorted()).toEqual(["1", "2", "3"])
})
.pipe(
setupRequestContextFromCurrent(),
Effect.provide(TestStoreLive)
))
Loading