From 5487c5a1dee67a2e77cf6a6213e7532dcc5e47ea Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 24 May 2026 06:22:50 +0000 Subject: [PATCH 1/7] test(infra): add rpc leak repro with post-user resolver cache Agent-Logs-Url: https://github.com/effect-app/libs/sessions/dc9105b2-25f5-44ac-b906-d4a24e5ec664 Co-authored-by: patroza <42661+patroza@users.noreply.github.com> --- .../test/rpc-context-map-streaming.test.ts | 159 +++++++++++++++++- 1 file changed, 155 insertions(+), 4 deletions(-) diff --git a/packages/infra/test/rpc-context-map-streaming.test.ts b/packages/infra/test/rpc-context-map-streaming.test.ts index 50b5af561..4c2f851d7 100644 --- a/packages/infra/test/rpc-context-map-streaming.test.ts +++ b/packages/infra/test/rpc-context-map-streaming.test.ts @@ -38,13 +38,16 @@ import { NodeHttpServer } from "@effect/platform-node" import { expect, it } from "@effect/vitest" import { ApiClientFactory, makeRpcClient } from "effect-app/client" -import { HttpRouter, HttpServer } from "effect-app/http" +import { HttpMiddleware, HttpRouter, HttpServer } from "effect-app/http" import { DefaultGenericMiddlewares } from "effect-app/middleware" import { MiddlewareMaker } from "effect-app/rpc" import * as S from "effect-app/Schema" import * as Effect from "effect/Effect" +import * as Exit from "effect/Exit" import * as Layer from "effect/Layer" import * as Option from "effect/Option" +import * as Request from "effect/Request" +import * as RequestResolver from "effect/RequestResolver" import * as Stream from "effect/Stream" import { FetchHttpClient } from "effect/unstable/http" import { RpcSerialization } from "effect/unstable/rpc" @@ -52,7 +55,12 @@ import { createServer } from "http" import { RequestContextMiddleware } from "../src/api/internal/RequestContextMiddleware.js" import { makeRouter } from "../src/api/routing.js" import { DefaultGenericMiddlewaresLive } from "../src/api/routing/middleware.js" -import { getContextMap } from "../src/Store/ContextMapContainer.js" +import { makeRepo } from "../src/Model/Repository.js" +import { RepositoryRegistryLive } from "../src/Model/Repository/Registry.js" +import { LocaleRef } from "../src/RequestContext.js" +import { ContextMapContainer, getContextMap, withRequestResolverCache } from "../src/Store/ContextMapContainer.js" +import { MemoryStoreLive, storeId } from "../src/Store/Memory.js" +import { makeContextMap } from "../src/Store/service.js" import { AllowAnonymous, AllowAnonymousLive, RequestContextMap, RequireRoles, RequireRolesLive, SomeElseMiddleware, SomeElseMiddlewareLive, SomeService, Test, TestLive } from "./fixtures.js" // --------------------------------------------------------------------------- @@ -111,7 +119,81 @@ class ReadEtagOnce extends Req.Query()("ReadEtagOnce", {}, { success: S.String }) {} -const Rsc = { StreamEtag, StreamWithEtag, ReadEtagOnce } +class LeakUser extends S.Class("LeakUser")({ + id: S.String, + name: S.String +}) {} + +class LeakLike extends S.Class("LeakLike")({ + likeUserId: S.String +}) {} + +class LeakPost extends S.Class("LeakPost")({ + id: S.String, + authorUserId: S.String, + publisherUserId: S.String, + likes: S.Array(LeakLike) +}) {} + +class LeakProbePosts extends Req.Query()("LeakProbePosts", {}, { + allowAnonymous: true, + success: S.Number +}) {} + +const LEAK_USER_COUNT = 100 +const LEAK_REQUEST_COUNT = 100 +const LEAK_LIKES_PER_POST = 10 + +const leakUsers = Array.from({ length: LEAK_USER_COUNT }, (_, i) => + new LeakUser({ + id: `u-${i}`, + name: `User ${i}` + }) +) +const leakPosts = Array.from({ length: LEAK_USER_COUNT }, (_, i) => + new LeakPost({ + id: `p-${i}`, + authorUserId: `u-${i}`, + publisherUserId: `u-${(i + 1) % LEAK_USER_COUNT}`, + likes: Array.from({ length: LEAK_LIKES_PER_POST }, (_, j) => + new LeakLike({ + likeUserId: `u-${(i + j) % LEAK_USER_COUNT}` + })) + }) +) +const leakUsersById = new Map(leakUsers.map((_) => [_.id, _] as const)) +const leakStats = { + resolverBatches: 0, + resolverRequestedUsers: 0 +} + +interface GetLeakUserRequest extends Request.Request { + readonly _tag: "GetLeakUser" + readonly userId: string +} + +const GetLeakUser = Request.tagged("GetLeakUser") + +const leakUserResolver = RequestResolver + .make((entries) => { + leakStats.resolverBatches += 1 + leakStats.resolverRequestedUsers += entries.length + return Effect.forEach(entries, (entry) => { + const user = leakUsersById.get(entry.request.userId) + if (user === undefined) { + return Request.complete(Exit.fail(new Error(`Missing leak user ${entry.request.userId}`)))(entry) + } + return Request.complete(Exit.succeed(user))(entry) + }, { discard: true }) + }) + .pipe(RequestResolver.batchN(20)) + +const leakUserResolverWithRequestCache = withRequestResolverCache(leakUserResolver, { + capacity: 10_000, + strategy: "fifo" +}).pipe(Effect.orDie) + +const Rsc = { StreamEtag, StreamWithEtag, ReadEtagOnce, LeakProbePosts } // Distinct constants so an assertion failure points squarely at "the etag // the handler wrote was no longer there when later chunks ran". @@ -161,7 +243,34 @@ const router = Router(Rsc)({ ) }) .pipe(Stream.unwrap), - ReadEtagOnce: () => getContextMap.pipe(Effect.orDie, Effect.map((m) => m.get(SHARED_KEY) ?? MISSING)) + ReadEtagOnce: () => getContextMap.pipe(Effect.orDie, Effect.map((m) => m.get(SHARED_KEY) ?? MISSING)), + LeakProbePosts: () => + Effect + .gen(function*() { + const userRepo = yield* makeRepo("LeakProbeUser", LeakUser, { + makeInitial: Effect.succeed(leakUsers) + }) + const postRepo = yield* makeRepo("LeakProbePost", LeakPost, { + makeInitial: Effect.succeed(leakPosts) + }) + const resolver = yield* leakUserResolverWithRequestCache + const posts = yield* postRepo.all + const allUsers = yield* userRepo.all + if (allUsers.length !== LEAK_USER_COUNT) { + return yield* Effect.die(new Error(`Expected ${LEAK_USER_COUNT} users, got ${allUsers.length}`)) + } + const userRefs = posts.flatMap((post) => [ + post.authorUserId, + post.publisherUserId, + ...post.likes.map((like) => like.likeUserId) + ]) + const resolved = yield* Effect.forEach( + userRefs, + (userId) => Effect.request(GetLeakUser({ userId }), resolver), + { concurrency: "unbounded" } + ) + return resolved.length + }) }) } }) @@ -178,12 +287,34 @@ const RpcRouterLayer = matchAll({ router }) const NodeServerLayer = NodeHttpServer.layer(() => createServer(), { port: 0 }) const RequestContextMiddlewareLayer = HttpRouter.middleware(RequestContextMiddleware()).layer +const LeakyRequestContextMiddlewareLayer = HttpRouter.middleware( + HttpMiddleware.make((app) => + app.pipe( + Effect.provide( + Layer.mergeAll( + Layer.succeed(ContextMapContainer, ContextMapContainer.of(makeContextMap())), + Layer.succeed(LocaleRef, "en"), + Layer.succeed(storeId, "primary") + ) + ) + )) +).layer const ServerLayer = HttpRouter .serve( RpcRouterLayer.pipe(Layer.provide(RequestContextMiddlewareLayer)) ) .pipe( + Layer.provide(Layer.merge(MemoryStoreLive, RepositoryRegistryLive)), + Layer.provide(NodeServerLayer), + Layer.provide(RpcSerialization.layerNdjson) + ) +const LeakyServerLayer = HttpRouter + .serve( + RpcRouterLayer.pipe(Layer.provide(LeakyRequestContextMiddlewareLayer)) + ) + .pipe( + Layer.provide(Layer.merge(MemoryStoreLive, RepositoryRegistryLive)), Layer.provide(NodeServerLayer), Layer.provide(RpcSerialization.layerNdjson) ) @@ -204,6 +335,7 @@ const ClientLayer = Layer .pipe(Layer.provide(NodeServerLayer)) const TestLayer = Layer.mergeAll(ServerLayer, ClientLayer) +const LeakyTestLayer = Layer.mergeAll(LeakyServerLayer, ClientLayer) // --------------------------------------------------------------------------- // Test @@ -260,3 +392,22 @@ it.live( }, Effect.provide(TestLayer)), { timeout: 10_000 } ) + +it.live( + "leak repro: 100 rpc requests with leaky ContextMap keep resolver cache users/fibers across requests", + Effect.fnUntraced(function*() { + leakStats.resolverBatches = 0 + leakStats.resolverRequestedUsers = 0 + const client = yield* ApiClientFactory.makeFor(Layer.empty)(Rsc) + const expectedPerRequestResolves = LEAK_USER_COUNT * (2 + LEAK_LIKES_PER_POST) + const first = yield* client.LeakProbePosts.handler() + expect(first).toBe(expectedPerRequestResolves) + yield* Effect.forEach( + Array.from({ length: LEAK_REQUEST_COUNT - 1 }, () => undefined), + () => client.LeakProbePosts.handler(), + { discard: true } + ) + expect(leakStats.resolverRequestedUsers).toBe(LEAK_USER_COUNT) + }, Effect.provide(LeakyTestLayer)), + { timeout: 30_000 } +) From 60c6174dc08c2d9cf4b098541fe9527798ef020e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 24 May 2026 06:24:50 +0000 Subject: [PATCH 2/7] test(infra): add leaky middleware rpc resolver cache repro Agent-Logs-Url: https://github.com/effect-app/libs/sessions/dc9105b2-25f5-44ac-b906-d4a24e5ec664 Co-authored-by: patroza <42661+patroza@users.noreply.github.com> --- .changeset/rpc-leak-repro-post-user.md | 5 +++ .../test/rpc-context-map-streaming.test.ts | 43 ++++++++++--------- 2 files changed, 27 insertions(+), 21 deletions(-) create mode 100644 .changeset/rpc-leak-repro-post-user.md diff --git a/.changeset/rpc-leak-repro-post-user.md b/.changeset/rpc-leak-repro-post-user.md new file mode 100644 index 000000000..c7d8b081d --- /dev/null +++ b/.changeset/rpc-leak-repro-post-user.md @@ -0,0 +1,5 @@ +--- +"@effect-app/infra": patch +--- + +Extend RPC ContextMap streaming coverage with a Post/User leak-repro scenario using `withRequestResolverCache`, 100 posts with relational user references, and 100 repeated requests under a deliberately leaky request context setup. diff --git a/packages/infra/test/rpc-context-map-streaming.test.ts b/packages/infra/test/rpc-context-map-streaming.test.ts index 4c2f851d7..6329bb8dd 100644 --- a/packages/infra/test/rpc-context-map-streaming.test.ts +++ b/packages/infra/test/rpc-context-map-streaming.test.ts @@ -60,7 +60,7 @@ import { RepositoryRegistryLive } from "../src/Model/Repository/Registry.js" import { LocaleRef } from "../src/RequestContext.js" import { ContextMapContainer, getContextMap, withRequestResolverCache } from "../src/Store/ContextMapContainer.js" import { MemoryStoreLive, storeId } from "../src/Store/Memory.js" -import { makeContextMap } from "../src/Store/service.js" +import { ContextMap, makeContextMap } from "../src/Store/service.js" import { AllowAnonymous, AllowAnonymousLive, RequestContextMap, RequireRoles, RequireRolesLive, SomeElseMiddleware, SomeElseMiddlewareLive, SomeService, Test, TestLive } from "./fixtures.js" // --------------------------------------------------------------------------- @@ -148,8 +148,7 @@ const leakUsers = Array.from({ length: LEAK_USER_COUNT }, (_, i) => new LeakUser({ id: `u-${i}`, name: `User ${i}` - }) -) + })) const leakPosts = Array.from({ length: LEAK_USER_COUNT }, (_, i) => new LeakPost({ id: `p-${i}`, @@ -159,8 +158,7 @@ const leakPosts = Array.from({ length: LEAK_USER_COUNT }, (_, i) => new LeakLike({ likeUserId: `u-${(i + j) % LEAK_USER_COUNT}` })) - }) -) + })) const leakUsersById = new Map(leakUsers.map((_) => [_.id, _] as const)) const leakStats = { resolverBatches: 0, @@ -175,13 +173,13 @@ interface GetLeakUserRequest extends Request.Request { const GetLeakUser = Request.tagged("GetLeakUser") const leakUserResolver = RequestResolver - .make((entries) => { + .make((entries: ReadonlyArray>) => { leakStats.resolverBatches += 1 leakStats.resolverRequestedUsers += entries.length return Effect.forEach(entries, (entry) => { const user = leakUsersById.get(entry.request.userId) if (user === undefined) { - return Request.complete(Exit.fail(new Error(`Missing leak user ${entry.request.userId}`)))(entry) + return Request.complete(Exit.die(new Error(`Missing leak user ${entry.request.userId}`)))(entry) } return Request.complete(Exit.succeed(user))(entry) }, { discard: true }) @@ -191,7 +189,8 @@ const leakUserResolver = RequestResolver const leakUserResolverWithRequestCache = withRequestResolverCache(leakUserResolver, { capacity: 10_000, strategy: "fifo" -}).pipe(Effect.orDie) +}) + .pipe(Effect.orDie) const Rsc = { StreamEtag, StreamWithEtag, ReadEtagOnce, LeakProbePosts } @@ -266,11 +265,12 @@ const router = Router(Rsc)({ ]) const resolved = yield* Effect.forEach( userRefs, - (userId) => Effect.request(GetLeakUser({ userId }), resolver), + (userId) => Effect.request(GetLeakUser({ userId }), resolver).pipe(Effect.orDie), { concurrency: "unbounded" } ) return resolved.length }) + .pipe(Effect.provide(Layer.merge(MemoryStoreLive, RepositoryRegistryLive))) }) } }) @@ -287,25 +287,27 @@ const RpcRouterLayer = matchAll({ router }) const NodeServerLayer = NodeHttpServer.layer(() => createServer(), { port: 0 }) const RequestContextMiddlewareLayer = HttpRouter.middleware(RequestContextMiddleware()).layer -const LeakyRequestContextMiddlewareLayer = HttpRouter.middleware( - HttpMiddleware.make((app) => - app.pipe( - Effect.provide( - Layer.mergeAll( - Layer.succeed(ContextMapContainer, ContextMapContainer.of(makeContextMap())), - Layer.succeed(LocaleRef, "en"), - Layer.succeed(storeId, "primary") +const LeakyRequestContextMiddlewareLayer = HttpRouter + .middleware( + HttpMiddleware.make((app) => + app.pipe( + Effect.provide( + Layer.mergeAll( + Layer.succeed(ContextMapContainer, ContextMapContainer.of(ContextMap.of(makeContextMap()))), + Layer.succeed(LocaleRef, "en"), + Layer.succeed(storeId, S.NonEmptyString255("primary")) + ) ) ) - )) -).layer + ) + ) + .layer const ServerLayer = HttpRouter .serve( RpcRouterLayer.pipe(Layer.provide(RequestContextMiddlewareLayer)) ) .pipe( - Layer.provide(Layer.merge(MemoryStoreLive, RepositoryRegistryLive)), Layer.provide(NodeServerLayer), Layer.provide(RpcSerialization.layerNdjson) ) @@ -314,7 +316,6 @@ const LeakyServerLayer = HttpRouter RpcRouterLayer.pipe(Layer.provide(LeakyRequestContextMiddlewareLayer)) ) .pipe( - Layer.provide(Layer.merge(MemoryStoreLive, RepositoryRegistryLive)), Layer.provide(NodeServerLayer), Layer.provide(RpcSerialization.layerNdjson) ) From 7cb94a2b2e7496ad527c5d3270a9d0a5b31b3d4a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 24 May 2026 06:28:12 +0000 Subject: [PATCH 3/7] test(infra): refine leak repro assertions and constants Agent-Logs-Url: https://github.com/effect-app/libs/sessions/dc9105b2-25f5-44ac-b906-d4a24e5ec664 Co-authored-by: patroza <42661+patroza@users.noreply.github.com> --- packages/infra/test/rpc-context-map-streaming.test.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/infra/test/rpc-context-map-streaming.test.ts b/packages/infra/test/rpc-context-map-streaming.test.ts index 6329bb8dd..6384e7797 100644 --- a/packages/infra/test/rpc-context-map-streaming.test.ts +++ b/packages/infra/test/rpc-context-map-streaming.test.ts @@ -143,6 +143,7 @@ class LeakProbePosts extends Req.Query()("LeakProbePosts", {}, { const LEAK_USER_COUNT = 100 const LEAK_REQUEST_COUNT = 100 const LEAK_LIKES_PER_POST = 10 +const LEAK_USER_REFS_PER_POST = 2 const leakUsers = Array.from({ length: LEAK_USER_COUNT }, (_, i) => new LeakUser({ @@ -266,7 +267,7 @@ const router = Router(Rsc)({ const resolved = yield* Effect.forEach( userRefs, (userId) => Effect.request(GetLeakUser({ userId }), resolver).pipe(Effect.orDie), - { concurrency: "unbounded" } + { concurrency: 1 } ) return resolved.length }) @@ -400,15 +401,17 @@ it.live( leakStats.resolverBatches = 0 leakStats.resolverRequestedUsers = 0 const client = yield* ApiClientFactory.makeFor(Layer.empty)(Rsc) - const expectedPerRequestResolves = LEAK_USER_COUNT * (2 + LEAK_LIKES_PER_POST) + const expectedPerRequestResolves = LEAK_USER_COUNT * (LEAK_USER_REFS_PER_POST + LEAK_LIKES_PER_POST) const first = yield* client.LeakProbePosts.handler() expect(first).toBe(expectedPerRequestResolves) + const resolverUsersAfterFirstRequest = leakStats.resolverRequestedUsers + expect(resolverUsersAfterFirstRequest).toBe(LEAK_USER_COUNT) yield* Effect.forEach( Array.from({ length: LEAK_REQUEST_COUNT - 1 }, () => undefined), () => client.LeakProbePosts.handler(), { discard: true } ) - expect(leakStats.resolverRequestedUsers).toBe(LEAK_USER_COUNT) + expect(leakStats.resolverRequestedUsers).toBe(resolverUsersAfterFirstRequest) }, Effect.provide(LeakyTestLayer)), { timeout: 30_000 } ) From da0c2e25fe12737661a95c8ed288bc671cb75f94 Mon Sep 17 00:00:00 2001 From: Patrick Roza Date: Sun, 24 May 2026 08:52:13 +0200 Subject: [PATCH 4/7] real resolver --- .../test/rpc-context-map-streaming.test.ts | 123 ++++++++---------- 1 file changed, 56 insertions(+), 67 deletions(-) diff --git a/packages/infra/test/rpc-context-map-streaming.test.ts b/packages/infra/test/rpc-context-map-streaming.test.ts index 6384e7797..31f336b30 100644 --- a/packages/infra/test/rpc-context-map-streaming.test.ts +++ b/packages/infra/test/rpc-context-map-streaming.test.ts @@ -37,8 +37,9 @@ */ import { NodeHttpServer } from "@effect/platform-node" import { expect, it } from "@effect/vitest" +import { SchemaGetter } from "effect" import { ApiClientFactory, makeRpcClient } from "effect-app/client" -import { HttpMiddleware, HttpRouter, HttpServer } from "effect-app/http" +import { HttpRouter, HttpServer } from "effect-app/http" import { DefaultGenericMiddlewares } from "effect-app/middleware" import { MiddlewareMaker } from "effect-app/rpc" import * as S from "effect-app/Schema" @@ -57,10 +58,8 @@ import { makeRouter } from "../src/api/routing.js" import { DefaultGenericMiddlewaresLive } from "../src/api/routing/middleware.js" import { makeRepo } from "../src/Model/Repository.js" import { RepositoryRegistryLive } from "../src/Model/Repository/Registry.js" -import { LocaleRef } from "../src/RequestContext.js" -import { ContextMapContainer, getContextMap, withRequestResolverCache } from "../src/Store/ContextMapContainer.js" -import { MemoryStoreLive, storeId } from "../src/Store/Memory.js" -import { ContextMap, makeContextMap } from "../src/Store/service.js" +import { getContextMap, withRequestResolverCache } from "../src/Store/ContextMapContainer.js" +import { MemoryStoreLive } from "../src/Store/Memory.js" import { AllowAnonymous, AllowAnonymousLive, RequestContextMap, RequireRoles, RequireRolesLive, SomeElseMiddleware, SomeElseMiddlewareLive, SomeService, Test, TestLive } from "./fixtures.js" // --------------------------------------------------------------------------- @@ -124,14 +123,56 @@ class LeakUser extends S.Class("LeakUser")({ name: S.String }) {} +const leakStats = { + resolverBatches: 0, + resolverRequestedUsers: 0 +} + +const leakUserResolver = RequestResolver + .make((entries: ReadonlyArray>) => { + leakStats.resolverBatches += 1 + leakStats.resolverRequestedUsers += entries.length + return Effect.forEach(entries, (entry) => { + const user = leakUsersById.get(entry.request.userId) + if (user === undefined) { + return Request.complete(Exit.die(new Error(`Missing leak user ${entry.request.userId}`)))(entry) + } + return Request.complete(Exit.succeed(user))(entry) + }, { discard: true }) + }) + .pipe(RequestResolver.batchN(20)) + +const leakUserResolverWithRequestCache = withRequestResolverCache(leakUserResolver, { + capacity: 10_000, + strategy: "fifo" +}) + .pipe(Effect.orDie) + +interface GetLeakUserRequest extends Request.Request { + readonly _tag: "GetLeakUser" + readonly userId: string +} + +const GetLeakUser = Request.tagged("GetLeakUser") + +const UserFromId = S.String.pipe(S.decodeTo( + LeakUser, + { + decode: SchemaGetter.transformOrFail((userId) => + Effect.request(GetLeakUser({ userId }), leakUserResolverWithRequestCache).pipe(Effect.orDie) + ), + encode: SchemaGetter.transformOrFail((user) => Effect.succeed(user.id)) + } +)) + class LeakLike extends S.Class("LeakLike")({ - likeUserId: S.String + likeUserId: UserFromId }) {} class LeakPost extends S.Class("LeakPost")({ id: S.String, - authorUserId: S.String, - publisherUserId: S.String, + authorUserId: UserFromId, + publisherUserId: UserFromId, likes: S.Array(LeakLike) }) {} @@ -151,47 +192,16 @@ const leakUsers = Array.from({ length: LEAK_USER_COUNT }, (_, i) => name: `User ${i}` })) const leakPosts = Array.from({ length: LEAK_USER_COUNT }, (_, i) => - new LeakPost({ + LeakPost.make({ id: `p-${i}`, - authorUserId: `u-${i}`, - publisherUserId: `u-${(i + 1) % LEAK_USER_COUNT}`, + authorUserId: leakUsers[i]!, + publisherUserId: leakUsers[(i + 1) % LEAK_USER_COUNT]!, likes: Array.from({ length: LEAK_LIKES_PER_POST }, (_, j) => - new LeakLike({ - likeUserId: `u-${(i + j) % LEAK_USER_COUNT}` + LeakLike.make({ + likeUserId: leakUsers[(i + j) % LEAK_USER_COUNT]! })) })) const leakUsersById = new Map(leakUsers.map((_) => [_.id, _] as const)) -const leakStats = { - resolverBatches: 0, - resolverRequestedUsers: 0 -} - -interface GetLeakUserRequest extends Request.Request { - readonly _tag: "GetLeakUser" - readonly userId: string -} - -const GetLeakUser = Request.tagged("GetLeakUser") - -const leakUserResolver = RequestResolver - .make((entries: ReadonlyArray>) => { - leakStats.resolverBatches += 1 - leakStats.resolverRequestedUsers += entries.length - return Effect.forEach(entries, (entry) => { - const user = leakUsersById.get(entry.request.userId) - if (user === undefined) { - return Request.complete(Exit.die(new Error(`Missing leak user ${entry.request.userId}`)))(entry) - } - return Request.complete(Exit.succeed(user))(entry) - }, { discard: true }) - }) - .pipe(RequestResolver.batchN(20)) - -const leakUserResolverWithRequestCache = withRequestResolverCache(leakUserResolver, { - capacity: 10_000, - strategy: "fifo" -}) - .pipe(Effect.orDie) const Rsc = { StreamEtag, StreamWithEtag, ReadEtagOnce, LeakProbePosts } @@ -253,22 +263,16 @@ const router = Router(Rsc)({ const postRepo = yield* makeRepo("LeakProbePost", LeakPost, { makeInitial: Effect.succeed(leakPosts) }) - const resolver = yield* leakUserResolverWithRequestCache const posts = yield* postRepo.all const allUsers = yield* userRepo.all if (allUsers.length !== LEAK_USER_COUNT) { return yield* Effect.die(new Error(`Expected ${LEAK_USER_COUNT} users, got ${allUsers.length}`)) } - const userRefs = posts.flatMap((post) => [ + const resolved = posts.flatMap((post) => [ post.authorUserId, post.publisherUserId, ...post.likes.map((like) => like.likeUserId) ]) - const resolved = yield* Effect.forEach( - userRefs, - (userId) => Effect.request(GetLeakUser({ userId }), resolver).pipe(Effect.orDie), - { concurrency: 1 } - ) return resolved.length }) .pipe(Effect.provide(Layer.merge(MemoryStoreLive, RepositoryRegistryLive))) @@ -288,21 +292,6 @@ const RpcRouterLayer = matchAll({ router }) const NodeServerLayer = NodeHttpServer.layer(() => createServer(), { port: 0 }) const RequestContextMiddlewareLayer = HttpRouter.middleware(RequestContextMiddleware()).layer -const LeakyRequestContextMiddlewareLayer = HttpRouter - .middleware( - HttpMiddleware.make((app) => - app.pipe( - Effect.provide( - Layer.mergeAll( - Layer.succeed(ContextMapContainer, ContextMapContainer.of(ContextMap.of(makeContextMap()))), - Layer.succeed(LocaleRef, "en"), - Layer.succeed(storeId, S.NonEmptyString255("primary")) - ) - ) - ) - ) - ) - .layer const ServerLayer = HttpRouter .serve( @@ -314,7 +303,7 @@ const ServerLayer = HttpRouter ) const LeakyServerLayer = HttpRouter .serve( - RpcRouterLayer.pipe(Layer.provide(LeakyRequestContextMiddlewareLayer)) + RpcRouterLayer.pipe(Layer.provide(RequestContextMiddlewareLayer)) ) .pipe( Layer.provide(NodeServerLayer), From e9f46a3b59a10c51ba8eaa06a29dc30fe0cb92cf Mon Sep 17 00:00:00 2001 From: Patrick Roza Date: Sun, 24 May 2026 08:59:27 +0200 Subject: [PATCH 5/7] better test --- .../test/rpc-context-map-streaming.test.ts | 90 +++++++++++-------- 1 file changed, 54 insertions(+), 36 deletions(-) diff --git a/packages/infra/test/rpc-context-map-streaming.test.ts b/packages/infra/test/rpc-context-map-streaming.test.ts index 31f336b30..6d4a1932e 100644 --- a/packages/infra/test/rpc-context-map-streaming.test.ts +++ b/packages/infra/test/rpc-context-map-streaming.test.ts @@ -123,21 +123,25 @@ class LeakUser extends S.Class("LeakUser")({ name: S.String }) {} -const leakStats = { - resolverBatches: 0, - resolverRequestedUsers: 0 -} +// WeakRefs to every LeakUser the resolver hands out this session. The resolver +// returns a fresh clone (not the base from leakUsersById) so the only strong +// references to these instances live inside per-request caches / ContextMap. +// If a request's ContextMap (and the request-scoped resolver cache hanging off +// it) is properly released when the request ends, every clone becomes eligible +// for GC. If anything retains the ContextMap across requests, the clones — and +// their ~100kb name buffers — survive. +const resolvedUserRefs: Array> = [] const leakUserResolver = RequestResolver .make((entries: ReadonlyArray>) => { - leakStats.resolverBatches += 1 - leakStats.resolverRequestedUsers += entries.length return Effect.forEach(entries, (entry) => { - const user = leakUsersById.get(entry.request.userId) - if (user === undefined) { + const base = leakUsersById.get(entry.request.userId) + if (base === undefined) { return Request.complete(Exit.die(new Error(`Missing leak user ${entry.request.userId}`)))(entry) } - return Request.complete(Exit.succeed(user))(entry) + const clone = new LeakUser({ id: base.id, name: base.name }) + resolvedUserRefs.push(new WeakRef(clone)) + return Request.complete(Exit.succeed(clone))(entry) }, { discard: true }) }) .pipe(RequestResolver.batchN(20)) @@ -181,24 +185,31 @@ class LeakProbePosts extends Req.Query()("LeakProbePosts", {}, { success: S.Number }) {} -const LEAK_USER_COUNT = 100 +const LEAK_USER_COUNT = 10 +const LEAK_POST_COUNT = 50 const LEAK_REQUEST_COUNT = 100 -const LEAK_LIKES_PER_POST = 10 -const LEAK_USER_REFS_PER_POST = 2 +const LEAK_LIKES_PER_POST = 8 + +// ~100kb name buffer so each retained User clone visibly blows up RSS. +const HUGE_NAME = "x".repeat(100_000) const leakUsers = Array.from({ length: LEAK_USER_COUNT }, (_, i) => new LeakUser({ id: `u-${i}`, - name: `User ${i}` + name: `User ${i} ${HUGE_NAME}` })) -const leakPosts = Array.from({ length: LEAK_USER_COUNT }, (_, i) => +// Each post picks distinct users across author / publisher / likes so a single +// request decodes a varied mix rather than the same user repeatedly. With a +// 10-user pool and 8 likes per post the indices below give 10 distinct users +// per post (author + publisher + 8 likes). +const leakPosts = Array.from({ length: LEAK_POST_COUNT }, (_, i) => LeakPost.make({ id: `p-${i}`, - authorUserId: leakUsers[i]!, - publisherUserId: leakUsers[(i + 1) % LEAK_USER_COUNT]!, + authorUserId: leakUsers[i % LEAK_USER_COUNT]!, + publisherUserId: leakUsers[(i * 3 + 1) % LEAK_USER_COUNT]!, likes: Array.from({ length: LEAK_LIKES_PER_POST }, (_, j) => LeakLike.make({ - likeUserId: leakUsers[(i + j) % LEAK_USER_COUNT]! + likeUserId: leakUsers[(i + j * 2 + 2) % LEAK_USER_COUNT]! })) })) const leakUsersById = new Map(leakUsers.map((_) => [_.id, _] as const)) @@ -257,23 +268,18 @@ const router = Router(Rsc)({ LeakProbePosts: () => Effect .gen(function*() { - const userRepo = yield* makeRepo("LeakProbeUser", LeakUser, { - makeInitial: Effect.succeed(leakUsers) - }) const postRepo = yield* makeRepo("LeakProbePost", LeakPost, { makeInitial: Effect.succeed(leakPosts) }) const posts = yield* postRepo.all - const allUsers = yield* userRepo.all - if (allUsers.length !== LEAK_USER_COUNT) { - return yield* Effect.die(new Error(`Expected ${LEAK_USER_COUNT} users, got ${allUsers.length}`)) - } - const resolved = posts.flatMap((post) => [ + // Touch every user reference so `UserFromId` decode (→ resolver + // → cache) actually runs and produces the clones we WeakRef-track. + const refs = posts.flatMap((post) => [ post.authorUserId, post.publisherUserId, ...post.likes.map((like) => like.likeUserId) ]) - return resolved.length + return refs.length }) .pipe(Effect.provide(Layer.merge(MemoryStoreLive, RepositoryRegistryLive))) }) @@ -385,22 +391,34 @@ it.live( ) it.live( - "leak repro: 100 rpc requests with leaky ContextMap keep resolver cache users/fibers across requests", + "resolver-produced User clones are GC-eligible after their requests complete", Effect.fnUntraced(function*() { - leakStats.resolverBatches = 0 - leakStats.resolverRequestedUsers = 0 + if (typeof globalThis.gc !== "function") { + return yield* Effect.die( + new Error("run vitest with --expose-gc (NODE_OPTIONS=--expose-gc) to enable the WeakRef leak probe") + ) + } + resolvedUserRefs.length = 0 const client = yield* ApiClientFactory.makeFor(Layer.empty)(Rsc) - const expectedPerRequestResolves = LEAK_USER_COUNT * (LEAK_USER_REFS_PER_POST + LEAK_LIKES_PER_POST) - const first = yield* client.LeakProbePosts.handler() - expect(first).toBe(expectedPerRequestResolves) - const resolverUsersAfterFirstRequest = leakStats.resolverRequestedUsers - expect(resolverUsersAfterFirstRequest).toBe(LEAK_USER_COUNT) yield* Effect.forEach( - Array.from({ length: LEAK_REQUEST_COUNT - 1 }, () => undefined), + Array.from({ length: LEAK_REQUEST_COUNT }, () => undefined), () => client.LeakProbePosts.handler(), { discard: true } ) - expect(leakStats.resolverRequestedUsers).toBe(resolverUsersAfterFirstRequest) + // Let request finalizers and any pending microtasks drain before forcing GC. + yield* Effect.sleep("200 millis") + globalThis.gc() + yield* Effect.sleep("50 millis") + globalThis.gc() + const totalProduced = resolvedUserRefs.length + const alive = resolvedUserRefs.filter((ref) => ref.deref() !== undefined).length + // Sanity: the resolver actually ran (otherwise the probe proves nothing). + expect(totalProduced).toBeGreaterThan(0) + // If a leaky ContextMap (or anything else) retains the per-request resolver + // cache across requests, the cached User clones — each ~100kb — survive GC + // and `alive` grows with the number of requests. Post-fix every clone must + // be collectable once its request scope closes. + expect(alive).toBe(0) }, Effect.provide(LeakyTestLayer)), { timeout: 30_000 } ) From 826399e23ce6e7b67d7ae435d57bf1a0db98d8b5 Mon Sep 17 00:00:00 2001 From: Patrick Roza Date: Sun, 24 May 2026 09:02:33 +0200 Subject: [PATCH 6/7] extract test --- .../infra/test/rpc-context-map-leak.test.ts | 300 ++++++++++++++++++ .../test/rpc-context-map-streaming.test.ts | 167 +--------- 2 files changed, 303 insertions(+), 164 deletions(-) create mode 100644 packages/infra/test/rpc-context-map-leak.test.ts diff --git a/packages/infra/test/rpc-context-map-leak.test.ts b/packages/infra/test/rpc-context-map-leak.test.ts new file mode 100644 index 000000000..e1f683ec0 --- /dev/null +++ b/packages/infra/test/rpc-context-map-leak.test.ts @@ -0,0 +1,300 @@ +/** + * Per-request ContextMap retention probe. + * + * Background + * ---------- + * `RequestContextMiddleware` provisions a per-request `ContextMapContainer`. + * The container backs `withRequestResolverCache`, so any User objects a + * request-scoped `RequestResolver` produces live inside the ContextMap for + * the duration of that request. + * + * If the ContextMap (or anything hanging off it) is retained across + * requests, the cached User objects — and any large fields they carry — + * never become GC-eligible and memory grows with request count. + * + * Reproduction strategy + * --------------------- + * - Define a small pool of `LeakUser` objects, each with a ~100kb `name` + * buffer so a leak shows up as obvious RSS growth. + * - Wire a `RequestResolver` that returns a FRESH `LeakUser` clone per + * resolve (not the base from `leakUsersById`) and records a `WeakRef` to + * every clone in module-scope. + * - The base users in `leakUsersById` are strongly held forever; the + * resolver-produced clones are only reachable through the per-request + * cache. If that cache is released when the request ends, the clones are + * GC-eligible — `WeakRef.deref()` returns `undefined` after `gc()`. + * - Fire `LEAK_REQUEST_COUNT` rpc requests, each decoding posts whose + * `UserFromId` fields drive the resolver. Then force GC and assert that + * zero clones survive. + * + * Run with `NODE_OPTIONS=--expose-gc` so `globalThis.gc` is available. + */ +import { NodeHttpServer } from "@effect/platform-node" +import { expect, it } from "@effect/vitest" +import { SchemaGetter } from "effect" +import { ApiClientFactory, makeRpcClient } from "effect-app/client" +import { HttpRouter, HttpServer } from "effect-app/http" +import { DefaultGenericMiddlewares } from "effect-app/middleware" +import { MiddlewareMaker } from "effect-app/rpc" +import * as S from "effect-app/Schema" +import * as Effect from "effect/Effect" +import * as Exit from "effect/Exit" +import * as Layer from "effect/Layer" +import * as Option from "effect/Option" +import * as Request from "effect/Request" +import * as RequestResolver from "effect/RequestResolver" +import { FetchHttpClient } from "effect/unstable/http" +import { RpcSerialization } from "effect/unstable/rpc" +import { createServer } from "http" +import { RequestContextMiddleware } from "../src/api/internal/RequestContextMiddleware.js" +import { makeRouter } from "../src/api/routing.js" +import { DefaultGenericMiddlewaresLive } from "../src/api/routing/middleware.js" +import { makeRepo } from "../src/Model/Repository.js" +import { RepositoryRegistryLive } from "../src/Model/Repository/Registry.js" +import { withRequestResolverCache } from "../src/Store/ContextMapContainer.js" +import { MemoryStoreLive } from "../src/Store/Memory.js" +import { + AllowAnonymous, + AllowAnonymousLive, + RequestContextMap, + RequireRoles, + RequireRolesLive, + SomeElseMiddleware, + SomeElseMiddlewareLive, + SomeService, + Test, + TestLive +} from "./fixtures.js" + +// --------------------------------------------------------------------------- +// Middleware — mirrors the wiring used by rpc-context-map-streaming. +// --------------------------------------------------------------------------- + +class AppMiddleware extends MiddlewareMaker + .Tag()("AppMiddleware", RequestContextMap) + .middleware(RequireRoles, Test) + .middleware(AllowAnonymous) + .middleware(SomeElseMiddleware) + .middleware(...DefaultGenericMiddlewares) +{ + static Default = this.layer.pipe( + Layer.provide( + [ + RequireRolesLive.pipe(Layer.provide(SomeService.Default)), + AllowAnonymousLive, + TestLive, + SomeElseMiddlewareLive, + DefaultGenericMiddlewaresLive + ] as const + ) + ) +} + +const { Router, matchAll } = makeRouter(AppMiddleware.Default) + +const { TaggedRequestFor } = makeRpcClient(AppMiddleware) +const Req = TaggedRequestFor("CtxMapLeak") + +// --------------------------------------------------------------------------- +// Schema + resolver — produces a fresh clone per resolve so the only strong +// reference to each instance lives inside the per-request resolver cache. +// --------------------------------------------------------------------------- + +class LeakUser extends S.Class("LeakUser")({ + id: S.String, + name: S.String +}) {} + +// WeakRefs to every LeakUser the resolver hands out this session. The resolver +// returns a fresh clone (not the base from leakUsersById) so the only strong +// references to these instances live inside per-request caches / ContextMap. +// If a request's ContextMap (and the request-scoped resolver cache hanging off +// it) is properly released when the request ends, every clone becomes eligible +// for GC. If anything retains the ContextMap across requests, the clones — and +// their ~100kb name buffers — survive. +const resolvedUserRefs: Array> = [] + +const leakUserResolver = RequestResolver + .make((entries: ReadonlyArray>) => { + return Effect.forEach(entries, (entry) => { + const base = leakUsersById.get(entry.request.userId) + if (base === undefined) { + return Request.complete(Exit.die(new Error(`Missing leak user ${entry.request.userId}`)))(entry) + } + const clone = new LeakUser({ id: base.id, name: base.name }) + resolvedUserRefs.push(new WeakRef(clone)) + return Request.complete(Exit.succeed(clone))(entry) + }, { discard: true }) + }) + .pipe(RequestResolver.batchN(20)) + +const leakUserResolverWithRequestCache = withRequestResolverCache(leakUserResolver, { + capacity: 10_000, + strategy: "fifo" +}) + .pipe(Effect.orDie) + +interface GetLeakUserRequest extends Request.Request { + readonly _tag: "GetLeakUser" + readonly userId: string +} + +const GetLeakUser = Request.tagged("GetLeakUser") + +const UserFromId = S.String.pipe(S.decodeTo( + LeakUser, + { + decode: SchemaGetter.transformOrFail((userId) => + Effect.request(GetLeakUser({ userId }), leakUserResolverWithRequestCache).pipe(Effect.orDie) + ), + encode: SchemaGetter.transformOrFail((user) => Effect.succeed(user.id)) + } +)) + +class LeakLike extends S.Class("LeakLike")({ + likeUserId: UserFromId +}) {} + +class LeakPost extends S.Class("LeakPost")({ + id: S.String, + authorUserId: UserFromId, + publisherUserId: UserFromId, + likes: S.Array(LeakLike) +}) {} + +class LeakProbePosts extends Req.Query()("LeakProbePosts", {}, { + allowAnonymous: true, + success: S.Number +}) {} + +// --------------------------------------------------------------------------- +// Fixture data. +// --------------------------------------------------------------------------- + +const LEAK_USER_COUNT = 10 +const LEAK_POST_COUNT = 50 +const LEAK_REQUEST_COUNT = 100 +const LEAK_LIKES_PER_POST = 8 + +// ~100kb name buffer so each retained User clone visibly blows up RSS. +const HUGE_NAME = "x".repeat(100_000) + +const leakUsers = Array.from({ length: LEAK_USER_COUNT }, (_, i) => + new LeakUser({ + id: `u-${i}`, + name: `User ${i} ${HUGE_NAME}` + })) +// Each post picks distinct users across author / publisher / likes so a single +// request decodes a varied mix rather than the same user repeatedly. With a +// 10-user pool and 8 likes per post the indices below give 10 distinct users +// per post (author + publisher + 8 likes). +const leakPosts = Array.from({ length: LEAK_POST_COUNT }, (_, i) => + LeakPost.make({ + id: `p-${i}`, + authorUserId: leakUsers[i % LEAK_USER_COUNT]!, + publisherUserId: leakUsers[(i * 3 + 1) % LEAK_USER_COUNT]!, + likes: Array.from({ length: LEAK_LIKES_PER_POST }, (_, j) => + LeakLike.make({ + likeUserId: leakUsers[(i + j * 2 + 2) % LEAK_USER_COUNT]! + })) + })) +const leakUsersById = new Map(leakUsers.map((_) => [_.id, _] as const)) + +const Rsc = { LeakProbePosts } + +const router = Router(Rsc)({ + *effect(match) { + return match({ + LeakProbePosts: () => + Effect + .gen(function*() { + const postRepo = yield* makeRepo("LeakProbePost", LeakPost, { + makeInitial: Effect.succeed(leakPosts) + }) + const posts = yield* postRepo.all + // Touch every user reference so `UserFromId` decode (→ resolver + // → cache) actually runs and produces the clones we WeakRef-track. + const refs = posts.flatMap((post) => [ + post.authorUserId, + post.publisherUserId, + ...post.likes.map((like) => like.likeUserId) + ]) + return refs.length + }) + .pipe(Effect.provide(Layer.merge(MemoryStoreLive, RepositoryRegistryLive))) + }) + } +}) + +const RpcRouterLayer = matchAll({ router }) + +// --------------------------------------------------------------------------- +// HTTP wiring. +// --------------------------------------------------------------------------- + +const NodeServerLayer = NodeHttpServer.layer(() => createServer(), { port: 0 }) + +const RequestContextMiddlewareLayer = HttpRouter.middleware(RequestContextMiddleware()).layer + +const ServerLayer = HttpRouter + .serve( + RpcRouterLayer.pipe(Layer.provide(RequestContextMiddlewareLayer)) + ) + .pipe( + Layer.provide(NodeServerLayer), + Layer.provide(RpcSerialization.layerNdjson) + ) + +const ClientLayer = Layer + .unwrap( + Effect.gen(function*() { + const server = yield* HttpServer.HttpServer + const addr = server.address + if (addr._tag !== "TcpAddress") return yield* Effect.die(new Error("expected TcpAddress")) + const host = addr.hostname === "0.0.0.0" ? "127.0.0.1" : addr.hostname + const url = `http://${host}:${addr.port}` + return ApiClientFactory + .layer({ url, headers: Option.none() }) + .pipe(Layer.provide(FetchHttpClient.layer)) + }) + ) + .pipe(Layer.provide(NodeServerLayer)) + +const TestLayer = Layer.mergeAll(ServerLayer, ClientLayer) + +// --------------------------------------------------------------------------- +// Test +// --------------------------------------------------------------------------- + +it.live( + "resolver-produced User clones are GC-eligible after their requests complete", + Effect.fnUntraced(function*() { + if (typeof globalThis.gc !== "function") { + return yield* Effect.die( + new Error("run vitest with --expose-gc (NODE_OPTIONS=--expose-gc) to enable the WeakRef leak probe") + ) + } + resolvedUserRefs.length = 0 + const client = yield* ApiClientFactory.makeFor(Layer.empty)(Rsc) + yield* Effect.forEach( + Array.from({ length: LEAK_REQUEST_COUNT }, () => undefined), + () => client.LeakProbePosts.handler(), + { discard: true } + ) + // Let request finalizers and any pending microtasks drain before forcing GC. + yield* Effect.sleep("200 millis") + globalThis.gc() + yield* Effect.sleep("50 millis") + globalThis.gc() + const totalProduced = resolvedUserRefs.length + const alive = resolvedUserRefs.filter((ref) => ref.deref() !== undefined).length + // Sanity: the resolver actually ran (otherwise the probe proves nothing). + expect(totalProduced).toBeGreaterThan(0) + // If a leaky ContextMap (or anything else) retains the per-request resolver + // cache across requests, the cached User clones — each ~100kb — survive GC + // and `alive` grows with the number of requests. Post-fix every clone must + // be collectable once its request scope closes. + expect(alive).toBe(0) + }, Effect.provide(TestLayer)), + { timeout: 30_000 } +) diff --git a/packages/infra/test/rpc-context-map-streaming.test.ts b/packages/infra/test/rpc-context-map-streaming.test.ts index 6d4a1932e..2a526b2df 100644 --- a/packages/infra/test/rpc-context-map-streaming.test.ts +++ b/packages/infra/test/rpc-context-map-streaming.test.ts @@ -37,18 +37,14 @@ */ import { NodeHttpServer } from "@effect/platform-node" import { expect, it } from "@effect/vitest" -import { SchemaGetter } from "effect" import { ApiClientFactory, makeRpcClient } from "effect-app/client" import { HttpRouter, HttpServer } from "effect-app/http" import { DefaultGenericMiddlewares } from "effect-app/middleware" import { MiddlewareMaker } from "effect-app/rpc" import * as S from "effect-app/Schema" import * as Effect from "effect/Effect" -import * as Exit from "effect/Exit" import * as Layer from "effect/Layer" import * as Option from "effect/Option" -import * as Request from "effect/Request" -import * as RequestResolver from "effect/RequestResolver" import * as Stream from "effect/Stream" import { FetchHttpClient } from "effect/unstable/http" import { RpcSerialization } from "effect/unstable/rpc" @@ -56,10 +52,7 @@ import { createServer } from "http" import { RequestContextMiddleware } from "../src/api/internal/RequestContextMiddleware.js" import { makeRouter } from "../src/api/routing.js" import { DefaultGenericMiddlewaresLive } from "../src/api/routing/middleware.js" -import { makeRepo } from "../src/Model/Repository.js" -import { RepositoryRegistryLive } from "../src/Model/Repository/Registry.js" -import { getContextMap, withRequestResolverCache } from "../src/Store/ContextMapContainer.js" -import { MemoryStoreLive } from "../src/Store/Memory.js" +import { getContextMap } from "../src/Store/ContextMapContainer.js" import { AllowAnonymous, AllowAnonymousLive, RequestContextMap, RequireRoles, RequireRolesLive, SomeElseMiddleware, SomeElseMiddlewareLive, SomeService, Test, TestLive } from "./fixtures.js" // --------------------------------------------------------------------------- @@ -118,103 +111,7 @@ class ReadEtagOnce extends Req.Query()("ReadEtagOnce", {}, { success: S.String }) {} -class LeakUser extends S.Class("LeakUser")({ - id: S.String, - name: S.String -}) {} - -// WeakRefs to every LeakUser the resolver hands out this session. The resolver -// returns a fresh clone (not the base from leakUsersById) so the only strong -// references to these instances live inside per-request caches / ContextMap. -// If a request's ContextMap (and the request-scoped resolver cache hanging off -// it) is properly released when the request ends, every clone becomes eligible -// for GC. If anything retains the ContextMap across requests, the clones — and -// their ~100kb name buffers — survive. -const resolvedUserRefs: Array> = [] - -const leakUserResolver = RequestResolver - .make((entries: ReadonlyArray>) => { - return Effect.forEach(entries, (entry) => { - const base = leakUsersById.get(entry.request.userId) - if (base === undefined) { - return Request.complete(Exit.die(new Error(`Missing leak user ${entry.request.userId}`)))(entry) - } - const clone = new LeakUser({ id: base.id, name: base.name }) - resolvedUserRefs.push(new WeakRef(clone)) - return Request.complete(Exit.succeed(clone))(entry) - }, { discard: true }) - }) - .pipe(RequestResolver.batchN(20)) - -const leakUserResolverWithRequestCache = withRequestResolverCache(leakUserResolver, { - capacity: 10_000, - strategy: "fifo" -}) - .pipe(Effect.orDie) - -interface GetLeakUserRequest extends Request.Request { - readonly _tag: "GetLeakUser" - readonly userId: string -} - -const GetLeakUser = Request.tagged("GetLeakUser") - -const UserFromId = S.String.pipe(S.decodeTo( - LeakUser, - { - decode: SchemaGetter.transformOrFail((userId) => - Effect.request(GetLeakUser({ userId }), leakUserResolverWithRequestCache).pipe(Effect.orDie) - ), - encode: SchemaGetter.transformOrFail((user) => Effect.succeed(user.id)) - } -)) - -class LeakLike extends S.Class("LeakLike")({ - likeUserId: UserFromId -}) {} - -class LeakPost extends S.Class("LeakPost")({ - id: S.String, - authorUserId: UserFromId, - publisherUserId: UserFromId, - likes: S.Array(LeakLike) -}) {} - -class LeakProbePosts extends Req.Query()("LeakProbePosts", {}, { - allowAnonymous: true, - success: S.Number -}) {} - -const LEAK_USER_COUNT = 10 -const LEAK_POST_COUNT = 50 -const LEAK_REQUEST_COUNT = 100 -const LEAK_LIKES_PER_POST = 8 - -// ~100kb name buffer so each retained User clone visibly blows up RSS. -const HUGE_NAME = "x".repeat(100_000) - -const leakUsers = Array.from({ length: LEAK_USER_COUNT }, (_, i) => - new LeakUser({ - id: `u-${i}`, - name: `User ${i} ${HUGE_NAME}` - })) -// Each post picks distinct users across author / publisher / likes so a single -// request decodes a varied mix rather than the same user repeatedly. With a -// 10-user pool and 8 likes per post the indices below give 10 distinct users -// per post (author + publisher + 8 likes). -const leakPosts = Array.from({ length: LEAK_POST_COUNT }, (_, i) => - LeakPost.make({ - id: `p-${i}`, - authorUserId: leakUsers[i % LEAK_USER_COUNT]!, - publisherUserId: leakUsers[(i * 3 + 1) % LEAK_USER_COUNT]!, - likes: Array.from({ length: LEAK_LIKES_PER_POST }, (_, j) => - LeakLike.make({ - likeUserId: leakUsers[(i + j * 2 + 2) % LEAK_USER_COUNT]! - })) - })) -const leakUsersById = new Map(leakUsers.map((_) => [_.id, _] as const)) - -const Rsc = { StreamEtag, StreamWithEtag, ReadEtagOnce, LeakProbePosts } +const Rsc = { StreamEtag, StreamWithEtag, ReadEtagOnce } // Distinct constants so an assertion failure points squarely at "the etag // the handler wrote was no longer there when later chunks ran". @@ -264,24 +161,7 @@ const router = Router(Rsc)({ ) }) .pipe(Stream.unwrap), - ReadEtagOnce: () => getContextMap.pipe(Effect.orDie, Effect.map((m) => m.get(SHARED_KEY) ?? MISSING)), - LeakProbePosts: () => - Effect - .gen(function*() { - const postRepo = yield* makeRepo("LeakProbePost", LeakPost, { - makeInitial: Effect.succeed(leakPosts) - }) - const posts = yield* postRepo.all - // Touch every user reference so `UserFromId` decode (→ resolver - // → cache) actually runs and produces the clones we WeakRef-track. - const refs = posts.flatMap((post) => [ - post.authorUserId, - post.publisherUserId, - ...post.likes.map((like) => like.likeUserId) - ]) - return refs.length - }) - .pipe(Effect.provide(Layer.merge(MemoryStoreLive, RepositoryRegistryLive))) + ReadEtagOnce: () => getContextMap.pipe(Effect.orDie, Effect.map((m) => m.get(SHARED_KEY) ?? MISSING)) }) } }) @@ -307,14 +187,6 @@ const ServerLayer = HttpRouter Layer.provide(NodeServerLayer), Layer.provide(RpcSerialization.layerNdjson) ) -const LeakyServerLayer = HttpRouter - .serve( - RpcRouterLayer.pipe(Layer.provide(RequestContextMiddlewareLayer)) - ) - .pipe( - Layer.provide(NodeServerLayer), - Layer.provide(RpcSerialization.layerNdjson) - ) const ClientLayer = Layer .unwrap( @@ -332,7 +204,6 @@ const ClientLayer = Layer .pipe(Layer.provide(NodeServerLayer)) const TestLayer = Layer.mergeAll(ServerLayer, ClientLayer) -const LeakyTestLayer = Layer.mergeAll(LeakyServerLayer, ClientLayer) // --------------------------------------------------------------------------- // Test @@ -390,35 +261,3 @@ it.live( { timeout: 10_000 } ) -it.live( - "resolver-produced User clones are GC-eligible after their requests complete", - Effect.fnUntraced(function*() { - if (typeof globalThis.gc !== "function") { - return yield* Effect.die( - new Error("run vitest with --expose-gc (NODE_OPTIONS=--expose-gc) to enable the WeakRef leak probe") - ) - } - resolvedUserRefs.length = 0 - const client = yield* ApiClientFactory.makeFor(Layer.empty)(Rsc) - yield* Effect.forEach( - Array.from({ length: LEAK_REQUEST_COUNT }, () => undefined), - () => client.LeakProbePosts.handler(), - { discard: true } - ) - // Let request finalizers and any pending microtasks drain before forcing GC. - yield* Effect.sleep("200 millis") - globalThis.gc() - yield* Effect.sleep("50 millis") - globalThis.gc() - const totalProduced = resolvedUserRefs.length - const alive = resolvedUserRefs.filter((ref) => ref.deref() !== undefined).length - // Sanity: the resolver actually ran (otherwise the probe proves nothing). - expect(totalProduced).toBeGreaterThan(0) - // If a leaky ContextMap (or anything else) retains the per-request resolver - // cache across requests, the cached User clones — each ~100kb — survive GC - // and `alive` grows with the number of requests. Post-fix every clone must - // be collectable once its request scope closes. - expect(alive).toBe(0) - }, Effect.provide(LeakyTestLayer)), - { timeout: 30_000 } -) From d363114b9c2c1c31d3208eeeccace1453190c464 Mon Sep 17 00:00:00 2001 From: Patrick Roza Date: Sun, 24 May 2026 09:58:16 +0200 Subject: [PATCH 7/7] native --- packages/infra/src/api/setupRequest.ts | 2 + .../test/rpc-context-map-leak-native.test.ts | 227 ++++++++++++++++++ 2 files changed, 229 insertions(+) create mode 100644 packages/infra/test/rpc-context-map-leak-native.test.ts diff --git a/packages/infra/src/api/setupRequest.ts b/packages/infra/src/api/setupRequest.ts index 963cbab58..30e15d8ad 100644 --- a/packages/infra/src/api/setupRequest.ts +++ b/packages/infra/src/api/setupRequest.ts @@ -1,3 +1,4 @@ +import { Console } from "effect" import * as Effect from "effect-app/Effect" import * as Layer from "effect-app/Layer" import * as Option from "effect-app/Option" @@ -72,6 +73,7 @@ export const provideOnRequestScope = // value (e.g. ContextMap) across every request handled by that server. const memoMap = yield* Layer.makeMemoMap const ctx = yield* Layer.buildWithMemoMap(layer, memoMap, requestScope) + yield* Effect.addFinalizer(() => Console.log("request scope finalized")) return yield* Effect.provide(self, ctx) }) diff --git a/packages/infra/test/rpc-context-map-leak-native.test.ts b/packages/infra/test/rpc-context-map-leak-native.test.ts new file mode 100644 index 000000000..4c1c12270 --- /dev/null +++ b/packages/infra/test/rpc-context-map-leak-native.test.ts @@ -0,0 +1,227 @@ +/** + * Native effect Rpc Server/Client variant of the ContextMap leak probe. + * + * Mirrors rpc-context-map-leak.test.ts but skips effect-app's MiddlewareMaker + * / makeRouter / makeRpcClient / ApiClientFactory wrappers and wires the + * request through native `RpcServer.layerHttp` + `RpcClient.layerProtocolHttp` + * instead. The only effect-app piece kept is `RequestContextMiddleware()` — + * applied directly as an `HttpRouter.middleware` — because that's the code + * path being probed. + * + * Run with `NODE_OPTIONS=--expose-gc` so `globalThis.gc` is available. + */ +import { NodeHttpServer } from "@effect/platform-node" +import { expect, it } from "@effect/vitest" +import { Schema, SchemaGetter } from "effect" +import * as Effect from "effect/Effect" +import * as Exit from "effect/Exit" +import * as Layer from "effect/Layer" +import * as Request from "effect/Request" +import * as RequestResolver from "effect/RequestResolver" +import * as FetchHttpClient from "effect/unstable/http/FetchHttpClient" +import * as HttpRouter from "effect/unstable/http/HttpRouter" +import * as HttpServer from "effect/unstable/http/HttpServer" +import { Rpc, RpcClient, RpcGroup, RpcSerialization, RpcServer } from "effect/unstable/rpc" +import { createServer } from "http" +import { RequestContextMiddleware } from "../src/api/internal/RequestContextMiddleware.js" +import { makeRepo } from "../src/Model/Repository.js" +import { RepositoryRegistryLive } from "../src/Model/Repository/Registry.js" +import { withRequestResolverCache } from "../src/Store/ContextMapContainer.js" +import { MemoryStoreLive } from "../src/Store/Memory.js" + +// --------------------------------------------------------------------------- +// Schema + resolver — identical shape to rpc-context-map-leak.test.ts. The +// resolver returns a FRESH `LeakUser` clone per resolve so the only strong +// reference to each instance lives inside the per-request resolver cache. +// --------------------------------------------------------------------------- + +class LeakUser extends Schema.Class("LeakUser")({ + id: Schema.String, + name: Schema.String +}) {} + +const resolvedUserRefs: Array> = [] + +interface GetLeakUserRequest extends Request.Request { + readonly _tag: "GetLeakUser" + readonly userId: string +} + +const GetLeakUser = Request.tagged("GetLeakUser") + +const leakUserResolver = RequestResolver + .make((entries: ReadonlyArray>) => { + return Effect.forEach(entries, (entry) => { + const base = leakUsersById.get(entry.request.userId) + if (base === undefined) { + return Request.complete(Exit.die(new Error(`Missing leak user ${entry.request.userId}`)))(entry) + } + const clone = new LeakUser({ id: base.id, name: base.name }) + resolvedUserRefs.push(new WeakRef(clone)) + return Request.complete(Exit.succeed(clone))(entry) + }, { discard: true }) + }) + .pipe(RequestResolver.batchN(20)) + +const leakUserResolverWithRequestCache = withRequestResolverCache(leakUserResolver, { + capacity: 10_000, + strategy: "fifo" +}) + .pipe(Effect.orDie) + +const UserFromId = Schema.String.pipe(Schema.decodeTo( + LeakUser, + { + decode: SchemaGetter.transformOrFail((userId) => + Effect.request(GetLeakUser({ userId }), leakUserResolverWithRequestCache).pipe(Effect.orDie) + ), + encode: SchemaGetter.transformOrFail((user) => Effect.succeed(user.id)) + } +)) + +class LeakLike extends Schema.Class("LeakLike")({ + likeUserId: UserFromId +}) {} + +class LeakPost extends Schema.Class("LeakPost")({ + id: Schema.String, + authorUserId: UserFromId, + publisherUserId: UserFromId, + likes: Schema.Array(LeakLike) +}) {} + +// --------------------------------------------------------------------------- +// Fixture data. +// --------------------------------------------------------------------------- + +const LEAK_USER_COUNT = 10 +const LEAK_POST_COUNT = 50 +const LEAK_REQUEST_COUNT = 100 +const LEAK_LIKES_PER_POST = 8 + +const HUGE_NAME = "x".repeat(100_000) + +const leakUsers = Array.from({ length: LEAK_USER_COUNT }, (_, i) => + new LeakUser({ + id: `u-${i}`, + name: `User ${i} ${HUGE_NAME}` + })) +const leakPosts = Array.from({ length: LEAK_POST_COUNT }, (_, i) => + LeakPost.make({ + id: `p-${i}`, + authorUserId: leakUsers[i % LEAK_USER_COUNT]!, + publisherUserId: leakUsers[(i * 3 + 1) % LEAK_USER_COUNT]!, + likes: Array.from({ length: LEAK_LIKES_PER_POST }, (_, j) => + LeakLike.make({ + likeUserId: leakUsers[(i + j * 2 + 2) % LEAK_USER_COUNT]! + })) + })) +const leakUsersById = new Map(leakUsers.map((_) => [_.id, _] as const)) + +// --------------------------------------------------------------------------- +// Native Rpc group + handler. +// --------------------------------------------------------------------------- + +const LeakProbePosts = Rpc.make("LeakProbePosts", { success: Schema.Number }) + +const LeakGroup = RpcGroup.make(LeakProbePosts) + +const HandlersLayer = LeakGroup.toLayer({ + LeakProbePosts: () => + Effect + .gen(function*() { + const postRepo = yield* makeRepo("LeakProbePost", LeakPost, { + makeInitial: Effect.succeed(leakPosts) + }) + const posts = yield* postRepo.all + // Touch every user reference so `UserFromId` decode (→ resolver → + // cache) runs and produces the clones tracked by resolvedUserRefs. + const refs = posts.flatMap((post) => [ + post.authorUserId, + post.publisherUserId, + ...post.likes.map((like) => like.likeUserId) + ]) + return refs.length + }) + .pipe(Effect.provide(Layer.merge(MemoryStoreLive, RepositoryRegistryLive))) +}) + +// --------------------------------------------------------------------------- +// HTTP wiring — NodeHttpServer + RpcServer.layerHttp + RequestContextMiddleware. +// --------------------------------------------------------------------------- + +const NodeServerLayer = NodeHttpServer.layer(() => createServer(), { port: 0 }) + +const RpcServerLayer = RpcServer + .layerHttp({ + group: LeakGroup, + path: "/rpc", + protocol: "http" + }) + .pipe(Layer.provide(HandlersLayer)) + +const RequestContextMiddlewareLayer = HttpRouter.middleware(RequestContextMiddleware()).layer + +const ServerLayer = HttpRouter + .serve( + RpcServerLayer.pipe(Layer.provide(RequestContextMiddlewareLayer)) + ) + .pipe( + Layer.provide(NodeServerLayer), + Layer.provide(RpcSerialization.layerNdjson) + ) + +const ClientLayer = Layer + .unwrap( + Effect.gen(function*() { + const server = yield* HttpServer.HttpServer + const addr = server.address + if (addr._tag !== "TcpAddress") return yield* Effect.die(new Error("expected TcpAddress")) + const host = addr.hostname === "0.0.0.0" ? "127.0.0.1" : addr.hostname + const url = `http://${host}:${addr.port}/rpc` + return RpcClient.layerProtocolHttp({ url }).pipe( + Layer.provideMerge(FetchHttpClient.layer), + Layer.provideMerge(RpcSerialization.layerNdjson) + ) + }) + ) + .pipe(Layer.provide(NodeServerLayer)) + +const TestLayer = Layer.mergeAll(ServerLayer, ClientLayer) + +// --------------------------------------------------------------------------- +// Test +// --------------------------------------------------------------------------- + +it.live( + "native Rpc: resolver-produced User clones are GC-eligible after their requests complete", + Effect.fnUntraced(function*() { + if (typeof globalThis.gc !== "function") { + return yield* Effect.die( + new Error("run vitest with --expose-gc (NODE_OPTIONS=--expose-gc) to enable the WeakRef leak probe") + ) + } + resolvedUserRefs.length = 0 + const client = yield* RpcClient.make(LeakGroup) + yield* Effect.forEach( + Array.from({ length: LEAK_REQUEST_COUNT }, () => undefined), + () => client.LeakProbePosts(), + { discard: true } + ) + // Let request finalizers and any pending microtasks drain before forcing GC. + yield* Effect.sleep("200 millis") + globalThis.gc() + yield* Effect.sleep("50 millis") + globalThis.gc() + const totalProduced = resolvedUserRefs.length + const alive = resolvedUserRefs.filter((ref) => ref.deref() !== undefined).length + // Sanity: the resolver actually ran (otherwise the probe proves nothing). + expect(totalProduced).toBeGreaterThan(0) + // If a leaky ContextMap (or anything else) retains the per-request resolver + // cache across requests, the cached User clones — each ~100kb — survive GC + // and `alive` grows with the number of requests. Post-fix every clone must + // be collectable once its request scope closes. + expect(alive).toBe(0) + }, Effect.provide(TestLayer)), + { timeout: 30_000 } +)