Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pulsar-functions/instance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,15 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<environmentVariables>
<TEST_JAVA_INSTANCE_PARSE_ENV_VAR>some-configuration</TEST_JAVA_INSTANCE_PARSE_ENV_VAR>
</environmentVariables>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,32 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable

private final Function.FunctionDetails.ComponentType componentType;

private static Map<String, Object> buildSecretsMap(InstanceConfig config) {
if (!StringUtils.isEmpty(config.getFunctionDetails().getSecretsMap())) {
return new Gson().fromJson(config.getFunctionDetails().getSecretsMap(),
new TypeToken<Map<String, Object>>() {
}.getType());
} else {
return new HashMap<>();
}
}

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder)
throws PulsarClientException {
this(config, logger, client, secretsProvider, collectorRegistry, metricsLabels, buildSecretsMap(config),
componentType, statsManager, stateManager, pulsarAdmin, clientBuilder);
}

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels, Map<String, Object> secretsMap,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder)
throws PulsarClientException {
this.config = config;
this.logger = logger;
this.clientBuilder = clientBuilder;
Expand Down Expand Up @@ -186,13 +206,7 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
}.getType());
}
this.secretsProvider = secretsProvider;
if (!StringUtils.isEmpty(config.getFunctionDetails().getSecretsMap())) {
secretsMap = new Gson().fromJson(config.getFunctionDetails().getSecretsMap(),
new TypeToken<Map<String, Object>>() {
}.getType());
} else {
secretsMap = new HashMap<>();
}
this.secretsMap = secretsMap == null ? new HashMap<>() : secretsMap;

this.metricsLabels = metricsLabels;
String prefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class InstanceConfig {
private int metricsPort;
private List<String> additionalJavaRuntimeArguments = Collections.emptyList();
private boolean ignoreUnknownConfigFields;
private boolean mergeSecretsIntoConfigMap;

/**
* Get the string representation of {@link #getInstanceId()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.BeanDeserializer;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
Expand Down Expand Up @@ -143,6 +145,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private Sink sink;

private final SecretsProvider secretsProvider;
private Map<String, Object> secretsMap;

private FunctionCollectorRegistry collectorRegistry;
private final String[] metricsLabels;
Expand Down Expand Up @@ -255,6 +258,8 @@ private synchronized void setup() throws Exception {
// start the state table
setupStateStore();

setupSecretsMap();

ContextImpl contextImpl = setupContext();

// start the output producer
Expand Down Expand Up @@ -283,7 +288,7 @@ ContextImpl setupContext() throws PulsarClientException {
Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager,
collectorRegistry, metricsLabels, secretsMap, this.componentType, this.stats, stateManager,
pulsarAdmin, clientBuilder);
}

Expand Down Expand Up @@ -388,6 +393,15 @@ private StateStoreProvider getStateStoreProvider() throws Exception {
}
}

private void setupSecretsMap() {
String secrets = instanceConfig.getFunctionDetails().getSecretsMap();
if (!StringUtils.isEmpty(secrets)) {
secretsMap = new Gson().fromJson(secrets, new TypeToken<Map<String, Object>>() {}.getType());
} else {
secretsMap = new HashMap<>();
}
}

@VisibleForTesting
void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {
if (result.getUserException() != null) {
Expand Down Expand Up @@ -862,11 +876,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
Thread.currentThread().setContextClassLoader(this.componentClassLoader);
}
try {
if (sourceSpec.getConfigs().isEmpty()) {
this.source.open(new HashMap<>(), contextImpl);
} else {
this.source.open(parseComponentConfig(sourceSpec.getConfigs()), contextImpl);
}
this.source.open(augmentAndFilterConnectorConfig(sourceSpec.getConfigs()), contextImpl);
if (this.source instanceof PulsarSource) {
contextImpl.setInputConsumers(((PulsarSource) this.source).getInputConsumers());
}
Expand All @@ -877,31 +887,66 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
private Map<String, Object> parseComponentConfig(String connectorConfigs) throws IOException {
return parseComponentConfig(connectorConfigs, instanceConfig, componentClassLoader, componentType);

/**
* Merge all key value pairs from the secrets map into the config map. If a key already exists in the
* config map, the value from the config map is used to ensure backwards compatibility.
* @param secretsProvider - the secrets provider that will convert secret's values into config values.
* @param secrets - the map of secrets
* @param configs - the connector configuration map, which will be mutated.
*/
private static void mergeSecretsIntoConfigs(SecretsProvider secretsProvider,
Map<String, Object> secrets,
Map<String, Object> configs) {
for (Map.Entry<String, Object> entry : secrets.entrySet()) {
Object oldValue = configs.putIfAbsent(entry.getKey(),
secretsProvider.provideSecret(entry.getKey(), entry.getValue()));
if (oldValue != null) {
log.warn("Key collision for config {}. Secrets and config provided a key. Using config's value.",
entry.getKey());
}
}
}

static Map<String, Object> convertComponentConfig(String connectorConfigs) throws IOException {
if (StringUtils.isEmpty(connectorConfigs)) {
return new HashMap<>();
}
return ObjectMapperFactory.getMapper().reader().forType(new TypeReference<Map<String, Object>>() {})
.readValue(connectorConfigs);
}

private Map<String, Object> augmentAndFilterConnectorConfig(String connectorConfigs) throws IOException {
return augmentAndFilterConnectorConfig(connectorConfigs, secretsMap, instanceConfig, secretsProvider,
componentClassLoader, componentType);
}

static Map<String, Object> parseComponentConfig(String connectorConfigs,
InstanceConfig instanceConfig,
ClassLoader componentClassLoader,
org.apache.pulsar.functions.proto.Function
static Map<String, Object> augmentAndFilterConnectorConfig(String connectorConfigs,
Map<String, Object> secretsMap,
InstanceConfig instanceConfig,
SecretsProvider secretsProvider,
ClassLoader componentClassLoader,
org.apache.pulsar.functions.proto.Function
.FunctionDetails.ComponentType componentType)
throws IOException {
final Map<String, Object> config = ObjectMapperFactory
.getMapper()
.reader()
.forType(new TypeReference<Map<String, Object>>() {})
.readValue(connectorConfigs);
final Map<String, Object> config = convertComponentConfig(connectorConfigs);
if (componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK
&& componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
return config;
}

if (instanceConfig.isMergeSecretsIntoConfigMap()) {
mergeSecretsIntoConfigs(secretsProvider, secretsMap, config);
}

if (instanceConfig.isIgnoreUnknownConfigFields() && componentClassLoader instanceof NarClassLoader) {
final String configClassName;
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass();
} else if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
} else {
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass();
} else {
return config;
}
if (configClassName != null) {

Expand Down Expand Up @@ -1014,19 +1059,11 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
Thread.currentThread().setContextClassLoader(this.componentClassLoader);
}
try {
if (sinkSpec.getConfigs().isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("Opening Sink with empty hashmap with contextImpl: {} ", contextImpl.toString());
}
this.sink.open(new HashMap<>(), contextImpl);
} else {
if (log.isDebugEnabled()) {
log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec,
contextImpl.toString());
}
final Map<String, Object> config = parseComponentConfig(sinkSpec.getConfigs());
this.sink.open(config, contextImpl);
if (log.isDebugEnabled()) {
log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec.getConfigs(),
contextImpl.toString());
}
this.sink.open(augmentAndFilterConnectorConfig(sinkSpec.getConfigs()), contextImpl);
} catch (Exception e) {
log.error("Sink open produced uncaught exception: ", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
Expand All @@ -47,6 +48,7 @@
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
import org.jetbrains.annotations.NotNull;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -191,9 +193,11 @@ public void testStatsManagerNull() throws Exception {

@Test
public void testSinkConfigParsingPreservesOriginalType() throws Exception {
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
"{\"ttl\": 9223372036854775807}",
new HashMap<>(),
new InstanceConfig(),
new EnvironmentBasedSecretsProvider(),
null,
FunctionDetails.ComponentType.SINK
);
Expand All @@ -203,16 +207,63 @@ public void testSinkConfigParsingPreservesOriginalType() throws Exception {

@Test
public void testSourceConfigParsingPreservesOriginalType() throws Exception {
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
"{\"ttl\": 9223372036854775807}",
new HashMap<>(),
new InstanceConfig(),
null,
null,
FunctionDetails.ComponentType.SOURCE
);
Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
}

@DataProvider(name = "componentAndSecrets")
public Object[][] componentAndSecrets() {
return new Object[][]{
// Schema: component type, whether to map in secrets
{ FunctionDetails.ComponentType.SINK, true },
{ FunctionDetails.ComponentType.SINK, false },
{ FunctionDetails.ComponentType.SOURCE, true },
{ FunctionDetails.ComponentType.SOURCE, false },
{ FunctionDetails.ComponentType.FUNCTION, true },
{ FunctionDetails.ComponentType.FUNCTION, false },
{ FunctionDetails.ComponentType.UNKNOWN, true },
{ FunctionDetails.ComponentType.UNKNOWN, false },
};
}

@Test(dataProvider = "componentAndSecrets")
public void testParsingEnvironmentVariables(FunctionDetails.ComponentType componentType,
boolean mergeSecrets) throws Exception {
final Map<String, Object> secrets = new HashMap<>();
// The env var is set in the pom.xml file
secrets.put("TEST_JAVA_INSTANCE_PARSE_ENV_VAR", "ignored");
secrets.put("collision-key", "some-value");
final InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setMergeSecretsIntoConfigMap(mergeSecrets);
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
"{\"collision-key\": \"winner\"}",
secrets,
instanceConfig,
new EnvironmentBasedSecretsProvider(),
null,
componentType
);
String expectedSecretConfig;
if (mergeSecrets
&& (componentType == FunctionDetails.ComponentType.SOURCE
|| componentType == FunctionDetails.ComponentType.SINK)) {
expectedSecretConfig = "some-configuration";
} else {
expectedSecretConfig = null;
}
Assert.assertEquals(parsedConfig.get("TEST_JAVA_INSTANCE_PARSE_ENV_VAR"), expectedSecretConfig);
Assert.assertEquals(parsedConfig.get("unset-env-var"), null);
Assert.assertEquals(parsedConfig.get("collision-key"), "winner");
}


public static class ConnectorTestConfig1 {
public String field1;
Expand Down Expand Up @@ -243,9 +294,11 @@ public void testSinkConfigIgnoreUnknownFields(boolean ignoreUnknownConfigFields,
final InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);

final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
"{\"field1\": \"value\", \"field2\": \"value2\"}",
new HashMap<>(),
instanceConfig,
null,
narClassLoader,
type
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ public class JavaInstanceStarter implements AutoCloseable {
required = false)
public Boolean ignoreUnknownConfigFields = false;

@Parameter(names = "--merge_secrets_into_config_map", arity = 1,
description = "Whether to merge secrets into the connector's configuration. Only affects Sinks and Sources."

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this doesn't apply to Functions?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could apply to functions. I targeted Sources and Sinks because those are often cases where you are running a third party connector and in those cases, it is easier to merge the configs than updating the code. In the function runtime, it seems to be a well defined flow to get a secret from the function's context. That being said, we could also just fix each implementation.

I provide some direct examples of issues with connectors in this comment #20862 (comment).

+ " When true, the SecretsProvider will materialize secrets from the connector's secrets argument"
+ " and then the resulting values will be put into the connector's configuration."
+ " In the event of a key collision, the sink or source configuration will take precedence."
+ " Secrets are merged into config map before unknown fields are filtered out when"
+ " ignoreUnknownConfigFields is true. Defaults to false.",
required = false)
public boolean mergeSecretsIntoConfigMap = false;

private Server server;
private RuntimeSpawner runtimeSpawner;
Expand Down Expand Up @@ -187,6 +196,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL
instanceConfig.setMaxPendingAsyncRequests(maxPendingAsyncRequests);
instanceConfig.setExposePulsarAdminClientEnabled(exposePulsarAdminClientEnabled);
instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);
instanceConfig.setMergeSecretsIntoConfigMap(mergeSecretsIntoConfigMap);
Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
if (functionDetailsJsonString.charAt(0) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
if (instanceConfig.isIgnoreUnknownConfigFields()) {
args.add("--ignore_unknown_config_fields");
}

args.add("--merge_secrets_into_config_map");
args.add(Boolean.toString(instanceConfig.isMergeSecretsIntoConfigMap()));
}

// state storage configs
Expand Down
Loading