From 38d8ba42cc3b7816dfb830b90e6e36fcc3d85c8a Mon Sep 17 00:00:00 2001 From: Matthias Vallentin Date: Fri, 15 May 2026 21:22:17 +0200 Subject: [PATCH 1/4] Document Amazon Kinesis operators --- .../docs/integrations/amazon/kinesis.mdx | 120 +++++++++++++ .../operators/from_amazon_kinesis.mdx | 166 ++++++++++++++++++ .../reference/operators/to_amazon_kinesis.mdx | 155 ++++++++++++++++ src/sidebar.ts | 1 + 4 files changed, 442 insertions(+) create mode 100644 src/content/docs/integrations/amazon/kinesis.mdx create mode 100644 src/content/docs/reference/operators/from_amazon_kinesis.mdx create mode 100644 src/content/docs/reference/operators/to_amazon_kinesis.mdx diff --git a/src/content/docs/integrations/amazon/kinesis.mdx b/src/content/docs/integrations/amazon/kinesis.mdx new file mode 100644 index 000000000..0c4ecf0ef --- /dev/null +++ b/src/content/docs/integrations/amazon/kinesis.mdx @@ -0,0 +1,120 @@ +--- +title: Kinesis +--- + +[Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/) is a +managed streaming data service on AWS. + +Tenzir can receive records from Kinesis streams with + +from_amazon_kinesis and send records to Kinesis streams with +to_amazon_kinesis. + +When Tenzir reads from Kinesis, it emits one event per Kinesis record. The event +uses the `tenzir.amazon_kinesis` schema and contains the raw record payload in +the `message` field as a `blob`, together with metadata such as the stream, +shard, sequence number, partition key, arrival time, and lag. + +Tenzir does not parse or decompress Kinesis payloads automatically. Convert the +`message` blob explicitly in TQL when the stream contains structured data: + +```tql +from_amazon_kinesis "security-events", start="trim_horizon" +this = string(message).parse_json() +``` + +The source operator lists shards once during startup. If the stream gains new +shards while a pipeline is running, restart the pipeline to discover them. +Snapshots store per-shard sequence numbers and resume with at-least-once +semantics. + +## Configuration + +Follow the [Amazon integration configuration](/integrations/amazon) to +authenticate with your AWS credentials. + +Alternatively, use the `aws_iam` parameter to provide explicit credentials: + +```tql +from_amazon_kinesis "security-events", aws_iam={ + region: "us-east-1", + access_key_id: secret("aws-key"), + secret_access_key: secret("aws-secret") +} +``` + +You can also use `aws_iam` to assume an IAM role: + +```tql +from_amazon_kinesis "security-events", aws_iam={ + region: "eu-west-1", + assume_role: "arn:aws:iam::123456789012:role/my-kinesis-role", + session_name: "tenzir-session" +} +``` + +Set `endpoint` to use a Kinesis-compatible endpoint, such as LocalStack: + +```tql +from_amazon_kinesis "security-events", + aws_region="us-east-1", + endpoint="http://127.0.0.1:4566" +``` + +When `endpoint` is omitted, Tenzir checks `AWS_ENDPOINT_URL_KINESIS` first, then +`AWS_ENDPOINT_URL`, then uses the default AWS SDK endpoint for the region. + +Tenzir needs these Kinesis permissions: + +| Operator | Required permissions | +| ---------------------------- | ---------------------------------------------------------------------- | +| from_amazon_kinesis | `kinesis:ListShards`, `kinesis:GetShardIterator`, `kinesis:GetRecords` | +| to_amazon_kinesis | `kinesis:PutRecords` | + +## Examples + +### Send events to a stream + +```tql +subscribe "alerts" +to_amazon_kinesis "security-events" +``` + +This sends one NDJSON record per input event by using the default +`message=this.print_ndjson()` expression. + +### Send a custom payload + +```tql +from {payload: "security event detected", tenant: "acme"} +to_amazon_kinesis "security-events", + message=payload, + partition_key=tenant +``` + +If you omit `partition_key`, Tenzir generates a random UUID per event. + +### Receive and parse JSON records + +```tql +from_amazon_kinesis "security-events", + start="trim_horizon", + count=100, + exit=true +this = string(message).parse_json() +``` + +### Read from a timestamp + +```tql +from_amazon_kinesis "security-events", start=2026-01-01T00:00:00Z +``` + +## See Also + +- from_amazon_kinesis +- to_amazon_kinesis +- collecting/read-from-message-brokers +- routing/send-to-destinations +- amazon/sqs +- kafka diff --git a/src/content/docs/reference/operators/from_amazon_kinesis.mdx b/src/content/docs/reference/operators/from_amazon_kinesis.mdx new file mode 100644 index 000000000..406ea6c5a --- /dev/null +++ b/src/content/docs/reference/operators/from_amazon_kinesis.mdx @@ -0,0 +1,166 @@ +--- +title: from_amazon_kinesis +category: Inputs/Events +example: 'from_amazon_kinesis "security-events", start="trim_horizon"' +--- + +import AWSIAMOptions from '@partials/operators/AWSIAMOptions.mdx'; + +Receives records from an [Amazon Kinesis Data Streams][kinesis] stream. + +[kinesis]: https://aws.amazon.com/kinesis/data-streams/ + +```tql +from_amazon_kinesis stream:string, [start=string|time, count=int, exit=bool, + records_per_call=int, poll_idle=duration, + aws_region=string, aws_iam=record, endpoint=string] +``` + +## Description + +The `from_amazon_kinesis` operator reads records from the existing shards of a +Kinesis data stream and emits one event per Kinesis record. + +The emitted events use the `tenzir.amazon_kinesis` schema with these fields: + +| Field | Type | Description | +| ---------------------- | -------- | ------------------------------------------------ | +| `message` | `blob` | The raw Kinesis record payload. | +| `stream` | `string` | The stream name. | +| `shard_id` | `string` | The shard that contained the record. | +| `sequence_number` | `string` | The Kinesis sequence number. | +| `partition_key` | `string` | The record partition key. | +| `arrival_time` | `time` | The approximate time when Kinesis received it. | +| `encryption_type` | `string` | The server-side encryption type, when available. | +| `millis_behind_latest` | `int` | The shard lag reported by Kinesis. | + +The `arrival_time` and `encryption_type` fields are optional because Kinesis +only returns them when they are present on the record. + +The operator lists shards once during startup. If the stream gains new shards +while the pipeline is running, restart the pipeline to discover them. Pipeline +snapshots store the next sequence number per shard, and restarts resume with +`AFTER_SEQUENCE_NUMBER`. This gives at-least-once restart behavior. + +The operator requires these AWS permissions: + +- `kinesis:ListShards` +- `kinesis:GetShardIterator` +- `kinesis:GetRecords` + +### `stream: string` + +The name of the Kinesis data stream to receive records from. + +### `start = string | time (optional)` + +The position for the initial shard iterator when no snapshot is available. + +The value can be one of: + +- `"latest"`: start after the latest record +- `"trim_horizon"`: start at the oldest available record +- ``: start at the given timestamp + +Defaults to `"latest"`. + +### `count = int (optional)` + +Exit successfully after emitting `count` records. + +The value must be greater than zero. + +### `exit = bool (optional)` + +Exit successfully after the operator is caught up on all shards. + +Without this option, the operator waits for new records after consuming the +currently available records. + +Defaults to `false`. + +### `records_per_call = int (optional)` + +The maximum number of records to fetch in one `GetRecords` request. + +The value must be between `1` and `10000`. + +Defaults to `1000`. + +### `poll_idle = duration (optional)` + +How long to wait after a `GetRecords` request returns no records. + +The value must be non-negative. + +Defaults to `1s`. + +### `aws_region = string (optional)` + +The AWS region for reading from the stream. + +If omitted, the operator uses the region from `aws_iam` when present. Otherwise, +it uses the default AWS SDK region resolution. + +### `endpoint = string (optional)` + +A custom Kinesis endpoint URL. + +If omitted, the operator uses `AWS_ENDPOINT_URL_KINESIS` when set, then +`AWS_ENDPOINT_URL`, then the default AWS SDK endpoint for the region. + + + +## Examples + +### Read from the latest position + +```tql +from_amazon_kinesis "security-events" +``` + +### Read from the oldest available record + +```tql +from_amazon_kinesis "security-events", start="trim_horizon" +``` + +### Parse JSON payloads + +```tql +from_amazon_kinesis "security-events", start="trim_horizon" +this = string(message).parse_json() +``` + +### Read a bounded number of records + +```tql +from_amazon_kinesis "security-events", + start="trim_horizon", + count=100, + exit=true +``` + +### Use explicit credentials + +```tql +from_amazon_kinesis "security-events", aws_iam={ + region: "us-east-1", + access_key_id: secret("aws-key"), + secret_access_key: secret("aws-secret") +} +``` + +### Use a LocalStack endpoint + +```tql +from_amazon_kinesis "security-events", + aws_region="us-east-1", + endpoint="http://127.0.0.1:4566" +``` + +## See Also + +- to_amazon_kinesis +- collecting/read-from-message-brokers +- amazon/kinesis diff --git a/src/content/docs/reference/operators/to_amazon_kinesis.mdx b/src/content/docs/reference/operators/to_amazon_kinesis.mdx new file mode 100644 index 000000000..ae83ad6a5 --- /dev/null +++ b/src/content/docs/reference/operators/to_amazon_kinesis.mdx @@ -0,0 +1,155 @@ +--- +title: to_amazon_kinesis +category: Outputs/Events +example: 'to_amazon_kinesis "security-events"' +--- + +import AWSIAMOptions from '@partials/operators/AWSIAMOptions.mdx'; + +Sends records to an [Amazon Kinesis Data Streams][kinesis] stream. + +[kinesis]: https://aws.amazon.com/kinesis/data-streams/ + +```tql +to_amazon_kinesis stream:string, [message=blob|string, partition_key=string, + batch_size=int, batch_timeout=duration, parallel=int, + aws_region=string, aws_iam=record, endpoint=string] +``` + +## Description + +The `to_amazon_kinesis` operator sends records to a Kinesis data stream with +`PutRecords`. + +By default, `to_amazon_kinesis` serializes each input event as NDJSON with +`this.print_ndjson()`. Use the `message` option to send a specific string or +blob expression instead. If the expression evaluates to `null` or another type, +the operator skips that event and emits a warning. + +If you omit `partition_key`, the operator generates a random UUID for each +record. A partition key expression must produce non-empty strings up to 256 +bytes. Kinesis record payloads must be at most 1 MiB. Events that exceed these +limits are skipped with a warning. + +The operator retries failed `PutRecords` entries with exponential backoff before +emitting an error. + +The operator requires the `kinesis:PutRecords` AWS permission. + +### `stream: string` + +The name of the Kinesis data stream to send records to. + +### `message = blob | string (optional)` + +The expression that produces the Kinesis record payload for each event. + +Defaults to `this.print_ndjson()`. + +### `partition_key = string (optional)` + +The expression that produces the Kinesis partition key for each event. + +If omitted, the operator generates a random UUID per event. + +### `batch_size = int (optional)` + +The maximum number of records per `PutRecords` request. + +The value must be between `1` and `500`. + +Defaults to `500`. + +### `batch_timeout = duration (optional)` + +How long to wait before flushing a non-empty batch. + +The value must be positive. + +Defaults to `1s`. + +### `parallel = int (optional)` + +The number of parallel write slots. + +The value must be greater than zero. + +Defaults to `1`. + +### `aws_region = string (optional)` + +The AWS region for writing to the stream. + +If omitted, the operator uses the region from `aws_iam` when present. Otherwise, +it uses the default AWS SDK region resolution. + +### `endpoint = string (optional)` + +A custom Kinesis endpoint URL. + +If omitted, the operator uses `AWS_ENDPOINT_URL_KINESIS` when set, then +`AWS_ENDPOINT_URL`, then the default AWS SDK endpoint for the region. + + + +## Examples + +### Send NDJSON events + +```tql +subscribe "alerts" +to_amazon_kinesis "security-events" +``` + +### Send a custom payload + +```tql +from {payload: "security event detected", tenant: "acme"} +to_amazon_kinesis "security-events", + message=payload, + partition_key=tenant +``` + +### Send JSON strings with an explicit partition key + +```tql +subscribe "alerts" +to_amazon_kinesis "security-events", + message=this.print_json(), + partition_key=string(src_ip) +``` + +### Configure batching + +```tql +subscribe "alerts" +to_amazon_kinesis "security-events", + batch_size=100, + batch_timeout=500ms +``` + +### Use explicit credentials + +```tql +subscribe "alerts" +to_amazon_kinesis "security-events", aws_iam={ + region: "us-east-1", + access_key_id: secret("aws-key"), + secret_access_key: secret("aws-secret") +} +``` + +### Use a LocalStack endpoint + +```tql +from {alert: "test"} +to_amazon_kinesis "security-events", + aws_region="us-east-1", + endpoint="http://127.0.0.1:4566" +``` + +## See Also + +- from_amazon_kinesis +- routing/send-to-destinations +- amazon/kinesis diff --git a/src/sidebar.ts b/src/sidebar.ts index 07bedb665..260aa0079 100644 --- a/src/sidebar.ts +++ b/src/sidebar.ts @@ -335,6 +335,7 @@ export const integrations = [ collapsed: true, items: [ "integrations/amazon", + "integrations/amazon/kinesis", "integrations/amazon/msk", "integrations/amazon/s3", "integrations/amazon/security-lake", From 91b0201a8f019aedd6d1b69d363792571b3f5a30 Mon Sep 17 00:00:00 2001 From: Matthias Vallentin Date: Fri, 15 May 2026 21:42:06 +0200 Subject: [PATCH 2/4] List Amazon Kinesis operators --- src/content/docs/reference/operators.mdx | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/content/docs/reference/operators.mdx b/src/content/docs/reference/operators.mdx index 2450b3fb7..2fb0eda86 100644 --- a/src/content/docs/reference/operators.mdx +++ b/src/content/docs/reference/operators.mdx @@ -347,6 +347,10 @@ operators: description: 'Receives messages from an AMQP queue.' example: 'from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events"' path: 'reference/operators/from_amqp' + - name: 'from_amazon_kinesis' + description: 'Receives records from an Amazon Kinesis data stream.' + example: 'from_amazon_kinesis "security-events"' + path: 'reference/operators/from_amazon_kinesis' - name: 'from_azure_blob_storage' description: 'Reads one or multiple files from Azure Blob Storage.' example: 'from_azure_blob_storage "abfs://container/data/**.json"' @@ -619,6 +623,10 @@ operators: description: 'Sends messages to an AMQP exchange.' example: 'to_amqp "amqp://admin:pass@0.0.0.1:5672/vhost"' path: 'reference/operators/to_amqp' + - name: 'to_amazon_kinesis' + description: 'Sends records to an Amazon Kinesis data stream.' + example: 'to_amazon_kinesis "security-events"' + path: 'reference/operators/to_amazon_kinesis' - name: 'to_sqs' description: 'Sends messages to an Amazon SQS queue.' example: 'to_sqs "sqs://tenzir"' @@ -1422,6 +1430,14 @@ from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events" + + +```tql +from_amazon_kinesis "security-events" +``` + + + ```tql @@ -1870,6 +1886,14 @@ to_amqp "amqp://admin:pass@0.0.0.1:5672/vhost" + + +```tql +to_amazon_kinesis "security-events" +``` + + + ```tql From 9936532f72e5d282f329e8f3b1422c034aff5968 Mon Sep 17 00:00:00 2001 From: Matthias Vallentin Date: Fri, 15 May 2026 21:57:53 +0200 Subject: [PATCH 3/4] Update Kinesis operator limits --- src/content/docs/reference/operators.mdx | 32 +++++++++---------- .../operators/from_amazon_kinesis.mdx | 2 +- .../reference/operators/to_amazon_kinesis.mdx | 4 +-- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/content/docs/reference/operators.mdx b/src/content/docs/reference/operators.mdx index 2fb0eda86..b165cb566 100644 --- a/src/content/docs/reference/operators.mdx +++ b/src/content/docs/reference/operators.mdx @@ -343,14 +343,14 @@ operators: description: 'Receives UDP datagrams and outputs structured events.' example: 'accept_udp "0.0.0.0:8090"' path: 'reference/operators/accept_udp' - - name: 'from_amqp' - description: 'Receives messages from an AMQP queue.' - example: 'from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events"' - path: 'reference/operators/from_amqp' - name: 'from_amazon_kinesis' description: 'Receives records from an Amazon Kinesis data stream.' example: 'from_amazon_kinesis "security-events"' path: 'reference/operators/from_amazon_kinesis' + - name: 'from_amqp' + description: 'Receives messages from an AMQP queue.' + example: 'from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events"' + path: 'reference/operators/from_amqp' - name: 'from_azure_blob_storage' description: 'Reads one or multiple files from Azure Blob Storage.' example: 'from_azure_blob_storage "abfs://container/data/**.json"' @@ -619,14 +619,14 @@ operators: description: 'Listens for incoming TCP connections and sends events to all connected clients.' example: 'serve_tcp "0.0.0.0:8090" { write_json }' path: 'reference/operators/serve_tcp' - - name: 'to_amqp' - description: 'Sends messages to an AMQP exchange.' - example: 'to_amqp "amqp://admin:pass@0.0.0.1:5672/vhost"' - path: 'reference/operators/to_amqp' - name: 'to_amazon_kinesis' description: 'Sends records to an Amazon Kinesis data stream.' example: 'to_amazon_kinesis "security-events"' path: 'reference/operators/to_amazon_kinesis' + - name: 'to_amqp' + description: 'Sends messages to an AMQP exchange.' + example: 'to_amqp "amqp://admin:pass@0.0.0.1:5672/vhost"' + path: 'reference/operators/to_amqp' - name: 'to_sqs' description: 'Sends messages to an Amazon SQS queue.' example: 'to_sqs "sqs://tenzir"' @@ -1422,18 +1422,18 @@ accept_zmq "tcp://0.0.0.0:5555", prefix="alerts/" { read_json } - + ```tql -from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events" +from_amazon_kinesis "security-events" ``` - + ```tql -from_amazon_kinesis "security-events" +from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events" ``` @@ -1878,18 +1878,18 @@ to_amazon_security_lake "s3://…" - + ```tql -to_amqp "amqp://admin:pass@0.0.0.1:5672/vhost" +to_amazon_kinesis "security-events" ``` - + ```tql -to_amazon_kinesis "security-events" +to_amqp "amqp://admin:pass@0.0.0.1:5672/vhost" ``` diff --git a/src/content/docs/reference/operators/from_amazon_kinesis.mdx b/src/content/docs/reference/operators/from_amazon_kinesis.mdx index 406ea6c5a..55bef8de6 100644 --- a/src/content/docs/reference/operators/from_amazon_kinesis.mdx +++ b/src/content/docs/reference/operators/from_amazon_kinesis.mdx @@ -91,7 +91,7 @@ Defaults to `1000`. How long to wait after a `GetRecords` request returns no records. -The value must be non-negative. +The value must be non-negative and less than `5min`. Defaults to `1s`. diff --git a/src/content/docs/reference/operators/to_amazon_kinesis.mdx b/src/content/docs/reference/operators/to_amazon_kinesis.mdx index ae83ad6a5..62a9487f6 100644 --- a/src/content/docs/reference/operators/to_amazon_kinesis.mdx +++ b/src/content/docs/reference/operators/to_amazon_kinesis.mdx @@ -28,8 +28,8 @@ the operator skips that event and emits a warning. If you omit `partition_key`, the operator generates a random UUID for each record. A partition key expression must produce non-empty strings up to 256 -bytes. Kinesis record payloads must be at most 1 MiB. Events that exceed these -limits are skipped with a warning. +bytes. Kinesis record payloads plus their partition key must be at most 10 MiB. +Events that exceed these limits are skipped with a warning. The operator retries failed `PutRecords` entries with exponential backoff before emitting an error. From e06a2e2aa6585d0f0e7e7910f642baf8c0437c13 Mon Sep 17 00:00:00 2001 From: Matthias Vallentin Date: Fri, 15 May 2026 22:12:23 +0200 Subject: [PATCH 4/4] Order Kinesis operators in index --- src/content/docs/reference/operators.mdx | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/content/docs/reference/operators.mdx b/src/content/docs/reference/operators.mdx index b165cb566..bef226824 100644 --- a/src/content/docs/reference/operators.mdx +++ b/src/content/docs/reference/operators.mdx @@ -623,6 +623,10 @@ operators: description: 'Sends records to an Amazon Kinesis data stream.' example: 'to_amazon_kinesis "security-events"' path: 'reference/operators/to_amazon_kinesis' + - name: 'to_amazon_security_lake' + description: 'Sends OCSF events to Amazon Security Lake.' + example: 'to_amazon_security_lake "s3://…"' + path: 'reference/operators/to_amazon_security_lake' - name: 'to_amqp' description: 'Sends messages to an AMQP exchange.' example: 'to_amqp "amqp://admin:pass@0.0.0.1:5672/vhost"' @@ -643,10 +647,6 @@ operators: description: 'Executes YARA rules on byte streams.' example: 'yara "/path/to/rules"' path: 'reference/operators/yara' - - name: 'to_amazon_security_lake' - description: 'Sends OCSF events to Amazon Security Lake.' - example: 'to_amazon_security_lake "s3://…"' - path: 'reference/operators/to_amazon_security_lake' - name: 'to_azure_blob_storage' description: 'Writes events to one or multiple blobs in Azure Blob Storage.' example: 'to_azure_blob_storage "abfs://container/data/{uuid}.json" { write_ndjson }' @@ -1870,18 +1870,18 @@ serve_zmq "tcp://0.0.0.0:5555", encoding="json", prefix=kind + "/" - + ```tql -to_amazon_security_lake "s3://…" +to_amazon_kinesis "security-events" ``` - + ```tql -to_amazon_kinesis "security-events" +to_amazon_security_lake "s3://…" ```