diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml index c0d68cad72e27..a2f4d2fd50b0f 100644 --- a/pulsar-functions/instance/pom.xml +++ b/pulsar-functions/instance/pom.xml @@ -269,6 +269,15 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + some-configuration + + + diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index c3f36f754daca..691e547256a7e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -862,11 +862,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception { Thread.currentThread().setContextClassLoader(this.componentClassLoader); } try { - if (sourceSpec.getConfigs().isEmpty()) { - this.source.open(new HashMap<>(), contextImpl); - } else { - this.source.open(parseComponentConfig(sourceSpec.getConfigs()), contextImpl); - } + this.source.open(augmentAndFilterConnectorConfig(sourceSpec.getConfigs()), contextImpl); if (this.source instanceof PulsarSource) { contextImpl.setInputConsumers(((PulsarSource) this.source).getInputConsumers()); } @@ -877,31 +873,60 @@ private void setupInput(ContextImpl contextImpl) throws Exception { Thread.currentThread().setContextClassLoader(this.instanceClassLoader); } } - private Map parseComponentConfig(String connectorConfigs) throws IOException { - return parseComponentConfig(connectorConfigs, instanceConfig, componentClassLoader, componentType); + + /** + * Recursively interpolate configured secrets into the config map by calling + * {@link SecretsProvider#interpolateSecretForValue(String)}. + * @param secretsProvider - the secrets provider that will convert secret's values into config values. + * @param configs - the connector configuration map, which will be mutated. + */ + private static void interpolateSecretsIntoConfigs(SecretsProvider secretsProvider, + Map configs) { + for (Map.Entry entry : configs.entrySet()) { + Object value = entry.getValue(); + if (value instanceof String) { + String updatedValue = secretsProvider.interpolateSecretForValue((String) value); + if (updatedValue != null) { + entry.setValue(updatedValue); + } + } else if (value instanceof Map) { + interpolateSecretsIntoConfigs(secretsProvider, (Map) value); + } + } + } + + private Map augmentAndFilterConnectorConfig(String connectorConfigs) throws IOException { + return augmentAndFilterConnectorConfig(connectorConfigs, instanceConfig, secretsProvider, + componentClassLoader, componentType); } - static Map parseComponentConfig(String connectorConfigs, - InstanceConfig instanceConfig, - ClassLoader componentClassLoader, - org.apache.pulsar.functions.proto.Function + static Map augmentAndFilterConnectorConfig(String connectorConfigs, + InstanceConfig instanceConfig, + SecretsProvider secretsProvider, + ClassLoader componentClassLoader, + org.apache.pulsar.functions.proto.Function .FunctionDetails.ComponentType componentType) throws IOException { - final Map config = ObjectMapperFactory + final Map config = connectorConfigs.isEmpty() ? new HashMap<>() : ObjectMapperFactory .getMapper() .reader() .forType(new TypeReference>() {}) .readValue(connectorConfigs); + if (componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK + && componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) { + return config; + } + + interpolateSecretsIntoConfigs(secretsProvider, config); + if (instanceConfig.isIgnoreUnknownConfigFields() && componentClassLoader instanceof NarClassLoader) { final String configClassName; if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) { configClassName = ConnectorUtils .getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass(); - } else if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) { + } else { configClassName = ConnectorUtils .getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass(); - } else { - return config; } if (configClassName != null) { @@ -1014,19 +1039,11 @@ private void setupOutput(ContextImpl contextImpl) throws Exception { Thread.currentThread().setContextClassLoader(this.componentClassLoader); } try { - if (sinkSpec.getConfigs().isEmpty()) { - if (log.isDebugEnabled()) { - log.debug("Opening Sink with empty hashmap with contextImpl: {} ", contextImpl.toString()); - } - this.sink.open(new HashMap<>(), contextImpl); - } else { - if (log.isDebugEnabled()) { - log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec, - contextImpl.toString()); - } - final Map config = parseComponentConfig(sinkSpec.getConfigs()); - this.sink.open(config, contextImpl); + if (log.isDebugEnabled()) { + log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec.getConfigs(), + contextImpl.toString()); } + this.sink.open(augmentAndFilterConnectorConfig(sinkSpec.getConfigs()), contextImpl); } catch (Exception e) { log.error("Sink open produced uncaught exception: ", e); throw e; diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 5fea8bcc9fde9..0ba1d24ba74fe 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; import org.jetbrains.annotations.NotNull; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -191,9 +192,10 @@ public void testStatsManagerNull() throws Exception { @Test public void testSinkConfigParsingPreservesOriginalType() throws Exception { - final Map parsedConfig = JavaInstanceRunnable.parseComponentConfig( + final Map parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig( "{\"ttl\": 9223372036854775807}", new InstanceConfig(), + new EnvironmentBasedSecretsProvider(), null, FunctionDetails.ComponentType.SINK ); @@ -203,9 +205,10 @@ public void testSinkConfigParsingPreservesOriginalType() throws Exception { @Test public void testSourceConfigParsingPreservesOriginalType() throws Exception { - final Map parsedConfig = JavaInstanceRunnable.parseComponentConfig( + final Map parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig( "{\"ttl\": 9223372036854775807}", new InstanceConfig(), + new EnvironmentBasedSecretsProvider(), null, FunctionDetails.ComponentType.SOURCE ); @@ -213,6 +216,58 @@ public void testSourceConfigParsingPreservesOriginalType() throws Exception { Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE); } + @DataProvider(name = "component") + public Object[][] component() { + return new Object[][]{ + // Schema: component type, whether to map in secrets + { FunctionDetails.ComponentType.SINK }, + { FunctionDetails.ComponentType.SOURCE }, + { FunctionDetails.ComponentType.FUNCTION }, + { FunctionDetails.ComponentType.UNKNOWN }, + }; + } + + @Test(dataProvider = "component") + public void testEmptyStringInput(FunctionDetails.ComponentType componentType) throws Exception { + final Map parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig( + "", + new InstanceConfig(), + new EnvironmentBasedSecretsProvider(), + null, + componentType + ); + Assert.assertEquals(parsedConfig.size(), 0); + } + + // Environment variables are set in the pom.xml file + @Test(dataProvider = "component") + public void testInterpolatingEnvironmentVariables(FunctionDetails.ComponentType componentType) throws Exception { + final Map parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig( + """ + { + "key": { + "key1": "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}", + "key2": "${unset-env-var}" + }, + "key3": "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}" + } + """, + new InstanceConfig(), + new EnvironmentBasedSecretsProvider(), + null, + componentType + ); + if ((componentType == FunctionDetails.ComponentType.SOURCE + || componentType == FunctionDetails.ComponentType.SINK)) { + Assert.assertEquals(((Map) parsedConfig.get("key")).get("key1"), "some-configuration"); + Assert.assertEquals(((Map) parsedConfig.get("key")).get("key2"), "${unset-env-var}"); + Assert.assertEquals(parsedConfig.get("key3"), "some-configuration"); + } else { + Assert.assertEquals(((Map) parsedConfig.get("key")).get("key1"), "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}"); + Assert.assertEquals(((Map) parsedConfig.get("key")).get("key2"), "${unset-env-var}"); + Assert.assertEquals(parsedConfig.get("key3"), "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}"); + } + } public static class ConnectorTestConfig1 { public String field1; @@ -243,9 +298,10 @@ public void testSinkConfigIgnoreUnknownFields(boolean ignoreUnknownConfigFields, final InstanceConfig instanceConfig = new InstanceConfig(); instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields); - final Map parsedConfig = JavaInstanceRunnable.parseComponentConfig( + final Map parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig( "{\"field1\": \"value\", \"field2\": \"value2\"}", instanceConfig, + new EnvironmentBasedSecretsProvider(), narClassLoader, type ); diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java index 92db72a21eb82..fbc5c4e169a48 100644 --- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java +++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java @@ -18,12 +18,20 @@ */ package org.apache.pulsar.functions.secretsprovider; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + /** * This defines a very simple Secrets Provider that looks up environment variable * thats named the same as secretName and fetches it. */ public class EnvironmentBasedSecretsProvider implements SecretsProvider { + /** + * Pattern to match ${secretName} in the value. + */ + private static final Pattern interpolationPattern = Pattern.compile("\\$\\{(.+?)}"); + /** * Fetches a secret. * @@ -33,4 +41,15 @@ public class EnvironmentBasedSecretsProvider implements SecretsProvider { public String provideSecret(String secretName, Object pathToSecret) { return System.getenv(secretName); } + + @Override + public String interpolateSecretForValue(String value) { + Matcher m = interpolationPattern.matcher(value); + if (m.matches()) { + String secretName = m.group(1); + // If the secret doesn't exist, we return null and don't override the current value. + return provideSecret(secretName, null); + } + return null; + } } \ No newline at end of file diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java index ac479d1cb1452..5cd4261f0a223 100644 --- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java +++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java @@ -39,4 +39,16 @@ default void init(Map config) {} * @return The actual secret */ String provideSecret(String secretName, Object pathToSecret); + + /** + * If the passed value is formatted as a reference to a secret, as defined by the implementation, return the + * referenced secret. If the value is not formatted as a secret reference or the referenced secret does not exist, + * return null. + * + * @param value a config value that may be formatted as a reference to a secret + * @return the materialized secret. Otherwise, null. + */ + default String interpolateSecretForValue(String value) { + return null; + } } \ No newline at end of file