diff --git a/src/content/docs/integrations/amazon/kinesis.mdx b/src/content/docs/integrations/amazon/kinesis.mdx new file mode 100644 index 000000000..0c4ecf0ef --- /dev/null +++ b/src/content/docs/integrations/amazon/kinesis.mdx @@ -0,0 +1,120 @@ +--- +title: Kinesis +--- + +[Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/) is a +managed streaming data service on AWS. + +Tenzir can receive records from Kinesis streams with + +from_amazon_kinesis and send records to Kinesis streams with +to_amazon_kinesis. + +When Tenzir reads from Kinesis, it emits one event per Kinesis record. The event +uses the `tenzir.amazon_kinesis` schema and contains the raw record payload in +the `message` field as a `blob`, together with metadata such as the stream, +shard, sequence number, partition key, arrival time, and lag. + +Tenzir does not parse or decompress Kinesis payloads automatically. Convert the +`message` blob explicitly in TQL when the stream contains structured data: + +```tql +from_amazon_kinesis "security-events", start="trim_horizon" +this = string(message).parse_json() +``` + +The source operator lists shards once during startup. If the stream gains new +shards while a pipeline is running, restart the pipeline to discover them. +Snapshots store per-shard sequence numbers and resume with at-least-once +semantics. + +## Configuration + +Follow the [Amazon integration configuration](/integrations/amazon) to +authenticate with your AWS credentials. + +Alternatively, use the `aws_iam` parameter to provide explicit credentials: + +```tql +from_amazon_kinesis "security-events", aws_iam={ + region: "us-east-1", + access_key_id: secret("aws-key"), + secret_access_key: secret("aws-secret") +} +``` + +You can also use `aws_iam` to assume an IAM role: + +```tql +from_amazon_kinesis "security-events", aws_iam={ + region: "eu-west-1", + assume_role: "arn:aws:iam::123456789012:role/my-kinesis-role", + session_name: "tenzir-session" +} +``` + +Set `endpoint` to use a Kinesis-compatible endpoint, such as LocalStack: + +```tql +from_amazon_kinesis "security-events", + aws_region="us-east-1", + endpoint="http://127.0.0.1:4566" +``` + +When `endpoint` is omitted, Tenzir checks `AWS_ENDPOINT_URL_KINESIS` first, then +`AWS_ENDPOINT_URL`, then uses the default AWS SDK endpoint for the region. + +Tenzir needs these Kinesis permissions: + +| Operator | Required permissions | +| ---------------------------- | ---------------------------------------------------------------------- | +| from_amazon_kinesis | `kinesis:ListShards`, `kinesis:GetShardIterator`, `kinesis:GetRecords` | +| to_amazon_kinesis | `kinesis:PutRecords` | + +## Examples + +### Send events to a stream + +```tql +subscribe "alerts" +to_amazon_kinesis "security-events" +``` + +This sends one NDJSON record per input event by using the default +`message=this.print_ndjson()` expression. + +### Send a custom payload + +```tql +from {payload: "security event detected", tenant: "acme"} +to_amazon_kinesis "security-events", + message=payload, + partition_key=tenant +``` + +If you omit `partition_key`, Tenzir generates a random UUID per event. + +### Receive and parse JSON records + +```tql +from_amazon_kinesis "security-events", + start="trim_horizon", + count=100, + exit=true +this = string(message).parse_json() +``` + +### Read from a timestamp + +```tql +from_amazon_kinesis "security-events", start=2026-01-01T00:00:00Z +``` + +## See Also + +- from_amazon_kinesis +- to_amazon_kinesis +- collecting/read-from-message-brokers +- routing/send-to-destinations +- amazon/sqs +- kafka diff --git a/src/content/docs/reference/operators.mdx b/src/content/docs/reference/operators.mdx index 2450b3fb7..bef226824 100644 --- a/src/content/docs/reference/operators.mdx +++ b/src/content/docs/reference/operators.mdx @@ -343,6 +343,10 @@ operators: description: 'Receives UDP datagrams and outputs structured events.' example: 'accept_udp "0.0.0.0:8090"' path: 'reference/operators/accept_udp' + - name: 'from_amazon_kinesis' + description: 'Receives records from an Amazon Kinesis data stream.' + example: 'from_amazon_kinesis "security-events"' + path: 'reference/operators/from_amazon_kinesis' - name: 'from_amqp' description: 'Receives messages from an AMQP queue.' example: 'from_amqp "amqp://admin:pass@0.0.0.1:5672/vhost", queue="events"' @@ -615,6 +619,14 @@ operators: description: 'Listens for incoming TCP connections and sends events to all connected clients.' example: 'serve_tcp "0.0.0.0:8090" { write_json }' path: 'reference/operators/serve_tcp' + - name: 'to_amazon_kinesis' + description: 'Sends records to an Amazon Kinesis data stream.' + example: 'to_amazon_kinesis "security-events"' + path: 'reference/operators/to_amazon_kinesis' + - name: 'to_amazon_security_lake' + description: 'Sends OCSF events to Amazon Security Lake.' + example: 'to_amazon_security_lake "s3://…"' + path: 'reference/operators/to_amazon_security_lake' - name: 'to_amqp' description: 'Sends messages to an AMQP exchange.' example: 'to_amqp "amqp://admin:pass@0.0.0.1:5672/vhost"' @@ -635,10 +647,6 @@ operators: description: 'Executes YARA rules on byte streams.' example: 'yara "/path/to/rules"' path: 'reference/operators/yara' - - name: 'to_amazon_security_lake' - description: 'Sends OCSF events to Amazon Security Lake.' - example: 'to_amazon_security_lake "s3://…"' - path: 'reference/operators/to_amazon_security_lake' - name: 'to_azure_blob_storage' description: 'Writes events to one or multiple blobs in Azure Blob Storage.' example: 'to_azure_blob_storage "abfs://container/data/{uuid}.json" { write_ndjson }' @@ -1414,6 +1422,14 @@ accept_zmq "tcp://0.0.0.0:5555", prefix="alerts/" { read_json } + + +```tql +from_amazon_kinesis "security-events" +``` + + + ```tql @@ -1854,6 +1870,14 @@ serve_zmq "tcp://0.0.0.0:5555", encoding="json", prefix=kind + "/" + + +```tql +to_amazon_kinesis "security-events" +``` + + + ```tql diff --git a/src/content/docs/reference/operators/from_amazon_kinesis.mdx b/src/content/docs/reference/operators/from_amazon_kinesis.mdx new file mode 100644 index 000000000..55bef8de6 --- /dev/null +++ b/src/content/docs/reference/operators/from_amazon_kinesis.mdx @@ -0,0 +1,166 @@ +--- +title: from_amazon_kinesis +category: Inputs/Events +example: 'from_amazon_kinesis "security-events", start="trim_horizon"' +--- + +import AWSIAMOptions from '@partials/operators/AWSIAMOptions.mdx'; + +Receives records from an [Amazon Kinesis Data Streams][kinesis] stream. + +[kinesis]: https://aws.amazon.com/kinesis/data-streams/ + +```tql +from_amazon_kinesis stream:string, [start=string|time, count=int, exit=bool, + records_per_call=int, poll_idle=duration, + aws_region=string, aws_iam=record, endpoint=string] +``` + +## Description + +The `from_amazon_kinesis` operator reads records from the existing shards of a +Kinesis data stream and emits one event per Kinesis record. + +The emitted events use the `tenzir.amazon_kinesis` schema with these fields: + +| Field | Type | Description | +| ---------------------- | -------- | ------------------------------------------------ | +| `message` | `blob` | The raw Kinesis record payload. | +| `stream` | `string` | The stream name. | +| `shard_id` | `string` | The shard that contained the record. | +| `sequence_number` | `string` | The Kinesis sequence number. | +| `partition_key` | `string` | The record partition key. | +| `arrival_time` | `time` | The approximate time when Kinesis received it. | +| `encryption_type` | `string` | The server-side encryption type, when available. | +| `millis_behind_latest` | `int` | The shard lag reported by Kinesis. | + +The `arrival_time` and `encryption_type` fields are optional because Kinesis +only returns them when they are present on the record. + +The operator lists shards once during startup. If the stream gains new shards +while the pipeline is running, restart the pipeline to discover them. Pipeline +snapshots store the next sequence number per shard, and restarts resume with +`AFTER_SEQUENCE_NUMBER`. This gives at-least-once restart behavior. + +The operator requires these AWS permissions: + +- `kinesis:ListShards` +- `kinesis:GetShardIterator` +- `kinesis:GetRecords` + +### `stream: string` + +The name of the Kinesis data stream to receive records from. + +### `start = string | time (optional)` + +The position for the initial shard iterator when no snapshot is available. + +The value can be one of: + +- `"latest"`: start after the latest record +- `"trim_horizon"`: start at the oldest available record +- ``: start at the given timestamp + +Defaults to `"latest"`. + +### `count = int (optional)` + +Exit successfully after emitting `count` records. + +The value must be greater than zero. + +### `exit = bool (optional)` + +Exit successfully after the operator is caught up on all shards. + +Without this option, the operator waits for new records after consuming the +currently available records. + +Defaults to `false`. + +### `records_per_call = int (optional)` + +The maximum number of records to fetch in one `GetRecords` request. + +The value must be between `1` and `10000`. + +Defaults to `1000`. + +### `poll_idle = duration (optional)` + +How long to wait after a `GetRecords` request returns no records. + +The value must be non-negative and less than `5min`. + +Defaults to `1s`. + +### `aws_region = string (optional)` + +The AWS region for reading from the stream. + +If omitted, the operator uses the region from `aws_iam` when present. Otherwise, +it uses the default AWS SDK region resolution. + +### `endpoint = string (optional)` + +A custom Kinesis endpoint URL. + +If omitted, the operator uses `AWS_ENDPOINT_URL_KINESIS` when set, then +`AWS_ENDPOINT_URL`, then the default AWS SDK endpoint for the region. + + + +## Examples + +### Read from the latest position + +```tql +from_amazon_kinesis "security-events" +``` + +### Read from the oldest available record + +```tql +from_amazon_kinesis "security-events", start="trim_horizon" +``` + +### Parse JSON payloads + +```tql +from_amazon_kinesis "security-events", start="trim_horizon" +this = string(message).parse_json() +``` + +### Read a bounded number of records + +```tql +from_amazon_kinesis "security-events", + start="trim_horizon", + count=100, + exit=true +``` + +### Use explicit credentials + +```tql +from_amazon_kinesis "security-events", aws_iam={ + region: "us-east-1", + access_key_id: secret("aws-key"), + secret_access_key: secret("aws-secret") +} +``` + +### Use a LocalStack endpoint + +```tql +from_amazon_kinesis "security-events", + aws_region="us-east-1", + endpoint="http://127.0.0.1:4566" +``` + +## See Also + +- to_amazon_kinesis +- collecting/read-from-message-brokers +- amazon/kinesis diff --git a/src/content/docs/reference/operators/to_amazon_kinesis.mdx b/src/content/docs/reference/operators/to_amazon_kinesis.mdx new file mode 100644 index 000000000..62a9487f6 --- /dev/null +++ b/src/content/docs/reference/operators/to_amazon_kinesis.mdx @@ -0,0 +1,155 @@ +--- +title: to_amazon_kinesis +category: Outputs/Events +example: 'to_amazon_kinesis "security-events"' +--- + +import AWSIAMOptions from '@partials/operators/AWSIAMOptions.mdx'; + +Sends records to an [Amazon Kinesis Data Streams][kinesis] stream. + +[kinesis]: https://aws.amazon.com/kinesis/data-streams/ + +```tql +to_amazon_kinesis stream:string, [message=blob|string, partition_key=string, + batch_size=int, batch_timeout=duration, parallel=int, + aws_region=string, aws_iam=record, endpoint=string] +``` + +## Description + +The `to_amazon_kinesis` operator sends records to a Kinesis data stream with +`PutRecords`. + +By default, `to_amazon_kinesis` serializes each input event as NDJSON with +`this.print_ndjson()`. Use the `message` option to send a specific string or +blob expression instead. If the expression evaluates to `null` or another type, +the operator skips that event and emits a warning. + +If you omit `partition_key`, the operator generates a random UUID for each +record. A partition key expression must produce non-empty strings up to 256 +bytes. Kinesis record payloads plus their partition key must be at most 10 MiB. +Events that exceed these limits are skipped with a warning. + +The operator retries failed `PutRecords` entries with exponential backoff before +emitting an error. + +The operator requires the `kinesis:PutRecords` AWS permission. + +### `stream: string` + +The name of the Kinesis data stream to send records to. + +### `message = blob | string (optional)` + +The expression that produces the Kinesis record payload for each event. + +Defaults to `this.print_ndjson()`. + +### `partition_key = string (optional)` + +The expression that produces the Kinesis partition key for each event. + +If omitted, the operator generates a random UUID per event. + +### `batch_size = int (optional)` + +The maximum number of records per `PutRecords` request. + +The value must be between `1` and `500`. + +Defaults to `500`. + +### `batch_timeout = duration (optional)` + +How long to wait before flushing a non-empty batch. + +The value must be positive. + +Defaults to `1s`. + +### `parallel = int (optional)` + +The number of parallel write slots. + +The value must be greater than zero. + +Defaults to `1`. + +### `aws_region = string (optional)` + +The AWS region for writing to the stream. + +If omitted, the operator uses the region from `aws_iam` when present. Otherwise, +it uses the default AWS SDK region resolution. + +### `endpoint = string (optional)` + +A custom Kinesis endpoint URL. + +If omitted, the operator uses `AWS_ENDPOINT_URL_KINESIS` when set, then +`AWS_ENDPOINT_URL`, then the default AWS SDK endpoint for the region. + + + +## Examples + +### Send NDJSON events + +```tql +subscribe "alerts" +to_amazon_kinesis "security-events" +``` + +### Send a custom payload + +```tql +from {payload: "security event detected", tenant: "acme"} +to_amazon_kinesis "security-events", + message=payload, + partition_key=tenant +``` + +### Send JSON strings with an explicit partition key + +```tql +subscribe "alerts" +to_amazon_kinesis "security-events", + message=this.print_json(), + partition_key=string(src_ip) +``` + +### Configure batching + +```tql +subscribe "alerts" +to_amazon_kinesis "security-events", + batch_size=100, + batch_timeout=500ms +``` + +### Use explicit credentials + +```tql +subscribe "alerts" +to_amazon_kinesis "security-events", aws_iam={ + region: "us-east-1", + access_key_id: secret("aws-key"), + secret_access_key: secret("aws-secret") +} +``` + +### Use a LocalStack endpoint + +```tql +from {alert: "test"} +to_amazon_kinesis "security-events", + aws_region="us-east-1", + endpoint="http://127.0.0.1:4566" +``` + +## See Also + +- from_amazon_kinesis +- routing/send-to-destinations +- amazon/kinesis diff --git a/src/sidebar.ts b/src/sidebar.ts index 07bedb665..260aa0079 100644 --- a/src/sidebar.ts +++ b/src/sidebar.ts @@ -335,6 +335,7 @@ export const integrations = [ collapsed: true, items: [ "integrations/amazon", + "integrations/amazon/kinesis", "integrations/amazon/msk", "integrations/amazon/s3", "integrations/amazon/security-lake",