diff --git a/src/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md b/src/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md index aab8d3d21..a09545083 100644 --- a/src/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md +++ b/src/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md @@ -21,98 +21,171 @@ # Kafka -[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. +[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. IoTDB can subscribe to Kafka data through a Kafka Consumer and write the data to IoTDB by using the Session API. -## 1. Coding Example +This document introduces a simple data ingestion flow: an application writes messages to a Kafka topic, the Kafka Consumer consumes the messages and parses them into IoTDB time series data, and then writes the data to IoTDB. -### 1.1 kafka Producer Producing Data Java Code Example +## 1. Environment Preparation + +Before you start, make sure that the following environment is available: + +- JDK 8 or later +- Maven 3.6 or later +- Apache Kafka. For installation and startup, refer to the [Kafka official documentation](https://kafka.apache.org/documentation/) +- An IoTDB service is running + +The default addresses used in the following examples are: + +| Service | Address | +| --- | --- | +| Kafka | `127.0.0.1:9092` | +| IoTDB | `127.0.0.1:6667` | +| IoTDB username | `root` | +| IoTDB password | `root` | + +## 2. Add Dependencies + +Add the Kafka and IoTDB Session dependencies to your Maven `pom.xml`. It is recommended that the IoTDB dependency version matches your deployed IoTDB version. + +```xml + + + org.apache.iotdb + iotdb-session + 2.0.4 + + + org.apache.kafka + kafka-clients + 3.7.0 + + +``` + +For the complete example project, see [iotdb-extras/examples/kafka](https://github.com/apache/iotdb-extras/tree/master/examples/kafka). + +## 3. Kafka Message Format + +The sample program uses a string format to transfer one IoTDB data record: + +```text +device,timestamp,measurements,types,values +``` + +The fields are described as follows: + +| Field | Description | Example | +| --- | --- | --- | +| `device` | IoTDB device path | `root.kafka.d0` | +| `timestamp` | Timestamp in milliseconds | `1716180000000` | +| `measurements` | Measurement names, separated by `:` when there are multiple values | `temperature:status` | +| `types` | Data types, separated by `:` when there are multiple values | `DOUBLE:BOOLEAN` | +| `values` | Data values, separated by `:` when there are multiple values | `36.5:true` | + +Single-measurement example: + +```text +root.kafka.d0,1716180000000,temperature,DOUBLE,36.5 +``` + +Multiple-measurement example: + +```text +root.kafka.d0,1716180000000,temperature:status,DOUBLE:BOOLEAN,36.5:true +``` + +## 4. Produce Data to Kafka + +The following code shows the key logic for writing one IoTDB data record to the `Kafka-Test` topic: ```java - Properties props = new Properties(); - props.put("bootstrap.servers", "127.0.0.1:9092"); - props.put("key.serializer", StringSerializer.class); - props.put("value.serializer", StringSerializer.class); - KafkaProducer producer = new KafkaProducer<>(props); - producer.send( - new ProducerRecord<>( - "Kafka-Test", "key", "root.kafka," + System.currentTimeMillis() + ",value,INT32,100")); - producer.close(); +String value = "root.kafka.d0," + + System.currentTimeMillis() + + ",temperature:status,DOUBLE:BOOLEAN,36.5:true"; + +producer.send(new ProducerRecord<>("Kafka-Test", "iotdb", value)); ``` -### 1.2 kafka Consumer Receiving Data Java Code Example +## 5. Consume Kafka Data and Write to IoTDB + +After the Kafka Consumer reads a message from the topic, it parses the device, timestamp, measurements, types, and values, and writes the data to IoTDB through SessionPool. ```java - Properties props = new Properties(); - props.put("bootstrap.servers", "127.0.0.1:9092"); - props.put("key.deserializer", StringDeserializer.class); - props.put("value.deserializer", StringDeserializer.class); - props.put("auto.offset.reset", "earliest"); - props.put("group.id", "Kafka-Test"); - KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props); - kafkaConsumer.subscribe(Collections.singleton("Kafka-Test")); - ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1)); - ``` - -### 1.3 Example of Java Code Stored in IoTDB Server +String[] fields = record.value().split(","); +String device = fields[0]; +long time = Long.parseLong(fields[1]); +List measurements = Arrays.asList(fields[2].split(":")); + +String[] typeNames = fields[3].split(":"); +String[] valueTexts = fields[4].split(":"); + +List types = new ArrayList<>(); +List values = new ArrayList<>(); + +for (int i = 0; i < typeNames.length; i++) { + TSDataType type = TSDataType.valueOf(typeNames[i]); + types.add(type); + + switch (type) { + case INT32: + values.add(Integer.parseInt(valueTexts[i])); + break; + case INT64: + values.add(Long.parseLong(valueTexts[i])); + break; + case FLOAT: + values.add(Float.parseFloat(valueTexts[i])); + break; + case DOUBLE: + values.add(Double.parseDouble(valueTexts[i])); + break; + case BOOLEAN: + values.add(Boolean.parseBoolean(valueTexts[i])); + break; + case TEXT: + values.add(valueTexts[i]); + break; + default: + throw new IllegalArgumentException("Unsupported data type: " + type); + } +} + +pool.insertRecord(device, time, measurements, types, values); +``` + +The IoTDB `SessionPool` can be created as follows: ```java - SessionPool pool = - new SessionPool.Builder() - .host("127.0.0.1") - .port(6667) - .user("root") - .password("root") - .maxSize(3) - .build(); - List datas = new ArrayList<>(records.count()); - for (ConsumerRecord record : records) { - datas.add(record.value()); - } - int size = datas.size(); - List deviceIds = new ArrayList<>(size); - List times = new ArrayList<>(size); - List> measurementsList = new ArrayList<>(size); - List> typesList = new ArrayList<>(size); - List> valuesList = new ArrayList<>(size); - for (String data : datas) { - String[] dataArray = data.split(","); - String device = dataArray[0]; - long time = Long.parseLong(dataArray[1]); - List measurements = Arrays.asList(dataArray[2].split(":")); - List types = new ArrayList<>(); - for (String type : dataArray[3].split(":")) { - types.add(TSDataType.valueOf(type)); - } - List values = new ArrayList<>(); - String[] valuesStr = dataArray[4].split(":"); - for (int i = 0; i < valuesStr.length; i++) { - switch (types.get(i)) { - case INT64: - values.add(Long.parseLong(valuesStr[i])); - break; - case DOUBLE: - values.add(Double.parseDouble(valuesStr[i])); - break; - case INT32: - values.add(Integer.parseInt(valuesStr[i])); - break; - case TEXT: - values.add(valuesStr[i]); - break; - case FLOAT: - values.add(Float.parseFloat(valuesStr[i])); - break; - case BOOLEAN: - values.add(Boolean.parseBoolean(valuesStr[i])); - break; - } - } - deviceIds.add(device); - times.add(time); - measurementsList.add(measurements); - typesList.add(types); - valuesList.add(values); - } - pool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); - ``` +SessionPool pool = new SessionPool.Builder() + .host("127.0.0.1") + .port(6667) + .user("root") + .password("root") + .maxSize(3) + .build(); +``` + +## 6. Query the Result + +Connect to the IoTDB CLI: + +```bash +./sbin/start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root +``` + +Run the query: + +```sql +SELECT * FROM root.kafka.**; +``` + +The query result shows the time series data written by the Kafka Consumer, for example: + +```text ++-----------------------------+-------------------------+--------------------+ +| Time|root.kafka.d0.temperature|root.kafka.d0.status| ++-----------------------------+-------------------------+--------------------+ +|2024-05-20T10:00:00.000+08:00| 36.5| true| ++-----------------------------+-------------------------+--------------------+ +``` diff --git a/src/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md b/src/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md index aab8d3d21..a09545083 100644 --- a/src/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md +++ b/src/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md @@ -21,98 +21,171 @@ # Kafka -[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. +[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. IoTDB can subscribe to Kafka data through a Kafka Consumer and write the data to IoTDB by using the Session API. -## 1. Coding Example +This document introduces a simple data ingestion flow: an application writes messages to a Kafka topic, the Kafka Consumer consumes the messages and parses them into IoTDB time series data, and then writes the data to IoTDB. -### 1.1 kafka Producer Producing Data Java Code Example +## 1. Environment Preparation + +Before you start, make sure that the following environment is available: + +- JDK 8 or later +- Maven 3.6 or later +- Apache Kafka. For installation and startup, refer to the [Kafka official documentation](https://kafka.apache.org/documentation/) +- An IoTDB service is running + +The default addresses used in the following examples are: + +| Service | Address | +| --- | --- | +| Kafka | `127.0.0.1:9092` | +| IoTDB | `127.0.0.1:6667` | +| IoTDB username | `root` | +| IoTDB password | `root` | + +## 2. Add Dependencies + +Add the Kafka and IoTDB Session dependencies to your Maven `pom.xml`. It is recommended that the IoTDB dependency version matches your deployed IoTDB version. + +```xml + + + org.apache.iotdb + iotdb-session + 2.0.4 + + + org.apache.kafka + kafka-clients + 3.7.0 + + +``` + +For the complete example project, see [iotdb-extras/examples/kafka](https://github.com/apache/iotdb-extras/tree/master/examples/kafka). + +## 3. Kafka Message Format + +The sample program uses a string format to transfer one IoTDB data record: + +```text +device,timestamp,measurements,types,values +``` + +The fields are described as follows: + +| Field | Description | Example | +| --- | --- | --- | +| `device` | IoTDB device path | `root.kafka.d0` | +| `timestamp` | Timestamp in milliseconds | `1716180000000` | +| `measurements` | Measurement names, separated by `:` when there are multiple values | `temperature:status` | +| `types` | Data types, separated by `:` when there are multiple values | `DOUBLE:BOOLEAN` | +| `values` | Data values, separated by `:` when there are multiple values | `36.5:true` | + +Single-measurement example: + +```text +root.kafka.d0,1716180000000,temperature,DOUBLE,36.5 +``` + +Multiple-measurement example: + +```text +root.kafka.d0,1716180000000,temperature:status,DOUBLE:BOOLEAN,36.5:true +``` + +## 4. Produce Data to Kafka + +The following code shows the key logic for writing one IoTDB data record to the `Kafka-Test` topic: ```java - Properties props = new Properties(); - props.put("bootstrap.servers", "127.0.0.1:9092"); - props.put("key.serializer", StringSerializer.class); - props.put("value.serializer", StringSerializer.class); - KafkaProducer producer = new KafkaProducer<>(props); - producer.send( - new ProducerRecord<>( - "Kafka-Test", "key", "root.kafka," + System.currentTimeMillis() + ",value,INT32,100")); - producer.close(); +String value = "root.kafka.d0," + + System.currentTimeMillis() + + ",temperature:status,DOUBLE:BOOLEAN,36.5:true"; + +producer.send(new ProducerRecord<>("Kafka-Test", "iotdb", value)); ``` -### 1.2 kafka Consumer Receiving Data Java Code Example +## 5. Consume Kafka Data and Write to IoTDB + +After the Kafka Consumer reads a message from the topic, it parses the device, timestamp, measurements, types, and values, and writes the data to IoTDB through SessionPool. ```java - Properties props = new Properties(); - props.put("bootstrap.servers", "127.0.0.1:9092"); - props.put("key.deserializer", StringDeserializer.class); - props.put("value.deserializer", StringDeserializer.class); - props.put("auto.offset.reset", "earliest"); - props.put("group.id", "Kafka-Test"); - KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props); - kafkaConsumer.subscribe(Collections.singleton("Kafka-Test")); - ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1)); - ``` - -### 1.3 Example of Java Code Stored in IoTDB Server +String[] fields = record.value().split(","); +String device = fields[0]; +long time = Long.parseLong(fields[1]); +List measurements = Arrays.asList(fields[2].split(":")); + +String[] typeNames = fields[3].split(":"); +String[] valueTexts = fields[4].split(":"); + +List types = new ArrayList<>(); +List values = new ArrayList<>(); + +for (int i = 0; i < typeNames.length; i++) { + TSDataType type = TSDataType.valueOf(typeNames[i]); + types.add(type); + + switch (type) { + case INT32: + values.add(Integer.parseInt(valueTexts[i])); + break; + case INT64: + values.add(Long.parseLong(valueTexts[i])); + break; + case FLOAT: + values.add(Float.parseFloat(valueTexts[i])); + break; + case DOUBLE: + values.add(Double.parseDouble(valueTexts[i])); + break; + case BOOLEAN: + values.add(Boolean.parseBoolean(valueTexts[i])); + break; + case TEXT: + values.add(valueTexts[i]); + break; + default: + throw new IllegalArgumentException("Unsupported data type: " + type); + } +} + +pool.insertRecord(device, time, measurements, types, values); +``` + +The IoTDB `SessionPool` can be created as follows: ```java - SessionPool pool = - new SessionPool.Builder() - .host("127.0.0.1") - .port(6667) - .user("root") - .password("root") - .maxSize(3) - .build(); - List datas = new ArrayList<>(records.count()); - for (ConsumerRecord record : records) { - datas.add(record.value()); - } - int size = datas.size(); - List deviceIds = new ArrayList<>(size); - List times = new ArrayList<>(size); - List> measurementsList = new ArrayList<>(size); - List> typesList = new ArrayList<>(size); - List> valuesList = new ArrayList<>(size); - for (String data : datas) { - String[] dataArray = data.split(","); - String device = dataArray[0]; - long time = Long.parseLong(dataArray[1]); - List measurements = Arrays.asList(dataArray[2].split(":")); - List types = new ArrayList<>(); - for (String type : dataArray[3].split(":")) { - types.add(TSDataType.valueOf(type)); - } - List values = new ArrayList<>(); - String[] valuesStr = dataArray[4].split(":"); - for (int i = 0; i < valuesStr.length; i++) { - switch (types.get(i)) { - case INT64: - values.add(Long.parseLong(valuesStr[i])); - break; - case DOUBLE: - values.add(Double.parseDouble(valuesStr[i])); - break; - case INT32: - values.add(Integer.parseInt(valuesStr[i])); - break; - case TEXT: - values.add(valuesStr[i]); - break; - case FLOAT: - values.add(Float.parseFloat(valuesStr[i])); - break; - case BOOLEAN: - values.add(Boolean.parseBoolean(valuesStr[i])); - break; - } - } - deviceIds.add(device); - times.add(time); - measurementsList.add(measurements); - typesList.add(types); - valuesList.add(values); - } - pool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); - ``` +SessionPool pool = new SessionPool.Builder() + .host("127.0.0.1") + .port(6667) + .user("root") + .password("root") + .maxSize(3) + .build(); +``` + +## 6. Query the Result + +Connect to the IoTDB CLI: + +```bash +./sbin/start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root +``` + +Run the query: + +```sql +SELECT * FROM root.kafka.**; +``` + +The query result shows the time series data written by the Kafka Consumer, for example: + +```text ++-----------------------------+-------------------------+--------------------+ +| Time|root.kafka.d0.temperature|root.kafka.d0.status| ++-----------------------------+-------------------------+--------------------+ +|2024-05-20T10:00:00.000+08:00| 36.5| true| ++-----------------------------+-------------------------+--------------------+ +``` diff --git a/src/zh/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md b/src/zh/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md index 214cc2c74..9b81a9d28 100644 --- a/src/zh/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md +++ b/src/zh/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md @@ -21,98 +21,171 @@ # Kafka -[Apache Kafka](https://kafka.apache.org/) 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。 +[Apache Kafka](https://kafka.apache.org/) 是一个开源的分布式事件流平台,常用于构建高性能数据管道、流式分析和数据集成系统。IoTDB 可以通过 Kafka Consumer 订阅 Kafka 中的数据,并使用 Session API 将数据写入 IoTDB。 -## 1. 示例代码 +本文介绍一个简单的数据写入流程:应用程序向 Kafka Topic 写入消息,Kafka Consumer 消费消息并解析为 IoTDB 时序数据,最后写入 IoTDB。 -### 1.1 kafka 生产者生产数据 Java 代码示例 +## 1. 环境准备 + +使用前请确保已准备以下环境: + +- JDK 8 或以上版本 +- Maven 3.6 或以上版本 +- Apache Kafka,安装与启动方式请参考 [Kafka 官方文档](https://kafka.apache.org/documentation/) +- IoTDB 服务已启动 + +以下示例中使用的默认地址如下: + +| 服务 | 地址 | +| --- | --- | +| Kafka | `127.0.0.1:9092` | +| IoTDB | `127.0.0.1:6667` | +| IoTDB 用户名 | `root` | +| IoTDB 密码 | `root` | + +## 2. 添加项目依赖 + +在 Maven 项目的 `pom.xml` 中添加 Kafka 与 IoTDB Session 相关依赖。IoTDB 依赖版本建议与实际部署的 IoTDB 版本保持一致。 + +```xml + + + org.apache.iotdb + iotdb-session + 2.0.4 + + + org.apache.kafka + kafka-clients + 3.7.0 + + +``` + +完整示例工程可参考:[iotdb-extras/examples/kafka](https://github.com/apache/iotdb-extras/tree/master/examples/kafka)。 + +## 3. Kafka 消息格式 + +示例程序使用字符串格式传输一条 IoTDB 数据记录: + +```text +device,timestamp,measurements,types,values +``` + +字段说明如下: + +| 字段 | 说明 | 示例 | +| --- | --- | --- | +| `device` | IoTDB 设备路径 | `root.kafka.d0` | +| `timestamp` | 时间戳,单位为毫秒 | `1716180000000` | +| `measurements` | 测点名称,多个测点使用 `:` 分隔 | `temperature:status` | +| `types` | 数据类型,多个类型使用 `:` 分隔 | `DOUBLE:BOOLEAN` | +| `values` | 数据值,多个值使用 `:` 分隔 | `36.5:true` | + +单测点消息示例: + +```text +root.kafka.d0,1716180000000,temperature,DOUBLE,36.5 +``` + +多测点消息示例: + +```text +root.kafka.d0,1716180000000,temperature:status,DOUBLE:BOOLEAN,36.5:true +``` + +## 4. 生产数据到 Kafka + +以下代码展示向 `Kafka-Test` Topic 写入一条 IoTDB 数据记录的关键逻辑: ```java - Properties props = new Properties(); - props.put("bootstrap.servers", "127.0.0.1:9092"); - props.put("key.serializer", StringSerializer.class); - props.put("value.serializer", StringSerializer.class); - KafkaProducer producer = new KafkaProducer<>(props); - producer.send( - new ProducerRecord<>( - "Kafka-Test", "key", "root.kafka," + System.currentTimeMillis() + ",value,INT32,100")); - producer.close(); +String value = "root.kafka.d0," + + System.currentTimeMillis() + + ",temperature:status,DOUBLE:BOOLEAN,36.5:true"; + +producer.send(new ProducerRecord<>("Kafka-Test", "iotdb", value)); ``` -### 1.2 kafka 消费者接收数据 Java 代码示例 +## 5. 消费 Kafka 数据并写入 IoTDB + +Kafka Consumer 从 Topic 中读取消息后,解析设备、时间戳、测点、类型和值,并调用 IoTDB SessionPool 写入数据。 ```java - Properties props = new Properties(); - props.put("bootstrap.servers", "127.0.0.1:9092"); - props.put("key.deserializer", StringDeserializer.class); - props.put("value.deserializer", StringDeserializer.class); - props.put("auto.offset.reset", "earliest"); - props.put("group.id", "Kafka-Test"); - KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props); - kafkaConsumer.subscribe(Collections.singleton("Kafka-Test")); - ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1)); - ``` - -### 1.3 存入 IoTDB 服务器的 Java 代码示例 +String[] fields = record.value().split(","); +String device = fields[0]; +long time = Long.parseLong(fields[1]); +List measurements = Arrays.asList(fields[2].split(":")); + +String[] typeNames = fields[3].split(":"); +String[] valueTexts = fields[4].split(":"); + +List types = new ArrayList<>(); +List values = new ArrayList<>(); + +for (int i = 0; i < typeNames.length; i++) { + TSDataType type = TSDataType.valueOf(typeNames[i]); + types.add(type); + + switch (type) { + case INT32: + values.add(Integer.parseInt(valueTexts[i])); + break; + case INT64: + values.add(Long.parseLong(valueTexts[i])); + break; + case FLOAT: + values.add(Float.parseFloat(valueTexts[i])); + break; + case DOUBLE: + values.add(Double.parseDouble(valueTexts[i])); + break; + case BOOLEAN: + values.add(Boolean.parseBoolean(valueTexts[i])); + break; + case TEXT: + values.add(valueTexts[i]); + break; + default: + throw new IllegalArgumentException("Unsupported data type: " + type); + } +} + +pool.insertRecord(device, time, measurements, types, values); +``` + +其中,IoTDB SessionPool 可按如下方式创建: ```java - SessionPool pool = - new SessionPool.Builder() - .host("127.0.0.1") - .port(6667) - .user("root") - .password("root") - .maxSize(3) - .build(); - List datas = new ArrayList<>(records.count()); - for (ConsumerRecord record : records) { - datas.add(record.value()); - } - int size = datas.size(); - List deviceIds = new ArrayList<>(size); - List times = new ArrayList<>(size); - List> measurementsList = new ArrayList<>(size); - List> typesList = new ArrayList<>(size); - List> valuesList = new ArrayList<>(size); - for (String data : datas) { - String[] dataArray = data.split(","); - String device = dataArray[0]; - long time = Long.parseLong(dataArray[1]); - List measurements = Arrays.asList(dataArray[2].split(":")); - List types = new ArrayList<>(); - for (String type : dataArray[3].split(":")) { - types.add(TSDataType.valueOf(type)); - } - List values = new ArrayList<>(); - String[] valuesStr = dataArray[4].split(":"); - for (int i = 0; i < valuesStr.length; i++) { - switch (types.get(i)) { - case INT64: - values.add(Long.parseLong(valuesStr[i])); - break; - case DOUBLE: - values.add(Double.parseDouble(valuesStr[i])); - break; - case INT32: - values.add(Integer.parseInt(valuesStr[i])); - break; - case TEXT: - values.add(valuesStr[i]); - break; - case FLOAT: - values.add(Float.parseFloat(valuesStr[i])); - break; - case BOOLEAN: - values.add(Boolean.parseBoolean(valuesStr[i])); - break; - } - } - deviceIds.add(device); - times.add(time); - measurementsList.add(measurements); - typesList.add(types); - valuesList.add(values); - } - pool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); - ``` +SessionPool pool = new SessionPool.Builder() + .host("127.0.0.1") + .port(6667) + .user("root") + .password("root") + .maxSize(3) + .build(); +``` + +## 6. 查询写入结果 + +连接 IoTDB CLI: + +```bash +./sbin/start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root +``` + +执行查询: + +```sql +SELECT * FROM root.kafka.**; +``` + +查询结果中可以看到由 Kafka Consumer 写入的时间序列数据,例如: + +```text ++-----------------------------+-------------------------+--------------------+ +| Time|root.kafka.d0.temperature|root.kafka.d0.status| ++-----------------------------+-------------------------+--------------------+ +|2024-05-20T10:00:00.000+08:00| 36.5| true| ++-----------------------------+-------------------------+--------------------+ +``` diff --git a/src/zh/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md b/src/zh/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md index 214cc2c74..9b81a9d28 100644 --- a/src/zh/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md +++ b/src/zh/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md @@ -21,98 +21,171 @@ # Kafka -[Apache Kafka](https://kafka.apache.org/) 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。 +[Apache Kafka](https://kafka.apache.org/) 是一个开源的分布式事件流平台,常用于构建高性能数据管道、流式分析和数据集成系统。IoTDB 可以通过 Kafka Consumer 订阅 Kafka 中的数据,并使用 Session API 将数据写入 IoTDB。 -## 1. 示例代码 +本文介绍一个简单的数据写入流程:应用程序向 Kafka Topic 写入消息,Kafka Consumer 消费消息并解析为 IoTDB 时序数据,最后写入 IoTDB。 -### 1.1 kafka 生产者生产数据 Java 代码示例 +## 1. 环境准备 + +使用前请确保已准备以下环境: + +- JDK 8 或以上版本 +- Maven 3.6 或以上版本 +- Apache Kafka,安装与启动方式请参考 [Kafka 官方文档](https://kafka.apache.org/documentation/) +- IoTDB 服务已启动 + +以下示例中使用的默认地址如下: + +| 服务 | 地址 | +| --- | --- | +| Kafka | `127.0.0.1:9092` | +| IoTDB | `127.0.0.1:6667` | +| IoTDB 用户名 | `root` | +| IoTDB 密码 | `root` | + +## 2. 添加项目依赖 + +在 Maven 项目的 `pom.xml` 中添加 Kafka 与 IoTDB Session 相关依赖。IoTDB 依赖版本建议与实际部署的 IoTDB 版本保持一致。 + +```xml + + + org.apache.iotdb + iotdb-session + 2.0.4 + + + org.apache.kafka + kafka-clients + 3.7.0 + + +``` + +完整示例工程可参考:[iotdb-extras/examples/kafka](https://github.com/apache/iotdb-extras/tree/master/examples/kafka)。 + +## 3. Kafka 消息格式 + +示例程序使用字符串格式传输一条 IoTDB 数据记录: + +```text +device,timestamp,measurements,types,values +``` + +字段说明如下: + +| 字段 | 说明 | 示例 | +| --- | --- | --- | +| `device` | IoTDB 设备路径 | `root.kafka.d0` | +| `timestamp` | 时间戳,单位为毫秒 | `1716180000000` | +| `measurements` | 测点名称,多个测点使用 `:` 分隔 | `temperature:status` | +| `types` | 数据类型,多个类型使用 `:` 分隔 | `DOUBLE:BOOLEAN` | +| `values` | 数据值,多个值使用 `:` 分隔 | `36.5:true` | + +单测点消息示例: + +```text +root.kafka.d0,1716180000000,temperature,DOUBLE,36.5 +``` + +多测点消息示例: + +```text +root.kafka.d0,1716180000000,temperature:status,DOUBLE:BOOLEAN,36.5:true +``` + +## 4. 生产数据到 Kafka + +以下代码展示向 `Kafka-Test` Topic 写入一条 IoTDB 数据记录的关键逻辑: ```java - Properties props = new Properties(); - props.put("bootstrap.servers", "127.0.0.1:9092"); - props.put("key.serializer", StringSerializer.class); - props.put("value.serializer", StringSerializer.class); - KafkaProducer producer = new KafkaProducer<>(props); - producer.send( - new ProducerRecord<>( - "Kafka-Test", "key", "root.kafka," + System.currentTimeMillis() + ",value,INT32,100")); - producer.close(); +String value = "root.kafka.d0," + + System.currentTimeMillis() + + ",temperature:status,DOUBLE:BOOLEAN,36.5:true"; + +producer.send(new ProducerRecord<>("Kafka-Test", "iotdb", value)); ``` -### 1.2 kafka 消费者接收数据 Java 代码示例 +## 5. 消费 Kafka 数据并写入 IoTDB + +Kafka Consumer 从 Topic 中读取消息后,解析设备、时间戳、测点、类型和值,并调用 IoTDB SessionPool 写入数据。 ```java - Properties props = new Properties(); - props.put("bootstrap.servers", "127.0.0.1:9092"); - props.put("key.deserializer", StringDeserializer.class); - props.put("value.deserializer", StringDeserializer.class); - props.put("auto.offset.reset", "earliest"); - props.put("group.id", "Kafka-Test"); - KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props); - kafkaConsumer.subscribe(Collections.singleton("Kafka-Test")); - ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1)); - ``` - -### 1.3 存入 IoTDB 服务器的 Java 代码示例 +String[] fields = record.value().split(","); +String device = fields[0]; +long time = Long.parseLong(fields[1]); +List measurements = Arrays.asList(fields[2].split(":")); + +String[] typeNames = fields[3].split(":"); +String[] valueTexts = fields[4].split(":"); + +List types = new ArrayList<>(); +List values = new ArrayList<>(); + +for (int i = 0; i < typeNames.length; i++) { + TSDataType type = TSDataType.valueOf(typeNames[i]); + types.add(type); + + switch (type) { + case INT32: + values.add(Integer.parseInt(valueTexts[i])); + break; + case INT64: + values.add(Long.parseLong(valueTexts[i])); + break; + case FLOAT: + values.add(Float.parseFloat(valueTexts[i])); + break; + case DOUBLE: + values.add(Double.parseDouble(valueTexts[i])); + break; + case BOOLEAN: + values.add(Boolean.parseBoolean(valueTexts[i])); + break; + case TEXT: + values.add(valueTexts[i]); + break; + default: + throw new IllegalArgumentException("Unsupported data type: " + type); + } +} + +pool.insertRecord(device, time, measurements, types, values); +``` + +其中,IoTDB SessionPool 可按如下方式创建: ```java - SessionPool pool = - new SessionPool.Builder() - .host("127.0.0.1") - .port(6667) - .user("root") - .password("root") - .maxSize(3) - .build(); - List datas = new ArrayList<>(records.count()); - for (ConsumerRecord record : records) { - datas.add(record.value()); - } - int size = datas.size(); - List deviceIds = new ArrayList<>(size); - List times = new ArrayList<>(size); - List> measurementsList = new ArrayList<>(size); - List> typesList = new ArrayList<>(size); - List> valuesList = new ArrayList<>(size); - for (String data : datas) { - String[] dataArray = data.split(","); - String device = dataArray[0]; - long time = Long.parseLong(dataArray[1]); - List measurements = Arrays.asList(dataArray[2].split(":")); - List types = new ArrayList<>(); - for (String type : dataArray[3].split(":")) { - types.add(TSDataType.valueOf(type)); - } - List values = new ArrayList<>(); - String[] valuesStr = dataArray[4].split(":"); - for (int i = 0; i < valuesStr.length; i++) { - switch (types.get(i)) { - case INT64: - values.add(Long.parseLong(valuesStr[i])); - break; - case DOUBLE: - values.add(Double.parseDouble(valuesStr[i])); - break; - case INT32: - values.add(Integer.parseInt(valuesStr[i])); - break; - case TEXT: - values.add(valuesStr[i]); - break; - case FLOAT: - values.add(Float.parseFloat(valuesStr[i])); - break; - case BOOLEAN: - values.add(Boolean.parseBoolean(valuesStr[i])); - break; - } - } - deviceIds.add(device); - times.add(time); - measurementsList.add(measurements); - typesList.add(types); - valuesList.add(values); - } - pool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); - ``` +SessionPool pool = new SessionPool.Builder() + .host("127.0.0.1") + .port(6667) + .user("root") + .password("root") + .maxSize(3) + .build(); +``` + +## 6. 查询写入结果 + +连接 IoTDB CLI: + +```bash +./sbin/start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root +``` + +执行查询: + +```sql +SELECT * FROM root.kafka.**; +``` + +查询结果中可以看到由 Kafka Consumer 写入的时间序列数据,例如: + +```text ++-----------------------------+-------------------------+--------------------+ +| Time|root.kafka.d0.temperature|root.kafka.d0.status| ++-----------------------------+-------------------------+--------------------+ +|2024-05-20T10:00:00.000+08:00| 36.5| true| ++-----------------------------+-------------------------+--------------------+ +```