Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pulsar-function-go/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ type Conf struct {
ProcessingGuarantees int32 `json:"processingGuarantees" yaml:"processingGuarantees"`
SecretsMap string `json:"secretsMap" yaml:"secretsMap"`
Runtime int32 `json:"runtime" yaml:"runtime"`
//Deprecated
// Authentication
ClientAuthenticationPlugin string `json:"clientAuthenticationPlugin" yaml:"clientAuthenticationPlugin"`
ClientAuthenticationParameters string `json:"clientAuthenticationParameters" yaml:"clientAuthenticationParameters"`
TLSTrustCertsFilePath string `json:"tlsTrustCertsFilePath" yaml:"tlsTrustCertsFilePath"`
TLSAllowInsecureConnection bool `json:"tlsAllowInsecureConnection" yaml:"tlsAllowInsecureConnection"`
TLSHostnameVerificationEnable bool `json:"tlsHostnameVerificationEnable" yaml:"tlsHostnameVerificationEnable"`
// Deprecated
AutoACK bool `json:"autoAck" yaml:"autoAck"`
Parallelism int32 `json:"parallelism" yaml:"parallelism"`
//source config
Expand Down
2 changes: 0 additions & 2 deletions pulsar-function-go/examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ go 1.13
require (
github.com/apache/pulsar-client-go v0.8.1
github.com/apache/pulsar/pulsar-function-go v0.0.0
github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect
github.com/yahoo/athenz v1.8.55 // indirect
)

replace github.com/apache/pulsar/pulsar-function-go => ../
Expand Down
121 changes: 39 additions & 82 deletions pulsar-function-go/examples/go.sum

Large diffs are not rendered by default.

37 changes: 34 additions & 3 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ package pf

import (
"context"
"fmt"
"math"
"strconv"
"strings"
"time"

"github.com/golang/protobuf/ptypes/empty"
Expand Down Expand Up @@ -192,11 +194,40 @@ CLOSE:
return nil
}

const (
authPluginToken = "org.apache.pulsar.client.impl.auth.AuthenticationToken"
authPluginNone = ""
)

func (gi *goInstance) setupClient() error {
client, err := pulsar.NewClient(pulsar.ClientOptions{
ic := gi.context.instanceConf

clientOpts := pulsar.ClientOptions{
URL: ic.pulsarServiceURL,
TLSTrustCertsFilePath: ic.tlsTrustCertsPath,
TLSAllowInsecureConnection: ic.tlsAllowInsecure,
TLSValidateHostname: ic.tlsHostnameVerification,
}

switch ic.authPlugin {
case authPluginToken:
switch {
case strings.HasPrefix(ic.authParams, "file://"):
clientOpts.Authentication = pulsar.NewAuthenticationTokenFromFile(ic.authParams[7:])
case strings.HasPrefix(ic.authParams, "token:"):
clientOpts.Authentication = pulsar.NewAuthenticationToken(ic.authParams[6:])
case ic.authParams == "":
return fmt.Errorf("auth plugin %s given, but authParams is empty", authPluginToken)
default:
return fmt.Errorf(`unknown token format - expecting "file://" or "token:" prefix`)
}
case authPluginNone:
clientOpts.Authentication, _ = pulsar.NewAuthentication("", "") // ret: auth.NewAuthDisabled()
default:
return fmt.Errorf("unknown auth provider: %s", ic.authPlugin)
}

URL: gi.context.instanceConf.pulsarServiceURL,
})
client, err := pulsar.NewClient(clientOpts)
if err != nil {
log.Errorf("create client error:%v", err)
gi.stats.incrTotalSysExceptions(err)
Expand Down
10 changes: 10 additions & 0 deletions pulsar-function-go/pf/instanceConf.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ type instanceConf struct {
killAfterIdle time.Duration
expectedHealthCheckInterval int32
metricsPort int
authPlugin string
authParams string
tlsTrustCertsPath string
tlsAllowInsecure bool
tlsHostnameVerification bool
}

func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
Expand Down Expand Up @@ -107,6 +112,11 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
},
UserConfig: cfg.UserConfig,
},
authPlugin: cfg.ClientAuthenticationPlugin,
authParams: cfg.ClientAuthenticationParameters,
tlsTrustCertsPath: cfg.TLSTrustCertsFilePath,
tlsAllowInsecure: cfg.TLSAllowInsecureConnection,
tlsHostnameVerification: cfg.TLSHostnameVerificationEnable,
}

if instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_EFFECTIVELY_ONCE {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public class GoInstanceConfig {
private int processingGuarantees;
private String secretsMap = "";
private String userConfig = "";

private String clientAuthenticationPlugin = "";
private String clientAuthenticationParameters = "";
private String tlsTrustCertsFilePath = "";
private boolean tlsHostnameVerificationEnable = false;
private boolean tlsAllowInsecureConnection = false;

private int runtime;
private boolean autoAck;
private int parallelism;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig, Strin
*/

public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
AuthenticationConfig authConfig,
String originalCodeFileName,
String pulsarServiceUrl,
boolean k8sRuntime) throws IOException {
Expand Down Expand Up @@ -187,6 +188,23 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
goInstanceConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
}

if (authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&& isNotBlank(authConfig.getClientAuthenticationParameters())) {
goInstanceConfig.setClientAuthenticationPlugin(authConfig.getClientAuthenticationPlugin());
goInstanceConfig.setClientAuthenticationParameters(authConfig.getClientAuthenticationParameters());
}
goInstanceConfig.setTlsAllowInsecureConnection(
authConfig.isTlsAllowInsecureConnection());
goInstanceConfig.setTlsHostnameVerificationEnable(
authConfig.isTlsHostnameVerificationEnable());
if (isNotBlank(authConfig.getTlsTrustCertsFilePath())){
goInstanceConfig.setTlsTrustCertsFilePath(
authConfig.getTlsTrustCertsFilePath());
}

}

if (instanceConfig.getMaxBufferedTuples() != 0) {
goInstanceConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
}
Expand Down Expand Up @@ -292,7 +310,8 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
final List<String> args = new LinkedList<>();

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
return getGoInstanceCmd(instanceConfig, originalCodeFileName, pulsarServiceUrl, k8sRuntime);
return getGoInstanceCmd(instanceConfig, authConfig,
originalCodeFileName, pulsarServiceUrl, k8sRuntime);
}

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
instanceConfig.setPort(1337);
instanceConfig.setMetricsPort(60000);

AuthenticationConfig authConfig = AuthenticationConfig.builder()
.clientAuthenticationPlugin("org.apache.pulsar.client.impl.auth.AuthenticationToken")
.clientAuthenticationParameters("file:///secret/token.jwt")
.tlsTrustCertsFilePath("/secret/ca.cert.pem")
.tlsHostnameVerificationEnable(true)
.tlsAllowInsecureConnection(false)
.build();

JSONObject userConfig = new JSONObject();
userConfig.put("word-of-the-day", "der Weltschmerz");
Expand Down Expand Up @@ -116,7 +123,7 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {

instanceConfig.setFunctionDetails(functionDetails);

List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650", k8sRuntime);
List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, authConfig,"config", "pulsar://localhost:6650", k8sRuntime);
if (k8sRuntime) {
goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class);
} else {
Expand Down Expand Up @@ -160,6 +167,11 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
Assert.assertEquals(goInstanceConfig.get("deadLetterTopic"), "go-func-deadletter");
Assert.assertEquals(goInstanceConfig.get("userConfig"), userConfig.toString());
Assert.assertEquals(goInstanceConfig.get("metricsPort"), 60000);
Assert.assertEquals(goInstanceConfig.get("clientAuthenticationPlugin"), "org.apache.pulsar.client.impl.auth.AuthenticationToken");
Assert.assertEquals(goInstanceConfig.get("clientAuthenticationParameters"), "file:///secret/token.jwt");
Assert.assertEquals(goInstanceConfig.get("tlsTrustCertsFilePath"), "/secret/ca.cert.pem");
Assert.assertEquals(goInstanceConfig.get("tlsHostnameVerificationEnable"), true);
Assert.assertEquals(goInstanceConfig.get("tlsAllowInsecureConnection"), false);
}

@DataProvider(name = "k8sRuntime")
Expand Down