Skip to content

Generic Secret Injection for Connector Configuration #20862

@michaeljmarshall

Description

@michaeljmarshall

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Provide a generic way to inject secrets into the config map for connectors without requiring a rewrite of each source/sink and without leaking them on the command line used to start the connector.

Context

The recent CVE-2023-37579 resulted in the potential to leak source/sink credentials because some credentials are stored in connector configuration instead of in the connector's secrets map.

The current way to configure secrets for connectors requires each source or sink to implement correct secret handling by either getting a secret from the SecretsProvider or by using custom annotations and this special configuration loader. This implementation assumes that users have a Configuration class that can be annotated, but that is not always the case because the connector framework passes a configuration map of Map<String, Object>. Note that the current mechanism is not well documented and is not used by all of the official Apache Pulsar connectors.

Solution

I propose that we materialize and merge all secrets from the secrets map into the config that is passed to the connector when we call Source#open or Sink#open. Materializing the secrets would look like:

    private void mergeSecretsIntoConfigs(Map<String, Object> secrets, Map<String, Object> configs) {
        for (Map.Entry<String, Object> entry : secrets.entrySet()) {
            Object oldValue = configs.put(entry.getKey(),
                    secretsProvider.provideSecret(entry.getKey(), entry.getValue()));
            if (oldValue != null) {
                log.warn("Config value for {} replaced by secret's configuration value.", entry.getKey());
            }
        }
    }

They key benefit to this solution is that it will work for all sinks and sources, and it will leverage the SecretsProvider interface to materialize the secrets.

This will benefit all deployment methods, but is most helpful for the kubernetes runtime.

Trade off

The one drawback to this solution is that it could theoretically break existing connector configuration. However, I think this is very unlikely because it only breaks when a configuration and a secret are passed with the same key.

I was able to resolve this trade off in two ways. First, I put this feature behind a feature flag. Second, I replaced put with putIfAbsent in the merge logic so that the existing configuration has precedence.

Alternatives

We could consider interpreting configuration values that start with a well known prefix, like env:, as values that need to be read from the environment. The primary drawback to this solution is that there is not an easy way to configure the function at this point in the code, which means that

This solution would look something like adding this code block

    // Replace environment variable pointers with their environment variable values
    for (Map.Entry<String, Object> entry : config.entrySet()) {
        if (entry.getValue() instanceof String && ((String) entry.getValue()).toLowerCase().startsWith("env:")) {
            String envVariableName = ((String) entry.getValue()).substring("env:".length());
            String envVariableValue = System.getenv(envVariableName);
            entry.setValue(envVariableValue);
        }
    }

to this method

static Map<String, Object> parseComponentConfig(String connectorConfigs,
InstanceConfig instanceConfig,
ClassLoader componentClassLoader,
org.apache.pulsar.functions.proto.Function
.FunctionDetails.ComponentType componentType)
throws IOException {
final Map<String, Object> config = ObjectMapperFactory
.getMapper()
.reader()
.forType(new TypeReference<Map<String, Object>>() {})
.readValue(connectorConfigs);
if (instanceConfig.isIgnoreUnknownConfigFields() && componentClassLoader instanceof NarClassLoader) {
final String configClassName;
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass();
} else if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass();
} else {
return config;
}
if (configClassName != null) {
Class<?> configClass;
try {
configClass = Class.forName(configClassName,
true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Config class not found: " + configClassName, e);
}
final List<String> allFields = BeanPropertiesReader.getBeanProperties(configClass);
for (String s : config.keySet()) {
if (!allFields.contains(s)) {
log.error("Field '{}' not defined in the {} configuration {}, the field will be ignored",
s,
componentType,
configClass);
config.remove(s);
}
}
}
}
return config;
}

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Labels

area/connectorarea/securitytype/enhancementThe enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions