From b4557417717e585731cba339cd42e9b4dfce5453 Mon Sep 17 00:00:00 2001 From: Dawei Date: Sun, 8 Jun 2025 21:50:20 -0700 Subject: [PATCH 01/12] Refactor ServiceContext as generic --- __tests__/cancellation.test.ts | 35 +++---- __tests__/cleanup.test.ts | 2 +- __tests__/context.test.ts | 15 ++- __tests__/e2e.test.ts | 2 +- __tests__/invalid-request.test.ts | 26 ++--- __tests__/middleware.test.ts | 2 +- __tests__/typescript-stress.test.ts | 7 +- router/client.ts | 18 ++-- router/context.ts | 26 +---- router/index.ts | 6 +- router/procedures.ts | 108 +++++++++++++++++---- router/result.ts | 2 +- router/server.ts | 46 +++++---- router/services.ts | 141 +++++++++++++++++++++------- testUtil/fixtures/cleanup.ts | 6 +- testUtil/fixtures/services.ts | 33 +++++-- 16 files changed, 325 insertions(+), 150 deletions(-) diff --git a/__tests__/cancellation.test.ts b/__tests__/cancellation.test.ts index 789b08dd..4ee18beb 100644 --- a/__tests__/cancellation.test.ts +++ b/__tests__/cancellation.test.ts @@ -29,7 +29,8 @@ function makeMockHandler( ) { return vi.fn< Procedure< - Record, + object, + object, T, TObject, TObject | null, @@ -66,7 +67,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -110,7 +111,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -164,7 +165,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -214,7 +215,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -279,7 +280,7 @@ describe.each(testMatrix())( const serverTransport = getServerTransport(); const handler = makeMockHandler('rpc'); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -337,7 +338,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('stream'); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -411,7 +412,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('upload'); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -482,7 +483,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('subscription'); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -569,7 +570,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('rpc'); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -623,7 +624,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('stream'); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -683,7 +684,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('upload'); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -742,7 +743,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('subscription'); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -826,7 +827,7 @@ describe.each(testMatrix())( const rejectable = createRejectable(); const handler = makeMockHandler('rpc', () => rejectable.promise); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -879,7 +880,7 @@ describe.each(testMatrix())( const rejectable = createRejectable(); const handler = makeMockHandler('stream', () => rejectable.promise); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -941,7 +942,7 @@ describe.each(testMatrix())( const rejectable = createRejectable(); const handler = makeMockHandler('upload', () => rejectable.promise); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -1006,7 +1007,7 @@ describe.each(testMatrix())( () => rejectable.promise, ); const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), diff --git a/__tests__/cleanup.test.ts b/__tests__/cleanup.test.ts index ef9b60bc..8366ef61 100644 --- a/__tests__/cleanup.test.ts +++ b/__tests__/cleanup.test.ts @@ -509,7 +509,7 @@ describe('request finishing triggers signal onabort', async () => { const procedureName = procedureType; const services = { - [serviceName]: ServiceSchema.define({ + [serviceName]: ServiceSchema.defineWithContext()({ // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any [procedureType]: (Procedure[procedureType] as any)({ requestInit: Type.Object({}), diff --git a/__tests__/context.test.ts b/__tests__/context.test.ts index c33e69ad..66cc4e65 100644 --- a/__tests__/context.test.ts +++ b/__tests__/context.test.ts @@ -40,23 +40,30 @@ describe('should handle incompatabilities', async () => { // setup const clientTransport = getClientTransport('client'); const serverTransport = getServerTransport(); + + interface ExtendedContext { + testctx: string; + } + const extendedContext: ExtendedContext = { + testctx: Math.random().toString(), + }; + const services = { - testservice: ServiceSchema.define({ + testservice: ServiceSchema.defineWithContext()({ testrpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.String(), handler: async ({ ctx }) => { - return Ok((ctx as unknown as typeof extendedContext).testctx); + return Ok(ctx.testctx); }, }), }), }; - const extendedContext = { testctx: Math.random().toString() }; createServer(serverTransport, services, { extendedContext, }); - const client = createClient( + const client = createClient( clientTransport, serverTransport.clientId, ); diff --git a/__tests__/e2e.test.ts b/__tests__/e2e.test.ts index 33e875a6..20b3c8a0 100644 --- a/__tests__/e2e.test.ts +++ b/__tests__/e2e.test.ts @@ -949,7 +949,7 @@ describe.each(testMatrix())( }); const services = { - test: ServiceSchema.define({ + test: ServiceSchema.defineWithContext()({ getData: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({ diff --git a/__tests__/invalid-request.test.ts b/__tests__/invalid-request.test.ts index fd324e16..30815a44 100644 --- a/__tests__/invalid-request.test.ts +++ b/__tests__/invalid-request.test.ts @@ -49,7 +49,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -99,7 +99,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -148,7 +148,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -197,7 +197,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -247,7 +247,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -299,7 +299,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({ mustSendThings: Type.String() }), requestData: Type.Object({}), @@ -349,7 +349,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({ mustSendThings: Type.String() }), @@ -417,7 +417,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -484,7 +484,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -548,7 +548,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -619,7 +619,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -706,7 +706,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -804,7 +804,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.define({ + service: ServiceSchema.defineWithContext()({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), diff --git a/__tests__/middleware.test.ts b/__tests__/middleware.test.ts index e0ede7f5..ebdb7518 100644 --- a/__tests__/middleware.test.ts +++ b/__tests__/middleware.test.ts @@ -244,7 +244,7 @@ describe('middleware test', () => { readByMiddlewareSignal: boolean; }>(); - const AsyncStorageSchemas = ServiceSchema.define({ + const AsyncStorageSchemas = ServiceSchema.defineWithContext()({ gimmeStore: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), diff --git a/__tests__/typescript-stress.test.ts b/__tests__/typescript-stress.test.ts index 204bcb21..f96a294f 100644 --- a/__tests__/typescript-stress.test.ts +++ b/__tests__/typescript-stress.test.ts @@ -42,6 +42,9 @@ const responseError = Type.Union([ const fnBody = Procedure.rpc< Record, + { + db: string; + }, typeof requestData, typeof responseData, typeof responseError @@ -61,7 +64,7 @@ const fnBody = Procedure.rpc< // typescript is limited to max 50 constraints // see: https://github.com/microsoft/TypeScript/issues/33541 // we should be able to support more than that due to how we make services -const StupidlyLargeServiceSchema = ServiceSchema.define({ +const StupidlyLargeServiceSchema = ServiceSchema.defineWithContext()({ f1: fnBody, f2: fnBody, f3: fnBody, @@ -207,7 +210,7 @@ describe("ensure typescript doesn't give up trying to infer the types for large }); const services = { - test: ServiceSchema.define({ + test: ServiceSchema.defineWithContext()({ rpc: Procedure.rpc({ requestInit: Type.Object({ n: Type.Number() }), responseData: Type.Object({ n: Type.Number() }), diff --git a/router/client.ts b/router/client.ts index 6cded350..5bd2aad3 100644 --- a/router/client.ts +++ b/router/client.ts @@ -140,9 +140,12 @@ type ServiceClient = { * @template Srv - The type of the server. */ export type Client< - Services extends AnyServiceSchemaMap, - IS extends - InstantiatedServiceSchemaMap = InstantiatedServiceSchemaMap, + ServiceContext extends object, + Services extends AnyServiceSchemaMap, + IS extends InstantiatedServiceSchemaMap< + ServiceContext, + Services + > = InstantiatedServiceSchemaMap, > = { [SvcName in keyof IS]: ServiceClient; }; @@ -204,7 +207,10 @@ const defaultClientOptions: ClientOptions = { * @param {Partial} providedClientOptions - The options for the client. * @returns The client for the server. */ -export function createClient( +export function createClient< + ServiceSchemaMap extends AnyServiceSchemaMap, + ServiceContext extends object = object, +>( transport: ClientTransport, serverId: TransportClientId, providedClientOptions: Partial< @@ -212,7 +218,7 @@ export function createClient( handshakeOptions: ClientHandshakeOptions; } > = {}, -): Client { +): Client { if (providedClientOptions.handshakeOptions) { transport.extendHandshake(providedClientOptions.handshakeOptions); } @@ -256,7 +262,7 @@ export function createClient( procName, callOptions ? (callOptions as CallOptions).signal : undefined, ); - }, []) as Client; + }, []) as Client; } type AnyProcReturn = diff --git a/router/context.ts b/router/context.ts index b91552e0..5007a9c6 100644 --- a/router/context.ts +++ b/router/context.ts @@ -5,27 +5,6 @@ import { ErrResult } from './result'; import { CancelErrorSchema } from './errors'; import { Static } from '@sinclair/typebox'; -/** - * ServiceContext exist for the purpose of declaration merging - * to extend the context with additional properties. - * - * For example: - * - * ```ts - * declare module '@replit/river' { - * interface ServiceContext { - * db: Database; - * } - * } - * - * createServer(someTransport, myServices, { extendedContext: { db: myDb } }); - * ``` - * - * Once you do this, your {@link ProcedureHandlerContext} will have `db` property on it. - */ -/* eslint-disable-next-line @typescript-eslint/no-empty-interface */ -export interface ServiceContext {} - /** * The parsed metadata schema for a service. This is the * return value of the {@link ServerHandshakeOptions.validate} @@ -49,7 +28,10 @@ export interface ParsedMetadata extends Record {} * This is passed to every procedure handler and contains various context-level * information and utilities. This may be extended, see {@link ServiceContext} */ -export type ProcedureHandlerContext = ServiceContext & { +export type ProcedureHandlerContext< + State, + ServiceContext = object, +> = ServiceContext & { /** * State for this service as defined by the service definition. */ diff --git a/router/index.ts b/router/index.ts index f7661bc3..ee5dc119 100644 --- a/router/index.ts +++ b/router/index.ts @@ -49,11 +49,7 @@ export type { MiddlewareParam, MiddlewareContext, } from './server'; -export type { - ParsedMetadata, - ServiceContext, - ProcedureHandlerContext, -} from './context'; +export type { ParsedMetadata, ProcedureHandlerContext } from './context'; export { Ok, Err } from './result'; export type { Result, diff --git a/router/procedures.ts b/router/procedures.ts index b4656dd1..2cc43bdd 100644 --- a/router/procedures.ts +++ b/router/procedures.ts @@ -50,6 +50,7 @@ export type Cancellable = T | Static; * @template ResponseErr - The TypeBox schema of the error object. */ export interface RpcProcedure< + Context, State, RequestInit extends PayloadType, ResponseData extends PayloadType, @@ -61,7 +62,7 @@ export interface RpcProcedure< responseError: ResponseErr; description?: string; handler(param: { - ctx: ProcedureHandlerContext; + ctx: ProcedureHandlerContext; reqInit: Static; }): Promise, Cancellable>>>; } @@ -77,6 +78,7 @@ export interface RpcProcedure< * @template ResponseErr - The TypeBox schema of the error object. */ export interface UploadProcedure< + Context, State, RequestInit extends PayloadType, RequestData extends PayloadType, @@ -90,7 +92,7 @@ export interface UploadProcedure< responseError: ResponseErr; description?: string; handler(param: { - ctx: ProcedureHandlerContext; + ctx: ProcedureHandlerContext; reqInit: Static; reqReadable: Readable< Static, @@ -108,6 +110,7 @@ export interface UploadProcedure< * @template ResponseErr - The TypeBox schema of the error object. */ export interface SubscriptionProcedure< + Context, State, RequestInit extends PayloadType, ResponseData extends PayloadType, @@ -119,7 +122,7 @@ export interface SubscriptionProcedure< responseError: ResponseErr; description?: string; handler(param: { - ctx: ProcedureHandlerContext; + ctx: ProcedureHandlerContext; reqInit: Static; resWritable: Writable< Result, Cancellable>> @@ -138,6 +141,7 @@ export interface SubscriptionProcedure< * @template ResponseErr - The TypeBox schema of the error object. */ export interface StreamProcedure< + Context, State, RequestInit extends PayloadType, RequestData extends PayloadType, @@ -151,7 +155,7 @@ export interface StreamProcedure< responseError: ResponseErr; description?: string; handler(param: { - ctx: ProcedureHandlerContext; + ctx: ProcedureHandlerContext; reqInit: Static; reqReadable: Readable< Static, @@ -179,6 +183,7 @@ export interface StreamProcedure< * @template ResponseData - The TypeBox schema of the response object. */ export type Procedure< + Context, State, Ty extends ValidProcType, RequestInit extends PayloadType, @@ -188,6 +193,7 @@ export type Procedure< > = { type: Ty } & (RequestData extends PayloadType ? Ty extends 'upload' ? UploadProcedure< + Context, State, RequestInit, RequestData, @@ -196,6 +202,7 @@ export type Procedure< > : Ty extends 'stream' ? StreamProcedure< + Context, State, RequestInit, RequestData, @@ -204,9 +211,15 @@ export type Procedure< > : never : Ty extends 'rpc' - ? RpcProcedure + ? RpcProcedure : Ty extends 'subscription' - ? SubscriptionProcedure + ? SubscriptionProcedure< + Context, + State, + RequestInit, + ResponseData, + ResponseErr + > : never); /** @@ -215,7 +228,8 @@ export type Procedure< * @template State - The context state object. You can provide this to constrain * the type of procedures. */ -export type AnyProcedure = Procedure< +export type AnyProcedure = Procedure< + Context, State, ValidProcType, PayloadType, @@ -230,7 +244,10 @@ export type AnyProcedure = Procedure< * @template State - The context state object. You can provide this to constrain * the type of procedures. */ -export type ProcedureMap = Record>; +export type ProcedureMap = Record< + string, + AnyProcedure +>; // typescript is funky so with these upcoming procedure constructors, the overloads // which handle the `init` case _must_ come first, otherwise the `init` property @@ -241,6 +258,7 @@ export type ProcedureMap = Record>; */ // signature: default errors function rpc< + Context, State, RequestInit extends PayloadType, ResponseData extends PayloadType, @@ -249,11 +267,18 @@ function rpc< responseData: ResponseData; responseError?: never; description?: string; - handler: RpcProcedure['handler']; -}): Branded>; + handler: RpcProcedure< + Context, + State, + RequestInit, + ResponseData, + TNever + >['handler']; +}): Branded>; // signature: explicit errors function rpc< + Context, State, RequestInit extends PayloadType, ResponseData extends PayloadType, @@ -264,12 +289,15 @@ function rpc< responseError: ResponseErr; description?: string; handler: RpcProcedure< + Context, State, RequestInit, ResponseData, ResponseErr >['handler']; -}): Branded>; +}): Branded< + RpcProcedure +>; // implementation function rpc({ @@ -284,6 +312,7 @@ function rpc({ responseError?: ProcedureErrorSchemaType; description?: string; handler: RpcProcedure< + object, object, PayloadType, PayloadType, @@ -305,6 +334,7 @@ function rpc({ */ // signature: init with default errors function upload< + Context, State, RequestInit extends PayloadType, RequestData extends PayloadType, @@ -316,6 +346,7 @@ function upload< responseError?: never; description?: string; handler: UploadProcedure< + Context, State, RequestInit, RequestData, @@ -323,11 +354,19 @@ function upload< TNever >['handler']; }): Branded< - UploadProcedure + UploadProcedure< + Context, + State, + RequestInit, + RequestData, + ResponseData, + TNever + > >; // signature: init with explicit errors function upload< + Context, State, RequestInit extends PayloadType, RequestData extends PayloadType, @@ -340,6 +379,7 @@ function upload< responseError: ResponseErr; description?: string; handler: UploadProcedure< + Context, State, RequestInit, RequestData, @@ -347,7 +387,14 @@ function upload< ResponseErr >['handler']; }): Branded< - UploadProcedure + UploadProcedure< + Context, + State, + RequestInit, + RequestData, + ResponseData, + ResponseErr + > >; // implementation @@ -365,6 +412,7 @@ function upload({ responseError?: ProcedureErrorSchemaType; description?: string; handler: UploadProcedure< + object, object, PayloadType, PayloadType, @@ -388,6 +436,7 @@ function upload({ */ // signature: default errors function subscription< + Context, State, RequestInit extends PayloadType, ResponseData extends PayloadType, @@ -397,15 +446,19 @@ function subscription< responseError?: never; description?: string; handler: SubscriptionProcedure< + Context, State, RequestInit, ResponseData, TNever >['handler']; -}): Branded>; +}): Branded< + SubscriptionProcedure +>; // signature: explicit errors function subscription< + Context, State, RequestInit extends PayloadType, ResponseData extends PayloadType, @@ -416,13 +469,14 @@ function subscription< responseError: ResponseErr; description?: string; handler: SubscriptionProcedure< + Context, State, RequestInit, ResponseData, ResponseErr >['handler']; }): Branded< - SubscriptionProcedure + SubscriptionProcedure >; // implementation @@ -438,6 +492,7 @@ function subscription({ responseError?: ProcedureErrorSchemaType; description?: string; handler: SubscriptionProcedure< + object, object, PayloadType, PayloadType, @@ -459,6 +514,7 @@ function subscription({ */ // signature: with default errors function stream< + Context, State, RequestInit extends PayloadType, RequestData extends PayloadType, @@ -470,6 +526,7 @@ function stream< responseError?: never; description?: string; handler: StreamProcedure< + Context, State, RequestInit, RequestData, @@ -477,11 +534,19 @@ function stream< TNever >['handler']; }): Branded< - StreamProcedure + StreamProcedure< + Context, + State, + RequestInit, + RequestData, + ResponseData, + TNever + > >; // signature: explicit errors function stream< + Context, State, RequestInit extends PayloadType, RequestData extends PayloadType, @@ -494,6 +559,7 @@ function stream< responseError: ResponseErr; description?: string; handler: StreamProcedure< + Context, State, RequestInit, RequestData, @@ -501,7 +567,14 @@ function stream< ResponseErr >['handler']; }): Branded< - StreamProcedure + StreamProcedure< + Context, + State, + RequestInit, + RequestData, + ResponseData, + ResponseErr + > >; // implementation @@ -519,6 +592,7 @@ function stream({ responseError?: ProcedureErrorSchemaType; description?: string; handler: StreamProcedure< + object, object, PayloadType, PayloadType, diff --git a/router/result.ts b/router/result.ts index 11abd837..3399a579 100644 --- a/router/result.ts +++ b/router/result.ts @@ -99,7 +99,7 @@ export type ResponseData< ProcedureName extends keyof RiverClient[ServiceName], Procedure = RiverClient[ServiceName][ProcedureName], Fn extends (...args: never) => unknown = (...args: never) => unknown, -> = RiverClient extends Client +> = RiverClient extends Client ? Procedure extends object ? Procedure extends object & { rpc: infer RpcFn extends Fn } ? Awaited> diff --git a/router/server.ts b/router/server.ts index b579ed02..c241a68e 100644 --- a/router/server.ts +++ b/router/server.ts @@ -28,11 +28,7 @@ import { ProtocolVersion, TransportClientId, } from '../transport/message'; -import { - ServiceContext, - ProcedureHandlerContext, - ParsedMetadata, -} from './context'; +import { ProcedureHandlerContext, ParsedMetadata } from './context'; import { Logger } from '../logging/log'; import { Value } from '@sinclair/typebox/value'; import { Err, Result, Ok, ErrResult } from './result'; @@ -57,11 +53,14 @@ type StreamId = string; * Represents a server with a set of services. Use {@link createServer} to create it. * @template Services - The type of services provided by the server. */ -export interface Server { +export interface Server< + ServiceContext extends object, + Services extends AnyServiceSchemaMap, +> { /** * Services defined for this server. */ - services: InstantiatedServiceSchemaMap; + services: InstantiatedServiceSchemaMap; /** * A set of stream ids that are currently open. */ @@ -70,7 +69,7 @@ export interface Server { close: () => Promise; } -interface StreamInitProps { +interface StreamInitProps { // msg derived streamId: StreamId; procedureName: string; @@ -104,8 +103,10 @@ interface ProcStream { handleSessionDisconnect: () => void; } -class RiverServer - implements Server +class RiverServer< + ServiceContext extends object, + Services extends AnyServiceSchemaMap, +> implements Server { private transport: ServerTransport; private contextMap: Map; @@ -123,7 +124,7 @@ class RiverServer private maxCancelledStreamTombstonesPerSession: number; public streams: Map; - public services: InstantiatedServiceSchemaMap; + public services: InstantiatedServiceSchemaMap; private unregisterTransportListeners: () => void; @@ -138,11 +139,16 @@ class RiverServer const instances: Record = {}; this.middlewares = middlewares; - this.services = instances as InstantiatedServiceSchemaMap; + this.services = instances as InstantiatedServiceSchemaMap< + ServiceContext, + Services + >; this.contextMap = new Map(); + extendedContext = extendedContext ?? ({} as ServiceContext); + for (const [name, service] of Object.entries(services)) { - const instance = service.instantiate(extendedContext ?? {}); + const instance = service.instantiate(extendedContext); instances[name] = instance; this.contextMap.set(instance, { @@ -246,7 +252,10 @@ class RiverServer this.transport.addEventListener('transportStatus', handleTransportStatus); } - private createNewProcStream(span: Span, props: StreamInitProps) { + private createNewProcStream( + span: Span, + props: StreamInitProps, + ) { const { streamId, initialSession, @@ -700,7 +709,7 @@ class RiverServer private validateNewProcStream( initMessage: OpaqueTransportMessage, - ): StreamInitProps | null { + ): StreamInitProps | null { // lifetime safety: this is a sync function so this session cant transition // to another state before we finish const session = this.transport.sessions.get(initMessage.from); @@ -1039,7 +1048,10 @@ export type Middleware = (param: MiddlewareParam) => void; * @param extendedContext - An optional object containing additional context to be passed to all services. * @returns A promise that resolves to a server instance with the registered services. */ -export function createServer( +export function createServer< + ServiceContext extends object, + Services extends AnyServiceSchemaMap, +>( transport: ServerTransport, services: Services, providedServerOptions?: Partial<{ @@ -1055,7 +1067,7 @@ export function createServer( */ middlewares?: Array; }>, -): Server { +): Server { return new RiverServer( transport, services, diff --git a/router/services.ts b/router/services.ts index e575ef0b..a5663f17 100644 --- a/router/services.ts +++ b/router/services.ts @@ -6,7 +6,6 @@ import { AnyProcedure, PayloadType, } from './procedures'; -import { ServiceContext } from './context'; import { flattenErrorType, ProcedureErrorSchemaType, @@ -19,8 +18,9 @@ import { * You shouldn't construct these directly, use {@link ServiceSchema} instead. */ export interface Service< + Context, State extends object, - Procs extends ProcedureMap, + Procs extends ProcedureMap, > { readonly state: State; readonly procedures: Procs; @@ -30,17 +30,19 @@ export interface Service< /** * Represents any {@link Service} object. */ -export type AnyService = Service; +export type AnyService = Service; /** * Represents any {@link ServiceSchema} object. */ -export type AnyServiceSchema = ServiceSchema; +export type AnyServiceSchema = + ServiceSchema; /** * A dictionary of {@link ServiceSchema}s, where the key is the service name. */ -export type AnyServiceSchemaMap = Record; +export type AnyServiceSchemaMap = + Record>; // This has the secret sauce to keep go to definition working, the structure is // somewhat delicate, so be careful when modifying it. Would be nice to add a @@ -49,9 +51,12 @@ export type AnyServiceSchemaMap = Record; * Takes a {@link AnyServiceSchemaMap} and returns a dictionary of instantiated * services. */ -export type InstantiatedServiceSchemaMap = { - [K in keyof T]: T[K] extends ServiceSchema - ? Service +export type InstantiatedServiceSchemaMap< + ServiceContext extends object, + T extends AnyServiceSchemaMap, +> = { + [K in keyof T]: T[K] extends ServiceSchema + ? Service : never; }; @@ -123,7 +128,10 @@ export type ProcType< * A list of procedures where every procedure is "branded", as-in the procedure * was created via the {@link Procedure} constructors. */ -type BrandedProcedureMap = Record>>; +type BrandedProcedureMap = Record< + string, + Branded> +>; type MaybeDisposable = State & { [Symbol.asyncDispose]?: () => Promise; @@ -133,11 +141,14 @@ type MaybeDisposable = State & { /** * The configuration for a service. */ -export interface ServiceConfiguration { +export interface ServiceConfiguration< + Context extends object, + State extends object, +> { /** * A factory function for creating a fresh state. */ - initializeState: (extendedContext: ServiceContext) => MaybeDisposable; + initializeState: (extendedContext: Context) => MaybeDisposable; } // TODO remove once clients migrate to v2 @@ -169,6 +180,27 @@ export function Strict(schema: T): T { return JSON.parse(JSON.stringify(schema)) as T; } +interface DefineWithContext { + < + State extends object, + Procedures extends BrandedProcedureMap, + >( + config: ServiceConfiguration, + procedures: Procedures, + ): ServiceSchema< + Context, + State, + { [K in keyof Procedures]: Unbranded } + >; + >( + procedures: Procedures, + ): ServiceSchema< + Context, + object, + { [K in keyof Procedures]: Unbranded } + >; +} + // TODO remove once clients migrate to v2 /** * Same as {@link serializeSchema} but with a format that is compatible with @@ -260,14 +292,15 @@ export function serializeSchema( * When defining procedures, use the {@link Procedure} constructors to create them. */ export class ServiceSchema< + Context extends object, State extends object, - Procedures extends ProcedureMap, + Procedures extends ProcedureMap, > { /** * Factory function for creating a fresh state. */ protected readonly initializeState: ( - extendedContext: ServiceContext, + extendedContext: Context, ) => MaybeDisposable; /** @@ -280,7 +313,7 @@ export class ServiceSchema< * @param procedures - The procedures for this service. */ protected constructor( - config: ServiceConfiguration, + config: ServiceConfiguration, procedures: Procedures, ) { this.initializeState = config.initializeState; @@ -341,7 +374,9 @@ export class ServiceSchema< * Depending on your preferences, this may be a more appealing way to define * a schema versus using the {@link ServiceSchema.define} method. */ - static scaffold(config: ServiceConfiguration) { + static scaffold( + config: ServiceConfiguration, + ) { return new ServiceScaffold(config); } @@ -374,12 +409,14 @@ export class ServiceSchema< * ``` */ static define< + Context extends object, State extends object, - Procedures extends BrandedProcedureMap, + Procedures extends BrandedProcedureMap, >( - config: ServiceConfiguration, + config: ServiceConfiguration, procedures: Procedures, ): ServiceSchema< + Context, State, { [K in keyof Procedures]: Unbranded } >; @@ -405,21 +442,26 @@ export class ServiceSchema< * }), * }); */ - static define>>( + + static define< + Context extends object, + Procedures extends BrandedProcedureMap, + >( procedures: Procedures, ): ServiceSchema< - Record, + Context, + object, { [K in keyof Procedures]: Unbranded } >; // actual implementation static define( configOrProcedures: - | ServiceConfiguration - | BrandedProcedureMap, - maybeProcedures?: BrandedProcedureMap, - ): ServiceSchema { - let config: ServiceConfiguration; - let procedures: BrandedProcedureMap; + | ServiceConfiguration + | BrandedProcedureMap, + maybeProcedures?: BrandedProcedureMap, + ): ServiceSchema { + let config: ServiceConfiguration; + let procedures: BrandedProcedureMap; if ( 'initializeState' in configOrProcedures && @@ -429,16 +471,47 @@ export class ServiceSchema< throw new Error('Expected procedures to be defined'); } - config = configOrProcedures as ServiceConfiguration; + config = configOrProcedures as ServiceConfiguration; procedures = maybeProcedures; } else { config = { initializeState: () => ({}) }; - procedures = configOrProcedures as BrandedProcedureMap; + procedures = configOrProcedures as BrandedProcedureMap; } return new ServiceSchema(config, procedures); } + static defineWithContext< + Context extends object = object, + >(): DefineWithContext { + return function ( + configOrProcedures: + | ServiceConfiguration + | BrandedProcedureMap, + maybeProcedures?: BrandedProcedureMap, + ): ServiceSchema> { + let config: ServiceConfiguration; + let procedures: BrandedProcedureMap; + + if ( + 'initializeState' in configOrProcedures && + typeof configOrProcedures.initializeState === 'function' + ) { + if (!maybeProcedures) { + throw new Error('Expected procedures to be defined'); + } + + config = configOrProcedures as ServiceConfiguration; + procedures = maybeProcedures; + } else { + config = { initializeState: () => ({}) }; + procedures = configOrProcedures as BrandedProcedureMap; + } + + return new ServiceSchema(config, procedures); + }; + } + /** * Serializes this schema's procedures into a plain object that is JSON compatible. */ @@ -528,7 +601,7 @@ export class ServiceSchema< * You probably don't need this, usually the River server will handle this * for you. */ - instantiate(extendedContext: ServiceContext): Service { + instantiate(extendedContext: Context): Service { const state = this.initializeState(extendedContext); const dispose = async () => { await state[Symbol.asyncDispose]?.(); @@ -566,16 +639,16 @@ function getSerializedProcErrors( * @see {@link ServiceSchema.scaffold} */ // note that this isn't exported -class ServiceScaffold { +class ServiceScaffold { /** * The configuration for this service. */ - protected readonly config: ServiceConfiguration; + protected readonly config: ServiceConfiguration; /** * @param config - The configuration for this service. */ - constructor(config: ServiceConfiguration) { + constructor(config: ServiceConfiguration) { this.config = config; } @@ -599,7 +672,7 @@ class ServiceScaffold { * * @param procedures - The procedures for this service. */ - procedures>(procedures: T): T { + procedures>(procedures: T): T { return procedures; } @@ -621,9 +694,9 @@ class ServiceScaffold { * }); * ``` */ - finalize>( + finalize>( procedures: T, - ): ServiceSchema }> { + ): ServiceSchema }> { return ServiceSchema.define(this.config, procedures); } } diff --git a/testUtil/fixtures/cleanup.ts b/testUtil/fixtures/cleanup.ts index 68a7a03a..af08e79a 100644 --- a/testUtil/fixtures/cleanup.ts +++ b/testUtil/fixtures/cleanup.ts @@ -84,7 +84,9 @@ export async function ensureTransportBuffersAreEventuallyEmpty( ); } -export async function ensureServerIsClean(s: Server) { +export async function ensureServerIsClean( + s: Server, +) { return waitFor(() => expect( s.streams, @@ -111,7 +113,7 @@ export async function testFinishesCleanly({ }: Partial<{ clientTransports: Array>; serverTransport: ServerTransport; - server: Server; + server: Server; }>) { // pre-close invariants // invariant check servers first as heartbeats are authoritative on their side diff --git a/testUtil/fixtures/services.ts b/testUtil/fixtures/services.ts index 971ac964..94cf352a 100644 --- a/testUtil/fixtures/services.ts +++ b/testUtil/fixtures/services.ts @@ -127,7 +127,7 @@ export const TestServiceSchema = TestServiceScaffold.finalize({ ...testServiceProcedures, }); -export const OrderingServiceSchema = ServiceSchema.define( +export const OrderingServiceSchema = ServiceSchema.defineWithContext()( { initializeState: () => ({ msgs: [] as Array }) }, { add: Procedure.rpc({ @@ -150,7 +150,7 @@ export const OrderingServiceSchema = ServiceSchema.define( }, ); -export const BinaryFileServiceSchema = ServiceSchema.define({ +export const BinaryFileServiceSchema = ServiceSchema.defineWithContext()({ getFile: Procedure.rpc({ requestInit: Type.Object({ file: Type.String() }), responseData: Type.Object({ contents: Type.Uint8Array() }), @@ -165,7 +165,7 @@ export const BinaryFileServiceSchema = ServiceSchema.define({ export const DIV_BY_ZERO = 'DIV_BY_ZERO'; export const STREAM_ERROR = 'STREAM_ERROR'; -export const FallibleServiceSchema = ServiceSchema.define({ +export const FallibleServiceSchema = ServiceSchema.defineWithContext()({ divide: Procedure.rpc({ requestInit: Type.Object({ a: Type.Number(), b: Type.Number() }), responseData: Type.Object({ result: Type.Number() }), @@ -232,7 +232,7 @@ export const FallibleServiceSchema = ServiceSchema.define({ }), }); -export const SubscribableServiceSchema = ServiceSchema.define( +export const SubscribableServiceSchema = ServiceSchema.defineWithContext()( { initializeState: () => ({ count: new Observable(0) }) }, { add: Procedure.rpc({ @@ -259,7 +259,7 @@ export const SubscribableServiceSchema = ServiceSchema.define( }, ); -export const UploadableServiceSchema = ServiceSchema.define({ +export const UploadableServiceSchema = ServiceSchema.defineWithContext()({ addMultiple: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({ n: Type.Number() }), @@ -315,7 +315,26 @@ const RecursivePayload = Type.Recursive((This) => }), ); -export const NonObjectSchemas = ServiceSchema.define({ +ServiceSchema.defineWithContext<{ + db: string; +}>()( + { + initializeState: () => ({ a: 'test' }), + }, + { + add: Procedure.rpc({ + requestInit: Type.Number(), + responseData: Type.Number(), + async handler({ ctx, reqInit }) { + ctx.db; + + return Ok(reqInit + 1); + }, + }), + }, +); + +export const NonObjectSchemas = ServiceSchema.defineWithContext()({ add: Procedure.rpc({ requestInit: Type.Number(), responseData: Type.Number(), @@ -334,7 +353,7 @@ export const NonObjectSchemas = ServiceSchema.define({ }); export function SchemaWithDisposableState(dispose: () => void) { - return ServiceSchema.define( + return ServiceSchema.defineWithContext()( { initializeState: () => ({ [Symbol.dispose]: dispose }) }, { add: Procedure.rpc({ From 2eb13101617ced7e1a070e6448cfd4222e83d60e Mon Sep 17 00:00:00 2001 From: Dawei Date: Mon, 9 Jun 2025 18:24:19 -0700 Subject: [PATCH 02/12] Use builder pattern for service context --- __tests__/cancellation.test.ts | 34 +- __tests__/cleanup.test.ts | 13 +- __tests__/context.test.ts | 25 +- __tests__/e2e.test.ts | 4 +- __tests__/invalid-request.test.ts | 28 +- __tests__/middleware.test.ts | 4 +- __tests__/typescript-stress.test.ts | 6 +- router/client.ts | 28 +- router/context.ts | 7 +- router/index.ts | 2 +- router/server.ts | 45 +-- router/services.ts | 607 +++++++++++++--------------- testUtil/fixtures/services.ts | 44 +- 13 files changed, 397 insertions(+), 450 deletions(-) diff --git a/__tests__/cancellation.test.ts b/__tests__/cancellation.test.ts index 4ee18beb..aa87c950 100644 --- a/__tests__/cancellation.test.ts +++ b/__tests__/cancellation.test.ts @@ -4,9 +4,9 @@ import { Err, Ok, Procedure, - ServiceSchema, ValidProcType, createClient, + createServiceSchema, createServer, } from '../router'; import { testMatrix } from '../testUtil/fixtures/matrix'; @@ -67,7 +67,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -111,7 +111,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -165,7 +165,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -215,7 +215,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -280,7 +280,7 @@ describe.each(testMatrix())( const serverTransport = getServerTransport(); const handler = makeMockHandler('rpc'); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -338,7 +338,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('stream'); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -412,7 +412,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('upload'); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -483,7 +483,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('subscription'); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -570,7 +570,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('rpc'); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -624,7 +624,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('stream'); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -684,7 +684,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('upload'); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -743,7 +743,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('subscription'); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -827,7 +827,7 @@ describe.each(testMatrix())( const rejectable = createRejectable(); const handler = makeMockHandler('rpc', () => rejectable.promise); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -880,7 +880,7 @@ describe.each(testMatrix())( const rejectable = createRejectable(); const handler = makeMockHandler('stream', () => rejectable.promise); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -942,7 +942,7 @@ describe.each(testMatrix())( const rejectable = createRejectable(); const handler = makeMockHandler('upload', () => rejectable.promise); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -1007,7 +1007,7 @@ describe.each(testMatrix())( () => rejectable.promise, ); const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), diff --git a/__tests__/cleanup.test.ts b/__tests__/cleanup.test.ts index 8366ef61..190244b2 100644 --- a/__tests__/cleanup.test.ts +++ b/__tests__/cleanup.test.ts @@ -15,8 +15,8 @@ import { Ok, Procedure, ProcedureHandlerContext, - ServiceSchema, createClient, + createServiceSchema, createServer, } from '../router'; import { @@ -503,13 +503,14 @@ describe('request finishing triggers signal onabort', async () => { ] as const)('handler aborts $procedureType', async ({ procedureType }) => { const clientTransport = getClientTransport('client'); const serverTransport = getServerTransport(); - const handler = vi.fn<(ctx: ProcedureHandlerContext) => void>(); + const handler = + vi.fn<(ctx: ProcedureHandlerContext) => void>(); const serverId = serverTransport.clientId; const serviceName = 'service'; const procedureName = procedureType; const services = { - [serviceName]: ServiceSchema.defineWithContext()({ + [serviceName]: createServiceSchema().define({ // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any [procedureType]: (Procedure[procedureType] as any)({ requestInit: Type.Object({}), @@ -519,7 +520,11 @@ describe('request finishing triggers signal onabort', async () => { } : {}), responseData: Type.Object({}), - async handler({ ctx }: { ctx: ProcedureHandlerContext }) { + async handler({ + ctx, + }: { + ctx: ProcedureHandlerContext; + }) { handler(ctx); return new Promise(() => { diff --git a/__tests__/context.test.ts b/__tests__/context.test.ts index 66cc4e65..f121e462 100644 --- a/__tests__/context.test.ts +++ b/__tests__/context.test.ts @@ -5,14 +5,9 @@ import { } from '../testUtil/fixtures/cleanup'; import { testMatrix } from '../testUtil/fixtures/matrix'; import { TestSetupHelpers } from '../testUtil/fixtures/transports'; -import { - Ok, - Procedure, - ServiceSchema, - createClient, - createServer, -} from '../router'; +import { Ok, Procedure, createClient, createServer } from '../router'; import { Type } from '@sinclair/typebox'; +import { createServiceSchema } from '../router/services'; describe('should handle incompatabilities', async () => { const { addPostTestCleanup, postTestCleanup } = createPostTestCleanups(); @@ -48,8 +43,10 @@ describe('should handle incompatabilities', async () => { testctx: Math.random().toString(), }; + const serviceSchema = createServiceSchema(); + const services = { - testservice: ServiceSchema.defineWithContext()({ + testservice: serviceSchema.define({ testrpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.String(), @@ -63,7 +60,7 @@ describe('should handle incompatabilities', async () => { createServer(serverTransport, services, { extendedContext, }); - const client = createClient( + const client = createClient( clientTransport, serverTransport.clientId, ); @@ -81,9 +78,15 @@ describe('should handle incompatabilities', async () => { const clientTransport = getClientTransport('client'); const serverTransport = getServerTransport(); - const TestServiceScaffold = ServiceSchema.scaffold({ + interface ExtendedContext { + testctx: string; + } + + const serviceSchema = createServiceSchema(); + + const TestServiceScaffold = serviceSchema.scaffold({ initializeState: (ctx) => ({ - fromctx: (ctx as unknown as typeof extendedContext).testctx, + fromctx: ctx.testctx, }), }); const services = { diff --git a/__tests__/e2e.test.ts b/__tests__/e2e.test.ts index 20b3c8a0..bd4e60d7 100644 --- a/__tests__/e2e.test.ts +++ b/__tests__/e2e.test.ts @@ -31,7 +31,7 @@ import { testMatrix } from '../testUtil/fixtures/matrix'; import { Type } from '@sinclair/typebox'; import { Procedure, - ServiceSchema, + createServiceSchema, Ok, UNCAUGHT_ERROR_CODE, CANCEL_CODE, @@ -949,7 +949,7 @@ describe.each(testMatrix())( }); const services = { - test: ServiceSchema.defineWithContext()({ + test: createServiceSchema().define({ getData: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({ diff --git a/__tests__/invalid-request.test.ts b/__tests__/invalid-request.test.ts index 30815a44..e04406c7 100644 --- a/__tests__/invalid-request.test.ts +++ b/__tests__/invalid-request.test.ts @@ -5,8 +5,8 @@ import { Ok, OkResult, Procedure, - ServiceSchema, createClient, + createServiceSchema, createServer, } from '../router'; import { testMatrix } from '../testUtil/fixtures/matrix'; @@ -49,7 +49,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -99,7 +99,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -148,7 +148,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -197,7 +197,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -247,7 +247,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -299,7 +299,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({ mustSendThings: Type.String() }), requestData: Type.Object({}), @@ -349,7 +349,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({ mustSendThings: Type.String() }), @@ -417,7 +417,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -484,7 +484,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -548,7 +548,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -619,7 +619,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -706,7 +706,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -804,7 +804,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: ServiceSchema.defineWithContext()({ + service: createServiceSchema().define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), diff --git a/__tests__/middleware.test.ts b/__tests__/middleware.test.ts index ebdb7518..37334dab 100644 --- a/__tests__/middleware.test.ts +++ b/__tests__/middleware.test.ts @@ -12,7 +12,7 @@ import { createServer, Ok, Procedure, - ServiceSchema, + createServiceSchema, Middleware, } from '../router'; import { createMockTransportNetwork } from '../testUtil/fixtures/mockTransport'; @@ -244,7 +244,7 @@ describe('middleware test', () => { readByMiddlewareSignal: boolean; }>(); - const AsyncStorageSchemas = ServiceSchema.defineWithContext()({ + const AsyncStorageSchemas = createServiceSchema().define({ gimmeStore: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), diff --git a/__tests__/typescript-stress.test.ts b/__tests__/typescript-stress.test.ts index f96a294f..35423f10 100644 --- a/__tests__/typescript-stress.test.ts +++ b/__tests__/typescript-stress.test.ts @@ -1,6 +1,6 @@ import { describe, expect, test } from 'vitest'; import { Procedure } from '../router/procedures'; -import { ServiceSchema } from '../router/services'; +import { createServiceSchema } from '../router/services'; import { Type } from '@sinclair/typebox'; import { createServer } from '../router/server'; import { createClient } from '../router/client'; @@ -64,7 +64,7 @@ const fnBody = Procedure.rpc< // typescript is limited to max 50 constraints // see: https://github.com/microsoft/TypeScript/issues/33541 // we should be able to support more than that due to how we make services -const StupidlyLargeServiceSchema = ServiceSchema.defineWithContext()({ +const StupidlyLargeServiceSchema = createServiceSchema().define({ f1: fnBody, f2: fnBody, f3: fnBody, @@ -210,7 +210,7 @@ describe("ensure typescript doesn't give up trying to infer the types for large }); const services = { - test: ServiceSchema.defineWithContext()({ + test: createServiceSchema().define({ rpc: Procedure.rpc({ requestInit: Type.Object({ n: Type.Number() }), responseData: Type.Object({ n: Type.Number() }), diff --git a/router/client.ts b/router/client.ts index 5bd2aad3..804518c8 100644 --- a/router/client.ts +++ b/router/client.ts @@ -140,12 +140,20 @@ type ServiceClient = { * @template Srv - The type of the server. */ export type Client< - ServiceContext extends object, - Services extends AnyServiceSchemaMap, + // Context is a server-side implementation detail that doesn't affect the client interface + // eslint-disable-next-line @typescript-eslint/no-explicit-any + Services extends AnyServiceSchemaMap, IS extends InstantiatedServiceSchemaMap< - ServiceContext, + // Context is a server-side implementation detail that doesn't affect the client interface + // eslint-disable-next-line @typescript-eslint/no-explicit-any + any, Services - > = InstantiatedServiceSchemaMap, + > = InstantiatedServiceSchemaMap< + // Context is a server-side implementation detail that doesn't affect the client interface + // eslint-disable-next-line @typescript-eslint/no-explicit-any + any, + Services + >, > = { [SvcName in keyof IS]: ServiceClient; }; @@ -207,10 +215,10 @@ const defaultClientOptions: ClientOptions = { * @param {Partial} providedClientOptions - The options for the client. * @returns The client for the server. */ -export function createClient< - ServiceSchemaMap extends AnyServiceSchemaMap, - ServiceContext extends object = object, ->( +// We are using any here because the ServiceContext is a server-side implementation +// detail that doesn't affect the client interface +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function createClient>( transport: ClientTransport, serverId: TransportClientId, providedClientOptions: Partial< @@ -218,7 +226,7 @@ export function createClient< handshakeOptions: ClientHandshakeOptions; } > = {}, -): Client { +): Client { if (providedClientOptions.handshakeOptions) { transport.extendHandshake(providedClientOptions.handshakeOptions); } @@ -262,7 +270,7 @@ export function createClient< procName, callOptions ? (callOptions as CallOptions).signal : undefined, ); - }, []) as Client; + }, []) as Client; } type AnyProcReturn = diff --git a/router/context.ts b/router/context.ts index 5007a9c6..e97324af 100644 --- a/router/context.ts +++ b/router/context.ts @@ -26,12 +26,9 @@ export interface ParsedMetadata extends Record {} /** * This is passed to every procedure handler and contains various context-level - * information and utilities. This may be extended, see {@link ServiceContext} + * information and utilities. */ -export type ProcedureHandlerContext< - State, - ServiceContext = object, -> = ServiceContext & { +export type ProcedureHandlerContext = Context & { /** * State for this service as defined by the service definition. */ diff --git a/router/index.ts b/router/index.ts index ee5dc119..ece437cb 100644 --- a/router/index.ts +++ b/router/index.ts @@ -9,7 +9,7 @@ export type { ProcType, } from './services'; export { - ServiceSchema, + createServiceSchema, serializeSchema, SerializedServerSchema, SerializedServiceSchema, diff --git a/router/server.ts b/router/server.ts index c241a68e..786b34fc 100644 --- a/router/server.ts +++ b/router/server.ts @@ -54,13 +54,13 @@ type StreamId = string; * @template Services - The type of services provided by the server. */ export interface Server< - ServiceContext extends object, - Services extends AnyServiceSchemaMap, + Context extends object, + Services extends AnyServiceSchemaMap, > { /** * Services defined for this server. */ - services: InstantiatedServiceSchemaMap; + services: InstantiatedServiceSchemaMap; /** * A set of stream ids that are currently open. */ @@ -69,7 +69,7 @@ export interface Server< close: () => Promise; } -interface StreamInitProps { +interface StreamInitProps { // msg derived streamId: StreamId; procedureName: string; @@ -81,7 +81,7 @@ interface StreamInitProps { procClosesWithInit: boolean; // server level - serviceContext: ServiceContext & { state: object }; + serviceContext: Context & { state: object }; procedure: AnyProcedure; sessionMetadata: ParsedMetadata; @@ -104,12 +104,12 @@ interface ProcStream { } class RiverServer< - ServiceContext extends object, - Services extends AnyServiceSchemaMap, -> implements Server + Context extends object, + Services extends AnyServiceSchemaMap, +> implements Server { private transport: ServerTransport; - private contextMap: Map; + private contextMap: Map; private log?: Logger; private middlewares: Array; @@ -124,7 +124,7 @@ class RiverServer< private maxCancelledStreamTombstonesPerSession: number; public streams: Map; - public services: InstantiatedServiceSchemaMap; + public services: InstantiatedServiceSchemaMap; private unregisterTransportListeners: () => void; @@ -132,7 +132,7 @@ class RiverServer< transport: ServerTransport, services: Services, handshakeOptions?: ServerHandshakeOptions, - extendedContext?: ServiceContext, + extendedContext?: Context, maxCancelledStreamTombstonesPerSession = 200, middlewares: Array = [], ) { @@ -140,12 +140,12 @@ class RiverServer< this.middlewares = middlewares; this.services = instances as InstantiatedServiceSchemaMap< - ServiceContext, + Context, Services >; this.contextMap = new Map(); - extendedContext = extendedContext ?? ({} as ServiceContext); + extendedContext = extendedContext ?? ({} as Context); for (const [name, service] of Object.entries(services)) { const instance = service.instantiate(extendedContext); @@ -252,10 +252,7 @@ class RiverServer< this.transport.addEventListener('transportStatus', handleTransportStatus); } - private createNewProcStream( - span: Span, - props: StreamInitProps, - ) { + private createNewProcStream(span: Span, props: StreamInitProps) { const { streamId, initialSession, @@ -570,7 +567,7 @@ class RiverServer< closeReadable(); } - const handlerContextWithSpan: ProcedureHandlerContext = { + const handlerContextWithSpan: ProcedureHandlerContext = { ...serviceContext, from: from, sessionId, @@ -709,7 +706,7 @@ class RiverServer< private validateNewProcStream( initMessage: OpaqueTransportMessage, - ): StreamInitProps | null { + ): StreamInitProps | null { // lifetime safety: this is a sync function so this session cant transition // to another state before we finish const session = this.transport.sessions.get(initMessage.from); @@ -1021,7 +1018,7 @@ function getStreamCloseBackwardsCompat(protocolVersion: ProtocolVersion) { } export interface MiddlewareContext - extends Readonly, 'cancel'>> { + extends Readonly, 'cancel'>> { readonly streamId: StreamId; readonly procedureName: string; readonly serviceName: string; @@ -1049,14 +1046,14 @@ export type Middleware = (param: MiddlewareParam) => void; * @returns A promise that resolves to a server instance with the registered services. */ export function createServer< - ServiceContext extends object, - Services extends AnyServiceSchemaMap, + Context extends object, + Services extends AnyServiceSchemaMap, >( transport: ServerTransport, services: Services, providedServerOptions?: Partial<{ handshakeOptions?: ServerHandshakeOptions; - extendedContext?: ServiceContext; + extendedContext?: Context; /** * Maximum number of cancelled streams to keep track of to avoid * cascading stream errors. @@ -1067,7 +1064,7 @@ export function createServer< */ middlewares?: Array; }>, -): Server { +): Server { return new RiverServer( transport, services, diff --git a/router/services.ts b/router/services.ts index a5663f17..75eef424 100644 --- a/router/services.ts +++ b/router/services.ts @@ -35,14 +35,17 @@ export type AnyService = Service; /** * Represents any {@link ServiceSchema} object. */ -export type AnyServiceSchema = - ServiceSchema; +export type AnyServiceSchema = InstanceType< + ReturnType> +>; /** * A dictionary of {@link ServiceSchema}s, where the key is the service name. */ -export type AnyServiceSchemaMap = - Record>; +export type AnyServiceSchemaMap = Record< + string, + AnyServiceSchema +>; // This has the secret sauce to keep go to definition working, the structure is // somewhat delicate, so be careful when modifying it. Would be nice to add a @@ -52,11 +55,22 @@ export type AnyServiceSchemaMap = * services. */ export type InstantiatedServiceSchemaMap< - ServiceContext extends object, - T extends AnyServiceSchemaMap, + Context extends object, + T extends AnyServiceSchemaMap, > = { - [K in keyof T]: T[K] extends ServiceSchema - ? Service + [K in keyof T]: T[K] extends AnyServiceSchema + ? T[K] extends { + initializeState: (ctx: Context) => infer S; + procedures: infer P; + } + ? Service< + Context, + S extends object ? S : object, + P extends ProcedureMap + ? P + : ProcedureMap + > + : never : never; }; @@ -180,27 +194,6 @@ export function Strict(schema: T): T { return JSON.parse(JSON.stringify(schema)) as T; } -interface DefineWithContext { - < - State extends object, - Procedures extends BrandedProcedureMap, - >( - config: ServiceConfiguration, - procedures: Procedures, - ): ServiceSchema< - Context, - State, - { [K in keyof Procedures]: Unbranded } - >; - >( - procedures: Procedures, - ): ServiceSchema< - Context, - object, - { [K in keyof Procedures]: Unbranded } - >; -} - // TODO remove once clients migrate to v2 /** * Same as {@link serializeSchema} but with a format that is compatible with @@ -273,223 +266,187 @@ export function serializeSchema( return schema; } -/** - * The schema for a {@link Service}. This is used to define a service, specifically - * its initial state and procedures. - * - * There are two ways to define a service: - * 1. the {@link ServiceSchema.define} static method, which takes a configuration and - * a list of procedures directly. Use this to ergonomically define a service schema - * in one go. Good for smaller services, especially if they're stateless. - * 2. the {@link ServiceSchema.scaffold} static method, which creates a scaffold that - * can be used to define procedures separately from the configuration. Use this to - * better organize your service's definition, especially if it's a large service. - * You can also use it in a builder pattern to define the service in a more - * fluent way. - * - * See the static methods for more information and examples. - * - * When defining procedures, use the {@link Procedure} constructors to create them. - */ -export class ServiceSchema< - Context extends object, - State extends object, - Procedures extends ProcedureMap, -> { - /** - * Factory function for creating a fresh state. - */ - protected readonly initializeState: ( - extendedContext: Context, - ) => MaybeDisposable; - - /** - * The procedures for this service. - */ - readonly procedures: Procedures; - - /** - * @param config - The configuration for this service. - * @param procedures - The procedures for this service. - */ - protected constructor( - config: ServiceConfiguration, - procedures: Procedures, - ) { - this.initializeState = config.initializeState; - this.procedures = procedures; - } - - /** - * Creates a {@link ServiceScaffold}, which can be used to define procedures - * that can then be merged into a {@link ServiceSchema}, via the scaffold's - * `finalize` method. - * - * There are two patterns that work well with this method. The first is using - * it to separate the definition of procedures from the definition of the - * service's configuration: - * ```ts - * const MyServiceScaffold = ServiceSchema.scaffold({ - * initializeState: () => ({ count: 0 }), - * }); - * - * const incrementProcedures = MyServiceScaffold.procedures({ - * increment: Procedure.rpc({ - * requestInit: Type.Object({ amount: Type.Number() }), - * responseData: Type.Object({ current: Type.Number() }), - * async handler(ctx, init) { - * ctx.state.count += init.amount; - * return Ok({ current: ctx.state.count }); - * } - * }), - * }) - * - * const MyService = MyServiceScaffold.finalize({ - * ...incrementProcedures, - * // you can also directly define procedures here - * }); - * ``` - * This might be really handy if you have a very large service and you're - * wanting to split it over multiple files. You can define the scaffold - * in one file, and then import that scaffold in other files where you - * define procedures - and then finally import the scaffolds and your - * procedure objects in a final file where you finalize the scaffold into - * a service schema. - * - * The other way is to use it like in a builder pattern: - * ```ts - * const MyService = ServiceSchema - * .scaffold({ initializeState: () => ({ count: 0 }) }) - * .finalize({ - * increment: Procedure.rpc({ - * requestInit: Type.Object({ amount: Type.Number() }), - * responseData: Type.Object({ current: Type.Number() }), - * async handler(ctx, init) { - * ctx.state.count += init.amount; - * return Ok({ current: ctx.state.count }); - * } - * }), - * }) - * ``` - * Depending on your preferences, this may be a more appealing way to define - * a schema versus using the {@link ServiceSchema.define} method. - */ - static scaffold( - config: ServiceConfiguration, - ) { - return new ServiceScaffold(config); - } - +export function createServiceSchema() { /** - * Creates a new {@link ServiceSchema} with the given configuration and procedures. - * - * All procedures must be created with the {@link Procedure} constructors. + * The schema for a {@link Service}. This is used to define a service, specifically + * its initial state and procedures. * - * NOTE: There is an overload that lets you just provide the procedures alone if your - * service has no state. + * There are two ways to define a service: + * 1. the {@link ServiceSchema.define} static method, which takes a configuration and + * a list of procedures directly. Use this to ergonomically define a service schema + * in one go. Good for smaller services, especially if they're stateless. + * 2. the {@link ServiceSchema.scaffold} static method, which creates a scaffold that + * can be used to define procedures separately from the configuration. Use this to + * better organize your service's definition, especially if it's a large service. + * You can also use it in a builder pattern to define the service in a more + * fluent way. * - * @param config - The configuration for this service. - * @param procedures - The procedures for this service. + * See the static methods for more information and examples. * - * @example - * ``` - * const service = ServiceSchema.define( - * { initializeState: () => ({ count: 0 }) }, - * { - * increment: Procedure.rpc({ - * requestInit: Type.Object({ amount: Type.Number() }), - * responseData: Type.Object({ current: Type.Number() }), - * async handler(ctx, init) { - * ctx.state.count += init.amount; - * return Ok({ current: ctx.state.count }); - * } - * }), - * }, - * ); - * ``` + * When defining procedures, use the {@link Procedure} constructors to create them. */ - static define< - Context extends object, + return class ServiceSchema< State extends object, - Procedures extends BrandedProcedureMap, - >( - config: ServiceConfiguration, - procedures: Procedures, - ): ServiceSchema< - Context, - State, - { [K in keyof Procedures]: Unbranded } - >; - /** - * Creates a new {@link ServiceSchema} with the given procedures. - * - * All procedures must be created with the {@link Procedure} constructors. - * - * NOTE: There is an overload that lets you provide configuration as well, - * if your service has extra configuration like a state. - * - * @param procedures - The procedures for this service. - * - * @example - * ``` - * const service = ServiceSchema.define({ - * add: Procedure.rpc({ - * requestInit: Type.Object({ a: Type.Number(), b: Type.Number() }), - * responseData: Type.Object({ result: Type.Number() }), - * async handler(ctx, init) { - * return Ok({ result: init.a + init.b }); - * } - * }), - * }); - */ - - static define< - Context extends object, - Procedures extends BrandedProcedureMap, - >( - procedures: Procedures, - ): ServiceSchema< - Context, - object, - { [K in keyof Procedures]: Unbranded } - >; - // actual implementation - static define( - configOrProcedures: - | ServiceConfiguration - | BrandedProcedureMap, - maybeProcedures?: BrandedProcedureMap, - ): ServiceSchema { - let config: ServiceConfiguration; - let procedures: BrandedProcedureMap; - - if ( - 'initializeState' in configOrProcedures && - typeof configOrProcedures.initializeState === 'function' + Procedures extends ProcedureMap, + > { + /** + * Factory function for creating a fresh state. + */ + readonly initializeState: ( + extendedContext: Context, + ) => MaybeDisposable; + + /** + * The procedures for this service. + */ + readonly procedures: Procedures; + + /** + * @param config - The configuration for this service. + * @param procedures - The procedures for this service. + */ + constructor( + config: ServiceConfiguration, + procedures: Procedures, ) { - if (!maybeProcedures) { - throw new Error('Expected procedures to be defined'); - } - - config = configOrProcedures as ServiceConfiguration; - procedures = maybeProcedures; - } else { - config = { initializeState: () => ({}) }; - procedures = configOrProcedures as BrandedProcedureMap; + this.initializeState = config.initializeState; + this.procedures = procedures; } - return new ServiceSchema(config, procedures); - } + /** + * Creates a {@link ServiceScaffold}, which can be used to define procedures + * that can then be merged into a {@link ServiceSchema}, via the scaffold's + * `finalize` method. + * + * There are two patterns that work well with this method. The first is using + * it to separate the definition of procedures from the definition of the + * service's configuration: + * ```ts + * const MyServiceScaffold = ServiceSchema.scaffold({ + * initializeState: () => ({ count: 0 }), + * }); + * + * const incrementProcedures = MyServiceScaffold.procedures({ + * increment: Procedure.rpc({ + * requestInit: Type.Object({ amount: Type.Number() }), + * responseData: Type.Object({ current: Type.Number() }), + * async handler(ctx, init) { + * ctx.state.count += init.amount; + * return Ok({ current: ctx.state.count }); + * } + * }), + * }) + * + * const MyService = MyServiceScaffold.finalize({ + * ...incrementProcedures, + * // you can also directly define procedures here + * }); + * ``` + * This might be really handy if you have a very large service and you're + * wanting to split it over multiple files. You can define the scaffold + * in one file, and then import that scaffold in other files where you + * define procedures - and then finally import the scaffolds and your + * procedure objects in a final file where you finalize the scaffold into + * a service schema. + * + * The other way is to use it like in a builder pattern: + * ```ts + * const MyService = ServiceSchema + * .scaffold({ initializeState: () => ({ count: 0 }) }) + * .finalize({ + * increment: Procedure.rpc({ + * requestInit: Type.Object({ amount: Type.Number() }), + * responseData: Type.Object({ current: Type.Number() }), + * async handler(ctx, init) { + * ctx.state.count += init.amount; + * return Ok({ current: ctx.state.count }); + * } + * }), + * }) + * ``` + * Depending on your preferences, this may be a more appealing way to define + * a schema versus using the {@link ServiceSchema.define} method. + */ + static scaffold( + config: ServiceConfiguration, + ) { + return new ServiceScaffold(config); + } - static defineWithContext< - Context extends object = object, - >(): DefineWithContext { - return function ( + /** + * Creates a new {@link ServiceSchema} with the given configuration and procedures. + * + * All procedures must be created with the {@link Procedure} constructors. + * + * NOTE: There is an overload that lets you just provide the procedures alone if your + * service has no state. + * + * @param config - The configuration for this service. + * @param procedures - The procedures for this service. + * + * @example + * ``` + * const service = ServiceSchema.define( + * { initializeState: () => ({ count: 0 }) }, + * { + * increment: Procedure.rpc({ + * requestInit: Type.Object({ amount: Type.Number() }), + * responseData: Type.Object({ current: Type.Number() }), + * async handler(ctx, init) { + * ctx.state.count += init.amount; + * return Ok({ current: ctx.state.count }); + * } + * }), + * }, + * ); + * ``` + */ + static define< + State extends object, + Procedures extends BrandedProcedureMap, + >( + config: ServiceConfiguration, + procedures: Procedures, + ): ServiceSchema< + State, + { [K in keyof Procedures]: Unbranded } + >; + /** + * Creates a new {@link ServiceSchema} with the given procedures. + * + * All procedures must be created with the {@link Procedure} constructors. + * + * NOTE: There is an overload that lets you provide configuration as well, + * if your service has extra configuration like a state. + * + * @param procedures - The procedures for this service. + * + * @example + * ``` + * const service = ServiceSchema.define({ + * add: Procedure.rpc({ + * requestInit: Type.Object({ a: Type.Number(), b: Type.Number() }), + * responseData: Type.Object({ result: Type.Number() }), + * async handler(ctx, init) { + * return Ok({ result: init.a + init.b }); + * } + * }), + * }); + */ + + static define>( + procedures: Procedures, + ): ServiceSchema< + object, + { [K in keyof Procedures]: Unbranded } + >; + // actual implementation + static define( configOrProcedures: | ServiceConfiguration | BrandedProcedureMap, maybeProcedures?: BrandedProcedureMap, - ): ServiceSchema> { + ): ServiceSchema { let config: ServiceConfiguration; let procedures: BrandedProcedureMap; @@ -509,59 +466,75 @@ export class ServiceSchema< } return new ServiceSchema(config, procedures); - }; - } + } - /** - * Serializes this schema's procedures into a plain object that is JSON compatible. - */ - serialize(): SerializedServiceSchema { - return { - procedures: Object.fromEntries( - Object.entries(this.procedures).map(([procName, procDef]) => [ - procName, - { - init: Strict(procDef.requestInit), - output: Strict(procDef.responseData), - errors: getSerializedProcErrors(procDef), - // Only add `description` field if the type declares it. - ...('description' in procDef - ? { description: procDef.description } - : {}), - type: procDef.type, - // Only add the `input` field if the type declares it. - ...('requestData' in procDef - ? { - input: Strict(procDef.requestData), - } - : {}), - }, - ]), - ), - }; - } + /** + * Serializes this schema's procedures into a plain object that is JSON compatible. + */ + serialize(): SerializedServiceSchema { + return { + procedures: Object.fromEntries( + Object.entries(this.procedures).map(([procName, procDef]) => [ + procName, + { + init: Strict(procDef.requestInit), + output: Strict(procDef.responseData), + errors: getSerializedProcErrors(procDef), + // Only add `description` field if the type declares it. + ...('description' in procDef + ? { description: procDef.description } + : {}), + type: procDef.type, + // Only add the `input` field if the type declares it. + ...('requestData' in procDef + ? { + input: Strict(procDef.requestData), + } + : {}), + }, + ]), + ), + }; + } - // TODO remove once clients migrate to v2 - /** - * Same as {@link ServiceSchema.serialize}, but with a format that is compatible with - * protocol v1. This is useful to be able to continue to generate schemas for older - * clients as they are still supported. - */ - serializeV1Compat(): SerializedServiceSchemaProtocolv1 { - return { - procedures: Object.fromEntries( - Object.entries(this.procedures).map( - ([procName, procDef]): [ - string, - SerializedProcedureSchemaProtocolv1, - ] => { - if (procDef.type === 'rpc' || procDef.type === 'subscription') { + // TODO remove once clients migrate to v2 + /** + * Same as {@link ServiceSchema.serialize}, but with a format that is compatible with + * protocol v1. This is useful to be able to continue to generate schemas for older + * clients as they are still supported. + */ + serializeV1Compat(): SerializedServiceSchemaProtocolv1 { + return { + procedures: Object.fromEntries( + Object.entries(this.procedures).map( + ([procName, procDef]): [ + string, + SerializedProcedureSchemaProtocolv1, + ] => { + if (procDef.type === 'rpc' || procDef.type === 'subscription') { + return [ + procName, + { + // BACKWARDS COMPAT: map init to input for protocolv1 + // this is the only change needed to make it compatible. + input: Strict(procDef.requestInit), + output: Strict(procDef.responseData), + errors: getSerializedProcErrors(procDef), + // Only add `description` field if the type declares it. + ...('description' in procDef + ? { description: procDef.description } + : {}), + type: procDef.type, + }, + ]; + } + + // No backwards compatibility needed for upload and stream types, as having an `init` + // all the time is compatible with protocol v1. return [ procName, { - // BACKWARDS COMPAT: map init to input for protocolv1 - // this is the only change needed to make it compatible. - input: Strict(procDef.requestInit), + init: Strict(procDef.requestInit), output: Strict(procDef.responseData), errors: getSerializedProcErrors(procDef), // Only add `description` field if the type declares it. @@ -569,54 +542,38 @@ export class ServiceSchema< ? { description: procDef.description } : {}), type: procDef.type, + input: Strict(procDef.requestData), }, ]; - } - - // No backwards compatibility needed for upload and stream types, as having an `init` - // all the time is compatible with protocol v1. - return [ - procName, - { - init: Strict(procDef.requestInit), - output: Strict(procDef.responseData), - errors: getSerializedProcErrors(procDef), - // Only add `description` field if the type declares it. - ...('description' in procDef - ? { description: procDef.description } - : {}), - type: procDef.type, - input: Strict(procDef.requestData), - }, - ]; - }, + }, + ), ), - ), - }; - } + }; + } - /** - * Instantiates this schema into a {@link Service} object. - * - * You probably don't need this, usually the River server will handle this - * for you. - */ - instantiate(extendedContext: Context): Service { - const state = this.initializeState(extendedContext); - const dispose = async () => { - await state[Symbol.asyncDispose]?.(); - state[Symbol.dispose]?.(); - }; - - return Object.freeze({ - state, - procedures: this.procedures, - [Symbol.asyncDispose]: dispose, - }); - } + /** + * Instantiates this schema into a {@link Service} object. + * + * You probably don't need this, usually the River server will handle this + * for you. + */ + instantiate(extendedContext: Context): Service { + const state = this.initializeState(extendedContext); + const dispose = async () => { + await state[Symbol.asyncDispose]?.(); + state[Symbol.dispose]?.(); + }; + + return Object.freeze({ + state, + procedures: this.procedures, + [Symbol.asyncDispose]: dispose, + }); + } + }; } -function getSerializedProcErrors( +export function getSerializedProcErrors( procDef: AnyProcedure, ): ProcedureErrorSchemaType { if ( @@ -694,9 +651,7 @@ class ServiceScaffold { * }); * ``` */ - finalize>( - procedures: T, - ): ServiceSchema }> { - return ServiceSchema.define(this.config, procedures); + finalize>(procedures: T) { + return createServiceSchema().define(this.config, procedures); } } diff --git a/testUtil/fixtures/services.ts b/testUtil/fixtures/services.ts index 94cf352a..225cc0ab 100644 --- a/testUtil/fixtures/services.ts +++ b/testUtil/fixtures/services.ts @@ -1,5 +1,5 @@ import { Type } from '@sinclair/typebox'; -import { ServiceSchema } from '../../router/services'; +import { createServiceSchema } from '../../router/services'; import { Err, Ok, unwrapOrThrow } from '../../router/result'; import { Observable } from '../observable/observable'; import { Procedure } from '../../router'; @@ -10,7 +10,7 @@ export const EchoRequest = Type.Object({ }); export const EchoResponse = Type.Object({ response: Type.String() }); -const TestServiceScaffold = ServiceSchema.scaffold({ +const TestServiceScaffold = createServiceSchema().scaffold({ initializeState: () => ({ count: 0 }), }); @@ -127,8 +127,10 @@ export const TestServiceSchema = TestServiceScaffold.finalize({ ...testServiceProcedures, }); -export const OrderingServiceSchema = ServiceSchema.defineWithContext()( - { initializeState: () => ({ msgs: [] as Array }) }, +export const OrderingServiceSchema = createServiceSchema().define( + { + initializeState: () => ({ msgs: [] as Array }), + }, { add: Procedure.rpc({ requestInit: Type.Object({ n: Type.Number() }), @@ -139,7 +141,6 @@ export const OrderingServiceSchema = ServiceSchema.defineWithContext()( return Ok({ n }); }, }), - getAll: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({ msgs: Type.Array(Type.Number()) }), @@ -150,7 +151,7 @@ export const OrderingServiceSchema = ServiceSchema.defineWithContext()( }, ); -export const BinaryFileServiceSchema = ServiceSchema.defineWithContext()({ +export const BinaryFileServiceSchema = createServiceSchema().define({ getFile: Procedure.rpc({ requestInit: Type.Object({ file: Type.String() }), responseData: Type.Object({ contents: Type.Uint8Array() }), @@ -165,7 +166,7 @@ export const BinaryFileServiceSchema = ServiceSchema.defineWithContext()({ export const DIV_BY_ZERO = 'DIV_BY_ZERO'; export const STREAM_ERROR = 'STREAM_ERROR'; -export const FallibleServiceSchema = ServiceSchema.defineWithContext()({ +export const FallibleServiceSchema = createServiceSchema().define({ divide: Procedure.rpc({ requestInit: Type.Object({ a: Type.Number(), b: Type.Number() }), responseData: Type.Object({ result: Type.Number() }), @@ -232,7 +233,7 @@ export const FallibleServiceSchema = ServiceSchema.defineWithContext()({ }), }); -export const SubscribableServiceSchema = ServiceSchema.defineWithContext()( +export const SubscribableServiceSchema = createServiceSchema().define( { initializeState: () => ({ count: new Observable(0) }) }, { add: Procedure.rpc({ @@ -259,7 +260,7 @@ export const SubscribableServiceSchema = ServiceSchema.defineWithContext()( }, ); -export const UploadableServiceSchema = ServiceSchema.defineWithContext()({ +export const UploadableServiceSchema = createServiceSchema().define({ addMultiple: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({ n: Type.Number() }), @@ -315,26 +316,7 @@ const RecursivePayload = Type.Recursive((This) => }), ); -ServiceSchema.defineWithContext<{ - db: string; -}>()( - { - initializeState: () => ({ a: 'test' }), - }, - { - add: Procedure.rpc({ - requestInit: Type.Number(), - responseData: Type.Number(), - async handler({ ctx, reqInit }) { - ctx.db; - - return Ok(reqInit + 1); - }, - }), - }, -); - -export const NonObjectSchemas = ServiceSchema.defineWithContext()({ +export const NonObjectSchemas = createServiceSchema().define({ add: Procedure.rpc({ requestInit: Type.Number(), responseData: Type.Number(), @@ -353,7 +335,7 @@ export const NonObjectSchemas = ServiceSchema.defineWithContext()({ }); export function SchemaWithDisposableState(dispose: () => void) { - return ServiceSchema.defineWithContext()( + return createServiceSchema().define( { initializeState: () => ({ [Symbol.dispose]: dispose }) }, { add: Procedure.rpc({ @@ -370,7 +352,7 @@ export function SchemaWithDisposableState(dispose: () => void) { export function SchemaWithAsyncDisposableStateAndScaffold( dispose: () => Promise, ) { - const scaffold = ServiceSchema.scaffold({ + const scaffold = createServiceSchema().scaffold({ initializeState: () => ({ [Symbol.asyncDispose]: dispose }), }); From d27d38f6d64373bee886cfb325ba40241b18e384 Mon Sep 17 00:00:00 2001 From: Dawei Date: Mon, 9 Jun 2025 18:29:34 -0700 Subject: [PATCH 03/12] typo --- __tests__/context.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/__tests__/context.test.ts b/__tests__/context.test.ts index f121e462..98a7d030 100644 --- a/__tests__/context.test.ts +++ b/__tests__/context.test.ts @@ -43,10 +43,10 @@ describe('should handle incompatabilities', async () => { testctx: Math.random().toString(), }; - const serviceSchema = createServiceSchema(); + const ServiceSchema = createServiceSchema(); const services = { - testservice: serviceSchema.define({ + testservice: ServiceSchema.define({ testrpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.String(), @@ -82,9 +82,9 @@ describe('should handle incompatabilities', async () => { testctx: string; } - const serviceSchema = createServiceSchema(); + const ServiceSchema = createServiceSchema(); - const TestServiceScaffold = serviceSchema.scaffold({ + const TestServiceScaffold = ServiceSchema.scaffold({ initializeState: (ctx) => ({ fromctx: ctx.testctx, }), From 7e7c9c4800bd8fbda0e8a9cb274666cfade1045a Mon Sep 17 00:00:00 2001 From: Dawei Date: Mon, 9 Jun 2025 18:38:25 -0700 Subject: [PATCH 04/12] Use the same schema in test --- __tests__/cancellation.test.ts | 34 ++++++++++++++++--------------- __tests__/invalid-request.test.ts | 28 +++++++++++++------------ 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/__tests__/cancellation.test.ts b/__tests__/cancellation.test.ts index aa87c950..856dc8bd 100644 --- a/__tests__/cancellation.test.ts +++ b/__tests__/cancellation.test.ts @@ -40,6 +40,8 @@ function makeMockHandler( >(impl); } +const ServiceSchema = createServiceSchema(); + describe.each(testMatrix())( 'clean handler cancellation ($transport.name transport, $codec.name codec)', @@ -67,7 +69,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -111,7 +113,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -165,7 +167,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -215,7 +217,7 @@ describe.each(testMatrix())( const signalReceiver = vi.fn<(sig: AbortSignal) => void>(); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -280,7 +282,7 @@ describe.each(testMatrix())( const serverTransport = getServerTransport(); const handler = makeMockHandler('rpc'); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -338,7 +340,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('stream'); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -412,7 +414,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('upload'); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -483,7 +485,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('subscription'); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -570,7 +572,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('rpc'); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -624,7 +626,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('stream'); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -684,7 +686,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('upload'); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -743,7 +745,7 @@ describe.each(testMatrix())( const handler = makeMockHandler('subscription'); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -827,7 +829,7 @@ describe.each(testMatrix())( const rejectable = createRejectable(); const handler = makeMockHandler('rpc', () => rejectable.promise); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -880,7 +882,7 @@ describe.each(testMatrix())( const rejectable = createRejectable(); const handler = makeMockHandler('stream', () => rejectable.promise); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -942,7 +944,7 @@ describe.each(testMatrix())( const rejectable = createRejectable(); const handler = makeMockHandler('upload', () => rejectable.promise); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ upload: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -1007,7 +1009,7 @@ describe.each(testMatrix())( () => rejectable.promise, ); const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ subscribe: Procedure.subscription({ requestInit: Type.Object({}), responseData: Type.Object({}), diff --git a/__tests__/invalid-request.test.ts b/__tests__/invalid-request.test.ts index e04406c7..cc007759 100644 --- a/__tests__/invalid-request.test.ts +++ b/__tests__/invalid-request.test.ts @@ -22,6 +22,8 @@ import { TestSetupHelpers } from '../testUtil/fixtures/transports'; import { nanoid } from 'nanoid'; import { getClientSendFn } from '../testUtil'; +const ServiceSchema = createServiceSchema(); + describe('cancels invalid request', () => { const { transport, codec } = testMatrix()[0]; const opts = { codec: codec.codec }; @@ -49,7 +51,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -99,7 +101,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -148,7 +150,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -197,7 +199,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -247,7 +249,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -299,7 +301,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({ mustSendThings: Type.String() }), requestData: Type.Object({}), @@ -349,7 +351,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({ mustSendThings: Type.String() }), @@ -417,7 +419,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -484,7 +486,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -548,7 +550,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ rpc: Procedure.rpc({ requestInit: Type.Object({}), responseData: Type.Object({}), @@ -619,7 +621,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -706,7 +708,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), @@ -804,7 +806,7 @@ describe('cancels invalid request', () => { const serverId = serverTransport.clientId; const services = { - service: createServiceSchema().define({ + service: ServiceSchema.define({ stream: Procedure.stream({ requestInit: Type.Object({}), requestData: Type.Object({}), From 2e0789147bb2b8cfa41235b4a7561e3798d697dd Mon Sep 17 00:00:00 2001 From: Dawei Date: Mon, 9 Jun 2025 18:47:05 -0700 Subject: [PATCH 05/12] Update JSDoc --- router/services.ts | 53 ++++++++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/router/services.ts b/router/services.ts index 75eef424..69507b26 100644 --- a/router/services.ts +++ b/router/services.ts @@ -266,25 +266,42 @@ export function serializeSchema( return schema; } +/** + * Creates a ServiceSchema class that can be used to define services with their initial state and procedures. + * This is a factory function that returns a ServiceSchema class constructor bound to the specified Context type. + * + * @template Context - The context type that will be available to all procedures in services created with this schema. + * @returns A ServiceSchema class constructor with static methods for defining services. + * + * @example + * ```ts + * // Create a ServiceSchema class for your context type + * const ServiceSchema = createServiceSchema<{ userId: string }>(); + * + * // Define a simple stateless service + * const mathService = ServiceSchema.define({ + * add: Procedure.rpc({ + * requestInit: Type.Object({ a: Type.Number(), b: Type.Number() }), + * responseData: Type.Object({ result: Type.Number() }), + * async handler(ctx, init) { + * return Ok({ result: init.a + init.b }); + * } + * }), + * }); + * ``` + * + * There are two main ways to define services with the returned ServiceSchema class: + * + * 1. **ServiceSchema.define()** - Takes a configuration and procedures directly. + * Use this for smaller services or when you want to define everything in one place. + * + * 2. **ServiceSchema.scaffold()** - Creates a scaffold that can be used to define + * procedures separately from the configuration. Use this for larger services or + * when you want to organize procedures across multiple files. + * + * When defining procedures, always use the {@link Procedure} constructors to create them. + */ export function createServiceSchema() { - /** - * The schema for a {@link Service}. This is used to define a service, specifically - * its initial state and procedures. - * - * There are two ways to define a service: - * 1. the {@link ServiceSchema.define} static method, which takes a configuration and - * a list of procedures directly. Use this to ergonomically define a service schema - * in one go. Good for smaller services, especially if they're stateless. - * 2. the {@link ServiceSchema.scaffold} static method, which creates a scaffold that - * can be used to define procedures separately from the configuration. Use this to - * better organize your service's definition, especially if it's a large service. - * You can also use it in a builder pattern to define the service in a more - * fluent way. - * - * See the static methods for more information and examples. - * - * When defining procedures, use the {@link Procedure} constructors to create them. - */ return class ServiceSchema< State extends object, Procedures extends ProcedureMap, From acae62542c050a50c301afdec265e214813aa8e7 Mon Sep 17 00:00:00 2001 From: Dawei Date: Mon, 9 Jun 2025 18:48:56 -0700 Subject: [PATCH 06/12] use the same ServiceSchema --- testUtil/fixtures/services.ts | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/testUtil/fixtures/services.ts b/testUtil/fixtures/services.ts index 225cc0ab..783e3b50 100644 --- a/testUtil/fixtures/services.ts +++ b/testUtil/fixtures/services.ts @@ -4,13 +4,15 @@ import { Err, Ok, unwrapOrThrow } from '../../router/result'; import { Observable } from '../observable/observable'; import { Procedure } from '../../router'; +const ServiceSchema = createServiceSchema(); + export const EchoRequest = Type.Object({ msg: Type.String(), ignore: Type.Boolean(), }); export const EchoResponse = Type.Object({ response: Type.String() }); -const TestServiceScaffold = createServiceSchema().scaffold({ +const TestServiceScaffold = ServiceSchema.scaffold({ initializeState: () => ({ count: 0 }), }); @@ -127,7 +129,7 @@ export const TestServiceSchema = TestServiceScaffold.finalize({ ...testServiceProcedures, }); -export const OrderingServiceSchema = createServiceSchema().define( +export const OrderingServiceSchema = ServiceSchema.define( { initializeState: () => ({ msgs: [] as Array }), }, @@ -151,7 +153,7 @@ export const OrderingServiceSchema = createServiceSchema().define( }, ); -export const BinaryFileServiceSchema = createServiceSchema().define({ +export const BinaryFileServiceSchema = ServiceSchema.define({ getFile: Procedure.rpc({ requestInit: Type.Object({ file: Type.String() }), responseData: Type.Object({ contents: Type.Uint8Array() }), @@ -166,7 +168,7 @@ export const BinaryFileServiceSchema = createServiceSchema().define({ export const DIV_BY_ZERO = 'DIV_BY_ZERO'; export const STREAM_ERROR = 'STREAM_ERROR'; -export const FallibleServiceSchema = createServiceSchema().define({ +export const FallibleServiceSchema = ServiceSchema.define({ divide: Procedure.rpc({ requestInit: Type.Object({ a: Type.Number(), b: Type.Number() }), responseData: Type.Object({ result: Type.Number() }), @@ -233,7 +235,7 @@ export const FallibleServiceSchema = createServiceSchema().define({ }), }); -export const SubscribableServiceSchema = createServiceSchema().define( +export const SubscribableServiceSchema = ServiceSchema.define( { initializeState: () => ({ count: new Observable(0) }) }, { add: Procedure.rpc({ @@ -260,7 +262,7 @@ export const SubscribableServiceSchema = createServiceSchema().define( }, ); -export const UploadableServiceSchema = createServiceSchema().define({ +export const UploadableServiceSchema = ServiceSchema.define({ addMultiple: Procedure.upload({ requestInit: Type.Object({}), requestData: Type.Object({ n: Type.Number() }), @@ -316,7 +318,7 @@ const RecursivePayload = Type.Recursive((This) => }), ); -export const NonObjectSchemas = createServiceSchema().define({ +export const NonObjectSchemas = ServiceSchema.define({ add: Procedure.rpc({ requestInit: Type.Number(), responseData: Type.Number(), @@ -335,7 +337,7 @@ export const NonObjectSchemas = createServiceSchema().define({ }); export function SchemaWithDisposableState(dispose: () => void) { - return createServiceSchema().define( + return ServiceSchema.define( { initializeState: () => ({ [Symbol.dispose]: dispose }) }, { add: Procedure.rpc({ @@ -352,7 +354,7 @@ export function SchemaWithDisposableState(dispose: () => void) { export function SchemaWithAsyncDisposableStateAndScaffold( dispose: () => Promise, ) { - const scaffold = createServiceSchema().scaffold({ + const scaffold = ServiceSchema.scaffold({ initializeState: () => ({ [Symbol.asyncDispose]: dispose }), }); From 11a91fdd2e79ccdd921b1a02b2fb7ce6818060a4 Mon Sep 17 00:00:00 2001 From: Dawei Date: Mon, 9 Jun 2025 18:51:28 -0700 Subject: [PATCH 07/12] ... --- router/result.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/result.ts b/router/result.ts index 3399a579..11abd837 100644 --- a/router/result.ts +++ b/router/result.ts @@ -99,7 +99,7 @@ export type ResponseData< ProcedureName extends keyof RiverClient[ServiceName], Procedure = RiverClient[ServiceName][ProcedureName], Fn extends (...args: never) => unknown = (...args: never) => unknown, -> = RiverClient extends Client +> = RiverClient extends Client ? Procedure extends object ? Procedure extends object & { rpc: infer RpcFn extends Fn } ? Awaited> From 5c8ed91db7dc8f0f629bec989e0d96d284578c57 Mon Sep 17 00:00:00 2001 From: Dawei Date: Tue, 10 Jun 2025 14:03:48 -0700 Subject: [PATCH 08/12] pass context as param --- __tests__/typescript-stress.test.ts | 14 +++++++++---- router/services.ts | 13 ++++++++---- testUtil/fixtures/services.ts | 32 +++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 8 deletions(-) diff --git a/__tests__/typescript-stress.test.ts b/__tests__/typescript-stress.test.ts index 35423f10..41584ff2 100644 --- a/__tests__/typescript-stress.test.ts +++ b/__tests__/typescript-stress.test.ts @@ -12,7 +12,10 @@ import { ResultUnwrapOk, unwrapOrThrow, } from '../router/result'; -import { TestServiceSchema } from '../testUtil/fixtures/services'; +import { + testContext, + TestServiceWithContextSchema, +} from '../testUtil/fixtures/services'; import { readNextResult } from '../testUtil'; import { createClientHandshakeOptions, @@ -41,7 +44,7 @@ const responseError = Type.Union([ ]); const fnBody = Procedure.rpc< - Record, + typeof testContext, { db: string; }, @@ -64,7 +67,7 @@ const fnBody = Procedure.rpc< // typescript is limited to max 50 constraints // see: https://github.com/microsoft/TypeScript/issues/33541 // we should be able to support more than that due to how we make services -const StupidlyLargeServiceSchema = createServiceSchema().define({ +const StupidlyLargeServiceSchema = createServiceSchema(testContext).define({ f1: fnBody, f2: fnBody, f3: fnBody, @@ -185,13 +188,16 @@ describe("ensure typescript doesn't give up trying to infer the types for large x1: StupidlyLargeServiceSchema, y1: StupidlyLargeServiceSchema, z1: StupidlyLargeServiceSchema, - test: TestServiceSchema, + test: TestServiceWithContextSchema, }; const mockTransportNetwork = createMockTransportNetwork(); const server = createServer( mockTransportNetwork.getServerTransport(), services, + { + extendedContext: testContext, + }, ); const client = createClient( diff --git a/router/services.ts b/router/services.ts index 69507b26..1d866f3a 100644 --- a/router/services.ts +++ b/router/services.ts @@ -301,7 +301,9 @@ export function serializeSchema( * * When defining procedures, always use the {@link Procedure} constructors to create them. */ -export function createServiceSchema() { +export function createServiceSchema( + context = {} as Context, +) { return class ServiceSchema< State extends object, Procedures extends ProcedureMap, @@ -387,7 +389,7 @@ export function createServiceSchema() { static scaffold( config: ServiceConfiguration, ) { - return new ServiceScaffold(config); + return new ServiceScaffold(config, context); } /** @@ -619,11 +621,14 @@ class ServiceScaffold { */ protected readonly config: ServiceConfiguration; + protected readonly context: Context; + /** * @param config - The configuration for this service. */ - constructor(config: ServiceConfiguration) { + constructor(config: ServiceConfiguration, context: Context) { this.config = config; + this.context = context; } /** @@ -669,6 +674,6 @@ class ServiceScaffold { * ``` */ finalize>(procedures: T) { - return createServiceSchema().define(this.config, procedures); + return createServiceSchema(this.context).define(this.config, procedures); } } diff --git a/testUtil/fixtures/services.ts b/testUtil/fixtures/services.ts index 783e3b50..3c899ed1 100644 --- a/testUtil/fixtures/services.ts +++ b/testUtil/fixtures/services.ts @@ -129,6 +129,38 @@ export const TestServiceSchema = TestServiceScaffold.finalize({ ...testServiceProcedures, }); +export const testContext = { + logger: { + info: (message: string) => { + console.log(message); + }, + }, +}; + +const TestServiceWithContextScaffold = createServiceSchema( + testContext, +).scaffold({ + initializeState: () => ({ count: 0 }), +}); + +const testServiceWithContextProcedures = + TestServiceWithContextScaffold.procedures({ + add: Procedure.rpc({ + requestInit: Type.Object({ n: Type.Number() }), + responseData: Type.Object({ result: Type.Number() }), + async handler({ ctx, reqInit: { n } }) { + ctx.state.count += n; + + return Ok({ result: ctx.state.count }); + }, + }), + }); + +export const TestServiceWithContextSchema = + TestServiceWithContextScaffold.finalize({ + ...testServiceWithContextProcedures, + }); + export const OrderingServiceSchema = ServiceSchema.define( { initializeState: () => ({ msgs: [] as Array }), From 636707387f964caadd7c73ff7918c400c9a8e364 Mon Sep 17 00:00:00 2001 From: Dawei Date: Tue, 10 Jun 2025 14:06:39 -0700 Subject: [PATCH 09/12] Update test --- __tests__/context.test.ts | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/__tests__/context.test.ts b/__tests__/context.test.ts index 98a7d030..acc02ece 100644 --- a/__tests__/context.test.ts +++ b/__tests__/context.test.ts @@ -36,14 +36,11 @@ describe('should handle incompatabilities', async () => { const clientTransport = getClientTransport('client'); const serverTransport = getServerTransport(); - interface ExtendedContext { - testctx: string; - } - const extendedContext: ExtendedContext = { + const extendedContext = { testctx: Math.random().toString(), }; - const ServiceSchema = createServiceSchema(); + const ServiceSchema = createServiceSchema(extendedContext); const services = { testservice: ServiceSchema.define({ @@ -78,11 +75,9 @@ describe('should handle incompatabilities', async () => { const clientTransport = getClientTransport('client'); const serverTransport = getServerTransport(); - interface ExtendedContext { - testctx: string; - } + const extendedContext = { testctx: Math.random().toString() }; - const ServiceSchema = createServiceSchema(); + const ServiceSchema = createServiceSchema(extendedContext); const TestServiceScaffold = ServiceSchema.scaffold({ initializeState: (ctx) => ({ @@ -103,7 +98,6 @@ describe('should handle incompatabilities', async () => { }), }; - const extendedContext = { testctx: Math.random().toString() }; createServer(serverTransport, services, { extendedContext, }); From 3d56537829f5079b275af7b2f9bb7c52e8bf7241 Mon Sep 17 00:00:00 2001 From: Dawei Date: Tue, 10 Jun 2025 14:09:42 -0700 Subject: [PATCH 10/12] can access context in procedure --- __tests__/context.test.ts | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/__tests__/context.test.ts b/__tests__/context.test.ts index acc02ece..a754f5aa 100644 --- a/__tests__/context.test.ts +++ b/__tests__/context.test.ts @@ -113,4 +113,41 @@ describe('should handle incompatabilities', async () => { expect(res).toEqual({ ok: true, payload: extendedContext.testctx }); }); + + test('should be able to access context in procedures', async () => { + // setup + const clientTransport = getClientTransport('client'); + const serverTransport = getServerTransport(); + + const extendedContext = { testctx: Math.random().toString() }; + + const ServiceSchema = createServiceSchema(extendedContext); + + const services = { + testservice: ServiceSchema.define({ + testrpc: Procedure.rpc({ + requestInit: Type.Object({}), + responseData: Type.String(), + handler: async ({ ctx }) => { + return Ok(ctx.testctx); + }, + }), + }), + }; + + createServer(serverTransport, services, { + extendedContext, + }); + const client = createClient( + clientTransport, + serverTransport.clientId, + ); + addPostTestCleanup(async () => { + await cleanupTransports([clientTransport, serverTransport]); + }); + + const res = await client.testservice.testrpc.rpc({}); + + expect(res).toEqual({ ok: true, payload: extendedContext.testctx }); + }); }); From 71c1ff9273871aaa63ad1aab07fb05ebe057fd5d Mon Sep 17 00:00:00 2001 From: Dawei Date: Tue, 10 Jun 2025 16:10:59 -0700 Subject: [PATCH 11/12] Test accessing context in large schema --- __tests__/typescript-stress.test.ts | 39 +++++++++++++++++++++++++++-- testUtil/fixtures/services.ts | 1 + 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/__tests__/typescript-stress.test.ts b/__tests__/typescript-stress.test.ts index 41584ff2..3bf89639 100644 --- a/__tests__/typescript-stress.test.ts +++ b/__tests__/typescript-stress.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, test } from 'vitest'; +import { assert, describe, expect, test } from 'vitest'; import { Procedure } from '../router/procedures'; import { createServiceSchema } from '../router/services'; import { Type } from '@sinclair/typebox'; @@ -28,6 +28,7 @@ import { createMockTransportNetwork } from '../testUtil/fixtures/mockTransport'; const requestData = Type.Union([ Type.Object({ a: Type.Number() }), Type.Object({ c: Type.String() }), + Type.Object({ d: Type.Number() }), ]); const responseData = Type.Object({ b: Type.Union([Type.Number(), Type.String()]), @@ -55,9 +56,11 @@ const fnBody = Procedure.rpc< requestInit: requestData, responseData, responseError, - async handler({ reqInit }) { + async handler({ reqInit, ctx }) { if ('c' in reqInit) { return Ok({ b: reqInit.c }); + } else if ('d' in reqInit) { + return Ok({ b: ctx.add(reqInit.d, reqInit.d) }); } else { return Ok({ b: reqInit.a }); } @@ -213,6 +216,38 @@ describe("ensure typescript doesn't give up trying to infer the types for large expect(server).toBeTruthy(); expect(client).toBeTruthy(); }); + + test('service with context should be able to access context in procedures', async () => { + const services = { + a: StupidlyLargeServiceSchema, + b: StupidlyLargeServiceSchema, + }; + const mockTransportNetwork = createMockTransportNetwork(); + const server = createServer( + mockTransportNetwork.getServerTransport(), + services, + { + extendedContext: testContext, + }, + ); + + const client = createClient( + mockTransportNetwork.getClientTransport('client'), + 'SERVER', + { eagerlyConnect: false }, + ); + + const res = await client.a.f2.rpc({ d: 1 }); + assert(res.ok); + expect(res.payload.b).toBe(2); + + const res2 = await client.b.f11.rpc({ d: 10 }); + assert(res2.ok); + expect(res2.payload.b).toBe(20); + + expect(server).toBeTruthy(); + expect(client).toBeTruthy(); + }); }); const services = { diff --git a/testUtil/fixtures/services.ts b/testUtil/fixtures/services.ts index 3c899ed1..710b3a17 100644 --- a/testUtil/fixtures/services.ts +++ b/testUtil/fixtures/services.ts @@ -135,6 +135,7 @@ export const testContext = { console.log(message); }, }, + add: (a: number, b: number) => a + b, }; const TestServiceWithContextScaffold = createServiceSchema( From b25fee81f4743132d0f11e97f546f5cc6f4256da Mon Sep 17 00:00:00 2001 From: Dawei Date: Tue, 10 Jun 2025 17:36:31 -0700 Subject: [PATCH 12/12] set Context as generic param --- __tests__/context.test.ts | 6 +++--- __tests__/typescript-stress.test.ts | 4 +++- router/services.ts | 20 +++++++++++--------- testUtil/fixtures/services.ts | 6 +++--- 4 files changed, 20 insertions(+), 16 deletions(-) diff --git a/__tests__/context.test.ts b/__tests__/context.test.ts index a754f5aa..3d51dd6a 100644 --- a/__tests__/context.test.ts +++ b/__tests__/context.test.ts @@ -40,7 +40,7 @@ describe('should handle incompatabilities', async () => { testctx: Math.random().toString(), }; - const ServiceSchema = createServiceSchema(extendedContext); + const ServiceSchema = createServiceSchema(); const services = { testservice: ServiceSchema.define({ @@ -77,7 +77,7 @@ describe('should handle incompatabilities', async () => { const extendedContext = { testctx: Math.random().toString() }; - const ServiceSchema = createServiceSchema(extendedContext); + const ServiceSchema = createServiceSchema(); const TestServiceScaffold = ServiceSchema.scaffold({ initializeState: (ctx) => ({ @@ -121,7 +121,7 @@ describe('should handle incompatabilities', async () => { const extendedContext = { testctx: Math.random().toString() }; - const ServiceSchema = createServiceSchema(extendedContext); + const ServiceSchema = createServiceSchema(); const services = { testservice: ServiceSchema.define({ diff --git a/__tests__/typescript-stress.test.ts b/__tests__/typescript-stress.test.ts index 3bf89639..9752d5aa 100644 --- a/__tests__/typescript-stress.test.ts +++ b/__tests__/typescript-stress.test.ts @@ -70,7 +70,9 @@ const fnBody = Procedure.rpc< // typescript is limited to max 50 constraints // see: https://github.com/microsoft/TypeScript/issues/33541 // we should be able to support more than that due to how we make services -const StupidlyLargeServiceSchema = createServiceSchema(testContext).define({ +const StupidlyLargeServiceSchema = createServiceSchema< + typeof testContext +>().define({ f1: fnBody, f2: fnBody, f3: fnBody, diff --git a/router/services.ts b/router/services.ts index 1d866f3a..6c898ca9 100644 --- a/router/services.ts +++ b/router/services.ts @@ -287,6 +287,13 @@ export function serializeSchema( * return Ok({ result: init.a + init.b }); * } * }), + * getUserId: Procedure.rpc({ + * requestInit: Type.Object({}), + * responseData: Type.Object({ id: Type.String() }), + * async handler(ctx) { + * return Ok({ id: ctx.userId }); + * } + * }), * }); * ``` * @@ -301,9 +308,7 @@ export function serializeSchema( * * When defining procedures, always use the {@link Procedure} constructors to create them. */ -export function createServiceSchema( - context = {} as Context, -) { +export function createServiceSchema() { return class ServiceSchema< State extends object, Procedures extends ProcedureMap, @@ -389,7 +394,7 @@ export function createServiceSchema( static scaffold( config: ServiceConfiguration, ) { - return new ServiceScaffold(config, context); + return new ServiceScaffold(config); } /** @@ -621,14 +626,11 @@ class ServiceScaffold { */ protected readonly config: ServiceConfiguration; - protected readonly context: Context; - /** * @param config - The configuration for this service. */ - constructor(config: ServiceConfiguration, context: Context) { + constructor(config: ServiceConfiguration) { this.config = config; - this.context = context; } /** @@ -674,6 +676,6 @@ class ServiceScaffold { * ``` */ finalize>(procedures: T) { - return createServiceSchema(this.context).define(this.config, procedures); + return createServiceSchema().define(this.config, procedures); } } diff --git a/testUtil/fixtures/services.ts b/testUtil/fixtures/services.ts index 710b3a17..09a788b4 100644 --- a/testUtil/fixtures/services.ts +++ b/testUtil/fixtures/services.ts @@ -138,9 +138,9 @@ export const testContext = { add: (a: number, b: number) => a + b, }; -const TestServiceWithContextScaffold = createServiceSchema( - testContext, -).scaffold({ +const TestServiceWithContextScaffold = createServiceSchema< + typeof testContext +>().scaffold({ initializeState: () => ({ count: 0 }), });