Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 157 additions & 84 deletions src/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,98 +21,171 @@

# Kafka

[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. IoTDB can subscribe to Kafka data through a Kafka Consumer and write the data to IoTDB by using the Session API.

## 1. Coding Example
This document introduces a simple data ingestion flow: an application writes messages to a Kafka topic, the Kafka Consumer consumes the messages and parses them into IoTDB time series data, and then writes the data to IoTDB.

### 1.1 kafka Producer Producing Data Java Code Example
## 1. Environment Preparation

Before you start, make sure that the following environment is available:

- JDK 8 or later
- Maven 3.6 or later
- Apache Kafka. For installation and startup, refer to the [Kafka official documentation](https://kafka.apache.org/documentation/)
- An IoTDB service is running

The default addresses used in the following examples are:

| Service | Address |
| --- | --- |
| Kafka | `127.0.0.1:9092` |
| IoTDB | `127.0.0.1:6667` |
| IoTDB username | `root` |
| IoTDB password | `root` |

## 2. Add Dependencies

Add the Kafka and IoTDB Session dependencies to your Maven `pom.xml`. It is recommended that the IoTDB dependency version matches your deployed IoTDB version.

```xml
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
</dependencies>
```

For the complete example project, see [iotdb-extras/examples/kafka](https://github.com/apache/iotdb-extras/tree/master/examples/kafka).

## 3. Kafka Message Format

The sample program uses a string format to transfer one IoTDB data record:

```text
device,timestamp,measurements,types,values
```

The fields are described as follows:

| Field | Description | Example |
| --- | --- | --- |
| `device` | IoTDB device path | `root.kafka.d0` |
| `timestamp` | Timestamp in milliseconds | `1716180000000` |
| `measurements` | Measurement names, separated by `:` when there are multiple values | `temperature:status` |
| `types` | Data types, separated by `:` when there are multiple values | `DOUBLE:BOOLEAN` |
| `values` | Data values, separated by `:` when there are multiple values | `36.5:true` |

Single-measurement example:

```text
root.kafka.d0,1716180000000,temperature,DOUBLE,36.5
```

Multiple-measurement example:

```text
root.kafka.d0,1716180000000,temperature:status,DOUBLE:BOOLEAN,36.5:true
```

## 4. Produce Data to Kafka

The following code shows the key logic for writing one IoTDB data record to the `Kafka-Test` topic:

```java
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(
new ProducerRecord<>(
"Kafka-Test", "key", "root.kafka," + System.currentTimeMillis() + ",value,INT32,100"));
producer.close();
String value = "root.kafka.d0,"
+ System.currentTimeMillis()
+ ",temperature:status,DOUBLE:BOOLEAN,36.5:true";

producer.send(new ProducerRecord<>("Kafka-Test", "iotdb", value));
```

### 1.2 kafka Consumer Receiving Data Java Code Example
## 5. Consume Kafka Data and Write to IoTDB

After the Kafka Consumer reads a message from the topic, it parses the device, timestamp, measurements, types, and values, and writes the data to IoTDB through SessionPool.

```java
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
props.put("auto.offset.reset", "earliest");
props.put("group.id", "Kafka-Test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Collections.singleton("Kafka-Test"));
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
```

### 1.3 Example of Java Code Stored in IoTDB Server
String[] fields = record.value().split(",");
String device = fields[0];
long time = Long.parseLong(fields[1]);
List<String> measurements = Arrays.asList(fields[2].split(":"));

String[] typeNames = fields[3].split(":");
String[] valueTexts = fields[4].split(":");

List<TSDataType> types = new ArrayList<>();
List<Object> values = new ArrayList<>();

for (int i = 0; i < typeNames.length; i++) {
TSDataType type = TSDataType.valueOf(typeNames[i]);
types.add(type);

switch (type) {
case INT32:
values.add(Integer.parseInt(valueTexts[i]));
break;
case INT64:
values.add(Long.parseLong(valueTexts[i]));
break;
case FLOAT:
values.add(Float.parseFloat(valueTexts[i]));
break;
case DOUBLE:
values.add(Double.parseDouble(valueTexts[i]));
break;
case BOOLEAN:
values.add(Boolean.parseBoolean(valueTexts[i]));
break;
case TEXT:
values.add(valueTexts[i]);
break;
default:
throw new IllegalArgumentException("Unsupported data type: " + type);
}
}

pool.insertRecord(device, time, measurements, types, values);
```

The IoTDB `SessionPool` can be created as follows:

```java
SessionPool pool =
new SessionPool.Builder()
.host("127.0.0.1")
.port(6667)
.user("root")
.password("root")
.maxSize(3)
.build();
List<String> datas = new ArrayList<>(records.count());
for (ConsumerRecord<String, String> record : records) {
datas.add(record.value());
}
int size = datas.size();
List<String> deviceIds = new ArrayList<>(size);
List<Long> times = new ArrayList<>(size);
List<List<String>> measurementsList = new ArrayList<>(size);
List<List<TSDataType>> typesList = new ArrayList<>(size);
List<List<Object>> valuesList = new ArrayList<>(size);
for (String data : datas) {
String[] dataArray = data.split(",");
String device = dataArray[0];
long time = Long.parseLong(dataArray[1]);
List<String> measurements = Arrays.asList(dataArray[2].split(":"));
List<TSDataType> types = new ArrayList<>();
for (String type : dataArray[3].split(":")) {
types.add(TSDataType.valueOf(type));
}
List<Object> values = new ArrayList<>();
String[] valuesStr = dataArray[4].split(":");
for (int i = 0; i < valuesStr.length; i++) {
switch (types.get(i)) {
case INT64:
values.add(Long.parseLong(valuesStr[i]));
break;
case DOUBLE:
values.add(Double.parseDouble(valuesStr[i]));
break;
case INT32:
values.add(Integer.parseInt(valuesStr[i]));
break;
case TEXT:
values.add(valuesStr[i]);
break;
case FLOAT:
values.add(Float.parseFloat(valuesStr[i]));
break;
case BOOLEAN:
values.add(Boolean.parseBoolean(valuesStr[i]));
break;
}
}
deviceIds.add(device);
times.add(time);
measurementsList.add(measurements);
typesList.add(types);
valuesList.add(values);
}
pool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
```
SessionPool pool = new SessionPool.Builder()
.host("127.0.0.1")
.port(6667)
.user("root")
.password("root")
.maxSize(3)
.build();
```

## 6. Query the Result

Connect to the IoTDB CLI:

```bash
./sbin/start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root
```

Run the query:

```sql
SELECT * FROM root.kafka.**;
```

The query result shows the time series data written by the Kafka Consumer, for example:

```text
+-----------------------------+-------------------------+--------------------+
| Time|root.kafka.d0.temperature|root.kafka.d0.status|
+-----------------------------+-------------------------+--------------------+
|2024-05-20T10:00:00.000+08:00| 36.5| true|
+-----------------------------+-------------------------+--------------------+
```

Loading
Loading