Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions src/content/docs/integrations/amazon/kinesis.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
---
title: Kinesis
---

[Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/) is a
managed streaming data service on AWS.

Tenzir can receive records from Kinesis streams with

<Op>from_amazon_kinesis</Op> and send records to Kinesis streams with
<Op>to_amazon_kinesis</Op>.

When Tenzir reads from Kinesis, it emits one event per Kinesis record. The event
uses the `tenzir.amazon_kinesis` schema and contains the raw record payload in
the `message` field as a `blob`, together with metadata such as the stream,
shard, sequence number, partition key, arrival time, and lag.

Tenzir does not parse or decompress Kinesis payloads automatically. Convert the
`message` blob explicitly in TQL when the stream contains structured data:

```tql
from_amazon_kinesis "security-events", start="trim_horizon"
this = string(message).parse_json()
```

The source operator lists shards once during startup. If the stream gains new
shards while a pipeline is running, restart the pipeline to discover them.
Snapshots store per-shard sequence numbers and resume with at-least-once
semantics.

## Configuration

Follow the [Amazon integration configuration](/integrations/amazon) to
authenticate with your AWS credentials.

Alternatively, use the `aws_iam` parameter to provide explicit credentials:

```tql
from_amazon_kinesis "security-events", aws_iam={
region: "us-east-1",
access_key_id: secret("aws-key"),
secret_access_key: secret("aws-secret")
}
```

You can also use `aws_iam` to assume an IAM role:

```tql
from_amazon_kinesis "security-events", aws_iam={
region: "eu-west-1",
assume_role: "arn:aws:iam::123456789012:role/my-kinesis-role",
session_name: "tenzir-session"
}
```

Set `endpoint` to use a Kinesis-compatible endpoint, such as LocalStack:

```tql
from_amazon_kinesis "security-events",
aws_region="us-east-1",
endpoint="http://127.0.0.1:4566"
```

When `endpoint` is omitted, Tenzir checks `AWS_ENDPOINT_URL_KINESIS` first, then
`AWS_ENDPOINT_URL`, then uses the default AWS SDK endpoint for the region.

Tenzir needs these Kinesis permissions:

| Operator | Required permissions |
| ---------------------------- | ---------------------------------------------------------------------- |
| <Op>from_amazon_kinesis</Op> | `kinesis:ListShards`, `kinesis:GetShardIterator`, `kinesis:GetRecords` |
| <Op>to_amazon_kinesis</Op> | `kinesis:PutRecords` |

## Examples

### Send events to a stream

```tql
subscribe "alerts"
to_amazon_kinesis "security-events"
```

This sends one NDJSON record per input event by using the default
`message=this.print_ndjson()` expression.

### Send a custom payload

```tql
from {payload: "security event detected", tenant: "acme"}
to_amazon_kinesis "security-events",
message=payload,
partition_key=tenant
```

If you omit `partition_key`, Tenzir generates a random UUID per event.

### Receive and parse JSON records

```tql
from_amazon_kinesis "security-events",
start="trim_horizon",
count=100,
exit=true
this = string(message).parse_json()
```

### Read from a timestamp

```tql
from_amazon_kinesis "security-events", start=2026-01-01T00:00:00Z
```

## See Also

- <Op>from_amazon_kinesis</Op>
- <Op>to_amazon_kinesis</Op>
- <Guide>collecting/read-from-message-brokers</Guide>
- <Guide>routing/send-to-destinations</Guide>
- <Integration>amazon/sqs</Integration>
- <Integration>kafka</Integration>
32 changes: 28 additions & 4 deletions src/content/docs/reference/operators.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ operators:
description: 'Receives UDP datagrams and outputs structured events.'
example: 'accept_udp "0.0.0.0:8090"'
path: 'reference/operators/accept_udp'
- name: 'from_amazon_kinesis'
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep from_amazon_kinesis alphabetized in operator index

The insertion order violates the documented index rule in .agents/references/reference-index-pages.md (“Order entries alphabetically within their category section”): from_amazon_kinesis should sort before from_amqp, not after it. This leaves the operators index inconsistent (including the corresponding card section), which makes future index maintenance error-prone and can cause ordering checks/review drift.

Useful? React with 👍 / 👎.

description: 'Receives records from an Amazon Kinesis data stream.'
example: 'from_amazon_kinesis "security-events"'
path: 'reference/operators/from_amazon_kinesis'
- name: 'from_amqp'
description: 'Receives messages from an AMQP queue.'
example: 'from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events"'
Expand Down Expand Up @@ -615,6 +619,14 @@ operators:
description: 'Listens for incoming TCP connections and sends events to all connected clients.'
example: 'serve_tcp "0.0.0.0:8090" { write_json }'
path: 'reference/operators/serve_tcp'
- name: 'to_amazon_kinesis'
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep to_amazon_kinesis alphabetized in operator index

This entry is also out of alphabetical order per .agents/references/reference-index-pages.md: to_amazon_kinesis should appear before to_amqp. Keeping it here makes the index ordering inconsistent (and the related card list inconsistent too), which increases maintenance friction and invites follow-up churn whenever the operator index is updated.

Useful? React with 👍 / 👎.

description: 'Sends records to an Amazon Kinesis data stream.'
example: 'to_amazon_kinesis "security-events"'
path: 'reference/operators/to_amazon_kinesis'
- name: 'to_amazon_security_lake'
description: 'Sends OCSF events to Amazon Security Lake.'
example: 'to_amazon_security_lake "s3://…"'
path: 'reference/operators/to_amazon_security_lake'
- name: 'to_amqp'
description: 'Sends messages to an AMQP exchange.'
example: 'to_amqp "amqp://admin:pass@0.0.0.1:5672/vhost"'
Expand All @@ -635,10 +647,6 @@ operators:
description: 'Executes YARA rules on byte streams.'
example: 'yara "/path/to/rules"'
path: 'reference/operators/yara'
- name: 'to_amazon_security_lake'
description: 'Sends OCSF events to Amazon Security Lake.'
example: 'to_amazon_security_lake "s3://…"'
path: 'reference/operators/to_amazon_security_lake'
- name: 'to_azure_blob_storage'
description: 'Writes events to one or multiple blobs in Azure Blob Storage.'
example: 'to_azure_blob_storage "abfs://container/data/{uuid}.json" { write_ndjson }'
Expand Down Expand Up @@ -1414,6 +1422,14 @@ accept_zmq "tcp://0.0.0.0:5555", prefix="alerts/" { read_json }

</ReferenceCard>

<ReferenceCard title="from_amazon_kinesis" description="Receives records from an Amazon Kinesis data stream." href="/reference/operators/from_amazon_kinesis">

```tql
from_amazon_kinesis "security-events"
```

</ReferenceCard>

<ReferenceCard title="from_amqp" description="Receives messages from an AMQP queue." href="/reference/operators/from_amqp">

```tql
Expand Down Expand Up @@ -1854,6 +1870,14 @@ serve_zmq "tcp://0.0.0.0:5555", encoding="json", prefix=kind + "/"

</ReferenceCard>

<ReferenceCard title="to_amazon_kinesis" description="Sends records to an Amazon Kinesis data stream." href="/reference/operators/to_amazon_kinesis">

```tql
to_amazon_kinesis "security-events"
```

</ReferenceCard>

<ReferenceCard title="to_amazon_security_lake" description="Sends OCSF events to Amazon Security Lake." href="/reference/operators/to_amazon_security_lake">

```tql
Expand Down
166 changes: 166 additions & 0 deletions src/content/docs/reference/operators/from_amazon_kinesis.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
---
title: from_amazon_kinesis
category: Inputs/Events
example: 'from_amazon_kinesis "security-events", start="trim_horizon"'
Comment on lines +2 to +4
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Register new Kinesis operators in reference index

Additions of from_amazon_kinesis and to_amazon_kinesis are incomplete without updating src/content/docs/reference/operators.mdx (both the frontmatter operators: list and the ReferenceCard sections), so these operators will not appear in the main Operators catalog that users browse from /reference/operators. This makes the new docs effectively undiscoverable unless someone already knows the direct URL.

Useful? React with 👍 / 👎.

---

import AWSIAMOptions from '@partials/operators/AWSIAMOptions.mdx';

Receives records from an [Amazon Kinesis Data Streams][kinesis] stream.

[kinesis]: https://aws.amazon.com/kinesis/data-streams/

```tql
from_amazon_kinesis stream:string, [start=string|time, count=int, exit=bool,
records_per_call=int, poll_idle=duration,
aws_region=string, aws_iam=record, endpoint=string]
```

## Description

The `from_amazon_kinesis` operator reads records from the existing shards of a
Kinesis data stream and emits one event per Kinesis record.

The emitted events use the `tenzir.amazon_kinesis` schema with these fields:

| Field | Type | Description |
| ---------------------- | -------- | ------------------------------------------------ |
| `message` | `blob` | The raw Kinesis record payload. |
| `stream` | `string` | The stream name. |
| `shard_id` | `string` | The shard that contained the record. |
| `sequence_number` | `string` | The Kinesis sequence number. |
| `partition_key` | `string` | The record partition key. |
| `arrival_time` | `time` | The approximate time when Kinesis received it. |
| `encryption_type` | `string` | The server-side encryption type, when available. |
| `millis_behind_latest` | `int` | The shard lag reported by Kinesis. |

The `arrival_time` and `encryption_type` fields are optional because Kinesis
only returns them when they are present on the record.

The operator lists shards once during startup. If the stream gains new shards
while the pipeline is running, restart the pipeline to discover them. Pipeline
snapshots store the next sequence number per shard, and restarts resume with
`AFTER_SEQUENCE_NUMBER`. This gives at-least-once restart behavior.

The operator requires these AWS permissions:

- `kinesis:ListShards`
- `kinesis:GetShardIterator`
- `kinesis:GetRecords`

### `stream: string`

The name of the Kinesis data stream to receive records from.

### `start = string | time (optional)`

The position for the initial shard iterator when no snapshot is available.

The value can be one of:

- `"latest"`: start after the latest record
- `"trim_horizon"`: start at the oldest available record
- `<timestamp>`: start at the given timestamp

Defaults to `"latest"`.

### `count = int (optional)`

Exit successfully after emitting `count` records.

The value must be greater than zero.

### `exit = bool (optional)`

Exit successfully after the operator is caught up on all shards.

Without this option, the operator waits for new records after consuming the
currently available records.

Defaults to `false`.

### `records_per_call = int (optional)`

The maximum number of records to fetch in one `GetRecords` request.

The value must be between `1` and `10000`.

Defaults to `1000`.

### `poll_idle = duration (optional)`

How long to wait after a `GetRecords` request returns no records.

The value must be non-negative and less than `5min`.

Defaults to `1s`.

### `aws_region = string (optional)`

The AWS region for reading from the stream.

If omitted, the operator uses the region from `aws_iam` when present. Otherwise,
it uses the default AWS SDK region resolution.

### `endpoint = string (optional)`

A custom Kinesis endpoint URL.

If omitted, the operator uses `AWS_ENDPOINT_URL_KINESIS` when set, then
`AWS_ENDPOINT_URL`, then the default AWS SDK endpoint for the region.

<AWSIAMOptions />

## Examples

### Read from the latest position

```tql
from_amazon_kinesis "security-events"
```

### Read from the oldest available record

```tql
from_amazon_kinesis "security-events", start="trim_horizon"
```

### Parse JSON payloads

```tql
from_amazon_kinesis "security-events", start="trim_horizon"
this = string(message).parse_json()
```

### Read a bounded number of records

```tql
from_amazon_kinesis "security-events",
start="trim_horizon",
count=100,
exit=true
```

### Use explicit credentials

```tql
from_amazon_kinesis "security-events", aws_iam={
region: "us-east-1",
access_key_id: secret("aws-key"),
secret_access_key: secret("aws-secret")
}
```

### Use a LocalStack endpoint

```tql
from_amazon_kinesis "security-events",
aws_region="us-east-1",
endpoint="http://127.0.0.1:4566"
```

## See Also

- <Op>to_amazon_kinesis</Op>
- <Guide>collecting/read-from-message-brokers</Guide>
- <Integration>amazon/kinesis</Integration>
Loading
Loading