From fb250679be6987bd3324065423e9bddbbf5f964b Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Thu, 27 Jul 2023 15:31:51 -0500 Subject: [PATCH 1/5] [fix][io] Improve loading sensitive fields for many connectors --- pulsar-io/aerospike/pom.xml | 6 ++ .../io/aerospike/AerospikeAbstractSink.java | 2 +- .../io/aerospike/AerospikeSinkConfig.java | 33 +++++++++ pulsar-io/canal/pom.xml | 5 ++ .../pulsar/io/canal/CanalAbstractSource.java | 2 +- .../pulsar/io/canal/CanalSourceConfig.java | 20 ++++++ pulsar-io/influxdb/pom.xml | 5 ++ .../influxdb/InfluxDBGenericRecordSink.java | 2 +- .../pulsar/io/influxdb/v2/InfluxDBSink.java | 2 +- .../io/influxdb/v2/InfluxDBSinkConfig.java | 20 ++++++ .../influxdb/v2/InfluxDBSinkConfigTest.java | 36 ++++++++-- pulsar-io/jdbc/core/pom.xml | 6 ++ .../pulsar/io/jdbc/JdbcAbstractSink.java | 2 +- .../apache/pulsar/io/jdbc/JdbcSinkConfig.java | 20 ++++++ pulsar-io/rabbitmq/pom.xml | 5 ++ .../pulsar/io/rabbitmq/RabbitMQSink.java | 2 +- .../io/rabbitmq/RabbitMQSinkConfig.java | 20 ++++++ .../pulsar/io/rabbitmq/RabbitMQSource.java | 2 +- .../io/rabbitmq/RabbitMQSourceConfig.java | 20 ++++++ .../rabbitmq/sink/RabbitMQSinkConfigTest.java | 63 +++++++++++++++++ .../source/RabbitMQSourceConfigTest.java | 69 +++++++++++++++++++ pulsar-io/redis/pom.xml | 5 ++ .../pulsar/io/redis/sink/RedisSink.java | 2 +- .../pulsar/io/redis/sink/RedisSinkConfig.java | 20 ++++++ .../io/redis/sink/RedisSinkConfigTest.java | 45 ++++++++++++ pulsar-io/solr/pom.xml | 5 ++ .../pulsar/io/solr/SolrAbstractSink.java | 2 +- .../apache/pulsar/io/solr/SolrSinkConfig.java | 20 ++++++ .../pulsar/io/solr/SolrSinkConfigTest.java | 40 +++++++++++ 29 files changed, 468 insertions(+), 13 deletions(-) diff --git a/pulsar-io/aerospike/pom.xml b/pulsar-io/aerospike/pom.xml index 7f604add58819..ed46f8fc17062 100644 --- a/pulsar-io/aerospike/pom.xml +++ b/pulsar-io/aerospike/pom.xml @@ -38,6 +38,12 @@ ${project.version} + + ${project.groupId} + pulsar-io-common + ${project.version} + + com.fasterxml.jackson.core jackson-databind diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java index a0cc2809239ed..ffb0177b7a269 100644 --- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java +++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java @@ -58,7 +58,7 @@ public abstract class AerospikeAbstractSink implements Sink { @Override public void open(Map config, SinkContext sinkContext) throws Exception { - aerospikeSinkConfig = AerospikeSinkConfig.load(config); + aerospikeSinkConfig = AerospikeSinkConfig.load(config, sinkContext); if (aerospikeSinkConfig.getSeedHosts() == null || aerospikeSinkConfig.getKeyspace() == null || aerospikeSinkConfig.getColumnName() == null) { diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java index af9374919dfa5..b1a62aded0220 100644 --- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java +++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.aerospike; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import java.io.File; @@ -26,6 +27,9 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.core.annotations.FieldDoc; @Data @Accessors(chain = true) @@ -38,7 +42,19 @@ public class AerospikeSinkConfig implements Serializable { private String columnName; // Optional + @FieldDoc( + required = false, + defaultValue = "", + sensitive = true, + help = "The username for authentication." + ) private String userName; + @FieldDoc( + required = false, + defaultValue = "", + sensitive = true, + help = "The password for authentication." + ) private String password; private String keySet; private int maxConcurrentRequests = 100; @@ -46,13 +62,30 @@ public class AerospikeSinkConfig implements Serializable { private int retries = 1; + /** + * @deprecated Use {@link #load(String, SinkContext)} instead. + */ + @Deprecated public static AerospikeSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class); } + public static AerospikeSinkConfig load(String yamlFile, SinkContext context) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context); + } + + /** + * @deprecated Use {@link #load(Map, SinkContext)} instead. + */ + @Deprecated public static AerospikeSinkConfig load(Map map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(mapper.writeValueAsString(map), AerospikeSinkConfig.class); } + + public static AerospikeSinkConfig load(Map map, SinkContext context) { + return IOConfigUtils.loadWithSecrets(map, AerospikeSinkConfig.class, context); + } } \ No newline at end of file diff --git a/pulsar-io/canal/pom.xml b/pulsar-io/canal/pom.xml index ecab67eba26b6..dceed23900c0f 100644 --- a/pulsar-io/canal/pom.xml +++ b/pulsar-io/canal/pom.xml @@ -42,6 +42,11 @@ pulsar-io-core ${project.version} + + ${project.groupId} + pulsar-io-common + ${project.version} + com.fasterxml.jackson.core jackson-databind diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java index 06c8788d5aea1..7d0cd0305a49e 100644 --- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java +++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java @@ -57,7 +57,7 @@ public abstract class CanalAbstractSource extends PushSource { @Override public void open(Map config, SourceContext sourceContext) throws Exception { - canalSourceConfig = CanalSourceConfig.load(config); + canalSourceConfig = CanalSourceConfig.load(config, sourceContext); if (canalSourceConfig.getCluster()) { connector = CanalConnectors.newClusterConnector(canalSourceConfig.getZkServers(), canalSourceConfig.getDestination(), canalSourceConfig.getUsername(), diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java index a0408e60e5f76..49db4b76b6147 100644 --- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java +++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.canal; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import java.io.File; @@ -26,6 +27,8 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @@ -80,14 +83,31 @@ public class CanalSourceConfig implements Serializable{ help = "The batch size to fetch from canal.") private int batchSize = 1000; + + /** + * @deprecated Use {@link #load(String, SourceContext)} instead. + */ + @Deprecated public static CanalSourceConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), CanalSourceConfig.class); } + public static CanalSourceConfig load(String yamlFile, SourceContext context) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context); + } + /** + * @deprecated Use {@link #load(Map, SourceContext)} instead. + */ + @Deprecated public static CanalSourceConfig load(Map map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(mapper.writeValueAsString(map), CanalSourceConfig.class); } + + public static CanalSourceConfig load(Map map, SourceContext context) { + return IOConfigUtils.loadWithSecrets(map, CanalSourceConfig.class, context); + } } diff --git a/pulsar-io/influxdb/pom.xml b/pulsar-io/influxdb/pom.xml index cad7956ccfcce..44a16ae3f6501 100644 --- a/pulsar-io/influxdb/pom.xml +++ b/pulsar-io/influxdb/pom.xml @@ -37,6 +37,11 @@ pulsar-io-core ${project.version} + + ${project.groupId} + pulsar-io-common + ${project.version} + ${project.groupId} pulsar-functions-instance diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java index 5b51461fc7b8e..3dd788a25265b 100644 --- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java +++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java @@ -46,7 +46,7 @@ public class InfluxDBGenericRecordSink implements Sink { @Override public void open(Map map, SinkContext sinkContext) throws Exception { try { - val configV2 = InfluxDBSinkConfig.load(map); + val configV2 = InfluxDBSinkConfig.load(map, sinkContext); configV2.validate(); sink = new InfluxDBSink(); } catch (Exception e) { diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java index 08f1ab2339992..0aa43570596af 100644 --- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java +++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java @@ -49,7 +49,7 @@ public class InfluxDBSink extends BatchSink { @Override public void open(Map config, SinkContext sinkContext) throws Exception { - InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config); + InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config, sinkContext); influxDBSinkConfig.validate(); super.init(influxDBSinkConfig.getBatchTimeMs(), influxDBSinkConfig.getBatchSize()); diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java index 899b00c002155..f870249021fe7 100644 --- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java +++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.influxdb.v2; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.google.common.base.Preconditions; @@ -27,6 +28,8 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; /** @@ -98,16 +101,33 @@ public class InfluxDBSinkConfig implements Serializable { ) private int batchSize = 200; + /** + * @deprecated Use {@link #load(String, SinkContext)} instead. + */ + @Deprecated public static InfluxDBSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class); } + public static InfluxDBSinkConfig load(String yamlFile, SinkContext context) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context); + } + + /** + * @deprecated Use {@link #load(Map, SinkContext)} instead. + */ + @Deprecated public static InfluxDBSinkConfig load(Map map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(mapper.writeValueAsString(map), InfluxDBSinkConfig.class); } + public static InfluxDBSinkConfig load(Map map, SinkContext context) { + return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, context); + } + public void validate() { Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not set."); Preconditions.checkNotNull(token, "token property not set."); diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java index df1f7fd29a637..1c43c0a9a4650 100644 --- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java +++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java @@ -18,12 +18,15 @@ */ package org.apache.pulsar.io.influxdb.v2; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import java.io.File; import java.util.HashMap; import java.util.Map; +import org.apache.pulsar.io.core.SinkContext; import org.testng.annotations.Test; public class InfluxDBSinkConfigTest { @@ -36,7 +39,20 @@ public final void testLoadFromYaml() throws Exception { assertNotNull(config); config.validate(); - verifyValues(config); + verifyValues(config, "xxxx"); + } + + @Test + public final void testLoadFromYamlAndContext() throws Exception { + File yamlFile = getFile("sinkConfig-v2.yaml"); + String path = yamlFile.getAbsolutePath(); + SinkContext context = mock(SinkContext.class); + when(context.getSecret("token")).thenReturn("a-super-secret-token"); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(path, context); + + assertNotNull(config); + config.validate(); + verifyValues(config, "a-super-secret-token"); } private Map buildValidConfigMap() { @@ -61,9 +77,21 @@ public final void testLoadFromMap() throws Exception { InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map); assertNotNull(config); config.validate(); - verifyValues(config); + verifyValues(config, "xxxx"); } + @Test + public final void testLoadFromMapAndContext() throws Exception { + Map map = buildValidConfigMap(); + SinkContext context = mock(SinkContext.class); + when(context.getSecret("token")).thenReturn("a-super-secret-token"); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, context); + assertNotNull(config); + config.validate(); + verifyValues(config, "a-super-secret-token"); + } + + @Test(expectedExceptions = NullPointerException.class, expectedExceptionsMessageRegExp = "influxdbUrl property not set.") public void testRequiredConfigMissing() throws Exception { @@ -82,9 +110,9 @@ public void testBatchConfig() throws Exception { config.validate(); } - private void verifyValues(InfluxDBSinkConfig config) { + private void verifyValues(InfluxDBSinkConfig config, String expectedToken) { assertEquals("http://localhost:9999", config.getInfluxdbUrl()); - assertEquals("xxxx", config.getToken()); + assertEquals(expectedToken, config.getToken()); assertEquals("example-org", config.getOrganization()); assertEquals("example-bucket", config.getBucket()); assertEquals("ns", config.getPrecision()); diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml index 5c2e97c7fdc29..72f88c8b9f5b3 100644 --- a/pulsar-io/jdbc/core/pom.xml +++ b/pulsar-io/jdbc/core/pom.xml @@ -38,6 +38,12 @@ ${project.version} + + ${project.groupId} + pulsar-io-common + ${project.version} + + org.apache.avro avro diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index 4586fcebcf167..ca33b3cfdaba9 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -76,7 +76,7 @@ public abstract class JdbcAbstractSink implements Sink { @Override public void open(Map config, SinkContext sinkContext) throws Exception { - jdbcSinkConfig = JdbcSinkConfig.load(config); + jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext); jdbcSinkConfig.validate(); jdbcUrl = jdbcSinkConfig.getJdbcUrl(); diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java index f798d94f7c35e..d4d636b2341ee 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.jdbc; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import java.io.File; @@ -26,6 +27,8 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @Data @@ -140,16 +143,33 @@ public enum NullValueAction { } + /** + * @deprecated Use {@link #load(String, SinkContext)} instead. + */ + @Deprecated public static JdbcSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class); } + public static JdbcSinkConfig load(String yamlFile, SinkContext context) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context); + } + + /** + * @deprecated Use {@link #load(Map, SinkContext)} instead. + */ + @Deprecated public static JdbcSinkConfig load(Map map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(mapper.writeValueAsString(map), JdbcSinkConfig.class); } + public static JdbcSinkConfig load(Map map, SinkContext context) { + return IOConfigUtils.loadWithSecrets(map, JdbcSinkConfig.class, context); + } + public void validate() { if (timeoutMs <= 0 && batchSize <= 0) { throw new IllegalArgumentException("timeoutMs or batchSize must be set to a positive value."); diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml index 3cdcb681b3fa8..ac0e7e1b596e0 100644 --- a/pulsar-io/rabbitmq/pom.xml +++ b/pulsar-io/rabbitmq/pom.xml @@ -37,6 +37,11 @@ pulsar-io-core ${project.version} + + ${project.groupId} + pulsar-io-common + ${project.version} + ${project.groupId} pulsar-functions-instance diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java index f317a35734e69..89192c42346e8 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java @@ -53,7 +53,7 @@ public class RabbitMQSink implements Sink { @Override public void open(Map config, SinkContext sinkContext) throws Exception { - rabbitMQSinkConfig = RabbitMQSinkConfig.load(config); + rabbitMQSinkConfig = RabbitMQSinkConfig.load(config, sinkContext); rabbitMQSinkConfig.validate(); ConnectionFactory connectionFactory = rabbitMQSinkConfig.createConnectionFactory(); diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java index c1f8d6b8ad3d3..3f343e41854e3 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.rabbitmq; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.google.common.base.Preconditions; @@ -28,6 +29,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @Data @@ -55,16 +58,33 @@ public class RabbitMQSinkConfig extends RabbitMQAbstractConfig implements Serial help = "The exchange type to publish the messages on") private String exchangeType = "topic"; + /** + * @deprecated Use {@link #load(String, SinkContext)} instead. + */ + @Deprecated public static RabbitMQSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), RabbitMQSinkConfig.class); } + public static RabbitMQSinkConfig load(String yamlFile, SinkContext context) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context); + } + + /** + * @deprecated Use {@link #load(Map, SinkContext)} instead. + */ + @Deprecated public static RabbitMQSinkConfig load(Map map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(mapper.writeValueAsString(map), RabbitMQSinkConfig.class); } + public static RabbitMQSinkConfig load(Map map, SinkContext context) { + return IOConfigUtils.loadWithSecrets(map, RabbitMQSinkConfig.class, context); + } + @Override public void validate() { super.validate(); diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java index d15108c4d8288..b0b7ef31b08de 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java @@ -54,7 +54,7 @@ public class RabbitMQSource extends PushSource { @Override public void open(Map config, SourceContext sourceContext) throws Exception { - rabbitMQSourceConfig = RabbitMQSourceConfig.load(config); + rabbitMQSourceConfig = RabbitMQSourceConfig.load(config, sourceContext); rabbitMQSourceConfig.validate(); ConnectionFactory connectionFactory = rabbitMQSourceConfig.createConnectionFactory(); diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java index f24018e70da13..f71d487ab34a1 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.rabbitmq; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.google.common.base.Preconditions; @@ -28,6 +29,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @Data @@ -61,16 +64,33 @@ public class RabbitMQSourceConfig extends RabbitMQAbstractConfig implements Seri help = "Set true if the queue should be declared passively - ie to preserve durability/timeout settings") private boolean passive = false; + /** + * @deprecated Use {@link #load(String, SourceContext)} instead. + */ + @Deprecated public static RabbitMQSourceConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), RabbitMQSourceConfig.class); } + public static RabbitMQSourceConfig load(String yamlFile, SourceContext context) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context); + } + + /** + * @deprecated Use {@link #load(Map, SourceContext)} instead. + */ + @Deprecated public static RabbitMQSourceConfig load(Map map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(mapper.writeValueAsString(map), RabbitMQSourceConfig.class); } + public static RabbitMQSourceConfig load(Map map, SourceContext context) { + return IOConfigUtils.loadWithSecrets(map, RabbitMQSourceConfig.class, context); + } + @Override public void validate() { super.validate(); diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java index 3d4fd6f46e16f..99eaefa83d375 100644 --- a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java +++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.rabbitmq.sink; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig; import org.testng.annotations.Test; @@ -26,6 +27,8 @@ import java.util.HashMap; import java.util.Map; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -54,6 +57,30 @@ public final void loadFromYamlFileTest() throws IOException { assertEquals(config.getExchangeType(), "test-exchange-type"); } + @Test + public final void loadFromYamlFileAndContextTest() throws IOException { + File yamlFile = getFile("sinkConfig.yaml"); + String path = yamlFile.getAbsolutePath(); + SinkContext context = mock(SinkContext.class); + when(context.getSecret("username")).thenReturn("my-secret-name"); + when(context.getSecret("password")).thenReturn("my-secret-pass"); + RabbitMQSinkConfig config = RabbitMQSinkConfig.load(path, context); + assertNotNull(config); + assertEquals(config.getHost(), "localhost"); + assertEquals(config.getPort(), Integer.parseInt("5673")); + assertEquals(config.getVirtualHost(), "/"); + assertEquals(config.getUsername(), "my-secret-name"); + assertEquals(config.getPassword(), "my-secret-pass"); + assertEquals(config.getConnectionName(), "test-connection"); + assertEquals(config.getRequestedChannelMax(), Integer.parseInt("0")); + assertEquals(config.getRequestedFrameMax(), Integer.parseInt("0")); + assertEquals(config.getConnectionTimeout(), Integer.parseInt("60000")); + assertEquals(config.getHandshakeTimeout(), Integer.parseInt("10000")); + assertEquals(config.getRequestedHeartbeat(), Integer.parseInt("60")); + assertEquals(config.getExchangeName(), "test-exchange"); + assertEquals(config.getExchangeType(), "test-exchange-type"); + } + @Test public final void loadFromMapTest() throws IOException { Map map = new HashMap<>(); @@ -88,6 +115,42 @@ public final void loadFromMapTest() throws IOException { assertEquals(config.getExchangeType(), "test-exchange-type"); } + @Test + public final void loadFromMapAndContextTest() throws IOException { + Map map = new HashMap<>(); + map.put("host", "localhost"); + map.put("port", "5673"); + map.put("virtualHost", "/"); + map.put("connectionName", "test-connection"); + map.put("requestedChannelMax", "0"); + map.put("requestedFrameMax", "0"); + map.put("connectionTimeout", "60000"); + map.put("handshakeTimeout", "10000"); + map.put("requestedHeartbeat", "60"); + map.put("exchangeName", "test-exchange"); + map.put("exchangeType", "test-exchange-type"); + + SinkContext context = mock(SinkContext.class); + when(context.getSecret("username")).thenReturn("my-secret-name"); + when(context.getSecret("password")).thenReturn("my-secret-pass"); + RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, context); + + assertNotNull(config); + assertEquals(config.getHost(), "localhost"); + assertEquals(config.getPort(), Integer.parseInt("5673")); + assertEquals(config.getVirtualHost(), "/"); + assertEquals(config.getUsername(), "my-secret-name"); + assertEquals(config.getPassword(), "my-secret-pass"); + assertEquals(config.getConnectionName(), "test-connection"); + assertEquals(config.getRequestedChannelMax(), Integer.parseInt("0")); + assertEquals(config.getRequestedFrameMax(), Integer.parseInt("0")); + assertEquals(config.getConnectionTimeout(), Integer.parseInt("60000")); + assertEquals(config.getHandshakeTimeout(), Integer.parseInt("10000")); + assertEquals(config.getRequestedHeartbeat(), Integer.parseInt("60")); + assertEquals(config.getExchangeName(), "test-exchange"); + assertEquals(config.getExchangeType(), "test-exchange-type"); + } + @Test public final void validValidateTest() throws IOException { Map map = new HashMap<>(); diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java index c33e0070c6fd0..624b8024b5514 100644 --- a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java +++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.rabbitmq.source; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig; import org.testng.annotations.Test; @@ -26,6 +27,8 @@ import java.util.HashMap; import java.util.Map; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -57,6 +60,32 @@ public final void loadFromYamlFileTest() throws IOException { assertFalse(config.isPassive()); } + @Test + public final void loadFromYamlFileAndContextTest() throws IOException { + File yamlFile = getFile("sourceConfig.yaml"); + String path = yamlFile.getAbsolutePath(); + SourceContext context = mock(SourceContext.class); + when(context.getSecret("username")).thenReturn("my-secret-name"); + when(context.getSecret("password")).thenReturn("my-secret-pass"); + RabbitMQSourceConfig config = RabbitMQSourceConfig.load(path, context); + assertNotNull(config); + assertEquals(config.getHost(), "localhost"); + assertEquals(config.getPort(), Integer.parseInt("5672")); + assertEquals(config.getVirtualHost(), "/"); + assertEquals(config.getUsername(), "my-secret-name"); + assertEquals(config.getPassword(), "my-secret-pass"); + assertEquals(config.getQueueName(), "test-queue"); + assertEquals(config.getConnectionName(), "test-connection"); + assertEquals(config.getRequestedChannelMax(), Integer.parseInt("0")); + assertEquals(config.getRequestedFrameMax(), Integer.parseInt("0")); + assertEquals(config.getConnectionTimeout(), Integer.parseInt("60000")); + assertEquals(config.getHandshakeTimeout(), Integer.parseInt("10000")); + assertEquals(config.getRequestedHeartbeat(), Integer.parseInt("60")); + assertEquals(config.getPrefetchCount(), Integer.parseInt("0")); + assertFalse(config.isPrefetchGlobal()); + assertFalse(config.isPassive()); + } + @Test public final void loadFromMapTest() throws IOException { Map map = new HashMap<>(); @@ -96,6 +125,46 @@ public final void loadFromMapTest() throws IOException { assertEquals(true, config.isPassive()); } + @Test + public final void loadFromMapAndContextTest() throws IOException { + Map map = new HashMap<>(); + map.put("host", "localhost"); + map.put("port", "5672"); + map.put("virtualHost", "/"); + map.put("queueName", "test-queue"); + map.put("connectionName", "test-connection"); + map.put("requestedChannelMax", "0"); + map.put("requestedFrameMax", "0"); + map.put("connectionTimeout", "60000"); + map.put("handshakeTimeout", "10000"); + map.put("requestedHeartbeat", "60"); + map.put("prefetchCount", "0"); + map.put("prefetchGlobal", "false"); + map.put("passive", "true"); + SourceContext context = mock(SourceContext.class); + when(context.getSecret("username")).thenReturn("my-secret-name"); + when(context.getSecret("password")).thenReturn("my-secret-pass"); + + RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, context); + assertNotNull(config); + assertEquals(config.getHost(), "localhost"); + assertEquals(config.getPort(), Integer.parseInt("5672")); + assertEquals(config.getVirtualHost(), "/"); + assertEquals(config.getUsername(), "my-secret-name"); + assertEquals(config.getPassword(), "my-secret-pass"); + assertEquals(config.getQueueName(), "test-queue"); + assertEquals(config.getConnectionName(), "test-connection"); + assertEquals(config.getRequestedChannelMax(), Integer.parseInt("0")); + assertEquals(config.getRequestedFrameMax(), Integer.parseInt("0")); + assertEquals(config.getConnectionTimeout(), Integer.parseInt("60000")); + assertEquals(config.getHandshakeTimeout(), Integer.parseInt("10000")); + assertEquals(config.getRequestedHeartbeat(), Integer.parseInt("60")); + assertEquals(config.getPrefetchCount(), Integer.parseInt("0")); + assertEquals(config.isPrefetchGlobal(), false); + assertEquals(config.isPrefetchGlobal(), false); + assertEquals(config.isPassive(), true); + } + @Test public final void validValidateTest() throws IOException { Map map = new HashMap<>(); diff --git a/pulsar-io/redis/pom.xml b/pulsar-io/redis/pom.xml index 351ae205e667f..ddff706524752 100644 --- a/pulsar-io/redis/pom.xml +++ b/pulsar-io/redis/pom.xml @@ -37,6 +37,11 @@ pulsar-io-core ${project.version} + + ${project.groupId} + pulsar-io-common + ${project.version} + ${project.groupId} pulsar-functions-instance diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java index bff0a5c2da592..ebd6e9dbab272 100644 --- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java +++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java @@ -68,7 +68,7 @@ public class RedisSink implements Sink { public void open(Map config, SinkContext sinkContext) throws Exception { log.info("Open Redis Sink"); - redisSinkConfig = RedisSinkConfig.load(config); + redisSinkConfig = RedisSinkConfig.load(config, sinkContext); redisSinkConfig.validate(); redisSession = RedisSession.create(redisSinkConfig); diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java index a9db66812a475..c935b4a7c6870 100644 --- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java +++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.redis.sink; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.google.common.base.Preconditions; @@ -28,6 +29,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; import org.apache.pulsar.io.redis.RedisAbstractConfig; @@ -57,16 +60,33 @@ public class RedisSinkConfig extends RedisAbstractConfig implements Serializable ) private int batchSize = 200; + /** + * @deprecated Use {@link #load(String, SinkContext)} instead. + */ + @Deprecated public static RedisSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), RedisSinkConfig.class); } + public static RedisSinkConfig load(String yamlFile, SinkContext context) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context); + } + + /** + * @deprecated Use {@link #load(Map, SinkContext)} instead. + */ + @Deprecated public static RedisSinkConfig load(Map map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(mapper.writeValueAsString(map), RedisSinkConfig.class); } + public static RedisSinkConfig load(Map map, SinkContext context) { + return IOConfigUtils.loadWithSecrets(map, RedisSinkConfig.class, context); + } + @Override public void validate() { super.validate(); diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java index 1316d0994a1cd..45c13a09ee072 100644 --- a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java +++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.redis.sink; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.redis.RedisAbstractConfig; import org.testng.annotations.Test; @@ -26,6 +27,8 @@ import java.util.HashMap; import java.util.Map; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -50,6 +53,24 @@ public final void loadFromYamlFileTest() throws IOException { assertEquals(config.getConnectTimeout(), Long.parseLong("3000")); } + @Test + public final void loadFromYamlFileAndContextTest() throws IOException { + File yamlFile = getFile("sinkConfig.yaml"); + String path = yamlFile.getAbsolutePath(); + SinkContext context = mock(SinkContext.class); + when(context.getSecret("redisPassword")).thenReturn("my-secret-pass"); + RedisSinkConfig config = RedisSinkConfig.load(path, context); + assertNotNull(config); + assertEquals(config.getRedisHosts(), "localhost:6379"); + assertEquals(config.getRedisPassword(), "my-secret-pass"); + assertEquals(config.getRedisDatabase(), Integer.parseInt("1")); + assertEquals(config.getClientMode(), "Standalone"); + assertEquals(config.getOperationTimeout(), Long.parseLong("2000")); + assertEquals(config.getBatchSize(), Integer.parseInt("100")); + assertEquals(config.getBatchTimeMs(), Long.parseLong("1000")); + assertEquals(config.getConnectTimeout(), Long.parseLong("3000")); + } + @Test public final void loadFromMapTest() throws IOException { Map map = new HashMap(); @@ -74,6 +95,30 @@ public final void loadFromMapTest() throws IOException { assertEquals(config.getConnectTimeout(), Long.parseLong("3000")); } + @Test + public final void loadFromMapAndContextTest() throws IOException { + Map map = new HashMap(); + map.put("redisHosts", "localhost:6379"); + map.put("redisDatabase", "1"); + map.put("clientMode", "Standalone"); + map.put("operationTimeout", "2000"); + map.put("batchSize", "100"); + map.put("batchTimeMs", "1000"); + map.put("connectTimeout", "3000"); + SinkContext context = mock(SinkContext.class); + when(context.getSecret("redisPassword")).thenReturn("my-secret-pass"); + RedisSinkConfig config = RedisSinkConfig.load(map, context); + assertNotNull(config); + assertEquals(config.getRedisHosts(), "localhost:6379"); + assertEquals(config.getRedisPassword(), "my-secret-pass"); + assertEquals(config.getRedisDatabase(), Integer.parseInt("1")); + assertEquals(config.getClientMode(), "Standalone"); + assertEquals(config.getOperationTimeout(), Long.parseLong("2000")); + assertEquals(config.getBatchSize(), Integer.parseInt("100")); + assertEquals(config.getBatchTimeMs(), Long.parseLong("1000")); + assertEquals(config.getConnectTimeout(), Long.parseLong("3000")); + } + @Test public final void validValidateTest() throws IOException { Map map = new HashMap(); diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml index 741c6d135c509..cb3fde4a7f7fd 100644 --- a/pulsar-io/solr/pom.xml +++ b/pulsar-io/solr/pom.xml @@ -41,6 +41,11 @@ pulsar-io-core ${project.parent.version} + + ${project.groupId} + pulsar-io-common + ${project.version} + ${project.groupId} pulsar-functions-instance diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java index de9cdb4a9d82a..202c782c14c49 100644 --- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java +++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java @@ -48,7 +48,7 @@ public abstract class SolrAbstractSink implements Sink { @Override public void open(Map config, SinkContext sinkContext) throws Exception { - solrSinkConfig = SolrSinkConfig.load(config); + solrSinkConfig = SolrSinkConfig.load(config, sinkContext); solrSinkConfig.validate(); enableBasicAuth = !Strings.isNullOrEmpty(solrSinkConfig.getUsername()); diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java index 02733d230bdcb..72a190e7d3f5c 100644 --- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java +++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.solr; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.google.common.base.Preconditions; @@ -27,6 +28,8 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; /** @@ -79,16 +82,33 @@ public class SolrSinkConfig implements Serializable { help = "The password to use for basic authentication") private String password; + /** + * @deprecated Use {@link #load(String, SinkContext)} instead. + */ + @Deprecated public static SolrSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), SolrSinkConfig.class); } + public static SolrSinkConfig load(String yamlFile, SinkContext context) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context); + } + + /** + * @deprecated Use {@link #load(Map, SinkContext)} instead. + */ + @Deprecated public static SolrSinkConfig load(Map map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(mapper.writeValueAsString(map), SolrSinkConfig.class); } + public static SolrSinkConfig load(Map map, SinkContext context) { + return IOConfigUtils.loadWithSecrets(map, SolrSinkConfig.class, context); + } + public void validate() { Preconditions.checkNotNull(solrUrl, "solrUrl property not set."); Preconditions.checkNotNull(solrMode, "solrMode property not set."); diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java index 42d2121dbfcbd..e88fb9e3abd3b 100644 --- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java +++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.io.solr; import com.google.common.collect.Lists; +import org.apache.pulsar.io.core.SinkContext; import org.testng.annotations.Test; import java.io.File; @@ -29,6 +30,8 @@ import java.util.Map; import java.util.Optional; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -51,6 +54,23 @@ public final void loadFromYamlFileTest() throws IOException { assertEquals(config.getPassword(), "fake@123"); } + @Test + public final void loadFromYamlFileAndContextTest() throws IOException { + File yamlFile = getFile("sinkConfig.yaml"); + String path = yamlFile.getAbsolutePath(); + SinkContext context = mock(SinkContext.class); + when(context.getSecret("username")).thenReturn("my-secret-name"); + when(context.getSecret("password")).thenReturn("my-secret-pass"); + SolrSinkConfig config = SolrSinkConfig.load(path, context); + assertNotNull(config); + assertEquals(config.getSolrUrl(), "localhost:2181,localhost:2182/chroot"); + assertEquals(config.getSolrMode(), "SolrCloud"); + assertEquals(config.getSolrCollection(), "techproducts"); + assertEquals(config.getSolrCommitWithinMs(), Integer.parseInt("100")); + assertEquals(config.getUsername(), "my-secret-name"); + assertEquals(config.getPassword(), "my-secret-pass"); + } + @Test public final void loadFromMapTest() throws IOException { Map map = new HashMap<>(); @@ -71,6 +91,26 @@ public final void loadFromMapTest() throws IOException { assertEquals(config.getPassword(), "fake@123"); } + @Test + public final void loadFromMapAndContextTest() throws IOException { + Map map = new HashMap<>(); + map.put("solrUrl", "localhost:2181,localhost:2182/chroot"); + map.put("solrMode", "SolrCloud"); + map.put("solrCollection", "techproducts"); + map.put("solrCommitWithinMs", "100"); + SinkContext context = mock(SinkContext.class); + when(context.getSecret("username")).thenReturn("my-secret-name"); + when(context.getSecret("password")).thenReturn("my-secret-pass"); + SolrSinkConfig config = SolrSinkConfig.load(map, context); + assertNotNull(config); + assertEquals(config.getSolrUrl(), "localhost:2181,localhost:2182/chroot"); + assertEquals(config.getSolrMode(), "SolrCloud"); + assertEquals(config.getSolrCollection(), "techproducts"); + assertEquals(config.getSolrCommitWithinMs(), Integer.parseInt("100")); + assertEquals(config.getUsername(), "my-secret-name"); + assertEquals(config.getPassword(), "my-secret-pass"); + } + @Test public final void validValidateTest() throws IOException { Map map = new HashMap<>(); From 067f1c0bab64567b942a88297693c79c7b83d5db Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 28 Jul 2023 12:08:56 -0500 Subject: [PATCH 2/5] Fix defaultValue annotation for correct deserialization --- .../org/apache/pulsar/io/redis/RedisAbstractConfig.java | 2 +- .../org/apache/pulsar/io/redis/sink/RedisSinkConfig.java | 4 ++-- .../org/apache/pulsar/io/redis/sink/RedisSinkTest.java | 8 +++++++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java index 978e7de31a51c..6e1a09e18fa6a 100644 --- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java +++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java @@ -88,7 +88,7 @@ public class RedisAbstractConfig implements Serializable { @FieldDoc( required = false, - defaultValue = "10000L", + defaultValue = "10000", help = "The amount of time in milliseconds to wait before timing out when connecting") private long connectTimeout = 10000L; diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java index c935b4a7c6870..aacee8994ac01 100644 --- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java +++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java @@ -43,13 +43,13 @@ public class RedisSinkConfig extends RedisAbstractConfig implements Serializable @FieldDoc( required = false, - defaultValue = "10000L", + defaultValue = "10000", help = "The amount of time in milliseconds before an operation is marked as timed out") private long operationTimeout = 10000L; @FieldDoc( required = false, - defaultValue = "1000L", + defaultValue = "1000", help = "The Redis operation time in milliseconds") private long batchTimeMs = 1000L; diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java index 214151345b42c..ee77dba3e29f8 100644 --- a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java +++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java @@ -18,9 +18,12 @@ */ package org.apache.pulsar.io.redis.sink; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.SinkRecord; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.redis.EmbeddedRedisUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -65,8 +68,11 @@ public void TestOpenAndWriteSink() throws Exception { // prepare a foo Record Record record = build("fakeTopic", "fakeKey", "fakeValue"); + SinkContext context = mock(SinkContext.class); + when(context.getSecret("redisPassword")).thenReturn(""); + // open should success - sink.open(configs, null); + sink.open(configs, context); // write should success. sink.write(record); From 5fe7790f7592138cdaa7c8b7bb7db469c34f382e Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 28 Jul 2023 14:40:56 -0500 Subject: [PATCH 3/5] Fix more default values --- .../org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java | 2 +- .../org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java index 9b7d8e1ce905d..e8a1495e44f45 100644 --- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java +++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java @@ -94,7 +94,7 @@ public class InfluxDBSinkConfig implements Serializable { @FieldDoc( required = false, - defaultValue = "1000L", + defaultValue = "1000", help = "The InfluxDB operation time in milliseconds") private long batchTimeMs = 1000L; diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java index f870249021fe7..cb949a22d4909 100644 --- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java +++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java @@ -90,7 +90,7 @@ public class InfluxDBSinkConfig implements Serializable { @FieldDoc( required = false, - defaultValue = "1000L", + defaultValue = "1000", help = "The InfluxDB operation time in milliseconds") private long batchTimeMs = 1000; From e22258672431eb0524c0a0e49c8ab921a318b23c Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 28 Jul 2023 14:41:36 -0500 Subject: [PATCH 4/5] Fix secret loading for v1.InfluxDBSink --- .../io/influxdb/v1/InfluxDBSinkConfig.java | 21 ++++++ .../influxdb/v1/InfluxDBSinkConfigTest.java | 75 ++++++++++++++++++- 2 files changed, 94 insertions(+), 2 deletions(-) diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java index e8a1495e44f45..c80a9c31bdac8 100644 --- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java +++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.influxdb.v1; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.google.common.base.Preconditions; @@ -27,6 +28,8 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; /** @@ -105,16 +108,34 @@ public class InfluxDBSinkConfig implements Serializable { ) private int batchSize = 200; + + /** + * @deprecated Use {@link #load(String, SinkContext)} instead. + */ + @Deprecated public static InfluxDBSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class); } + public static InfluxDBSinkConfig load(String yamlFile, SinkContext context) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context); + } + + /** + * @deprecated Use {@link #load(Map, SinkContext)} instead. + */ + @Deprecated public static InfluxDBSinkConfig load(Map map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(mapper.writeValueAsString(map), InfluxDBSinkConfig.class); } + public static InfluxDBSinkConfig load(Map map, SinkContext context) { + return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, context); + } + public void validate() { Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not set."); Preconditions.checkNotNull(database, "database property not set."); diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java index 4493dcfb24854..40914e4b4c9ef 100644 --- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java +++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.influxdb.v1; +import org.apache.pulsar.io.core.SinkContext; import org.influxdb.InfluxDB; import org.testng.annotations.Test; @@ -26,7 +27,10 @@ import java.util.HashMap; import java.util.Map; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; /** @@ -49,6 +53,28 @@ public final void loadFromYamlFileTest() throws IOException { assertEquals(Integer.parseInt("100"), config.getBatchSize()); } + @Test + public final void loadFromYamlFileAndContextTest() throws IOException { + File yamlFile = getFile("sinkConfig-v1.yaml"); + String path = yamlFile.getAbsolutePath(); + SinkContext context = mock(SinkContext.class); + when(context.getSecret("username")).thenReturn("secret-username"); + when(context.getSecret("password")).thenReturn("secret-password"); + + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(path, context); + assertNotNull(config); + assertEquals(config.getInfluxdbUrl(), "http://localhost:8086"); + assertEquals(config.getDatabase(), "test_db"); + assertEquals(config.getConsistencyLevel(), "ONE"); + assertEquals(config.getLogLevel(), "NONE"); + assertEquals(config.getRetentionPolicy(), "autogen"); + assertFalse(config.isGzipEnable()); + assertEquals(config.getBatchTimeMs(), Long.parseLong("1000")); + assertEquals(config.getBatchSize(), Integer.parseInt("100")); + assertEquals(config.getUsername(), "secret-username"); + assertEquals(config.getPassword(), "secret-password"); + } + @Test public final void loadFromMapTest() throws IOException { Map map = new HashMap<>(); @@ -73,6 +99,36 @@ public final void loadFromMapTest() throws IOException { assertEquals(Integer.parseInt("100"), config.getBatchSize()); } + @Test + public final void loadFromMapAndContextTest() throws IOException { + Map map = new HashMap<>(); + map.put("influxdbUrl", "http://localhost:8086"); + map.put("database", "test_db"); + map.put("consistencyLevel", "ONE"); + map.put("logLevel", "NONE"); + map.put("retentionPolicy", "autogen"); + map.put("gzipEnable", "false"); + map.put("batchTimeMs", "1000"); + map.put("batchSize", "100"); + + SinkContext context = mock(SinkContext.class); + when(context.getSecret("username")).thenReturn("secret-username"); + when(context.getSecret("password")).thenReturn("secret-password"); + + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, context); + assertNotNull(config); + assertEquals(config.getInfluxdbUrl(), "http://localhost:8086"); + assertEquals(config.getDatabase(), "test_db"); + assertEquals(config.getConsistencyLevel(), "ONE"); + assertEquals(config.getLogLevel(), "NONE"); + assertEquals(config.getRetentionPolicy(), "autogen"); + assertFalse(config.isGzipEnable()); + assertEquals(config.getBatchTimeMs(), Long.parseLong("1000")); + assertEquals(config.getBatchSize(), Integer.parseInt("100")); + assertEquals(config.getUsername(), "secret-username"); + assertEquals(config.getPassword(), "secret-password"); + } + @Test public final void validValidateTest() throws IOException { Map map = new HashMap<>(); @@ -89,6 +145,21 @@ public final void validValidateTest() throws IOException { config.validate(); } + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "influxdbUrl cannot be null") + public final void missingInfluxdbUrlTest() throws IOException { + Map map = new HashMap<>(); + map.put("database", "test_db"); + map.put("consistencyLevel", "ONE"); + map.put("logLevel", "NONE"); + map.put("retentionPolicy", "autogen"); + map.put("gzipEnable", "false"); + map.put("batchTimeMs", "1000"); + map.put("batchSize", "100"); + + InfluxDBSinkConfig.load(map, null); + } + @Test(expectedExceptions = NullPointerException.class, expectedExceptionsMessageRegExp = "influxdbUrl property not set.") public final void missingInfluxdbUrlValidateTest() throws IOException { @@ -118,7 +189,7 @@ public final void invalidBatchSizeTest() throws IOException { map.put("batchTimeMs", "1000"); map.put("batchSize", "-100"); - InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, null); config.validate(); } @@ -135,7 +206,7 @@ public final void invalidConsistencyLevelTest() throws IOException { map.put("batchTimeMs", "1000"); map.put("batchSize", "100"); - InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, null); config.validate(); InfluxDB.ConsistencyLevel.valueOf(config.getConsistencyLevel().toUpperCase()); From dcb25f66003aa049e10a0498846041817a4c5a6f Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 15 Aug 2023 11:09:58 -0500 Subject: [PATCH 5/5] Apply defaultValue before failing for missing a required field --- .../org/apache/pulsar/io/common/IOConfigUtils.java | 6 +++--- .../org/apache/pulsar/io/common/IOConfigUtilsTest.java | 10 ++++++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java index d15986a897caa..77bd74f785117 100644 --- a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java +++ b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java @@ -77,13 +77,13 @@ private static T loadWithSecrets(Map map, Class clazz, } } configs.computeIfAbsent(field.getName(), key -> { - if (fieldDoc.required()) { - throw new IllegalArgumentException(field.getName() + " cannot be null"); - } String value = fieldDoc.defaultValue(); if (value != null && !value.isEmpty()) { return value; } + if (fieldDoc.required()) { + throw new IllegalArgumentException(field.getName() + " cannot be null"); + } return null; }); } diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java index 52afac1a5ac0c..f06dbc2cffc30 100644 --- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java +++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java @@ -54,6 +54,14 @@ static class TestDefaultConfig { ) protected String testRequired; + @FieldDoc( + required = true, + defaultValue = "defaultStr", + sensitive = true, + help = "requiredWithDefault" + ) + protected String requiredWithDefault; + @FieldDoc( required = false, defaultValue = "defaultStr", @@ -300,6 +308,8 @@ public void testDefaultValue() { TestDefaultConfig testDefaultConfig = IOConfigUtils.loadWithSecrets(configMap, TestDefaultConfig.class, new TestSinkContext()); Assert.assertEquals(testDefaultConfig.getDefaultStr(), "defaultStr"); + Assert.assertEquals(testDefaultConfig.getTestRequired(), "test"); + Assert.assertEquals(testDefaultConfig.getRequiredWithDefault(), "defaultStr"); Assert.assertEquals(testDefaultConfig.isDefaultBool(), true); Assert.assertEquals(testDefaultConfig.getDefaultInt(), 100); Assert.assertEquals(testDefaultConfig.getDefaultLong(), 100);