From b1d0ebedddedc9a321e7bb6c81924ef924a45c1c Mon Sep 17 00:00:00 2001 From: Igor Bolshakov Date: Sun, 2 Jun 2024 10:56:28 +0300 Subject: [PATCH 1/6] Kafka Connect: Add table to topics mapping property --- .../iceberg/connect/IcebergSinkConfig.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9fcb130e4094..50d7c61b540a 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -65,6 +65,8 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String WRITE_PROP_PREFIX = "iceberg.tables.write-props."; private static final String CATALOG_NAME_PROP = "iceberg.catalog"; + private static final String TOPIC_TO_TABLES_MAPPING_PROP = + "iceberg.tables.topic-to-table-mapping"; private static final String TABLES_PROP = "iceberg.tables"; private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled"; private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field"; @@ -115,6 +117,12 @@ public static String version() { private static ConfigDef newConfigDef() { ConfigDef configDef = new ConfigDef(); + configDef.define( + TOPIC_TO_TABLES_MAPPING_PROP, + Type.LIST, + null, + Importance.LOW, + "Comma-delimited list of topic to table mappings"); configDef.define( TABLES_PROP, ConfigDef.Type.LIST, @@ -327,6 +335,18 @@ public String catalogName() { return getString(CATALOG_NAME_PROP); } + public Map topicToTableMap() { + Map topicToTableMap = Maps.newHashMap(); + for (String topicToTable : getList(TOPIC_TO_TABLES_MAPPING_PROP)) { + String[] propsplit = topicToTable.trim().split(":"); + if (propsplit.length == 2) { + topicToTableMap.put(propsplit[0].trim(), propsplit[1].trim()); + } + } + LOG.debug("Config: topicToTableMap: {}", topicToTableMap); + return topicToTableMap; + } + public List tables() { return getList(TABLES_PROP); } From fa0dd89d4eee0beb7553cb34ba47e42ce8738462 Mon Sep 17 00:00:00 2001 From: Igor Bolshakov Date: Thu, 17 Oct 2024 23:43:54 +0300 Subject: [PATCH 2/6] Implement routing and tests --- .../iceberg/connect/data/SinkWriter.java | 15 +++++++++++++++ .../iceberg/connect/data/TestSinkWriter.java | 18 ++++++++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index 48a01881935b..58d7b867a518 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -84,11 +84,26 @@ private void save(SinkRecord record) { if (config.dynamicTablesEnabled()) { routeRecordDynamically(record); + } else if (config.topicToTableMap().size() > 0) { + routeRecordByMap(record); } else { routeRecordStatically(record); } } + private void routeRecordByMap(SinkRecord record) { + Map topicToTableMap = config.topicToTableMap(); + String topicName = record.topic(); + String tableName = topicToTableMap.get(topicName); + + if (tableName == null) { + routeRecordStatically(record); + return; + } + + writerForTable(tableName, record, false).write(record); + } + private void routeRecordStatically(SinkRecord record) { String routeField = config.tablesRouteField(); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index 09f7a373d5f2..d5d0e310ff31 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -62,6 +62,7 @@ public class TestSinkWriter { optional(2, "data", Types.StringType.get()), optional(3, "date", Types.StringType.get())); private static final String ROUTE_FIELD = "fld"; + private static final String TOPIC_NAME = "topic"; @BeforeEach public void before() { @@ -235,6 +236,19 @@ public void testDynamicNoRoute() { assertThat(writerResults).hasSize(0); } + @Test + public void testTopicToTableMapRoute() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.topicToTableMap()).thenReturn(ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER)); + + Map value = ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER); + + List writerResults = sinkWriterTest(value, config); + assertThat(writerResults.size()).isEqualTo(1); + IcebergWriterResult writerResult = writerResults.get(0); + assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER); + } + private List sinkWriterTest( Map value, IcebergSinkConfig config) { IcebergWriterResult writeResult = @@ -255,7 +269,7 @@ private List sinkWriterTest( Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); SinkRecord rec = new SinkRecord( - "topic", + TOPIC_NAME, 1, null, "key", @@ -268,7 +282,7 @@ private List sinkWriterTest( SinkWriterResult result = sinkWriter.completeWrite(); - Offset offset = result.sourceOffsets().get(new TopicPartition("topic", 1)); + Offset offset = result.sourceOffsets().get(new TopicPartition(TOPIC_NAME, 1)); assertThat(offset).isNotNull(); assertThat(offset.offset()).isEqualTo(101L); // should be 1 more than current offset assertThat(offset.timestamp()).isEqualTo(now.atOffset(ZoneOffset.UTC)); From 7414e9735bdcb694fa451f5eb3d607d64fcd2196 Mon Sep 17 00:00:00 2001 From: Igor Bolshakov Date: Fri, 18 Oct 2024 00:25:54 +0300 Subject: [PATCH 3/6] Add readme --- docs/docs/kafka-connect.md | 42 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/docs/docs/kafka-connect.md b/docs/docs/kafka-connect.md index 9c7d3d83f2e4..8d1da650e317 100644 --- a/docs/docs/kafka-connect.md +++ b/docs/docs/kafka-connect.md @@ -63,6 +63,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later. | iceberg.tables | Comma-separated list of destination tables | | iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` | | iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables | +| iceberg.tables.topic-to-table-mapping | For topic to table mapping, statically map topic name to table identifier to route records | | iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified | | iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) | | iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables | @@ -364,6 +365,46 @@ See above for creating two tables. } ``` +### Topic mapped to table identifier, static routing + +This example writes to tables based on their mappings from connector config. For example, `events_list` is mapped to `db.event_get_log` table and `events_create` is mapped to `db.event_add_log`. + +#### Create two destination tables + +```sql +CREATE TABLE db.event_get_log ( + id STRING, + type STRING, + ts TIMESTAMP, + payload STRING) +PARTITIONED BY (hours(ts)); + +CREATE TABLE db.event_add_log ( + id STRING, + type STRING, + ts TIMESTAMP, + payload STRING) +PARTITIONED BY (hours(ts)); +``` + +#### Connector config + +```json +{ +"name": "events-sink", +"config": { + "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", + "tasks.max": "2", + "topics": "events_list,events_create", + "iceberg.tables.topic-to-table-mapping": "event_list:db.event_get_log,events_create:db.event_add_log", + "iceberg.catalog.type": "rest", + "iceberg.catalog.uri": "https://localhost", + "iceberg.catalog.credential": "", + "iceberg.catalog.warehouse": "" + } +} +``` + ## SMTs for the Apache Iceberg Sink Connector This project contains some SMTs that could be useful when transforming Kafka data for use by @@ -530,3 +571,4 @@ easy processing of arrays by downstream clients. Value document will convert the array into a struct of structs in the similar way as done by BSON serialization. The main struct contains fields named _0, _1, _2 etc. where the name represents the index of the element in the array. Every element is then passed as the value for the given field. +======= From b0beb430086a4fbd725d99069eadc2f969299cd4 Mon Sep 17 00:00:00 2001 From: Igor Bolshakov Date: Sun, 20 Oct 2024 14:07:30 +0300 Subject: [PATCH 4/6] fix: build and tests fixes --- .../org/apache/iceberg/connect/IcebergSinkConfig.java | 8 ++++---- .../org/apache/iceberg/connect/data/TestSinkWriter.java | 6 ++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 50d7c61b540a..0c1d43dd50ae 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -119,7 +119,7 @@ private static ConfigDef newConfigDef() { ConfigDef configDef = new ConfigDef(); configDef.define( TOPIC_TO_TABLES_MAPPING_PROP, - Type.LIST, + ConfigDef.Type.LIST, null, Importance.LOW, "Comma-delimited list of topic to table mappings"); @@ -338,9 +338,9 @@ public String catalogName() { public Map topicToTableMap() { Map topicToTableMap = Maps.newHashMap(); for (String topicToTable : getList(TOPIC_TO_TABLES_MAPPING_PROP)) { - String[] propsplit = topicToTable.trim().split(":"); - if (propsplit.length == 2) { - topicToTableMap.put(propsplit[0].trim(), propsplit[1].trim()); + List propsplit = Splitter.on(':').splitToList(topicToTable.trim()); + if (propsplit.size() == 2) { + topicToTableMap.put(propsplit.get(0).trim(), propsplit.get(1).trim()); } } LOG.debug("Config: topicToTableMap: {}", topicToTableMap); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index d5d0e310ff31..28e1f99f0222 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -239,9 +239,11 @@ public void testDynamicNoRoute() { @Test public void testTopicToTableMapRoute() { IcebergSinkConfig config = mock(IcebergSinkConfig.class); - when(config.topicToTableMap()).thenReturn(ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER)); + when(config.topicToTableMap()) + .thenReturn(ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); - Map value = ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER); + Map value = ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER.toString()); List writerResults = sinkWriterTest(value, config); assertThat(writerResults.size()).isEqualTo(1); From 25613bab02636e64dca991776c328a44aca327a0 Mon Sep 17 00:00:00 2001 From: Igor Bolshakov Date: Sat, 26 Oct 2024 09:18:51 +0300 Subject: [PATCH 5/6] fix naming --- docs/docs/kafka-connect.md | 4 ++-- .../org/apache/iceberg/connect/IcebergSinkConfig.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/docs/kafka-connect.md b/docs/docs/kafka-connect.md index 8d1da650e317..7bb32b9993fe 100644 --- a/docs/docs/kafka-connect.md +++ b/docs/docs/kafka-connect.md @@ -63,7 +63,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later. | iceberg.tables | Comma-separated list of destination tables | | iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` | | iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables | -| iceberg.tables.topic-to-table-mapping | For topic to table mapping, statically map topic name to table identifier to route records | +| iceberg.tables.topic-to-table-map | For topic to table mapping, statically map topic name to table identifier to route records | | iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified | | iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) | | iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables | @@ -396,7 +396,7 @@ PARTITIONED BY (hours(ts)); "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", "tasks.max": "2", "topics": "events_list,events_create", - "iceberg.tables.topic-to-table-mapping": "event_list:db.event_get_log,events_create:db.event_add_log", + "iceberg.tables.topic-to-table-map": "event_list:db.event_get_log,events_create:db.event_add_log", "iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://localhost", "iceberg.catalog.credential": "", diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 0c1d43dd50ae..fb943ed4a1be 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -65,8 +65,8 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String WRITE_PROP_PREFIX = "iceberg.tables.write-props."; private static final String CATALOG_NAME_PROP = "iceberg.catalog"; - private static final String TOPIC_TO_TABLES_MAPPING_PROP = - "iceberg.tables.topic-to-table-mapping"; + private static final String TOPIC_TO_TABLE_MAP_PROP = + "iceberg.tables.topic-to-table-map"; private static final String TABLES_PROP = "iceberg.tables"; private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled"; private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field"; @@ -118,7 +118,7 @@ public static String version() { private static ConfigDef newConfigDef() { ConfigDef configDef = new ConfigDef(); configDef.define( - TOPIC_TO_TABLES_MAPPING_PROP, + TOPIC_TO_TABLE_MAP_PROP, ConfigDef.Type.LIST, null, Importance.LOW, @@ -337,7 +337,7 @@ public String catalogName() { public Map topicToTableMap() { Map topicToTableMap = Maps.newHashMap(); - for (String topicToTable : getList(TOPIC_TO_TABLES_MAPPING_PROP)) { + for (String topicToTable : getList(TOPIC_TO_TABLE_MAP_PROP)) { List propsplit = Splitter.on(':').splitToList(topicToTable.trim()); if (propsplit.size() == 2) { topicToTableMap.put(propsplit.get(0).trim(), propsplit.get(1).trim()); From 77284496d0ace8f7d39c0318473743aac90ec96d Mon Sep 17 00:00:00 2001 From: Igor Bolshakov Date: Sun, 24 May 2026 13:07:10 +0300 Subject: [PATCH 6/6] fix linter --- .../java/org/apache/iceberg/connect/IcebergSinkConfig.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index fb943ed4a1be..b039cb95d825 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -65,8 +65,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String WRITE_PROP_PREFIX = "iceberg.tables.write-props."; private static final String CATALOG_NAME_PROP = "iceberg.catalog"; - private static final String TOPIC_TO_TABLE_MAP_PROP = - "iceberg.tables.topic-to-table-map"; + private static final String TOPIC_TO_TABLE_MAP_PROP = "iceberg.tables.topic-to-table-map"; private static final String TABLES_PROP = "iceberg.tables"; private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled"; private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field";