diff --git a/docs/docs/kafka-connect.md b/docs/docs/kafka-connect.md index 9c7d3d83f2e4..7bb32b9993fe 100644 --- a/docs/docs/kafka-connect.md +++ b/docs/docs/kafka-connect.md @@ -63,6 +63,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later. | iceberg.tables | Comma-separated list of destination tables | | iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` | | iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables | +| iceberg.tables.topic-to-table-map | For topic to table mapping, statically map topic name to table identifier to route records | | iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified | | iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) | | iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables | @@ -364,6 +365,46 @@ See above for creating two tables. } ``` +### Topic mapped to table identifier, static routing + +This example writes to tables based on their mappings from connector config. For example, `events_list` is mapped to `db.event_get_log` table and `events_create` is mapped to `db.event_add_log`. + +#### Create two destination tables + +```sql +CREATE TABLE db.event_get_log ( + id STRING, + type STRING, + ts TIMESTAMP, + payload STRING) +PARTITIONED BY (hours(ts)); + +CREATE TABLE db.event_add_log ( + id STRING, + type STRING, + ts TIMESTAMP, + payload STRING) +PARTITIONED BY (hours(ts)); +``` + +#### Connector config + +```json +{ +"name": "events-sink", +"config": { + "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", + "tasks.max": "2", + "topics": "events_list,events_create", + "iceberg.tables.topic-to-table-map": "event_list:db.event_get_log,events_create:db.event_add_log", + "iceberg.catalog.type": "rest", + "iceberg.catalog.uri": "https://localhost", + "iceberg.catalog.credential": "", + "iceberg.catalog.warehouse": "" + } +} +``` + ## SMTs for the Apache Iceberg Sink Connector This project contains some SMTs that could be useful when transforming Kafka data for use by @@ -530,3 +571,4 @@ easy processing of arrays by downstream clients. Value document will convert the array into a struct of structs in the similar way as done by BSON serialization. The main struct contains fields named _0, _1, _2 etc. where the name represents the index of the element in the array. Every element is then passed as the value for the given field. +======= diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9fcb130e4094..b039cb95d825 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -65,6 +65,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String WRITE_PROP_PREFIX = "iceberg.tables.write-props."; private static final String CATALOG_NAME_PROP = "iceberg.catalog"; + private static final String TOPIC_TO_TABLE_MAP_PROP = "iceberg.tables.topic-to-table-map"; private static final String TABLES_PROP = "iceberg.tables"; private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled"; private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field"; @@ -115,6 +116,12 @@ public static String version() { private static ConfigDef newConfigDef() { ConfigDef configDef = new ConfigDef(); + configDef.define( + TOPIC_TO_TABLE_MAP_PROP, + ConfigDef.Type.LIST, + null, + Importance.LOW, + "Comma-delimited list of topic to table mappings"); configDef.define( TABLES_PROP, ConfigDef.Type.LIST, @@ -327,6 +334,18 @@ public String catalogName() { return getString(CATALOG_NAME_PROP); } + public Map topicToTableMap() { + Map topicToTableMap = Maps.newHashMap(); + for (String topicToTable : getList(TOPIC_TO_TABLE_MAP_PROP)) { + List propsplit = Splitter.on(':').splitToList(topicToTable.trim()); + if (propsplit.size() == 2) { + topicToTableMap.put(propsplit.get(0).trim(), propsplit.get(1).trim()); + } + } + LOG.debug("Config: topicToTableMap: {}", topicToTableMap); + return topicToTableMap; + } + public List tables() { return getList(TABLES_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index 48a01881935b..58d7b867a518 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -84,11 +84,26 @@ private void save(SinkRecord record) { if (config.dynamicTablesEnabled()) { routeRecordDynamically(record); + } else if (config.topicToTableMap().size() > 0) { + routeRecordByMap(record); } else { routeRecordStatically(record); } } + private void routeRecordByMap(SinkRecord record) { + Map topicToTableMap = config.topicToTableMap(); + String topicName = record.topic(); + String tableName = topicToTableMap.get(topicName); + + if (tableName == null) { + routeRecordStatically(record); + return; + } + + writerForTable(tableName, record, false).write(record); + } + private void routeRecordStatically(SinkRecord record) { String routeField = config.tablesRouteField(); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index 09f7a373d5f2..28e1f99f0222 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -62,6 +62,7 @@ public class TestSinkWriter { optional(2, "data", Types.StringType.get()), optional(3, "date", Types.StringType.get())); private static final String ROUTE_FIELD = "fld"; + private static final String TOPIC_NAME = "topic"; @BeforeEach public void before() { @@ -235,6 +236,21 @@ public void testDynamicNoRoute() { assertThat(writerResults).hasSize(0); } + @Test + public void testTopicToTableMapRoute() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.topicToTableMap()) + .thenReturn(ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + + Map value = ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER.toString()); + + List writerResults = sinkWriterTest(value, config); + assertThat(writerResults.size()).isEqualTo(1); + IcebergWriterResult writerResult = writerResults.get(0); + assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER); + } + private List sinkWriterTest( Map value, IcebergSinkConfig config) { IcebergWriterResult writeResult = @@ -255,7 +271,7 @@ private List sinkWriterTest( Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); SinkRecord rec = new SinkRecord( - "topic", + TOPIC_NAME, 1, null, "key", @@ -268,7 +284,7 @@ private List sinkWriterTest( SinkWriterResult result = sinkWriter.completeWrite(); - Offset offset = result.sourceOffsets().get(new TopicPartition("topic", 1)); + Offset offset = result.sourceOffsets().get(new TopicPartition(TOPIC_NAME, 1)); assertThat(offset).isNotNull(); assertThat(offset.offset()).isEqualTo(101L); // should be 1 more than current offset assertThat(offset.timestamp()).isEqualTo(now.atOffset(ZoneOffset.UTC));