Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pulsar-io/aerospike/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connectors shouldn't bundle Pulsar IO jars.
This must be scope 'provided'

Are we breaking compatibility with existing connectors?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this block from existing connectors. Should I fix it for all of them?

</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class AerospikeAbstractSink<K, V> implements Sink<byte[]> {

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
aerospikeSinkConfig = AerospikeSinkConfig.load(config);
aerospikeSinkConfig = AerospikeSinkConfig.load(config, sinkContext);
if (aerospikeSinkConfig.getSeedHosts() == null
|| aerospikeSinkConfig.getKeyspace() == null
|| aerospikeSinkConfig.getColumnName() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.aerospike;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
Expand All @@ -26,6 +27,9 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;

@Data
@Accessors(chain = true)
Expand All @@ -38,21 +42,50 @@ public class AerospikeSinkConfig implements Serializable {
private String columnName;

// Optional
@FieldDoc(
required = false,
defaultValue = "",
sensitive = true,
help = "The username for authentication."
)
private String userName;
@FieldDoc(
required = false,
defaultValue = "",
sensitive = true,
help = "The password for authentication."
)
private String password;
private String keySet;
private int maxConcurrentRequests = 100;
private int timeoutMs = 100;
private int retries = 1;


/**
* @deprecated Use {@link #load(String, SinkContext)} instead.
*/
@Deprecated
public static AerospikeSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class);
}

public static AerospikeSinkConfig load(String yamlFile, SinkContext context) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return load(mapper.readValue(new File(yamlFile), new TypeReference<Map<String, Object>>() {}), context);
}

/**
* @deprecated Use {@link #load(Map, SinkContext)} instead.
*/
@Deprecated
public static AerospikeSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), AerospikeSinkConfig.class);
}

public static AerospikeSinkConfig load(Map<String, Object> map, SinkContext context) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should just keep the method, all other load methods should be removed, and the relevant unit tests should be fixed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the yamlFile load config is useless. This is not the responsibility of each connector but the responsibility of the connector framework.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should just keep the method, all other load methods should be removed, and the relevant unit tests should be fixed.

I am conflicted about removing those public methods. I agree this class is cluttered, but it introduces an unnecessary breaking change to outright remove them.

From the yamlFile load config is useless. This is not the responsibility of each connector but the responsibility of the connector framework.

What exactly is the connector framework's responsibility?

@shibd shibd Aug 16, 2023

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am conflicted about removing those public methods. I agree this class is cluttered, but it introduces an unnecessary breaking change to outright remove them.

For each connector, no user to use its class, right? We just need to make sure that the configuration is compatible.

I don't think removing these public methods would be a breaking change.

What exactly is the connector framework's responsibility?

The io framework is responsible for loading the configuration from the YAML and converting it to the Map. So for the connector, we only need cover to deserialize the correct configuration from the Map.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each connector, no user to use its class, right? We just need to make sure that the configuration is compatible.

I think you're right here. Part of the issue is likely the ambiguous boundaries for users. It's probably possible to use this class as a dependency in another project, but I doubt there is really an expectation that these classes are meant to be used elsewhere. The primary risk would be for someone slightly modifying a connector.

The io framework is responsible for loading the configuration from the YAML and converting it to the Map. So for the connector, we only need cover to deserialize the correct configuration from the Map.

As long as we assume these classes are for pulsar's internal use, I agree that this is the framework's responsibility.

The last question is really whether we think this change is valuable. @eolivelli is asking in another comment on this PR whether this is the right design.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right here. Part of the issue is likely the ambiguous boundaries for users. It's probably possible to use this class as a dependency in another project, but I doubt there is really an expectation that these classes are meant to be used elsewhere. The primary risk would be for someone slightly modifying a connector.

Maybe we can discuss clearly where the boundaries we offer to users are. I don't think users should use this class as a dependency in another project. We shouldn't have uploaded the connector project to the Maven center, right?

The last question is really whether we think this change is valuable. @eolivelli is asking in another comment on this PR whether this is the right design.

Maybe the IO framework needs to deserialize the configuration object for the connector.

public interface Source<C, T> extends AutoCloseable {

    /**
     * Open connector with configuration.
     *
     * @param config initialization config
     * @param sourceContext environment where the source connector is running
     * @throws Exception IO type exceptions when opening a connector
     */
    void open(C config, SourceContext sourceContext) throws Exception;

    /**
     * Reads the next message from source.
     * If source does not have any new messages, this call should block.
     * @return next message from source.  The return result should never be null
     * @throws Exception
     */
    Record<T> read() throws Exception;
}

return IOConfigUtils.loadWithSecrets(map, AerospikeSinkConfig.class, context);
}
}
5 changes: 5 additions & 0 deletions pulsar-io/canal/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class CanalAbstractSource<V> extends PushSource<V> {

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
canalSourceConfig = CanalSourceConfig.load(config);
canalSourceConfig = CanalSourceConfig.load(config, sourceContext);
if (canalSourceConfig.getCluster()) {
connector = CanalConnectors.newClusterConnector(canalSourceConfig.getZkServers(),
canalSourceConfig.getDestination(), canalSourceConfig.getUsername(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.canal;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
Expand All @@ -26,6 +27,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;


Expand Down Expand Up @@ -80,14 +83,31 @@ public class CanalSourceConfig implements Serializable{
help = "The batch size to fetch from canal.")
private int batchSize = 1000;


/**
* @deprecated Use {@link #load(String, SourceContext)} instead.
*/
@Deprecated
public static CanalSourceConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), CanalSourceConfig.class);
}

public static CanalSourceConfig load(String yamlFile, SourceContext context) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return load(mapper.readValue(new File(yamlFile), new TypeReference<Map<String, Object>>() {}), context);
}

/**
* @deprecated Use {@link #load(Map, SourceContext)} instead.
*/
@Deprecated
public static CanalSourceConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), CanalSourceConfig.class);
}

public static CanalSourceConfig load(Map<String, Object> map, SourceContext context) {
return IOConfigUtils.loadWithSecrets(map, CanalSourceConfig.class, context);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding a method in Context?
Asking developers to use a Util class is a little API design smell to me

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is not the ideal solution. I wrote this PR to complete the paradigm started in the project for some but not all connectors. There are already several examples using this "flow", which is to convert the Map<String, Object> into my CustomSinkConfig class. However, I do not think we should make this mechanism easier to use. I would prefer to guide developers to the new generic solution provided in PIP 289 of using env vars instead of adding new methods in the Context.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ private static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz,
}
}
configs.computeIfAbsent(field.getName(), key -> {
if (fieldDoc.required()) {
throw new IllegalArgumentException(field.getName() + " cannot be null");
}
String value = fieldDoc.defaultValue();
if (value != null && !value.isEmpty()) {
return value;
}
if (fieldDoc.required()) {
throw new IllegalArgumentException(field.getName() + " cannot be null");
}
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ static class TestDefaultConfig {
)
protected String testRequired;

@FieldDoc(
required = true,
defaultValue = "defaultStr",
sensitive = true,
help = "requiredWithDefault"
)
protected String requiredWithDefault;

@FieldDoc(
required = false,
defaultValue = "defaultStr",
Expand Down Expand Up @@ -300,6 +308,8 @@ public void testDefaultValue() {
TestDefaultConfig testDefaultConfig =
IOConfigUtils.loadWithSecrets(configMap, TestDefaultConfig.class, new TestSinkContext());
Assert.assertEquals(testDefaultConfig.getDefaultStr(), "defaultStr");
Assert.assertEquals(testDefaultConfig.getTestRequired(), "test");
Assert.assertEquals(testDefaultConfig.getRequiredWithDefault(), "defaultStr");
Assert.assertEquals(testDefaultConfig.isDefaultBool(), true);
Assert.assertEquals(testDefaultConfig.getDefaultInt(), 100);
Assert.assertEquals(testDefaultConfig.getDefaultLong(), 100);
Expand Down
5 changes: 5 additions & 0 deletions pulsar-io/influxdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-instance</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class InfluxDBGenericRecordSink implements Sink<GenericRecord> {
@Override
public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
try {
val configV2 = InfluxDBSinkConfig.load(map);
val configV2 = InfluxDBSinkConfig.load(map, sinkContext);
configV2.validate();
sink = new InfluxDBSink();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.influxdb.v1;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Preconditions;
Expand All @@ -27,6 +28,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;

/**
Expand Down Expand Up @@ -94,7 +97,7 @@ public class InfluxDBSinkConfig implements Serializable {

@FieldDoc(
required = false,
defaultValue = "1000L",
defaultValue = "1000",
help = "The InfluxDB operation time in milliseconds")
private long batchTimeMs = 1000L;

Expand All @@ -105,16 +108,34 @@ public class InfluxDBSinkConfig implements Serializable {
)
private int batchSize = 200;


/**
* @deprecated Use {@link #load(String, SinkContext)} instead.
*/
@Deprecated
public static InfluxDBSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class);
}

public static InfluxDBSinkConfig load(String yamlFile, SinkContext context) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return load(mapper.readValue(new File(yamlFile), new TypeReference<Map<String, Object>>() {}), context);
}

/**
* @deprecated Use {@link #load(Map, SinkContext)} instead.
*/
@Deprecated
public static InfluxDBSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), InfluxDBSinkConfig.class);
}

public static InfluxDBSinkConfig load(Map<String, Object> map, SinkContext context) {
return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, context);
}

public void validate() {
Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not set.");
Preconditions.checkNotNull(database, "database property not set.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class InfluxDBSink extends BatchSink<Point, GenericRecord> {

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config);
InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config, sinkContext);
influxDBSinkConfig.validate();
super.init(influxDBSinkConfig.getBatchTimeMs(), influxDBSinkConfig.getBatchSize());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.influxdb.v2;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Preconditions;
Expand All @@ -27,6 +28,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;

/**
Expand Down Expand Up @@ -87,7 +90,7 @@ public class InfluxDBSinkConfig implements Serializable {

@FieldDoc(
required = false,
defaultValue = "1000L",
defaultValue = "1000",
help = "The InfluxDB operation time in milliseconds")
private long batchTimeMs = 1000;

Expand All @@ -98,16 +101,33 @@ public class InfluxDBSinkConfig implements Serializable {
)
private int batchSize = 200;

/**
* @deprecated Use {@link #load(String, SinkContext)} instead.
*/
@Deprecated
public static InfluxDBSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class);
}

public static InfluxDBSinkConfig load(String yamlFile, SinkContext context) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return load(mapper.readValue(new File(yamlFile), new TypeReference<Map<String, Object>>() {}), context);
}

/**
* @deprecated Use {@link #load(Map, SinkContext)} instead.
*/
@Deprecated
public static InfluxDBSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), InfluxDBSinkConfig.class);
}

public static InfluxDBSinkConfig load(Map<String, Object> map, SinkContext context) {
return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, context);
}

public void validate() {
Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not set.");
Preconditions.checkNotNull(token, "token property not set.");
Expand Down
Loading