From aef4209a3d205bae7882058b09b4b35d7466b459 Mon Sep 17 00:00:00 2001 From: Michael Xiao Date: Thu, 7 May 2026 15:55:07 -0400 Subject: [PATCH 1/7] Create a base class for response cache --- src/adapter/endpoint.ts | 4 +- src/cache/response-cache/base.ts | 153 +++++++++++++++ src/cache/response-cache/simple.ts | 38 ++++ src/cache/response.ts | 181 +----------------- .../simple.test.ts} | 14 +- 5 files changed, 202 insertions(+), 188 deletions(-) create mode 100644 src/cache/response-cache/base.ts create mode 100644 src/cache/response-cache/simple.ts rename test/cache/{response-cache.test.ts => response-cache/simple.test.ts} (95%) diff --git a/src/adapter/endpoint.ts b/src/adapter/endpoint.ts index e64ca34a..282201b5 100644 --- a/src/adapter/endpoint.ts +++ b/src/adapter/endpoint.ts @@ -1,4 +1,4 @@ -import { ResponseCache } from '../cache/response' +import { SimpleResponseCache } from '../cache/response' import { AdapterSettings } from '../config' import { TransportRoutes } from '../transports' import { @@ -83,7 +83,7 @@ export class AdapterEndpoint implements AdapterEndpo adapterSettings: T['Settings'], ): Promise { this.adapterName = adapterName - const responseCache = new ResponseCache({ + const responseCache = new SimpleResponseCache({ dependencies, adapterSettings: adapterSettings as AdapterSettings, adapterName, diff --git a/src/cache/response-cache/base.ts b/src/cache/response-cache/base.ts new file mode 100644 index 00000000..512e39ba --- /dev/null +++ b/src/cache/response-cache/base.ts @@ -0,0 +1,153 @@ +import { AdapterDependencies } from '../../adapter' +import { AdapterSettings } from '../../config' +import { + AdapterResponse, + makeLogger, + ResponseGenerics, + TimestampedAdapterResponse, + TimestampedProviderResult, + censor, + censorLogs, + TimestampedProviderErrorResponse, +} from '../../util' +import { + InputParameters, + InputParametersDefinition, + TypeFromDefinition, +} from '../../validation/input-params' +import { Cache, calculateAdapterName, calculateCacheKey, calculateFeedId } from '../' +import CensorList from '../../util/censor/censor-list' +import { validator } from '../../validation/utils' + +const logger = makeLogger('ResponseCache') + +export abstract class ResponseCache< + T extends { Parameters: InputParametersDefinition; Response: ResponseGenerics }, +> { + cache: Cache> + inputParameters: InputParameters + adapterName: string + endpointName: string + adapterSettings: AdapterSettings + dependencies: AdapterDependencies + + constructor({ + inputParameters, + adapterName, + endpointName, + adapterSettings, + dependencies, + }: { + dependencies: AdapterDependencies + adapterSettings: AdapterSettings + adapterName: string + endpointName: string + inputParameters: InputParameters + }) { + this.dependencies = dependencies + this.cache = dependencies.cache as Cache> + this.inputParameters = inputParameters + this.adapterName = adapterName + this.endpointName = endpointName + this.adapterSettings = adapterSettings + } + + /** + * Sets responses in the adapter cache (adding necessary metadata and defaults) + * + * @param transportName - transport name + * @param results - the entries to write to the cache + */ + abstract write(transportName: string, results: TimestampedProviderResult[]): Promise + + /** + * Sets a new TTL value for already cached responses in the adapter cache + * + * @param transportName - transport name + * @param params - set of parameters that uniquely relate to the response + * @param ttl - a new time in milliseconds until the response expires + */ + async writeTTL( + transportName: string, + params: TypeFromDefinition[], + ttl: number, + ): Promise { + for (const param of params) { + const key = this.getCacheKey(transportName, param) + this.cache.setTTL(key, ttl) + } + } + + async get(key: string) { + return this.cache.get(key) + } + + protected generateCacheEntry(transportName: string, r: TimestampedProviderResult) { + const censorList = CensorList.getAll() + const { data, result, errorMessage } = r.response + if (!errorMessage && data === undefined) { + logger.warn('The "data" property of the response is undefined.') + } else if (!errorMessage && result === undefined) { + logger.warn('The "result" property of the response is undefined.') + } + let censoredResponse + if (!censorList.length) { + censoredResponse = r.response + } else { + try { + censoredResponse = censor(r.response, censorList, true) as TimestampedAdapterResponse< + T['Response'] + > + } catch (error) { + censorLogs(() => logger.error(`Error censoring response: ${error}`)) + censoredResponse = { + statusCode: 502, + errorMessage: 'Response could not be censored due to an error', + timestamps: r.response.timestamps, + } + } + } + + const response: AdapterResponse = { + ...censoredResponse, + statusCode: (censoredResponse as TimestampedProviderErrorResponse).statusCode || 200, + } + + if (this.adapterSettings.METRICS_ENABLED && this.adapterSettings.EXPERIMENTAL_METRICS_ENABLED) { + response.meta = { + adapterName: calculateAdapterName(this.adapterName, r.params), + metrics: { + feedId: calculateFeedId( + { + adapterSettings: this.adapterSettings, + }, + r.params, + ), + }, + } + } + + if (response.timestamps?.providerIndicatedTimeUnixMs !== undefined) { + const timestampValidator = validator.responseTimestamp() + const error = timestampValidator.fn(response.timestamps?.providerIndicatedTimeUnixMs) + if (error) { + censorLogs(() => logger.warn(`Provider indicated time is invalid: ${error}`)) + } + } + + return { + key: this.getCacheKey(transportName, r.params), + value: response, + } as const + } + + private getCacheKey(transportName: string, params: TypeFromDefinition) { + return calculateCacheKey({ + transportName, + data: params, + adapterName: this.adapterName, + endpointName: this.endpointName, + adapterSettings: this.adapterSettings, + }) + } +} diff --git a/src/cache/response-cache/simple.ts b/src/cache/response-cache/simple.ts new file mode 100644 index 00000000..d30ff2b3 --- /dev/null +++ b/src/cache/response-cache/simple.ts @@ -0,0 +1,38 @@ +import { ResponseCache } from './base' +import { AdapterResponse, ResponseGenerics, TimestampedProviderResult } from '../../util' +import { InputParametersDefinition } from '../../validation/input-params' +import * as cacheMetrics from '../metrics' + +/** + * Special type of cache to store responses for this adapter. + */ +export class SimpleResponseCache< + T extends { + Parameters: InputParametersDefinition + Response: ResponseGenerics + }, +> extends ResponseCache { + async write(transportName: string, results: TimestampedProviderResult[]): Promise { + const entries = results.map((r) => this.generateCacheEntry(transportName, r)) + + const ttl = this.adapterSettings.CACHE_MAX_AGE + await this.cache.setMany(entries, ttl) + + const now = Date.now() + for (const { key, value } of entries) { + // Only record metrics if feed Id is present, otherwise assuming value is not adapter response to record + const response = value as unknown as AdapterResponse + const feedId = response.meta?.metrics?.feedId + if (feedId) { + const providerTime = response.timestamps?.providerIndicatedTimeUnixMs + const timeDelta = providerTime ? now - providerTime : undefined + + // Record cache set count, max age, and staleness (set to 0 for cache set) + const label = cacheMetrics.cacheMetricsLabel(key, feedId, this.cache.type) + cacheMetrics.cacheSet(label, ttl, timeDelta) + } + } + + return + } +} diff --git a/src/cache/response.ts b/src/cache/response.ts index 2fe961e4..f0723547 100644 --- a/src/cache/response.ts +++ b/src/cache/response.ts @@ -1,179 +1,2 @@ -import { AdapterDependencies } from '../adapter' -import { AdapterSettings } from '../config' -import { - AdapterResponse, - ResponseGenerics, - TimestampedAdapterResponse, - TimestampedProviderErrorResponse, - TimestampedProviderResult, - censor, - censorLogs, - makeLogger, -} from '../util' -import CensorList from '../util/censor/censor-list' -import { - InputParameters, - InputParametersDefinition, - TypeFromDefinition, -} from '../validation/input-params' -import { validator } from '../validation/utils' -import { Cache, calculateCacheKey, calculateFeedId, calculateAdapterName } from './' -import * as cacheMetrics from './metrics' - -const logger = makeLogger('ResponseCache') - -/** - * Special type of cache to store responses for this adapter. - */ -export class ResponseCache< - T extends { - Parameters: InputParametersDefinition - Response: ResponseGenerics - }, -> { - cache: Cache> - inputParameters: InputParameters - adapterName: string - endpointName: string - adapterSettings: AdapterSettings - - constructor({ - inputParameters, - adapterName, - endpointName, - adapterSettings, - dependencies, - }: { - dependencies: AdapterDependencies - adapterSettings: AdapterSettings - adapterName: string - endpointName: string - inputParameters: InputParameters - }) { - this.cache = dependencies.cache as Cache> - this.inputParameters = inputParameters - this.adapterName = adapterName - this.endpointName = endpointName - this.adapterSettings = adapterSettings - } - - /** - * Sets responses in the adapter cache (adding necessary metadata and defaults) - * - * @param results - the entries to write to the cache - */ - async write(transportName: string, results: TimestampedProviderResult[]): Promise { - const censorList = CensorList.getAll() - const entries = results.map((r) => { - const { data, result, errorMessage } = r.response - if (!errorMessage && data === undefined) { - logger.warn('The "data" property of the response is undefined.') - } else if (!errorMessage && result === undefined) { - logger.warn('The "result" property of the response is undefined.') - } - let censoredResponse - if (!censorList.length) { - censoredResponse = r.response - } else { - try { - censoredResponse = censor(r.response, censorList, true) as TimestampedAdapterResponse< - T['Response'] - > - } catch (error) { - censorLogs(() => logger.error(`Error censoring response: ${error}`)) - censoredResponse = { - statusCode: 502, - errorMessage: 'Response could not be censored due to an error', - timestamps: r.response.timestamps, - } - } - } - - const response: AdapterResponse = { - ...censoredResponse, - statusCode: (censoredResponse as TimestampedProviderErrorResponse).statusCode || 200, - } - - if ( - this.adapterSettings.METRICS_ENABLED && - this.adapterSettings.EXPERIMENTAL_METRICS_ENABLED - ) { - response.meta = { - adapterName: calculateAdapterName(this.adapterName, r.params), - metrics: { - feedId: calculateFeedId( - { - adapterSettings: this.adapterSettings, - }, - r.params, - ), - }, - } - } - - if (response.timestamps?.providerIndicatedTimeUnixMs !== undefined) { - const timestampValidator = validator.responseTimestamp() - const error = timestampValidator.fn(response.timestamps?.providerIndicatedTimeUnixMs) - if (error) { - censorLogs(() => logger.warn(`Provider indicated time is invalid: ${error}`)) - } - } - - return { - key: calculateCacheKey({ - transportName, - data: r.params, - adapterName: this.adapterName, - endpointName: this.endpointName, - adapterSettings: this.adapterSettings, - }), - value: response, - } as const - }) - - const ttl = this.adapterSettings.CACHE_MAX_AGE - await this.cache.setMany(entries, ttl) - - const now = Date.now() - for (const { key, value } of entries) { - // Only record metrics if feed Id is present, otherwise assuming value is not adapter response to record - const response = value as unknown as AdapterResponse - const feedId = response.meta?.metrics?.feedId - if (feedId) { - const providerTime = response.timestamps?.providerIndicatedTimeUnixMs - const timeDelta = providerTime ? now - providerTime : undefined - - // Record cache set count, max age, and staleness (set to 0 for cache set) - const label = cacheMetrics.cacheMetricsLabel(key, feedId, this.cache.type) - cacheMetrics.cacheSet(label, ttl, timeDelta) - } - } - - return - } - - /** - * Sets a new TTL value for already cached responses in the adapter cache - * - * @param transportName - transport name - * @param params - set of parameters that uniquely relate to the response - * @param ttl - a new time in milliseconds until the response expires - */ - async writeTTL( - transportName: string, - params: TypeFromDefinition[], - ttl: number, - ): Promise { - for (const param of params) { - const key = calculateCacheKey({ - transportName: transportName, - data: param, - adapterName: this.adapterName, - endpointName: this.endpointName, - adapterSettings: this.adapterSettings, - }) - - this.cache.setTTL(key, ttl) - } - } -} +export { ResponseCache } from './response-cache/base' +export { SimpleResponseCache } from './response-cache/simple' diff --git a/test/cache/response-cache.test.ts b/test/cache/response-cache/simple.test.ts similarity index 95% rename from test/cache/response-cache.test.ts rename to test/cache/response-cache/simple.test.ts index b01ae581..9fda037e 100644 --- a/test/cache/response-cache.test.ts +++ b/test/cache/response-cache/simple.test.ts @@ -1,18 +1,18 @@ import { Clock as InstalledClock } from '@sinonjs/fake-timers' -import { installTimers } from '../helper' +import { installTimers } from '../../helper' import untypedTest, { TestFn } from 'ava' import { FastifyInstance } from 'fastify' -import { Adapter, AdapterEndpoint } from '../../src/adapter' -import { AdapterConfig, SettingsDefinitionFromConfig } from '../../src/config' -import { AdapterRequest } from '../../src/util' -import { TypeFromDefinition } from '../../src/validation/input-params' +import { Adapter, AdapterEndpoint } from '../../../src/adapter' +import { AdapterConfig, SettingsDefinitionFromConfig } from '../../../src/config' +import { AdapterRequest } from '../../../src/util' +import { TypeFromDefinition } from '../../../src/validation/input-params' import { NopTransport, TestAdapter, assertEqualResponses, runAllUntilTime, -} from '../../src/util/testing-utils' -import { cacheTestInputParameters, CacheTestTransportTypes } from './helper' +} from '../../../src/util/testing-utils' +import { cacheTestInputParameters, CacheTestTransportTypes } from '../helper' const test = untypedTest as TestFn<{ clock: InstalledClock From c5e27eda6d187bd9c0c23f3a0ac265f0a2c1c64b Mon Sep 17 00:00:00 2001 From: Michael Xiao Date: Thu, 7 May 2026 23:30:58 -0400 Subject: [PATCH 2/7] compare cache & composite transport --- src/cache/response-cache/base.ts | 23 +++- src/cache/response-cache/compare.ts | 88 ++++++++++++++++ src/cache/response-cache/simple.ts | 14 ++- src/cache/response.ts | 1 + src/transports/composite.ts | 69 ++++++++++++ src/transports/index.ts | 1 + src/util/types.ts | 2 + test/cache/response-cache/compare.test.ts | 104 +++++++++++++++++++ test/metrics/metrics.test.ts | 1 + test/transports/composite.test.ts | 121 ++++++++++++++++++++++ 10 files changed, 418 insertions(+), 6 deletions(-) create mode 100644 src/cache/response-cache/compare.ts create mode 100644 src/transports/composite.ts create mode 100644 test/cache/response-cache/compare.test.ts create mode 100644 test/transports/composite.test.ts diff --git a/src/cache/response-cache/base.ts b/src/cache/response-cache/base.ts index 512e39ba..68d903aa 100644 --- a/src/cache/response-cache/base.ts +++ b/src/cache/response-cache/base.ts @@ -60,6 +60,18 @@ export abstract class ResponseCache< */ abstract write(transportName: string, results: TimestampedProviderResult[]): Promise + /** + * Sets responses with metadata in the adapter cache + * + * @param entries - the entries to write to the cache + */ + abstract writeEntries( + entries: { + key: string + value: AdapterResponse + }[], + ): Promise + /** * Sets a new TTL value for already cached responses in the adapter cache * @@ -82,7 +94,11 @@ export abstract class ResponseCache< return this.cache.get(key) } - protected generateCacheEntry(transportName: string, r: TimestampedProviderResult) { + protected generateCacheEntry( + transportNameForMeta: string, + transportNameForCache: string, + r: TimestampedProviderResult, + ) { const censorList = CensorList.getAll() const { data, result, errorMessage } = r.response if (!errorMessage && data === undefined) { @@ -116,6 +132,7 @@ export abstract class ResponseCache< if (this.adapterSettings.METRICS_ENABLED && this.adapterSettings.EXPERIMENTAL_METRICS_ENABLED) { response.meta = { adapterName: calculateAdapterName(this.adapterName, r.params), + transportName: transportNameForMeta, metrics: { feedId: calculateFeedId( { @@ -136,12 +153,12 @@ export abstract class ResponseCache< } return { - key: this.getCacheKey(transportName, r.params), + key: this.getCacheKey(transportNameForCache, r.params), value: response, } as const } - private getCacheKey(transportName: string, params: TypeFromDefinition) { + getCacheKey(transportName: string, params: TypeFromDefinition) { return calculateCacheKey({ transportName, data: params, diff --git a/src/cache/response-cache/compare.ts b/src/cache/response-cache/compare.ts new file mode 100644 index 00000000..84b618c8 --- /dev/null +++ b/src/cache/response-cache/compare.ts @@ -0,0 +1,88 @@ +import { ResponseCache } from './base' +import { AdapterResponse, ResponseGenerics, TimestampedProviderResult } from '../../util' +import { InputParametersDefinition, TypeFromDefinition } from '../../validation/input-params' + +/** + * Compares with existing cache entries before deciding to write or not + */ +export class CompareResponseCache< + T extends { + Parameters: InputParametersDefinition + Response: ResponseGenerics + }, +> extends ResponseCache { + readonly transportName: string + // The actual cache where responses are written to + responseCache: ResponseCache + // A local map to keep track of the most recent entries written to the responseCache + // We compare with this first before comparing with value in cache + // so that we can reduce cache reads + localCache: Map> + // True if next should replace current in cache + shouldUpdate: ( + next: AdapterResponse, + current?: AdapterResponse, + ) => boolean + + constructor( + transportName: string, + responseCache: ResponseCache, + shouldUpdate: ( + next: AdapterResponse, + current?: AdapterResponse, + ) => boolean, + ) { + super({ + inputParameters: responseCache.inputParameters, + adapterName: responseCache.adapterName, + endpointName: responseCache.endpointName, + adapterSettings: responseCache.adapterSettings, + dependencies: responseCache.dependencies, + }) + this.transportName = transportName + this.responseCache = responseCache + this.localCache = new Map() + this.shouldUpdate = shouldUpdate + } + + async write(transportName: string, results: TimestampedProviderResult[]): Promise { + const entries: { + key: string + value: AdapterResponse + }[] = [] + + for (const result of results) { + const { key, value } = this.generateCacheEntry(transportName, this.transportName, result) + if (!this.shouldUpdate(value, this.localCache.get(key))) { + continue + } + const entryInCache = await this.get(key) + if (!this.shouldUpdate(value, entryInCache)) { + continue + } + entries.push({ key, value }) + } + + await this.responseCache.writeEntries(entries) + + entries.forEach(({ key, value }) => { + this.localCache.set(key, value) + }) + } + + async writeEntries() { + throw new Error('Use write instead for CompareResponseCache') + } + + override async writeTTL( + _: string, + params: TypeFromDefinition[], + ttl: number, + ): Promise { + await this.responseCache.writeTTL(this.transportName, params, ttl) + } + + override async get(key: string) { + return this.responseCache.get(key) + } +} diff --git a/src/cache/response-cache/simple.ts b/src/cache/response-cache/simple.ts index d30ff2b3..64e3249e 100644 --- a/src/cache/response-cache/simple.ts +++ b/src/cache/response-cache/simple.ts @@ -12,9 +12,12 @@ export class SimpleResponseCache< Response: ResponseGenerics }, > extends ResponseCache { - async write(transportName: string, results: TimestampedProviderResult[]): Promise { - const entries = results.map((r) => this.generateCacheEntry(transportName, r)) - + async writeEntries( + entries: { + key: string + value: AdapterResponse + }[], + ): Promise { const ttl = this.adapterSettings.CACHE_MAX_AGE await this.cache.setMany(entries, ttl) @@ -35,4 +38,9 @@ export class SimpleResponseCache< return } + + async write(transportName: string, results: TimestampedProviderResult[]): Promise { + const entries = results.map((r) => this.generateCacheEntry(transportName, transportName, r)) + await this.writeEntries(entries) + } } diff --git a/src/cache/response.ts b/src/cache/response.ts index f0723547..e7294569 100644 --- a/src/cache/response.ts +++ b/src/cache/response.ts @@ -1,2 +1,3 @@ export { ResponseCache } from './response-cache/base' export { SimpleResponseCache } from './response-cache/simple' +export { CompareResponseCache } from './response-cache/compare' diff --git a/src/transports/composite.ts b/src/transports/composite.ts new file mode 100644 index 00000000..1d269657 --- /dev/null +++ b/src/transports/composite.ts @@ -0,0 +1,69 @@ +import { EndpointContext } from '../adapter' +import { CompareResponseCache } from '../cache/response-cache/compare' +import { ResponseCache } from '../cache/response' +import { AdapterRequest, AdapterResponse } from '../util/types' +import { TypeFromDefinition } from '../validation/input-params' +import type { Transport, TransportDependencies, TransportGenerics } from '.' + +export type CompositeTransportConfig = { + transports: Record> + + /** + * @param next - the next response to be written to the cache + * @param current - the current response in the cache + * @returns true if next should replace current in cache + */ + shouldUpdate: ( + next: AdapterResponse, + current?: AdapterResponse, + ) => boolean +} + +// Send requests to multiple transports and merge responses into a single cache according to shouldUpdate +export class CompositeTransport implements Transport { + name!: string + responseCache!: ResponseCache + private transports: Transport[] = [] + + constructor(private readonly config: CompositeTransportConfig) {} + + async initialize( + dependencies: TransportDependencies, + adapterSettings: T['Settings'], + endpointName: string, + transportName: string, + ): Promise { + this.name = transportName + this.responseCache = dependencies.responseCache + + const compareCache = new CompareResponseCache( + transportName, + this.responseCache, + this.config.shouldUpdate, + ) + + await Promise.all( + Object.entries(this.config.transports).map(([name, transport]) => + transport.initialize( + { ...dependencies, responseCache: compareCache }, + adapterSettings, + endpointName, + name, + ), + ), + ) + + this.transports = Object.values(this.config.transports) + } + + async registerRequest( + req: AdapterRequest>, + adapterSettings: T['Settings'], + ): Promise { + await Promise.all(this.transports.map((t) => t.registerRequest?.(req, adapterSettings))) + } + + async backgroundExecute(context: EndpointContext): Promise { + await Promise.all(this.transports.map((t) => t.backgroundExecute?.(context))) + } +} diff --git a/src/transports/index.ts b/src/transports/index.ts index ffb8a6e4..4ef80e99 100644 --- a/src/transports/index.ts +++ b/src/transports/index.ts @@ -7,6 +7,7 @@ import { InputParametersDefinition, TypeFromDefinition } from '../validation/inp export * from './http' export * from './sse' export * from './websocket' +export * from './composite' /** * Helper struct type that will be used to pass types to the generic parameters of a Transport. diff --git a/src/util/types.ts b/src/util/types.ts index 4caf9e49..c0d5085e 100644 --- a/src/util/types.ts +++ b/src/util/types.ts @@ -89,6 +89,8 @@ export interface AdapterRequestMeta { export interface AdapterResponseMeta extends AdapterRequestMeta { /** Name of the adapter */ adapterName: string + /** Name of the transport */ + transportName: string } /** diff --git a/test/cache/response-cache/compare.test.ts b/test/cache/response-cache/compare.test.ts new file mode 100644 index 00000000..161ab252 --- /dev/null +++ b/test/cache/response-cache/compare.test.ts @@ -0,0 +1,104 @@ +import test from 'ava' +import { LocalCache } from '../../../src/cache/local' +import { CompareResponseCache } from '../../../src/cache/response-cache/compare' +import { SimpleResponseCache } from '../../../src/cache/response-cache/simple' +import { AdapterConfig } from '../../../src/config' +import { LoggerFactoryProvider } from '../../../src/util/logger' +import { InputParameters } from '../../../src/validation' +import { cacheTestInputParameters, CacheTestTransportTypes } from '../helper' +import { AdapterDependencies } from '../../../src/adapter' +import { metrics } from '../../../src/metrics' + +test.before(() => { + LoggerFactoryProvider.set() + metrics.initialize() +}) + +const buildSimpleCache = () => { + const config = new AdapterConfig({}) + config.initialize() + config.settings.METRICS_ENABLED = true + config.settings.EXPERIMENTAL_METRICS_ENABLED = true + config.validate() + + return new SimpleResponseCache({ + dependencies: { cache: new LocalCache(100) } as unknown as AdapterDependencies, + adapterSettings: config.settings, + adapterName: 'TEST', + endpointName: 'test', + inputParameters: new InputParameters(cacheTestInputParameters.definition), + }) +} + +const providerResult = (params: { base: string; factor: number }, result: number) => ({ + params, + response: { + data: null, + result, + timestamps: { + providerDataRequestedUnixMs: 0, + providerDataReceivedUnixMs: 0, + providerIndicatedTimeUnixMs: undefined, + }, + }, +}) + +test('writes under CompareResponseCache transportName', async (t) => { + const compareCache = new CompareResponseCache('merged', buildSimpleCache(), () => true) + + const params = { base: 'ETH', factor: 1 } + + await compareCache.write('ws', [providerResult(params, 42)]) + + t.is(await compareCache.get(compareCache.getCacheKey('ws', params)), undefined) + + const entry = await compareCache.get(compareCache.getCacheKey('merged', params)) + t.is(entry?.result, 42) + t.is(entry?.meta?.transportName, 'ws') +}) + +test('second write override first write', async (t) => { + const compareCache = new CompareResponseCache('merged', buildSimpleCache(), () => true) + + const params = { base: 'ETH', factor: 1 } + + await compareCache.write('ws', [providerResult(params, 1), providerResult(params, 2)]) + + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 2) +}) + +test('shouldUpdate can block write when new value is not fresher than cache', async (t) => { + const compareCache = new CompareResponseCache( + 'merged', + buildSimpleCache(), + (next, current) => (next?.result || 0) > (current?.result || 0), + ) + + const params = { base: 'ETH', factor: 1 } + + await compareCache.write('merged', [providerResult(params, 50)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 50) + + await compareCache.write('merged', [providerResult(params, 25)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 50) + t.is(compareCache.localCache.size, 1) +}) + +test('shouldUpdate can block write without old value in localCache', async (t) => { + const simpleCache = buildSimpleCache() + + const compareCache = new CompareResponseCache( + 'merged', + simpleCache, + (next, current) => (next?.result || 0) > (current?.result || 0), + ) + + const params = { base: 'ETH', factor: 1 } + + await simpleCache.write('merged', [providerResult(params, 100)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 100) + + await compareCache.write('merged', [providerResult(params, 25)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 100) + t.is(compareCache.localCache.size, 0) +}) diff --git a/test/metrics/metrics.test.ts b/test/metrics/metrics.test.ts index ef3d7399..6b9730ca 100644 --- a/test/metrics/metrics.test.ts +++ b/test/metrics/metrics.test.ts @@ -332,6 +332,7 @@ test.serial('validate response.meta has the correct properties', async (t) => { t.deepEqual(response.meta, { adapterName: 'TEST', metrics: { feedId: '{"from":"eth","to":"usd"}' }, + transportName: 'default_single_transport', }) }) diff --git a/test/transports/composite.test.ts b/test/transports/composite.test.ts new file mode 100644 index 00000000..37129712 --- /dev/null +++ b/test/transports/composite.test.ts @@ -0,0 +1,121 @@ +import { installTimers } from '../helper' +import untypedTest, { TestFn } from 'ava' +import axios, { AxiosResponse } from 'axios' +import MockAdapter from 'axios-mock-adapter' +import { FastifyInstance } from 'fastify' +import { Adapter, AdapterEndpoint } from '../../src/adapter' +import { CompositeTransport } from '../../src/transports/composite' +import { HttpTransport } from '../../src/transports' +import { AdapterRequest } from '../../src/util/types' +import { TestAdapter } from '../../src/util/testing-utils' +import { TypeFromDefinition } from '../../src/validation/input-params' +import { cacheTestInputParameters, CacheTestTransportTypes } from '../cache/helper' + +const test = untypedTest as TestFn<{ + clock: ReturnType + testAdapter: TestAdapter + api: FastifyInstance | undefined +}> + +process.env['CACHE_POLLING_MAX_RETRIES'] = '20' +process.env['CACHE_POLLING_SLEEP_MS'] = '10' +process.env['RETRY'] = '0' +process.env['BACKGROUND_EXECUTE_MS_HTTP'] = '1' +process.env['API_TIMEOUT'] = '0' + +const WS_PROVIDER = 'http://ea-composite-ws.test' +const REST_PROVIDER = 'http://ea-composite-rest.test' + +const axiosMock = new MockAdapter(axios) + +type CacheTestHttpTypes = CacheTestTransportTypes & { + Provider: { + RequestBody: unknown + ResponseBody: { result: number } + } +} + +/** HTTP transport that counts `registerRequest` (subscription adds) like the old stub. */ +class CountingCacheHttpTransport extends HttpTransport { + registerRequestCalls = 0 + + constructor(logicalName: string, baseURL: string) { + super({ + prepareRequests: (params) => ({ + params, + request: { + baseURL, + url: '/price', + method: 'GET', + }, + }), + parseResponse: (params, res: AxiosResponse<{ result: number }>) => + params.map((p) => ({ + params: p, + response: { + data: null, + result: res.data.result, + }, + })), + }) + this.name = logicalName + } + + override async registerRequest( + req: AdapterRequest>, + settings: CacheTestHttpTypes['Settings'], + ): Promise { + this.registerRequestCalls++ + return super.registerRequest(req, settings) + } +} + +test.before((t) => { + t.context.clock = installTimers() +}) + +test.afterEach(async (t) => { + axiosMock.resetHandlers() + t.context.clock.reset() + await t.context.testAdapter?.api.close() +}) + +test.serial( + 'composite transport merges child writes using shouldUpdate when run under an adapter', + async (t) => { + axiosMock.onGet(`${WS_PROVIDER}/price`).reply(200, { result: 10 }) + axiosMock.onGet(`${REST_PROVIDER}/price`).reply(200, { result: 100 }) + + const ws = new CountingCacheHttpTransport('ws', WS_PROVIDER) + const rest = new CountingCacheHttpTransport('rest', REST_PROVIDER) + + const composite = new CompositeTransport({ + transports: { ws: ws, rest: rest }, + shouldUpdate: (next, current) => (next?.result ?? 0) > (current?.result ?? 0), + }) + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + endpoints: [ + new AdapterEndpoint({ + name: 'test', + inputParameters: cacheTestInputParameters, + transport: composite, + }), + ], + }) + + const testAdapter = await TestAdapter.start(adapter, t.context) + + t.is(ws.name, 'ws') + t.is(rest.name, 'rest') + + const res = await testAdapter.request({ base: 'ETH', factor: 5 }) + + t.is(res.statusCode, 200) + t.is(res.json().result, 100) + t.is(ws.registerRequestCalls, 1) + t.is(rest.registerRequestCalls, 1) + }, +) From 7ef7a7e5f45cb36da37f05f8dc46a3b9f8d7ada6 Mon Sep 17 00:00:00 2001 From: Michael Xiao Date: Fri, 8 May 2026 09:30:19 -0400 Subject: [PATCH 3/7] docs --- README.md | 1 + .../transport-types/composite-transport.md | 55 +++++++++++++++++++ docs/components/transports.md | 1 + 3 files changed, 57 insertions(+) create mode 100644 docs/components/transport-types/composite-transport.md diff --git a/README.md b/README.md index 7e32839c..57d122a8 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ yarn # Install yarn dependencies - [Subscription](./docs/components/transport-types/subscription-transport.md) - [Streaming](./docs/components/transport-types/streaming-transport.md) - [Custom](./docs/components/transport-types/custom-transport.md) + - [Composite](./docs/components/transport-types/composite-transport.md) - Guides - [Porting a v2 EA to v3](./docs/guides/porting-a-v2-ea-to-v3.md) - [Creating a new v3 EA](./docs/guides/creating-a-new-v3-ea.md) diff --git a/docs/components/transport-types/composite-transport.md b/docs/components/transport-types/composite-transport.md new file mode 100644 index 00000000..55790a7e --- /dev/null +++ b/docs/components/transport-types/composite-transport.md @@ -0,0 +1,55 @@ +# Composite transport + +`CompositeTransport` runs several child transports in parallel for the same endpoint and merges their writes into a single response cache. You choose when a newer value from any child should replace what is already cached by implementing `shouldUpdate`. + +Typical uses: + +- Combine a low-latency channel (for example WebSocket) with a REST fallback so the cache still updates if the stream lags or drops. +- Prefer one provider’s quote over another’s when both are active, using freshness, spread, or custom rules in `shouldUpdate`. + +## How it works + +1. **Initialization** — Each child transport is initialized with the same adapter dependencies, except `responseCache` is replaced by a `[CompareResponseCache](../../../src/cache/response-cache/compare.ts)` wrapper. That wrapper forwards reads to the real endpoint cache but filters writes: a write is applied only when `shouldUpdate(next, current)` is true for the pending value versus the last locally seen value for that cache key, and again versus the value already in the shared cache (so concurrent children do not blindly overwrite each other). +2. **Subscriptions** — `registerRequest` is invoked on every child in parallel, so each transport can register the request in its own subscription set or equivalent. +3. **Background execution** — `backgroundExecute` is invoked on every child in parallel. All children share the same merged cache policy via `shouldUpdate`. + +Child transport names come from the keys of the `transports` object you pass in (for example `ws` and `rest`). Those names are passed to each child’s `initialize` as its `transportName`. + +## Configuration + +`CompositeTransport` is constructed with a `CompositeTransportConfig`: + +| Field | Description | +| -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `transports` | Record of named child `Transport` instances. All children must use the same `TransportGenerics` as the composite. | +| `shouldUpdate` | `(next, current?) => boolean`. Return `true` if `next` should replace `current` in the cache. `current` is `undefined` when there is no prior value for that key. | + +The composite implements `Transport` but does not define `foregroundExecute`; behavior depends entirely on the children. + +## Example + +Two HTTP-style transports (here standing in for WS vs REST) both poll the same symbols. The cache keeps whichever result has the higher `result` field: + +```typescript +import { CompositeTransport, HttpTransport } from '@chainlink/external-adapter-framework/transports' + +const ws = new HttpTransport({ + /* ... */ +}) +const rest = new HttpTransport({ + /* ... */ +}) + +const transport = new CompositeTransport({ + transports: { ws, rest }, + shouldUpdate: (next, current) => (next?.result ?? 0) > (current?.result ?? 0), +}) +``` + +Use the composite as the endpoint’s single `transport` in `AdapterEndpoint` (see `[test/transports/composite.test.ts](../../../test/transports/composite.test.ts)` for a full adapter-level example). + +## Notes + +- **Ordering** — Children run concurrently; which response arrives first is not guaranteed. `shouldUpdate` should encode your merge policy (for example “newer timestamp wins” or “always prefer stream unless stale”). +- **TTL** — TTL writes are forwarded to the underlying cache with the composite’s transport name; see `CompareResponseCache.writeTTL` if you rely on per-transport TTL behavior. +- **Errors** — Child transports still own parsing and error handling; the composite only decides whether successful cache entries from a child replace existing ones. diff --git a/docs/components/transports.md b/docs/components/transports.md index d236d8a2..d8599068 100644 --- a/docs/components/transports.md +++ b/docs/components/transports.md @@ -13,6 +13,7 @@ The v3 framework provides transports to fetch data from a Provider using the com - [HTTP Transport](./transport-types/http-transport.md) - [Websocket Transport](./transport-types/websocket-transport.md) - [SSE Transport](./transport-types/sse-transport.md) +- [Composite Transport](./transport-types/composite-transport.md) - [Custom Transport](./transport-types/custom-transport.md) ### Abstract Transports From 585ac8fd3bb9f2b18a2bf2d79e7dfdf6f338a688 Mon Sep 17 00:00:00 2001 From: Michael Xiao Date: Tue, 12 May 2026 09:49:18 -0400 Subject: [PATCH 4/7] Comments --- src/transports/composite.ts | 22 ++++- test/transports/composite.test.ts | 159 ++++++++++++++++++++++++------ 2 files changed, 150 insertions(+), 31 deletions(-) diff --git a/src/transports/composite.ts b/src/transports/composite.ts index 1d269657..d5228847 100644 --- a/src/transports/composite.ts +++ b/src/transports/composite.ts @@ -1,10 +1,13 @@ import { EndpointContext } from '../adapter' import { CompareResponseCache } from '../cache/response-cache/compare' import { ResponseCache } from '../cache/response' +import { makeLogger } from '../util' import { AdapterRequest, AdapterResponse } from '../util/types' import { TypeFromDefinition } from '../validation/input-params' import type { Transport, TransportDependencies, TransportGenerics } from '.' +const logger = makeLogger('CompositeTransport') + export type CompositeTransportConfig = { transports: Record> @@ -60,10 +63,25 @@ export class CompositeTransport implements Transpor req: AdapterRequest>, adapterSettings: T['Settings'], ): Promise { - await Promise.all(this.transports.map((t) => t.registerRequest?.(req, adapterSettings))) + const results = await Promise.allSettled( + this.transports.map((t) => t.registerRequest?.(req, adapterSettings)), + ) + results + .filter((r) => r.status === 'rejected') + .forEach((r) => { + logger.error(`Transport registerRequest failed: ${r.reason}`) + }) } async backgroundExecute(context: EndpointContext): Promise { - await Promise.all(this.transports.map((t) => t.backgroundExecute?.(context))) + const results = await Promise.allSettled( + this.transports.map((t) => t.backgroundExecute?.(context)), + ) + + results + .filter((r) => r.status === 'rejected') + .forEach((r) => { + logger.error(`Transport backgroundExecute failed: ${r.reason}`) + }) } } diff --git a/test/transports/composite.test.ts b/test/transports/composite.test.ts index 37129712..47681605 100644 --- a/test/transports/composite.test.ts +++ b/test/transports/composite.test.ts @@ -3,9 +3,15 @@ import untypedTest, { TestFn } from 'ava' import axios, { AxiosResponse } from 'axios' import MockAdapter from 'axios-mock-adapter' import { FastifyInstance } from 'fastify' -import { Adapter, AdapterEndpoint } from '../../src/adapter' +import { Adapter, AdapterEndpoint, EndpointContext } from '../../src/adapter' import { CompositeTransport } from '../../src/transports/composite' -import { HttpTransport } from '../../src/transports' +import { + HttpTransport, + Transport, + TransportDependencies, + TransportGenerics, +} from '../../src/transports' +import { ResponseCache } from '../../src/cache/response' import { AdapterRequest } from '../../src/util/types' import { TestAdapter } from '../../src/util/testing-utils' import { TypeFromDefinition } from '../../src/validation/input-params' @@ -15,6 +21,8 @@ const test = untypedTest as TestFn<{ clock: ReturnType testAdapter: TestAdapter api: FastifyInstance | undefined + ws: CountingCacheHttpTransport + rest: CountingCacheHttpTransport }> process.env['CACHE_POLLING_MAX_RETRIES'] = '20' @@ -35,20 +43,21 @@ type CacheTestHttpTypes = CacheTestTransportTypes & { } } -/** HTTP transport that counts `registerRequest` (subscription adds) like the old stub. */ class CountingCacheHttpTransport extends HttpTransport { registerRequestCalls = 0 constructor(logicalName: string, baseURL: string) { super({ - prepareRequests: (params) => ({ - params, - request: { - baseURL, - url: '/price', - method: 'GET', - }, - }), + prepareRequests: (params) => + params.map((p) => ({ + params: [p], + request: { + baseURL, + url: '/price', + method: 'GET', + params: { base: p.base, factor: p.factor }, + }, + })), parseResponse: (params, res: AxiosResponse<{ result: number }>) => params.map((p) => ({ params: p, @@ -70,32 +79,121 @@ class CountingCacheHttpTransport extends HttpTransport { } } -test.before((t) => { +test.before(async (t) => { t.context.clock = installTimers() + + const ws = new CountingCacheHttpTransport('ws', WS_PROVIDER) + const rest = new CountingCacheHttpTransport('rest', REST_PROVIDER) + t.context.ws = ws + t.context.rest = rest + + const composite = new CompositeTransport({ + transports: { ws, rest }, + shouldUpdate: (next, current) => !current || (next?.result ?? 0) > (current?.result ?? 0), + }) + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + endpoints: [ + new AdapterEndpoint({ + name: 'test', + inputParameters: cacheTestInputParameters, + transport: composite, + }), + ], + }) + + await TestAdapter.start(adapter, t.context) }) -test.afterEach(async (t) => { - axiosMock.resetHandlers() - t.context.clock.reset() +test.after(async (t) => { await t.context.testAdapter?.api.close() }) +test.afterEach((t) => { + t.context.ws.registerRequestCalls = 0 + t.context.rest.registerRequestCalls = 0 +}) + +test.serial( + 'composite transport returns value from working transport when one transport fails to produce a value', + async (t) => { + axiosMock.onGet(`${WS_PROVIDER}/price`, { params: { base: 'ETH', factor: 5 } }).reply(500) + axiosMock + .onGet(`${REST_PROVIDER}/price`, { params: { base: 'ETH', factor: 5 } }) + .reply(200, { result: 42 }) + + const res = await t.context.testAdapter.request({ base: 'ETH', factor: 5 }) + + t.is(res.statusCode, 200) + t.is(res.json().result, 42) + t.is(t.context.ws.registerRequestCalls, 1) + t.is(t.context.rest.registerRequestCalls, 1) + }, +) + test.serial( 'composite transport merges child writes using shouldUpdate when run under an adapter', async (t) => { - axiosMock.onGet(`${WS_PROVIDER}/price`).reply(200, { result: 10 }) - axiosMock.onGet(`${REST_PROVIDER}/price`).reply(200, { result: 100 }) + axiosMock + .onGet(`${WS_PROVIDER}/price`, { params: { base: 'BTC', factor: 3 } }) + .reply(200, { result: 10 }) + axiosMock + .onGet(`${REST_PROVIDER}/price`, { params: { base: 'BTC', factor: 3 } }) + .reply(200, { result: 100 }) + + t.is(t.context.ws.name, 'ws') + t.is(t.context.rest.name, 'rest') + + const res = await t.context.testAdapter.request({ base: 'BTC', factor: 3 }) - const ws = new CountingCacheHttpTransport('ws', WS_PROVIDER) - const rest = new CountingCacheHttpTransport('rest', REST_PROVIDER) + t.is(res.statusCode, 200) + t.is(res.json().result, 100) + t.is(t.context.ws.registerRequestCalls, 1) + t.is(t.context.rest.registerRequestCalls, 1) + }, +) + +class ThrowingTransport implements Transport { + name!: string + responseCache!: ResponseCache + + async initialize( + dependencies: TransportDependencies, + _adapterSettings: T['Settings'], + _endpointName: string, + transportName: string, + ): Promise { + this.name = transportName + this.responseCache = dependencies.responseCache + } + + async registerRequest( + _req: AdapterRequest>, + _adapterSettings: T['Settings'], + ): Promise { + throw new Error('ThrowingTransport.registerRequest intentional error') + } + + async backgroundExecute(_context: EndpointContext): Promise { + throw new Error('ThrowingTransport.backgroundExecute intentional error') + } +} + +test.serial( + 'composite transport returns value from working transport when the other transport throws in registerRequest and backgroundExecute', + async (t) => { + const workingTransport = new CountingCacheHttpTransport('working', WS_PROVIDER) + const throwingTransport = new ThrowingTransport() const composite = new CompositeTransport({ - transports: { ws: ws, rest: rest }, - shouldUpdate: (next, current) => (next?.result ?? 0) > (current?.result ?? 0), + transports: { working: workingTransport, throwing: throwingTransport }, + shouldUpdate: (next, current) => !current || (next?.result ?? 0) > (current?.result ?? 0), }) const adapter = new Adapter({ - name: 'TEST', + name: 'TEST_THROWING', defaultEndpoint: 'test', endpoints: [ new AdapterEndpoint({ @@ -106,16 +204,19 @@ test.serial( ], }) - const testAdapter = await TestAdapter.start(adapter, t.context) + const localContext = { clock: t.context.clock } as typeof t.context + const localAdapter = await TestAdapter.start(adapter, localContext) - t.is(ws.name, 'ws') - t.is(rest.name, 'rest') + axiosMock + .onGet(`${WS_PROVIDER}/price`, { params: { base: 'LINK', factor: 2 } }) + .reply(200, { result: 77 }) - const res = await testAdapter.request({ base: 'ETH', factor: 5 }) + const res = await localAdapter.request({ base: 'LINK', factor: 2 }) t.is(res.statusCode, 200) - t.is(res.json().result, 100) - t.is(ws.registerRequestCalls, 1) - t.is(rest.registerRequestCalls, 1) + t.is(res.json().result, 77) + t.is(workingTransport.registerRequestCalls, 1) + + await localAdapter.api.close() }, ) From b9dffd26ef660089ded780f113e0dc6cda98d0e2 Mon Sep 17 00:00:00 2001 From: Michael Xiao Date: Wed, 13 May 2026 16:45:49 -0400 Subject: [PATCH 5/7] Implement writeEntries --- src/cache/response-cache/compare.ts | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/cache/response-cache/compare.ts b/src/cache/response-cache/compare.ts index 84b618c8..e329ebaf 100644 --- a/src/cache/response-cache/compare.ts +++ b/src/cache/response-cache/compare.ts @@ -46,13 +46,23 @@ export class CompareResponseCache< } async write(transportName: string, results: TimestampedProviderResult[]): Promise { - const entries: { + await this.writeEntries( + results.map((result) => this.generateCacheEntry(transportName, this.transportName, result)), + ) + } + + async writeEntries( + entries: { + key: string + value: AdapterResponse + }[], + ) { + const filteredEntries: { key: string value: AdapterResponse }[] = [] - for (const result of results) { - const { key, value } = this.generateCacheEntry(transportName, this.transportName, result) + for (const { key, value } of entries) { if (!this.shouldUpdate(value, this.localCache.get(key))) { continue } @@ -60,20 +70,16 @@ export class CompareResponseCache< if (!this.shouldUpdate(value, entryInCache)) { continue } - entries.push({ key, value }) + filteredEntries.push({ key, value }) } - await this.responseCache.writeEntries(entries) + await this.responseCache.writeEntries(filteredEntries) - entries.forEach(({ key, value }) => { + filteredEntries.forEach(({ key, value }) => { this.localCache.set(key, value) }) } - async writeEntries() { - throw new Error('Use write instead for CompareResponseCache') - } - override async writeTTL( _: string, params: TypeFromDefinition[], From 9028a592e54f9a3a29b36eafc201d9841145942d Mon Sep 17 00:00:00 2001 From: Michael Xiao Date: Wed, 13 May 2026 18:56:22 -0400 Subject: [PATCH 6/7] Prevent sharing transport instance --- src/transports/abstract/subscription.ts | 5 ++ test/transports/routing.test.ts | 89 +++++++++++++++++++++---- 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/src/transports/abstract/subscription.ts b/src/transports/abstract/subscription.ts index ec563bbe..31385adf 100644 --- a/src/transports/abstract/subscription.ts +++ b/src/transports/abstract/subscription.ts @@ -19,6 +19,7 @@ export abstract class SubscriptionTransport impleme subscriptionSet!: SubscriptionSet> subscriptionTtl!: number name!: string + initialized = false async initialize( dependencies: TransportDependencies, @@ -26,6 +27,10 @@ export abstract class SubscriptionTransport impleme endpointName: string, name: string, ): Promise { + if (this.initialized) { + throw new Error(`Transport ${name} has already been initialized`) + } + this.initialized = true this.responseCache = dependencies.responseCache this.subscriptionSet = dependencies.subscriptionSetFactory.buildSet(endpointName, name) this.subscriptionTtl = this.getSubscriptionTtlFromConfig(adapterSettings) // Will be implemented by subclasses diff --git a/test/transports/routing.test.ts b/test/transports/routing.test.ts index 99f10478..31428910 100644 --- a/test/transports/routing.test.ts +++ b/test/transports/routing.test.ts @@ -269,12 +269,17 @@ class MockSseTransport extends SseTransport { } } -const transports = new TransportRoutes() - .register('websocket', new MockWebSocketTransport()) - .register('batch', new MockHttpTransport()) - .register('sse', new MockSseTransport()) +function createTransportRoutes(): TransportRoutes { + return new TransportRoutes() + .register('websocket', new MockWebSocketTransport()) + .register('batch', new MockHttpTransport()) + .register('sse', new MockSseTransport()) +} + +let transports: TransportRoutes test.beforeEach(async (t) => { + transports = createTransportRoutes() const sampleEndpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price @@ -401,11 +406,67 @@ test.serial('endpoint routing can route to SSE transport', async (t) => { t.assert(internalTransport.registerRequestCalls > 0) }) +test.serial( + 'single endpoint cannot register the same transport instance under two route names', + async (t) => { + const sharedWs = new MockWebSocketTransport() + + const adapter = new Adapter({ + name: 'SHARED', + defaultEndpoint: 'price', + config: new AdapterConfig(settings, {}), + endpoints: [ + new AdapterEndpoint({ + inputParameters, + name: 'price', + transportRoutes: new TransportRoutes() + .register('primary', sharedWs) + .register('mirror', sharedWs), + }), + ], + }) + + await t.throwsAsync(async () => adapter.initialize(), { + message: 'Transport mirror has already been initialized', + }) + }, +) + +test.serial( + 'two endpoints on the same adapter cannot share the same transport instances', + async (t) => { + const sharedRoutes = createTransportRoutes() + + const adapter = new Adapter({ + name: 'SHAREDEP', + defaultEndpoint: 'price', + config: new AdapterConfig(settings), + endpoints: [ + new AdapterEndpoint({ + inputParameters, + name: 'price', + transportRoutes: sharedRoutes, + }), + new AdapterEndpoint({ + inputParameters, + name: 'quote', + transportRoutes: sharedRoutes, + }), + ], + }) + + await t.throwsAsync(async () => adapter.initialize(), { + message: 'Transport websocket has already been initialized', + }) + }, +) + test.serial('custom router is applied to get valid transport to route to', async (t) => { + const testTransports = createTransportRoutes() const endpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price - transportRoutes: transports, + transportRoutes: testTransports, customRouter: () => 'batch', }) @@ -459,15 +520,16 @@ test.serial('custom router is applied to get valid transport to route to', async }) t.is(error.statusCode, 504) - const internalTransport = transports.get('batch') as unknown as MockHttpTransport + const internalTransport = testTransports.get('batch') as unknown as MockHttpTransport t.assert(internalTransport.registerRequestCalls > 0) }) test.serial('custom router returns invalid transport and request fails', async (t) => { + const testTransports = createTransportRoutes() const endpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price - transportRoutes: transports, + transportRoutes: testTransports, customRouter: () => 'qweqwe', }) @@ -527,10 +589,11 @@ test.serial('custom router returns invalid transport and request fails', async ( }) test.serial('missing transport in input params with no default fails request', async (t) => { + const testTransports = createTransportRoutes() const endpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price - transportRoutes: transports, + transportRoutes: testTransports, }) const customConfig = new AdapterConfig(settings, { @@ -588,10 +651,11 @@ test.serial('missing transport in input params with no default fails request', a }) test.serial('missing transport in input params with default succeeds', async (t) => { + const testTransports = createTransportRoutes() const endpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price - transportRoutes: transports, + transportRoutes: testTransports, defaultTransport: 'batch', }) @@ -644,7 +708,7 @@ test.serial('missing transport in input params with default succeeds', async (t) }) t.is(error.statusCode, 504) - const internalTransport = transports.get('batch') as unknown as MockHttpTransport + const internalTransport = testTransports.get('batch') as unknown as MockHttpTransport t.assert(internalTransport.registerRequestCalls > 0) }) @@ -789,10 +853,11 @@ test.serial('invalid transport override is skipped', async (t) => { test.serial( 'transport and transport override are ignored when custom router returns a value', async (t) => { + const testTransports = createTransportRoutes() const endpoint = new AdapterEndpoint({ inputParameters, name: 'price', // /price - transportRoutes: transports, + transportRoutes: testTransports, customRouter: () => 'batch', }) @@ -851,7 +916,7 @@ test.serial( }) t.is(error.statusCode, 504) - const internalTransport = transports.get('batch') as unknown as MockHttpTransport + const internalTransport = testTransports.get('batch') as unknown as MockHttpTransport t.assert(internalTransport.registerRequestCalls > 0) }, ) From eac7f22c72feff4b4b61d85f0fa776573ff79af5 Mon Sep 17 00:00:00 2001 From: Michael Xiao Date: Fri, 15 May 2026 18:41:17 -0400 Subject: [PATCH 7/7] Hide composite implementation --- .../transport-types/composite-transport.md | 67 +++++------ docs/reference-tables/ea-settings.md | 1 + src/adapter/endpoint.ts | 18 ++- src/adapter/types.ts | 3 + src/config/index.ts | 5 + src/transports/composite.ts | 33 ++---- test/transports/composite.test.ts | 111 ++++++++++++++---- 7 files changed, 156 insertions(+), 82 deletions(-) diff --git a/docs/components/transport-types/composite-transport.md b/docs/components/transport-types/composite-transport.md index 55790a7e..4031acec 100644 --- a/docs/components/transport-types/composite-transport.md +++ b/docs/components/transport-types/composite-transport.md @@ -1,55 +1,50 @@ # Composite transport -`CompositeTransport` runs several child transports in parallel for the same endpoint and merges their writes into a single response cache. You choose when a newer value from any child should replace what is already cached by implementing `shouldUpdate`. +Composite transport is a **framework feature** for multi-route endpoints: every registered child transport runs in parallel for the same endpoint, and successful cache writes are merged so only the “freshest” value wins. You enable it on the endpoint; you **do not** import or `new CompositeTransport(...)` in adapter code—that class is constructed internally when the conditions below are met. Typical uses: -- Combine a low-latency channel (for example WebSocket) with a REST fallback so the cache still updates if the stream lags or drops. -- Prefer one provider’s quote over another’s when both are active, using freshness, spread, or custom rules in `shouldUpdate`. +- Pair a low-latency stream (for example WebSocket) with a REST fallback so the cache still updates if the stream lags or drops. +- Run two data paths for the same feed and keep whichever provider reports a newer `providerIndicatedTimeUnixMs`. -## How it works +## How to use it -1. **Initialization** — Each child transport is initialized with the same adapter dependencies, except `responseCache` is replaced by a `[CompareResponseCache](../../../src/cache/response-cache/compare.ts)` wrapper. That wrapper forwards reads to the real endpoint cache but filters writes: a write is applied only when `shouldUpdate(next, current)` is true for the pending value versus the last locally seen value for that cache key, and again versus the value already in the shared cache (so concurrent children do not blindly overwrite each other). -2. **Subscriptions** — `registerRequest` is invoked on every child in parallel, so each transport can register the request in its own subscription set or equivalent. -3. **Background execution** — `backgroundExecute` is invoked on every child in parallel. All children share the same merged cache policy via `shouldUpdate`. +1. Define the endpoint with **`transportRoutes`** (not a single `transport` field). Register **at least two** named child transports on a [`TransportRoutes`](../../../src/transports/index.ts) instance. Transport names must be lowercase letters only (see `TransportRoutes.register`). +2. Set **`enableCompositeTransport: true`** on the same [`AdapterEndpoint`](../../../src/adapter/endpoint.ts) params. +3. Turn the behavior on at runtime by setting adapter setting **`COMPOSITE_TRANSPORT`** to `true` (for example env `COMPOSITE_TRANSPORT=true`, or your adapter’s settings prefix). -Child transport names come from the keys of the `transports` object you pass in (for example `ws` and `rest`). Those names are passed to each child’s `initialize` as its `transportName`. +If `enableCompositeTransport` is `true` but there are fewer than two routes, construction throws. If `enableCompositeTransport` is `true` but **`COMPOSITE_TRANSPORT`** is `false` (the default), the endpoint keeps **normal multi-transport routing** (`customRouter`, request `transport`, or `defaultTransport`) so operators can flip composite mode without redeploying. -## Configuration - -`CompositeTransport` is constructed with a `CompositeTransportConfig`: - -| Field | Description | -| -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `transports` | Record of named child `Transport` instances. All children must use the same `TransportGenerics` as the composite. | -| `shouldUpdate` | `(next, current?) => boolean`. Return `true` if `next` should replace `current` in the cache. `current` is `undefined` when there is no prior value for that key. | - -The composite implements `Transport` but does not define `foregroundExecute`; behavior depends entirely on the children. +When **both** flags are true, [`AdapterEndpoint.initialize`](../../../src/adapter/endpoint.ts) replaces the route map with a single internal route whose transport is a `CompositeTransport` built from your previous route entries. From then on the framework treats the endpoint as having one logical transport that fans out to all children. ## Example -Two HTTP-style transports (here standing in for WS vs REST) both poll the same symbols. The cache keeps whichever result has the higher `result` field: - ```typescript -import { CompositeTransport, HttpTransport } from '@chainlink/external-adapter-framework/transports' - -const ws = new HttpTransport({ - /* ... */ -}) -const rest = new HttpTransport({ - /* ... */ -}) - -const transport = new CompositeTransport({ - transports: { ws, rest }, - shouldUpdate: (next, current) => (next?.result ?? 0) > (current?.result ?? 0), +import { AdapterEndpoint } from '@chainlink/external-adapter-framework/adapter' +import { TransportRoutes } from '@chainlink/external-adapter-framework/transports' + +// wsTransport and restTransport are normal transports you already defined +export const endpoint = new AdapterEndpoint({ + name: 'example', + inputParameters, + enableCompositeTransport: true, + transportRoutes: new TransportRoutes() + .register('ws', wsTransport) + .register('rest', restTransport), }) ``` -Use the composite as the endpoint’s single `transport` in `AdapterEndpoint` (see `[test/transports/composite.test.ts](../../../test/transports/composite.test.ts)` for a full adapter-level example). +Deploy or configure with **`COMPOSITE_TRANSPORT=true`** when you want parallel execution and merged caching for that endpoint. + +## How it works (internals) + +The framework’s `CompositeTransport` (see [`composite.ts`](../../../src/transports/composite.ts)) wires each child with a [`CompareResponseCache`](../../../src/cache/response-cache/compare.ts) instead of the raw endpoint cache: reads go through to the real cache, while writes are accepted only when the pending payload is newer than both the last value seen for that key on that child path and the value already stored, using **`timestamps.providerIndicatedTimeUnixMs`** (missing timestamps are treated as `0`). **`registerRequest`** and **`backgroundExecute`** are invoked on **every** child in parallel. There is no `foregroundExecute` on the composite itself; behavior comes entirely from the children. + +Child names are the keys you passed to `register`; each child’s `initialize` receives that string as its `transportName`. ## Notes -- **Ordering** — Children run concurrently; which response arrives first is not guaranteed. `shouldUpdate` should encode your merge policy (for example “newer timestamp wins” or “always prefer stream unless stale”). -- **TTL** — TTL writes are forwarded to the underlying cache with the composite’s transport name; see `CompareResponseCache.writeTTL` if you rely on per-transport TTL behavior. -- **Errors** — Child transports still own parsing and error handling; the composite only decides whether successful cache entries from a child replace existing ones. +- **Timestamps** — Children should populate `providerIndicatedTimeUnixMs` when they have a meaningful provider clock; otherwise merge order may not match business intent. +- **Concurrency** — Delivery order across children is not guaranteed; the merge rule is strictly “larger `providerIndicatedTimeUnixMs` wins.” +- **TTL** — TTL behavior flows through the compare cache with the composite’s transport name; see `CompareResponseCache.writeTTL` if you depend on per-transport TTL semantics. +- **Errors** — Children still own parsing and errors; the composite only arbitrates successful cache updates between children. diff --git a/docs/reference-tables/ea-settings.md b/docs/reference-tables/ea-settings.md index 93ae62bc..562c86ee 100644 --- a/docs/reference-tables/ea-settings.md +++ b/docs/reference-tables/ea-settings.md @@ -27,6 +27,7 @@ | CACHE_REDIS_URL | string | undefined | The URL of the Redis server. Format: [redis[s]:]//[[user][:password@]][host][:port][/db-number]?db=db-number[&password=bar[&option=value]]] | - Value must be a valid URL | | | CACHE_TYPE | enum | local | The type of cache to use throughout the EA | | | | CENSOR_SENSITIVE_LOGS | boolean | false | Controls whether the logging of sensitive information is enabled or disabled | | | +| COMPOSITE_TRANSPORT | boolean | false | Whether to use enableCompositeTransport parameter in AdapterEndpoint | | | | CORRELATION_ID_ENABLED | boolean | true | Flag to enable correlation IDs for sent requests in logging | | | | DEBUG | boolean | false | Toggles debug mode | | | | DEBUG_ENDPOINTS | boolean | false | Whether to enable debug enpoints (/debug/\*) for this adapter. Enabling them might consume more resources. | | | diff --git a/src/adapter/endpoint.ts b/src/adapter/endpoint.ts index 282201b5..25669ecd 100644 --- a/src/adapter/endpoint.ts +++ b/src/adapter/endpoint.ts @@ -1,6 +1,6 @@ import { SimpleResponseCache } from '../cache/response' import { AdapterSettings } from '../config' -import { TransportRoutes } from '../transports' +import { CompositeTransport, TransportRoutes } from '../transports' import { AdapterRequest, AdapterRequestData, @@ -46,6 +46,7 @@ export class AdapterEndpoint implements AdapterEndpo settings: T['Settings'], ) => string defaultTransport?: string + enableCompositeTransport?: boolean constructor(params: AdapterEndpointParams) { this.name = params.name @@ -55,6 +56,13 @@ export class AdapterEndpoint implements AdapterEndpo this.transportRoutes = params.transportRoutes this.customRouter = params.customRouter this.defaultTransport = params.defaultTransport + this.enableCompositeTransport = params.enableCompositeTransport + if (params.enableCompositeTransport && this.transportRoutes.routeNames().length < 2) { + throw new AdapterError({ + statusCode: 400, + message: `Composite transport requires at least 2 transports`, + }) + } } else { this.transportRoutes = new TransportRoutes().register( DEFAULT_TRANSPORT_NAME, @@ -96,6 +104,14 @@ export class AdapterEndpoint implements AdapterEndpo responseCache, } + if (this.enableCompositeTransport && adapterSettings.COMPOSITE_TRANSPORT) { + logger.debug(`Enabling composite transport for endpoint "${this.name}"...`) + this.transportRoutes = new TransportRoutes().register( + DEFAULT_TRANSPORT_NAME, + new CompositeTransport(Object.fromEntries(this.transportRoutes.entries())), + ) + } + logger.debug(`Initializing transports for endpoint "${this.name}"...`) for (const [transportName, transport] of this.transportRoutes.entries()) { await transport.initialize(transportDependencies, adapterSettings, this.name, transportName) diff --git a/src/adapter/types.ts b/src/adapter/types.ts index 8d52246c..c33e6222 100644 --- a/src/adapter/types.ts +++ b/src/adapter/types.ts @@ -183,6 +183,9 @@ type MultiTransportAdapterEndpointParams = { /** If no value is returned from the custom router or the default (transport param), which transport to use */ defaultTransport?: string + + /** If true, roll all transportRoutes under a new CompositeTransport */ + enableCompositeTransport?: boolean } /** diff --git a/src/config/index.ts b/src/config/index.ts index 71006470..a97236f5 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -389,6 +389,11 @@ export const BaseSettingsDefinition = { 'Whether to enable debug enpoints (/debug/*) for this adapter. Enabling them might consume more resources.', default: false, }, + COMPOSITE_TRANSPORT: { + type: 'boolean', + description: 'Whether to use enableCompositeTransport parameter in AdapterEndpoint', + default: false, + }, } as const satisfies SettingsDefinitionMap export const buildAdapterSettings = < diff --git a/src/transports/composite.ts b/src/transports/composite.ts index d5228847..32d50348 100644 --- a/src/transports/composite.ts +++ b/src/transports/composite.ts @@ -2,33 +2,18 @@ import { EndpointContext } from '../adapter' import { CompareResponseCache } from '../cache/response-cache/compare' import { ResponseCache } from '../cache/response' import { makeLogger } from '../util' -import { AdapterRequest, AdapterResponse } from '../util/types' +import { AdapterRequest } from '../util/types' import { TypeFromDefinition } from '../validation/input-params' import type { Transport, TransportDependencies, TransportGenerics } from '.' const logger = makeLogger('CompositeTransport') -export type CompositeTransportConfig = { - transports: Record> - - /** - * @param next - the next response to be written to the cache - * @param current - the current response in the cache - * @returns true if next should replace current in cache - */ - shouldUpdate: ( - next: AdapterResponse, - current?: AdapterResponse, - ) => boolean -} - -// Send requests to multiple transports and merge responses into a single cache according to shouldUpdate +// Send requests to multiple transports and merge responses into a single cache according to bigger providerIndicatedTimeUnixMs export class CompositeTransport implements Transport { name!: string responseCache!: ResponseCache - private transports: Transport[] = [] - constructor(private readonly config: CompositeTransportConfig) {} + constructor(private readonly transports: Record>) {} async initialize( dependencies: TransportDependencies, @@ -42,11 +27,13 @@ export class CompositeTransport implements Transpor const compareCache = new CompareResponseCache( transportName, this.responseCache, - this.config.shouldUpdate, + (next, current) => + (next.timestamps?.providerIndicatedTimeUnixMs ?? 0) > + (current?.timestamps?.providerIndicatedTimeUnixMs ?? 0), ) await Promise.all( - Object.entries(this.config.transports).map(([name, transport]) => + Object.entries(this.transports).map(([name, transport]) => transport.initialize( { ...dependencies, responseCache: compareCache }, adapterSettings, @@ -55,8 +42,6 @@ export class CompositeTransport implements Transpor ), ), ) - - this.transports = Object.values(this.config.transports) } async registerRequest( @@ -64,7 +49,7 @@ export class CompositeTransport implements Transpor adapterSettings: T['Settings'], ): Promise { const results = await Promise.allSettled( - this.transports.map((t) => t.registerRequest?.(req, adapterSettings)), + Object.values(this.transports).map((t) => t.registerRequest?.(req, adapterSettings)), ) results .filter((r) => r.status === 'rejected') @@ -75,7 +60,7 @@ export class CompositeTransport implements Transpor async backgroundExecute(context: EndpointContext): Promise { const results = await Promise.allSettled( - this.transports.map((t) => t.backgroundExecute?.(context)), + Object.values(this.transports).map((t) => t.backgroundExecute?.(context)), ) results diff --git a/test/transports/composite.test.ts b/test/transports/composite.test.ts index 47681605..de97258f 100644 --- a/test/transports/composite.test.ts +++ b/test/transports/composite.test.ts @@ -4,12 +4,14 @@ import axios, { AxiosResponse } from 'axios' import MockAdapter from 'axios-mock-adapter' import { FastifyInstance } from 'fastify' import { Adapter, AdapterEndpoint, EndpointContext } from '../../src/adapter' -import { CompositeTransport } from '../../src/transports/composite' +import { AdapterError } from '../../src/validation/error' +import { AdapterConfig } from '../../src/config' import { HttpTransport, Transport, TransportDependencies, TransportGenerics, + TransportRoutes, } from '../../src/transports' import { ResponseCache } from '../../src/cache/response' import { AdapterRequest } from '../../src/util/types' @@ -39,7 +41,7 @@ const axiosMock = new MockAdapter(axios) type CacheTestHttpTypes = CacheTestTransportTypes & { Provider: { RequestBody: unknown - ResponseBody: { result: number } + ResponseBody: { result: number; ts?: number } } } @@ -58,12 +60,17 @@ class CountingCacheHttpTransport extends HttpTransport { params: { base: p.base, factor: p.factor }, }, })), - parseResponse: (params, res: AxiosResponse<{ result: number }>) => + parseResponse: (params, res: AxiosResponse<{ result: number; ts?: number }>) => params.map((p) => ({ params: p, response: { data: null, result: res.data.result, + timestamps: { + providerDataRequestedUnixMs: 0, + providerDataReceivedUnixMs: 0, + providerIndicatedTimeUnixMs: res.data.ts ?? 1, + }, }, })), }) @@ -87,11 +94,6 @@ test.before(async (t) => { t.context.ws = ws t.context.rest = rest - const composite = new CompositeTransport({ - transports: { ws, rest }, - shouldUpdate: (next, current) => !current || (next?.result ?? 0) > (current?.result ?? 0), - }) - const adapter = new Adapter({ name: 'TEST', defaultEndpoint: 'test', @@ -99,9 +101,13 @@ test.before(async (t) => { new AdapterEndpoint({ name: 'test', inputParameters: cacheTestInputParameters, - transport: composite, + enableCompositeTransport: true, + transportRoutes: new TransportRoutes() + .register('ws', ws) + .register('rest', rest), }), ], + config: new AdapterConfig({}, { envDefaultOverrides: { COMPOSITE_TRANSPORT: true } }), }) await TestAdapter.start(adapter, t.context) @@ -122,7 +128,7 @@ test.serial( axiosMock.onGet(`${WS_PROVIDER}/price`, { params: { base: 'ETH', factor: 5 } }).reply(500) axiosMock .onGet(`${REST_PROVIDER}/price`, { params: { base: 'ETH', factor: 5 } }) - .reply(200, { result: 42 }) + .reply(200, { result: 42, ts: 100 }) const res = await t.context.testAdapter.request({ base: 'ETH', factor: 5 }) @@ -134,19 +140,19 @@ test.serial( ) test.serial( - 'composite transport merges child writes using shouldUpdate when run under an adapter', + 'composite transport merges child writes by providerIndicatedTimeUnixMs when run under an adapter', async (t) => { axiosMock .onGet(`${WS_PROVIDER}/price`, { params: { base: 'BTC', factor: 3 } }) - .reply(200, { result: 10 }) + .reply(200, { result: 10, ts: 1000 }) axiosMock .onGet(`${REST_PROVIDER}/price`, { params: { base: 'BTC', factor: 3 } }) - .reply(200, { result: 100 }) + .reply(200, { result: 100, ts: 2000 }) t.is(t.context.ws.name, 'ws') t.is(t.context.rest.name, 'rest') - const res = await t.context.testAdapter.request({ base: 'BTC', factor: 3 }) + const res = await t.context.testAdapter.request({ base: 'BTC', factor: 3, transport: 'rest' }) t.is(res.statusCode, 200) t.is(res.json().result, 100) @@ -187,11 +193,6 @@ test.serial( const workingTransport = new CountingCacheHttpTransport('working', WS_PROVIDER) const throwingTransport = new ThrowingTransport() - const composite = new CompositeTransport({ - transports: { working: workingTransport, throwing: throwingTransport }, - shouldUpdate: (next, current) => !current || (next?.result ?? 0) > (current?.result ?? 0), - }) - const adapter = new Adapter({ name: 'TEST_THROWING', defaultEndpoint: 'test', @@ -199,9 +200,13 @@ test.serial( new AdapterEndpoint({ name: 'test', inputParameters: cacheTestInputParameters, - transport: composite, + enableCompositeTransport: true, + transportRoutes: new TransportRoutes() + .register('working', workingTransport) + .register('throwing', throwingTransport), }), ], + config: new AdapterConfig({}, { envDefaultOverrides: { COMPOSITE_TRANSPORT: true } }), }) const localContext = { clock: t.context.clock } as typeof t.context @@ -209,7 +214,7 @@ test.serial( axiosMock .onGet(`${WS_PROVIDER}/price`, { params: { base: 'LINK', factor: 2 } }) - .reply(200, { result: 77 }) + .reply(200, { result: 77, ts: 100 }) const res = await localAdapter.request({ base: 'LINK', factor: 2 }) @@ -220,3 +225,67 @@ test.serial( await localAdapter.api.close() }, ) + +test.serial( + 'enableCompositeTransport does not use composite routing when COMPOSITE_TRANSPORT is false', + async (t) => { + const ws = new CountingCacheHttpTransport('ws', WS_PROVIDER) + const rest = new CountingCacheHttpTransport('rest', REST_PROVIDER) + + const adapter = new Adapter({ + name: 'TEST_COMPOSITE_OFF', + defaultEndpoint: 'test', + endpoints: [ + new AdapterEndpoint({ + name: 'test', + inputParameters: cacheTestInputParameters, + enableCompositeTransport: true, + transportRoutes: new TransportRoutes() + .register('ws', ws) + .register('rest', rest), + }), + ], + config: new AdapterConfig({}, { envDefaultOverrides: { COMPOSITE_TRANSPORT: false } }), + }) + + const localContext = { clock: t.context.clock } as typeof t.context + const localAdapter = await TestAdapter.start(adapter, localContext) + + axiosMock + .onGet(`${REST_PROVIDER}/price`, { params: { base: 'SOL', factor: 7 } }) + .reply(200, { result: 99, ts: 50 }) + + const res = await localAdapter.request({ base: 'SOL', factor: 7, transport: 'rest' }) + + t.is(res.statusCode, 200) + t.is(res.json().result, 99) + t.is(ws.registerRequestCalls, 0) + t.is(rest.registerRequestCalls, 1) + + await localAdapter.api.close() + }, +) + +test.serial( + 'AdapterEndpoint throws when enableCompositeTransport is true with only one transport', + (t) => { + const onlyTransport = new CountingCacheHttpTransport('only', WS_PROVIDER) + + const error = t.throws( + () => + new AdapterEndpoint({ + name: 'test', + inputParameters: cacheTestInputParameters, + enableCompositeTransport: true, + transportRoutes: new TransportRoutes().register( + 'only', + onlyTransport, + ), + }), + { instanceOf: AdapterError }, + ) + + t.is(error?.message, 'Composite transport requires at least 2 transports') + t.is(error?.statusCode, 400) + }, +)