Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ yarn # Install yarn dependencies
- [Subscription](./docs/components/transport-types/subscription-transport.md)
- [Streaming](./docs/components/transport-types/streaming-transport.md)
- [Custom](./docs/components/transport-types/custom-transport.md)
- [Composite](./docs/components/transport-types/composite-transport.md)
- Guides
- [Porting a v2 EA to v3](./docs/guides/porting-a-v2-ea-to-v3.md)
- [Creating a new v3 EA](./docs/guides/creating-a-new-v3-ea.md)
Expand Down
50 changes: 50 additions & 0 deletions docs/components/transport-types/composite-transport.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Composite transport

Composite transport is a **framework feature** for multi-route endpoints: every registered child transport runs in parallel for the same endpoint, and successful cache writes are merged so only the “freshest” value wins. You enable it on the endpoint; you **do not** import or `new CompositeTransport(...)` in adapter code—that class is constructed internally when the conditions below are met.

Typical uses:

- Pair a low-latency stream (for example WebSocket) with a REST fallback so the cache still updates if the stream lags or drops.
- Run two data paths for the same feed and keep whichever provider reports a newer `providerIndicatedTimeUnixMs`.

## How to use it

1. Define the endpoint with **`transportRoutes`** (not a single `transport` field). Register **at least two** named child transports on a [`TransportRoutes`](../../../src/transports/index.ts) instance. Transport names must be lowercase letters only (see `TransportRoutes.register`).
2. Set **`enableCompositeTransport: true`** on the same [`AdapterEndpoint`](../../../src/adapter/endpoint.ts) params.
3. Turn the behavior on at runtime by setting adapter setting **`COMPOSITE_TRANSPORT`** to `true` (for example env `COMPOSITE_TRANSPORT=true`, or your adapter’s settings prefix).

If `enableCompositeTransport` is `true` but there are fewer than two routes, construction throws. If `enableCompositeTransport` is `true` but **`COMPOSITE_TRANSPORT`** is `false` (the default), the endpoint keeps **normal multi-transport routing** (`customRouter`, request `transport`, or `defaultTransport`) so operators can flip composite mode without redeploying.

When **both** flags are true, [`AdapterEndpoint.initialize`](../../../src/adapter/endpoint.ts) replaces the route map with a single internal route whose transport is a `CompositeTransport` built from your previous route entries. From then on the framework treats the endpoint as having one logical transport that fans out to all children.

## Example

```typescript
import { AdapterEndpoint } from '@chainlink/external-adapter-framework/adapter'
import { TransportRoutes } from '@chainlink/external-adapter-framework/transports'

// wsTransport and restTransport are normal transports you already defined
export const endpoint = new AdapterEndpoint({
name: 'example',
inputParameters,
enableCompositeTransport: true,
transportRoutes: new TransportRoutes<EndpointTypes>()
.register('ws', wsTransport)
.register('rest', restTransport),
})
```

Deploy or configure with **`COMPOSITE_TRANSPORT=true`** when you want parallel execution and merged caching for that endpoint.

## How it works (internals)

The framework’s `CompositeTransport` (see [`composite.ts`](../../../src/transports/composite.ts)) wires each child with a [`CompareResponseCache`](../../../src/cache/response-cache/compare.ts) instead of the raw endpoint cache: reads go through to the real cache, while writes are accepted only when the pending payload is newer than both the last value seen for that key on that child path and the value already stored, using **`timestamps.providerIndicatedTimeUnixMs`** (missing timestamps are treated as `0`). **`registerRequest`** and **`backgroundExecute`** are invoked on **every** child in parallel. There is no `foregroundExecute` on the composite itself; behavior comes entirely from the children.

Child names are the keys you passed to `register`; each child’s `initialize` receives that string as its `transportName`.

## Notes

- **Timestamps** — Children should populate `providerIndicatedTimeUnixMs` when they have a meaningful provider clock; otherwise merge order may not match business intent.
- **Concurrency** — Delivery order across children is not guaranteed; the merge rule is strictly “larger `providerIndicatedTimeUnixMs` wins.”
- **TTL** — TTL behavior flows through the compare cache with the composite’s transport name; see `CompareResponseCache.writeTTL` if you depend on per-transport TTL semantics.
- **Errors** — Children still own parsing and errors; the composite only arbitrates successful cache updates between children.
1 change: 1 addition & 0 deletions docs/components/transports.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The v3 framework provides transports to fetch data from a Provider using the com
- [HTTP Transport](./transport-types/http-transport.md)
- [Websocket Transport](./transport-types/websocket-transport.md)
- [SSE Transport](./transport-types/sse-transport.md)
- [Composite Transport](./transport-types/composite-transport.md)
- [Custom Transport](./transport-types/custom-transport.md)

### Abstract Transports
Expand Down
1 change: 1 addition & 0 deletions docs/reference-tables/ea-settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
| CACHE_REDIS_URL | string | undefined | The URL of the Redis server. Format: [redis[s]:]//[[user][:password@]][host][:port][/db-number]?db=db-number[&password=bar[&option=value]]] | - Value must be a valid URL | |
| CACHE_TYPE | enum | local | The type of cache to use throughout the EA | | |
| CENSOR_SENSITIVE_LOGS | boolean | false | Controls whether the logging of sensitive information is enabled or disabled | | |
| COMPOSITE_TRANSPORT | boolean | false | Whether to use enableCompositeTransport parameter in AdapterEndpoint | | |
| CORRELATION_ID_ENABLED | boolean | true | Flag to enable correlation IDs for sent requests in logging | | |
| DEBUG | boolean | false | Toggles debug mode | | |
| DEBUG_ENDPOINTS | boolean | false | Whether to enable debug enpoints (/debug/\*) for this adapter. Enabling them might consume more resources. | | |
Expand Down
22 changes: 19 additions & 3 deletions src/adapter/endpoint.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ResponseCache } from '../cache/response'
import { SimpleResponseCache } from '../cache/response'
import { AdapterSettings } from '../config'
import { TransportRoutes } from '../transports'
import { CompositeTransport, TransportRoutes } from '../transports'
import {
AdapterRequest,
AdapterRequestData,
Expand Down Expand Up @@ -46,6 +46,7 @@ export class AdapterEndpoint<T extends EndpointGenerics> implements AdapterEndpo
settings: T['Settings'],
) => string
defaultTransport?: string
enableCompositeTransport?: boolean

constructor(params: AdapterEndpointParams<T>) {
this.name = params.name
Expand All @@ -55,6 +56,13 @@ export class AdapterEndpoint<T extends EndpointGenerics> implements AdapterEndpo
this.transportRoutes = params.transportRoutes
this.customRouter = params.customRouter
this.defaultTransport = params.defaultTransport
this.enableCompositeTransport = params.enableCompositeTransport
if (params.enableCompositeTransport && this.transportRoutes.routeNames().length < 2) {
throw new AdapterError({
statusCode: 400,
message: `Composite transport requires at least 2 transports`,
})
}
} else {
this.transportRoutes = new TransportRoutes<T>().register(
DEFAULT_TRANSPORT_NAME,
Expand Down Expand Up @@ -83,7 +91,7 @@ export class AdapterEndpoint<T extends EndpointGenerics> implements AdapterEndpo
adapterSettings: T['Settings'],
): Promise<void> {
this.adapterName = adapterName
const responseCache = new ResponseCache({
const responseCache = new SimpleResponseCache({
dependencies,
adapterSettings: adapterSettings as AdapterSettings,
adapterName,
Expand All @@ -96,6 +104,14 @@ export class AdapterEndpoint<T extends EndpointGenerics> implements AdapterEndpo
responseCache,
}

if (this.enableCompositeTransport && adapterSettings.COMPOSITE_TRANSPORT) {
logger.debug(`Enabling composite transport for endpoint "${this.name}"...`)
this.transportRoutes = new TransportRoutes<T>().register(
DEFAULT_TRANSPORT_NAME,
new CompositeTransport(Object.fromEntries(this.transportRoutes.entries())),
)
}

logger.debug(`Initializing transports for endpoint "${this.name}"...`)
for (const [transportName, transport] of this.transportRoutes.entries()) {
await transport.initialize(transportDependencies, adapterSettings, this.name, transportName)
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ type MultiTransportAdapterEndpointParams<T extends EndpointGenerics> = {

/** If no value is returned from the custom router or the default (transport param), which transport to use */
defaultTransport?: string

/** If true, roll all transportRoutes under a new CompositeTransport */
enableCompositeTransport?: boolean
}

/**
Expand Down
170 changes: 170 additions & 0 deletions src/cache/response-cache/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { AdapterDependencies } from '../../adapter'
import { AdapterSettings } from '../../config'
import {
AdapterResponse,
makeLogger,
ResponseGenerics,
TimestampedAdapterResponse,
TimestampedProviderResult,
censor,
censorLogs,
TimestampedProviderErrorResponse,
} from '../../util'
import {
InputParameters,
InputParametersDefinition,
TypeFromDefinition,
} from '../../validation/input-params'
import { Cache, calculateAdapterName, calculateCacheKey, calculateFeedId } from '../'
import CensorList from '../../util/censor/censor-list'
import { validator } from '../../validation/utils'

const logger = makeLogger('ResponseCache')

export abstract class ResponseCache<
T extends { Parameters: InputParametersDefinition; Response: ResponseGenerics },
> {
cache: Cache<AdapterResponse<T['Response']>>
inputParameters: InputParameters<T['Parameters']>
adapterName: string
endpointName: string
adapterSettings: AdapterSettings
dependencies: AdapterDependencies

constructor({
inputParameters,
adapterName,
endpointName,
adapterSettings,
dependencies,
}: {
dependencies: AdapterDependencies
adapterSettings: AdapterSettings
adapterName: string
endpointName: string
inputParameters: InputParameters<T['Parameters']>
}) {
this.dependencies = dependencies
this.cache = dependencies.cache as Cache<AdapterResponse<T['Response']>>
this.inputParameters = inputParameters
this.adapterName = adapterName
this.endpointName = endpointName
this.adapterSettings = adapterSettings
}

/**
* Sets responses in the adapter cache (adding necessary metadata and defaults)
*
* @param transportName - transport name
* @param results - the entries to write to the cache
*/
abstract write(transportName: string, results: TimestampedProviderResult<T>[]): Promise<void>

/**
* Sets responses with metadata in the adapter cache
*
* @param entries - the entries to write to the cache
*/
abstract writeEntries(
entries: {
key: string
value: AdapterResponse<T['Response']>
}[],
): Promise<void>

/**
* Sets a new TTL value for already cached responses in the adapter cache
*
* @param transportName - transport name
* @param params - set of parameters that uniquely relate to the response
* @param ttl - a new time in milliseconds until the response expires
*/
async writeTTL(
transportName: string,
params: TypeFromDefinition<T['Parameters']>[],
ttl: number,
): Promise<void> {
for (const param of params) {
const key = this.getCacheKey(transportName, param)
this.cache.setTTL(key, ttl)
}
}

async get(key: string) {
return this.cache.get(key)
}

protected generateCacheEntry(
transportNameForMeta: string,
transportNameForCache: string,
r: TimestampedProviderResult<T>,
) {
const censorList = CensorList.getAll()
const { data, result, errorMessage } = r.response
if (!errorMessage && data === undefined) {
logger.warn('The "data" property of the response is undefined.')
} else if (!errorMessage && result === undefined) {
logger.warn('The "result" property of the response is undefined.')
}
let censoredResponse
if (!censorList.length) {
censoredResponse = r.response
} else {
try {
censoredResponse = censor(r.response, censorList, true) as TimestampedAdapterResponse<
T['Response']
>
} catch (error) {
censorLogs(() => logger.error(`Error censoring response: ${error}`))
censoredResponse = {
statusCode: 502,
errorMessage: 'Response could not be censored due to an error',
timestamps: r.response.timestamps,
}
}
}

const response: AdapterResponse<T['Response']> = {
...censoredResponse,
statusCode: (censoredResponse as TimestampedProviderErrorResponse).statusCode || 200,
}

if (this.adapterSettings.METRICS_ENABLED && this.adapterSettings.EXPERIMENTAL_METRICS_ENABLED) {
response.meta = {
adapterName: calculateAdapterName(this.adapterName, r.params),
transportName: transportNameForMeta,
metrics: {
feedId: calculateFeedId(
{
adapterSettings: this.adapterSettings,
},
r.params,
),
},
}
}

if (response.timestamps?.providerIndicatedTimeUnixMs !== undefined) {
const timestampValidator = validator.responseTimestamp()
const error = timestampValidator.fn(response.timestamps?.providerIndicatedTimeUnixMs)
if (error) {
censorLogs(() => logger.warn(`Provider indicated time is invalid: ${error}`))
}
}

return {
key: this.getCacheKey(transportNameForCache, r.params),
value: response,
} as const
}

getCacheKey(transportName: string, params: TypeFromDefinition<T['Parameters']>) {
return calculateCacheKey({
transportName,
data: params,
adapterName: this.adapterName,
endpointName: this.endpointName,
adapterSettings: this.adapterSettings,
})
}
}
Loading
Loading