diff --git a/pulsar-io/aerospike/pom.xml b/pulsar-io/aerospike/pom.xml
index 7f604add58819..ed46f8fc17062 100644
--- a/pulsar-io/aerospike/pom.xml
+++ b/pulsar-io/aerospike/pom.xml
@@ -38,6 +38,12 @@
${project.version}
+
+ ${project.groupId}
+ pulsar-io-common
+ ${project.version}
+
+
com.fasterxml.jackson.core
jackson-databind
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
index a0cc2809239ed..ffb0177b7a269 100644
--- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -58,7 +58,7 @@ public abstract class AerospikeAbstractSink implements Sink {
@Override
public void open(Map config, SinkContext sinkContext) throws Exception {
- aerospikeSinkConfig = AerospikeSinkConfig.load(config);
+ aerospikeSinkConfig = AerospikeSinkConfig.load(config, sinkContext);
if (aerospikeSinkConfig.getSeedHosts() == null
|| aerospikeSinkConfig.getKeyspace() == null
|| aerospikeSinkConfig.getColumnName() == null) {
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java
index af9374919dfa5..b1a62aded0220 100644
--- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.aerospike;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
@@ -26,6 +27,9 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@Accessors(chain = true)
@@ -38,7 +42,19 @@ public class AerospikeSinkConfig implements Serializable {
private String columnName;
// Optional
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ sensitive = true,
+ help = "The username for authentication."
+ )
private String userName;
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ sensitive = true,
+ help = "The password for authentication."
+ )
private String password;
private String keySet;
private int maxConcurrentRequests = 100;
@@ -46,13 +62,30 @@ public class AerospikeSinkConfig implements Serializable {
private int retries = 1;
+ /**
+ * @deprecated Use {@link #load(String, SinkContext)} instead.
+ */
+ @Deprecated
public static AerospikeSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class);
}
+ public static AerospikeSinkConfig load(String yamlFile, SinkContext context) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return load(mapper.readValue(new File(yamlFile), new TypeReference
+
+ ${project.groupId}
+ pulsar-io-common
+ ${project.version}
+
com.fasterxml.jackson.core
jackson-databind
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
index 06c8788d5aea1..7d0cd0305a49e 100644
--- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
@@ -57,7 +57,7 @@ public abstract class CanalAbstractSource extends PushSource {
@Override
public void open(Map config, SourceContext sourceContext) throws Exception {
- canalSourceConfig = CanalSourceConfig.load(config);
+ canalSourceConfig = CanalSourceConfig.load(config, sourceContext);
if (canalSourceConfig.getCluster()) {
connector = CanalConnectors.newClusterConnector(canalSourceConfig.getZkServers(),
canalSourceConfig.getDestination(), canalSourceConfig.getUsername(),
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
index a0408e60e5f76..49db4b76b6147 100644
--- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.canal;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
@@ -26,6 +27,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@@ -80,14 +83,31 @@ public class CanalSourceConfig implements Serializable{
help = "The batch size to fetch from canal.")
private int batchSize = 1000;
+
+ /**
+ * @deprecated Use {@link #load(String, SourceContext)} instead.
+ */
+ @Deprecated
public static CanalSourceConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), CanalSourceConfig.class);
}
+ public static CanalSourceConfig load(String yamlFile, SourceContext context) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return load(mapper.readValue(new File(yamlFile), new TypeReference
+
+ ${project.groupId}
+ pulsar-io-common
+ ${project.version}
+
${project.groupId}
pulsar-functions-instance
diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java
index 5b51461fc7b8e..3dd788a25265b 100644
--- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java
+++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java
@@ -46,7 +46,7 @@ public class InfluxDBGenericRecordSink implements Sink {
@Override
public void open(Map map, SinkContext sinkContext) throws Exception {
try {
- val configV2 = InfluxDBSinkConfig.load(map);
+ val configV2 = InfluxDBSinkConfig.load(map, sinkContext);
configV2.validate();
sink = new InfluxDBSink();
} catch (Exception e) {
diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java
index 9b7d8e1ce905d..c80a9c31bdac8 100644
--- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java
+++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.influxdb.v1;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Preconditions;
@@ -27,6 +28,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
/**
@@ -94,7 +97,7 @@ public class InfluxDBSinkConfig implements Serializable {
@FieldDoc(
required = false,
- defaultValue = "1000L",
+ defaultValue = "1000",
help = "The InfluxDB operation time in milliseconds")
private long batchTimeMs = 1000L;
@@ -105,16 +108,34 @@ public class InfluxDBSinkConfig implements Serializable {
)
private int batchSize = 200;
+
+ /**
+ * @deprecated Use {@link #load(String, SinkContext)} instead.
+ */
+ @Deprecated
public static InfluxDBSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class);
}
+ public static InfluxDBSinkConfig load(String yamlFile, SinkContext context) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return load(mapper.readValue(new File(yamlFile), new TypeReference
+
+ ${project.groupId}
+ pulsar-io-common
+ ${project.version}
+
+
org.apache.avro
avro
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 4586fcebcf167..ca33b3cfdaba9 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -76,7 +76,7 @@ public abstract class JdbcAbstractSink implements Sink {
@Override
public void open(Map config, SinkContext sinkContext) throws Exception {
- jdbcSinkConfig = JdbcSinkConfig.load(config);
+ jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext);
jdbcSinkConfig.validate();
jdbcUrl = jdbcSinkConfig.getJdbcUrl();
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index f798d94f7c35e..d4d636b2341ee 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.jdbc;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
@@ -26,6 +27,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@@ -140,16 +143,33 @@ public enum NullValueAction {
}
+ /**
+ * @deprecated Use {@link #load(String, SinkContext)} instead.
+ */
+ @Deprecated
public static JdbcSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class);
}
+ public static JdbcSinkConfig load(String yamlFile, SinkContext context) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context);
+ }
+
+ /**
+ * @deprecated Use {@link #load(Map, SinkContext)} instead.
+ */
+ @Deprecated
public static JdbcSinkConfig load(Map map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), JdbcSinkConfig.class);
}
+ public static JdbcSinkConfig load(Map map, SinkContext context) {
+ return IOConfigUtils.loadWithSecrets(map, JdbcSinkConfig.class, context);
+ }
+
public void validate() {
if (timeoutMs <= 0 && batchSize <= 0) {
throw new IllegalArgumentException("timeoutMs or batchSize must be set to a positive value.");
diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index 3cdcb681b3fa8..ac0e7e1b596e0 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -37,6 +37,11 @@
pulsar-io-core
${project.version}
+
+ ${project.groupId}
+ pulsar-io-common
+ ${project.version}
+
${project.groupId}
pulsar-functions-instance
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
index f317a35734e69..89192c42346e8 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
@@ -53,7 +53,7 @@ public class RabbitMQSink implements Sink {
@Override
public void open(Map config, SinkContext sinkContext) throws Exception {
- rabbitMQSinkConfig = RabbitMQSinkConfig.load(config);
+ rabbitMQSinkConfig = RabbitMQSinkConfig.load(config, sinkContext);
rabbitMQSinkConfig.validate();
ConnectionFactory connectionFactory = rabbitMQSinkConfig.createConnectionFactory();
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
index c1f8d6b8ad3d3..3f343e41854e3 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.rabbitmq;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Preconditions;
@@ -28,6 +29,8 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@@ -55,16 +58,33 @@ public class RabbitMQSinkConfig extends RabbitMQAbstractConfig implements Serial
help = "The exchange type to publish the messages on")
private String exchangeType = "topic";
+ /**
+ * @deprecated Use {@link #load(String, SinkContext)} instead.
+ */
+ @Deprecated
public static RabbitMQSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), RabbitMQSinkConfig.class);
}
+ public static RabbitMQSinkConfig load(String yamlFile, SinkContext context) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context);
+ }
+
+ /**
+ * @deprecated Use {@link #load(Map, SinkContext)} instead.
+ */
+ @Deprecated
public static RabbitMQSinkConfig load(Map map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), RabbitMQSinkConfig.class);
}
+ public static RabbitMQSinkConfig load(Map map, SinkContext context) {
+ return IOConfigUtils.loadWithSecrets(map, RabbitMQSinkConfig.class, context);
+ }
+
@Override
public void validate() {
super.validate();
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index d15108c4d8288..b0b7ef31b08de 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -54,7 +54,7 @@ public class RabbitMQSource extends PushSource {
@Override
public void open(Map config, SourceContext sourceContext) throws Exception {
- rabbitMQSourceConfig = RabbitMQSourceConfig.load(config);
+ rabbitMQSourceConfig = RabbitMQSourceConfig.load(config, sourceContext);
rabbitMQSourceConfig.validate();
ConnectionFactory connectionFactory = rabbitMQSourceConfig.createConnectionFactory();
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
index f24018e70da13..f71d487ab34a1 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.rabbitmq;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Preconditions;
@@ -28,6 +29,8 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@@ -61,16 +64,33 @@ public class RabbitMQSourceConfig extends RabbitMQAbstractConfig implements Seri
help = "Set true if the queue should be declared passively - ie to preserve durability/timeout settings")
private boolean passive = false;
+ /**
+ * @deprecated Use {@link #load(String, SourceContext)} instead.
+ */
+ @Deprecated
public static RabbitMQSourceConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), RabbitMQSourceConfig.class);
}
+ public static RabbitMQSourceConfig load(String yamlFile, SourceContext context) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context);
+ }
+
+ /**
+ * @deprecated Use {@link #load(Map, SourceContext)} instead.
+ */
+ @Deprecated
public static RabbitMQSourceConfig load(Map map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), RabbitMQSourceConfig.class);
}
+ public static RabbitMQSourceConfig load(Map map, SourceContext context) {
+ return IOConfigUtils.loadWithSecrets(map, RabbitMQSourceConfig.class, context);
+ }
+
@Override
public void validate() {
super.validate();
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
index 3d4fd6f46e16f..99eaefa83d375 100644
--- a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.rabbitmq.sink;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig;
import org.testng.annotations.Test;
@@ -26,6 +27,8 @@
import java.util.HashMap;
import java.util.Map;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -54,6 +57,30 @@ public final void loadFromYamlFileTest() throws IOException {
assertEquals(config.getExchangeType(), "test-exchange-type");
}
+ @Test
+ public final void loadFromYamlFileAndContextTest() throws IOException {
+ File yamlFile = getFile("sinkConfig.yaml");
+ String path = yamlFile.getAbsolutePath();
+ SinkContext context = mock(SinkContext.class);
+ when(context.getSecret("username")).thenReturn("my-secret-name");
+ when(context.getSecret("password")).thenReturn("my-secret-pass");
+ RabbitMQSinkConfig config = RabbitMQSinkConfig.load(path, context);
+ assertNotNull(config);
+ assertEquals(config.getHost(), "localhost");
+ assertEquals(config.getPort(), Integer.parseInt("5673"));
+ assertEquals(config.getVirtualHost(), "/");
+ assertEquals(config.getUsername(), "my-secret-name");
+ assertEquals(config.getPassword(), "my-secret-pass");
+ assertEquals(config.getConnectionName(), "test-connection");
+ assertEquals(config.getRequestedChannelMax(), Integer.parseInt("0"));
+ assertEquals(config.getRequestedFrameMax(), Integer.parseInt("0"));
+ assertEquals(config.getConnectionTimeout(), Integer.parseInt("60000"));
+ assertEquals(config.getHandshakeTimeout(), Integer.parseInt("10000"));
+ assertEquals(config.getRequestedHeartbeat(), Integer.parseInt("60"));
+ assertEquals(config.getExchangeName(), "test-exchange");
+ assertEquals(config.getExchangeType(), "test-exchange-type");
+ }
+
@Test
public final void loadFromMapTest() throws IOException {
Map map = new HashMap<>();
@@ -88,6 +115,42 @@ public final void loadFromMapTest() throws IOException {
assertEquals(config.getExchangeType(), "test-exchange-type");
}
+ @Test
+ public final void loadFromMapAndContextTest() throws IOException {
+ Map map = new HashMap<>();
+ map.put("host", "localhost");
+ map.put("port", "5673");
+ map.put("virtualHost", "/");
+ map.put("connectionName", "test-connection");
+ map.put("requestedChannelMax", "0");
+ map.put("requestedFrameMax", "0");
+ map.put("connectionTimeout", "60000");
+ map.put("handshakeTimeout", "10000");
+ map.put("requestedHeartbeat", "60");
+ map.put("exchangeName", "test-exchange");
+ map.put("exchangeType", "test-exchange-type");
+
+ SinkContext context = mock(SinkContext.class);
+ when(context.getSecret("username")).thenReturn("my-secret-name");
+ when(context.getSecret("password")).thenReturn("my-secret-pass");
+ RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, context);
+
+ assertNotNull(config);
+ assertEquals(config.getHost(), "localhost");
+ assertEquals(config.getPort(), Integer.parseInt("5673"));
+ assertEquals(config.getVirtualHost(), "/");
+ assertEquals(config.getUsername(), "my-secret-name");
+ assertEquals(config.getPassword(), "my-secret-pass");
+ assertEquals(config.getConnectionName(), "test-connection");
+ assertEquals(config.getRequestedChannelMax(), Integer.parseInt("0"));
+ assertEquals(config.getRequestedFrameMax(), Integer.parseInt("0"));
+ assertEquals(config.getConnectionTimeout(), Integer.parseInt("60000"));
+ assertEquals(config.getHandshakeTimeout(), Integer.parseInt("10000"));
+ assertEquals(config.getRequestedHeartbeat(), Integer.parseInt("60"));
+ assertEquals(config.getExchangeName(), "test-exchange");
+ assertEquals(config.getExchangeType(), "test-exchange-type");
+ }
+
@Test
public final void validValidateTest() throws IOException {
Map map = new HashMap<>();
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
index c33e0070c6fd0..624b8024b5514 100644
--- a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.rabbitmq.source;
+import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig;
import org.testng.annotations.Test;
@@ -26,6 +27,8 @@
import java.util.HashMap;
import java.util.Map;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -57,6 +60,32 @@ public final void loadFromYamlFileTest() throws IOException {
assertFalse(config.isPassive());
}
+ @Test
+ public final void loadFromYamlFileAndContextTest() throws IOException {
+ File yamlFile = getFile("sourceConfig.yaml");
+ String path = yamlFile.getAbsolutePath();
+ SourceContext context = mock(SourceContext.class);
+ when(context.getSecret("username")).thenReturn("my-secret-name");
+ when(context.getSecret("password")).thenReturn("my-secret-pass");
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(path, context);
+ assertNotNull(config);
+ assertEquals(config.getHost(), "localhost");
+ assertEquals(config.getPort(), Integer.parseInt("5672"));
+ assertEquals(config.getVirtualHost(), "/");
+ assertEquals(config.getUsername(), "my-secret-name");
+ assertEquals(config.getPassword(), "my-secret-pass");
+ assertEquals(config.getQueueName(), "test-queue");
+ assertEquals(config.getConnectionName(), "test-connection");
+ assertEquals(config.getRequestedChannelMax(), Integer.parseInt("0"));
+ assertEquals(config.getRequestedFrameMax(), Integer.parseInt("0"));
+ assertEquals(config.getConnectionTimeout(), Integer.parseInt("60000"));
+ assertEquals(config.getHandshakeTimeout(), Integer.parseInt("10000"));
+ assertEquals(config.getRequestedHeartbeat(), Integer.parseInt("60"));
+ assertEquals(config.getPrefetchCount(), Integer.parseInt("0"));
+ assertFalse(config.isPrefetchGlobal());
+ assertFalse(config.isPassive());
+ }
+
@Test
public final void loadFromMapTest() throws IOException {
Map map = new HashMap<>();
@@ -96,6 +125,46 @@ public final void loadFromMapTest() throws IOException {
assertEquals(true, config.isPassive());
}
+ @Test
+ public final void loadFromMapAndContextTest() throws IOException {
+ Map map = new HashMap<>();
+ map.put("host", "localhost");
+ map.put("port", "5672");
+ map.put("virtualHost", "/");
+ map.put("queueName", "test-queue");
+ map.put("connectionName", "test-connection");
+ map.put("requestedChannelMax", "0");
+ map.put("requestedFrameMax", "0");
+ map.put("connectionTimeout", "60000");
+ map.put("handshakeTimeout", "10000");
+ map.put("requestedHeartbeat", "60");
+ map.put("prefetchCount", "0");
+ map.put("prefetchGlobal", "false");
+ map.put("passive", "true");
+ SourceContext context = mock(SourceContext.class);
+ when(context.getSecret("username")).thenReturn("my-secret-name");
+ when(context.getSecret("password")).thenReturn("my-secret-pass");
+
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, context);
+ assertNotNull(config);
+ assertEquals(config.getHost(), "localhost");
+ assertEquals(config.getPort(), Integer.parseInt("5672"));
+ assertEquals(config.getVirtualHost(), "/");
+ assertEquals(config.getUsername(), "my-secret-name");
+ assertEquals(config.getPassword(), "my-secret-pass");
+ assertEquals(config.getQueueName(), "test-queue");
+ assertEquals(config.getConnectionName(), "test-connection");
+ assertEquals(config.getRequestedChannelMax(), Integer.parseInt("0"));
+ assertEquals(config.getRequestedFrameMax(), Integer.parseInt("0"));
+ assertEquals(config.getConnectionTimeout(), Integer.parseInt("60000"));
+ assertEquals(config.getHandshakeTimeout(), Integer.parseInt("10000"));
+ assertEquals(config.getRequestedHeartbeat(), Integer.parseInt("60"));
+ assertEquals(config.getPrefetchCount(), Integer.parseInt("0"));
+ assertEquals(config.isPrefetchGlobal(), false);
+ assertEquals(config.isPrefetchGlobal(), false);
+ assertEquals(config.isPassive(), true);
+ }
+
@Test
public final void validValidateTest() throws IOException {
Map map = new HashMap<>();
diff --git a/pulsar-io/redis/pom.xml b/pulsar-io/redis/pom.xml
index 351ae205e667f..ddff706524752 100644
--- a/pulsar-io/redis/pom.xml
+++ b/pulsar-io/redis/pom.xml
@@ -37,6 +37,11 @@
pulsar-io-core
${project.version}
+
+ ${project.groupId}
+ pulsar-io-common
+ ${project.version}
+
${project.groupId}
pulsar-functions-instance
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
index 978e7de31a51c..6e1a09e18fa6a 100644
--- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
@@ -88,7 +88,7 @@ public class RedisAbstractConfig implements Serializable {
@FieldDoc(
required = false,
- defaultValue = "10000L",
+ defaultValue = "10000",
help = "The amount of time in milliseconds to wait before timing out when connecting")
private long connectTimeout = 10000L;
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
index bff0a5c2da592..ebd6e9dbab272 100644
--- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
@@ -68,7 +68,7 @@ public class RedisSink implements Sink {
public void open(Map config, SinkContext sinkContext) throws Exception {
log.info("Open Redis Sink");
- redisSinkConfig = RedisSinkConfig.load(config);
+ redisSinkConfig = RedisSinkConfig.load(config, sinkContext);
redisSinkConfig.validate();
redisSession = RedisSession.create(redisSinkConfig);
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
index a9db66812a475..aacee8994ac01 100644
--- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.redis.sink;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Preconditions;
@@ -28,6 +29,8 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import org.apache.pulsar.io.redis.RedisAbstractConfig;
@@ -40,13 +43,13 @@ public class RedisSinkConfig extends RedisAbstractConfig implements Serializable
@FieldDoc(
required = false,
- defaultValue = "10000L",
+ defaultValue = "10000",
help = "The amount of time in milliseconds before an operation is marked as timed out")
private long operationTimeout = 10000L;
@FieldDoc(
required = false,
- defaultValue = "1000L",
+ defaultValue = "1000",
help = "The Redis operation time in milliseconds")
private long batchTimeMs = 1000L;
@@ -57,16 +60,33 @@ public class RedisSinkConfig extends RedisAbstractConfig implements Serializable
)
private int batchSize = 200;
+ /**
+ * @deprecated Use {@link #load(String, SinkContext)} instead.
+ */
+ @Deprecated
public static RedisSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), RedisSinkConfig.class);
}
+ public static RedisSinkConfig load(String yamlFile, SinkContext context) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context);
+ }
+
+ /**
+ * @deprecated Use {@link #load(Map, SinkContext)} instead.
+ */
+ @Deprecated
public static RedisSinkConfig load(Map map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), RedisSinkConfig.class);
}
+ public static RedisSinkConfig load(Map map, SinkContext context) {
+ return IOConfigUtils.loadWithSecrets(map, RedisSinkConfig.class, context);
+ }
+
@Override
public void validate() {
super.validate();
diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
index 1316d0994a1cd..45c13a09ee072 100644
--- a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
+++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.redis.sink;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.redis.RedisAbstractConfig;
import org.testng.annotations.Test;
@@ -26,6 +27,8 @@
import java.util.HashMap;
import java.util.Map;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -50,6 +53,24 @@ public final void loadFromYamlFileTest() throws IOException {
assertEquals(config.getConnectTimeout(), Long.parseLong("3000"));
}
+ @Test
+ public final void loadFromYamlFileAndContextTest() throws IOException {
+ File yamlFile = getFile("sinkConfig.yaml");
+ String path = yamlFile.getAbsolutePath();
+ SinkContext context = mock(SinkContext.class);
+ when(context.getSecret("redisPassword")).thenReturn("my-secret-pass");
+ RedisSinkConfig config = RedisSinkConfig.load(path, context);
+ assertNotNull(config);
+ assertEquals(config.getRedisHosts(), "localhost:6379");
+ assertEquals(config.getRedisPassword(), "my-secret-pass");
+ assertEquals(config.getRedisDatabase(), Integer.parseInt("1"));
+ assertEquals(config.getClientMode(), "Standalone");
+ assertEquals(config.getOperationTimeout(), Long.parseLong("2000"));
+ assertEquals(config.getBatchSize(), Integer.parseInt("100"));
+ assertEquals(config.getBatchTimeMs(), Long.parseLong("1000"));
+ assertEquals(config.getConnectTimeout(), Long.parseLong("3000"));
+ }
+
@Test
public final void loadFromMapTest() throws IOException {
Map map = new HashMap();
@@ -74,6 +95,30 @@ public final void loadFromMapTest() throws IOException {
assertEquals(config.getConnectTimeout(), Long.parseLong("3000"));
}
+ @Test
+ public final void loadFromMapAndContextTest() throws IOException {
+ Map map = new HashMap();
+ map.put("redisHosts", "localhost:6379");
+ map.put("redisDatabase", "1");
+ map.put("clientMode", "Standalone");
+ map.put("operationTimeout", "2000");
+ map.put("batchSize", "100");
+ map.put("batchTimeMs", "1000");
+ map.put("connectTimeout", "3000");
+ SinkContext context = mock(SinkContext.class);
+ when(context.getSecret("redisPassword")).thenReturn("my-secret-pass");
+ RedisSinkConfig config = RedisSinkConfig.load(map, context);
+ assertNotNull(config);
+ assertEquals(config.getRedisHosts(), "localhost:6379");
+ assertEquals(config.getRedisPassword(), "my-secret-pass");
+ assertEquals(config.getRedisDatabase(), Integer.parseInt("1"));
+ assertEquals(config.getClientMode(), "Standalone");
+ assertEquals(config.getOperationTimeout(), Long.parseLong("2000"));
+ assertEquals(config.getBatchSize(), Integer.parseInt("100"));
+ assertEquals(config.getBatchTimeMs(), Long.parseLong("1000"));
+ assertEquals(config.getConnectTimeout(), Long.parseLong("3000"));
+ }
+
@Test
public final void validValidateTest() throws IOException {
Map map = new HashMap();
diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
index 214151345b42c..ee77dba3e29f8 100644
--- a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
+++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
@@ -18,9 +18,12 @@
*/
package org.apache.pulsar.io.redis.sink;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.SinkRecord;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.redis.EmbeddedRedisUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -65,8 +68,11 @@ public void TestOpenAndWriteSink() throws Exception {
// prepare a foo Record
Record record = build("fakeTopic", "fakeKey", "fakeValue");
+ SinkContext context = mock(SinkContext.class);
+ when(context.getSecret("redisPassword")).thenReturn("");
+
// open should success
- sink.open(configs, null);
+ sink.open(configs, context);
// write should success.
sink.write(record);
diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml
index 741c6d135c509..cb3fde4a7f7fd 100644
--- a/pulsar-io/solr/pom.xml
+++ b/pulsar-io/solr/pom.xml
@@ -41,6 +41,11 @@
pulsar-io-core
${project.parent.version}
+
+ ${project.groupId}
+ pulsar-io-common
+ ${project.version}
+
${project.groupId}
pulsar-functions-instance
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
index de9cdb4a9d82a..202c782c14c49 100644
--- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
+++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
@@ -48,7 +48,7 @@ public abstract class SolrAbstractSink implements Sink {
@Override
public void open(Map config, SinkContext sinkContext) throws Exception {
- solrSinkConfig = SolrSinkConfig.load(config);
+ solrSinkConfig = SolrSinkConfig.load(config, sinkContext);
solrSinkConfig.validate();
enableBasicAuth = !Strings.isNullOrEmpty(solrSinkConfig.getUsername());
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
index 02733d230bdcb..72a190e7d3f5c 100644
--- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
+++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.solr;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Preconditions;
@@ -27,6 +28,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
/**
@@ -79,16 +82,33 @@ public class SolrSinkConfig implements Serializable {
help = "The password to use for basic authentication")
private String password;
+ /**
+ * @deprecated Use {@link #load(String, SinkContext)} instead.
+ */
+ @Deprecated
public static SolrSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), SolrSinkConfig.class);
}
+ public static SolrSinkConfig load(String yamlFile, SinkContext context) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return load(mapper.readValue(new File(yamlFile), new TypeReference>() {}), context);
+ }
+
+ /**
+ * @deprecated Use {@link #load(Map, SinkContext)} instead.
+ */
+ @Deprecated
public static SolrSinkConfig load(Map map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), SolrSinkConfig.class);
}
+ public static SolrSinkConfig load(Map map, SinkContext context) {
+ return IOConfigUtils.loadWithSecrets(map, SolrSinkConfig.class, context);
+ }
+
public void validate() {
Preconditions.checkNotNull(solrUrl, "solrUrl property not set.");
Preconditions.checkNotNull(solrMode, "solrMode property not set.");
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
index 42d2121dbfcbd..e88fb9e3abd3b 100644
--- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
+++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.solr;
import com.google.common.collect.Lists;
+import org.apache.pulsar.io.core.SinkContext;
import org.testng.annotations.Test;
import java.io.File;
@@ -29,6 +30,8 @@
import java.util.Map;
import java.util.Optional;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -51,6 +54,23 @@ public final void loadFromYamlFileTest() throws IOException {
assertEquals(config.getPassword(), "fake@123");
}
+ @Test
+ public final void loadFromYamlFileAndContextTest() throws IOException {
+ File yamlFile = getFile("sinkConfig.yaml");
+ String path = yamlFile.getAbsolutePath();
+ SinkContext context = mock(SinkContext.class);
+ when(context.getSecret("username")).thenReturn("my-secret-name");
+ when(context.getSecret("password")).thenReturn("my-secret-pass");
+ SolrSinkConfig config = SolrSinkConfig.load(path, context);
+ assertNotNull(config);
+ assertEquals(config.getSolrUrl(), "localhost:2181,localhost:2182/chroot");
+ assertEquals(config.getSolrMode(), "SolrCloud");
+ assertEquals(config.getSolrCollection(), "techproducts");
+ assertEquals(config.getSolrCommitWithinMs(), Integer.parseInt("100"));
+ assertEquals(config.getUsername(), "my-secret-name");
+ assertEquals(config.getPassword(), "my-secret-pass");
+ }
+
@Test
public final void loadFromMapTest() throws IOException {
Map map = new HashMap<>();
@@ -71,6 +91,26 @@ public final void loadFromMapTest() throws IOException {
assertEquals(config.getPassword(), "fake@123");
}
+ @Test
+ public final void loadFromMapAndContextTest() throws IOException {
+ Map map = new HashMap<>();
+ map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+ map.put("solrMode", "SolrCloud");
+ map.put("solrCollection", "techproducts");
+ map.put("solrCommitWithinMs", "100");
+ SinkContext context = mock(SinkContext.class);
+ when(context.getSecret("username")).thenReturn("my-secret-name");
+ when(context.getSecret("password")).thenReturn("my-secret-pass");
+ SolrSinkConfig config = SolrSinkConfig.load(map, context);
+ assertNotNull(config);
+ assertEquals(config.getSolrUrl(), "localhost:2181,localhost:2182/chroot");
+ assertEquals(config.getSolrMode(), "SolrCloud");
+ assertEquals(config.getSolrCollection(), "techproducts");
+ assertEquals(config.getSolrCommitWithinMs(), Integer.parseInt("100"));
+ assertEquals(config.getUsername(), "my-secret-name");
+ assertEquals(config.getPassword(), "my-secret-pass");
+ }
+
@Test
public final void validValidateTest() throws IOException {
Map map = new HashMap<>();