Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/docs/kafka-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
| iceberg.tables | Comma-separated list of destination tables |
| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` |
| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.topic-to-table-map | For topic to table mapping, statically map topic name to table identifier to route records |
| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) |
| iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables |
Expand Down Expand Up @@ -364,6 +365,46 @@ See above for creating two tables.
}
```

### Topic mapped to table identifier, static routing

This example writes to tables based on their mappings from connector config. For example, `events_list` is mapped to `db.event_get_log` table and `events_create` is mapped to `db.event_add_log`.

#### Create two destination tables

```sql
CREATE TABLE db.event_get_log (
id STRING,
type STRING,
ts TIMESTAMP,
payload STRING)
PARTITIONED BY (hours(ts));

CREATE TABLE db.event_add_log (
id STRING,
type STRING,
ts TIMESTAMP,
payload STRING)
PARTITIONED BY (hours(ts));
```

#### Connector config

```json
{
"name": "events-sink",
"config": {
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "events_list,events_create",
"iceberg.tables.topic-to-table-map": "event_list:db.event_get_log,events_create:db.event_add_log",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse name>"
}
}
```

## SMTs for the Apache Iceberg Sink Connector

This project contains some SMTs that could be useful when transforming Kafka data for use by
Expand Down Expand Up @@ -530,3 +571,4 @@ easy processing of arrays by downstream clients.
Value document will convert the array into a struct of structs in the similar way as done by BSON serialization.
The main struct contains fields named _0, _1, _2 etc. where the name represents the index of the element in the array.
Every element is then passed as the value for the given field.
=======
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String WRITE_PROP_PREFIX = "iceberg.tables.write-props.";

private static final String CATALOG_NAME_PROP = "iceberg.catalog";
private static final String TOPIC_TO_TABLE_MAP_PROP = "iceberg.tables.topic-to-table-map";
private static final String TABLES_PROP = "iceberg.tables";
private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled";
private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field";
Expand Down Expand Up @@ -115,6 +116,12 @@ public static String version() {

private static ConfigDef newConfigDef() {
ConfigDef configDef = new ConfigDef();
configDef.define(
TOPIC_TO_TABLE_MAP_PROP,
ConfigDef.Type.LIST,
null,
Importance.LOW,
"Comma-delimited list of topic to table mappings");
configDef.define(
TABLES_PROP,
ConfigDef.Type.LIST,
Expand Down Expand Up @@ -327,6 +334,18 @@ public String catalogName() {
return getString(CATALOG_NAME_PROP);
}

public Map<String, String> topicToTableMap() {
Map<String, String> topicToTableMap = Maps.newHashMap();
for (String topicToTable : getList(TOPIC_TO_TABLE_MAP_PROP)) {
List<String> propsplit = Splitter.on(':').splitToList(topicToTable.trim());
if (propsplit.size() == 2) {
topicToTableMap.put(propsplit.get(0).trim(), propsplit.get(1).trim());
}
}
LOG.debug("Config: topicToTableMap: {}", topicToTableMap);
return topicToTableMap;
}

public List<String> tables() {
return getList(TABLES_PROP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,26 @@ private void save(SinkRecord record) {

if (config.dynamicTablesEnabled()) {
routeRecordDynamically(record);
} else if (config.topicToTableMap().size() > 0) {
routeRecordByMap(record);
} else {
routeRecordStatically(record);
}
}

private void routeRecordByMap(SinkRecord record) {
Map<String, String> topicToTableMap = config.topicToTableMap();
String topicName = record.topic();
String tableName = topicToTableMap.get(topicName);

if (tableName == null) {
routeRecordStatically(record);
return;
}

writerForTable(tableName, record, false).write(record);
}

private void routeRecordStatically(SinkRecord record) {
String routeField = config.tablesRouteField();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class TestSinkWriter {
optional(2, "data", Types.StringType.get()),
optional(3, "date", Types.StringType.get()));
private static final String ROUTE_FIELD = "fld";
private static final String TOPIC_NAME = "topic";

@BeforeEach
public void before() {
Expand Down Expand Up @@ -235,6 +236,21 @@ public void testDynamicNoRoute() {
assertThat(writerResults).hasSize(0);
}

@Test
public void testTopicToTableMapRoute() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.topicToTableMap())
.thenReturn(ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER.toString()));
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));

Map<String, Object> value = ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER.toString());

List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
assertThat(writerResults.size()).isEqualTo(1);
IcebergWriterResult writerResult = writerResults.get(0);
assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER);
}

private List<IcebergWriterResult> sinkWriterTest(
Map<String, Object> value, IcebergSinkConfig config) {
IcebergWriterResult writeResult =
Expand All @@ -255,7 +271,7 @@ private List<IcebergWriterResult> sinkWriterTest(
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
SinkRecord rec =
new SinkRecord(
"topic",
TOPIC_NAME,
1,
null,
"key",
Expand All @@ -268,7 +284,7 @@ private List<IcebergWriterResult> sinkWriterTest(

SinkWriterResult result = sinkWriter.completeWrite();

Offset offset = result.sourceOffsets().get(new TopicPartition("topic", 1));
Offset offset = result.sourceOffsets().get(new TopicPartition(TOPIC_NAME, 1));
assertThat(offset).isNotNull();
assertThat(offset.offset()).isEqualTo(101L); // should be 1 more than current offset
assertThat(offset.timestamp()).isEqualTo(now.atOffset(ZoneOffset.UTC));
Expand Down
Loading