diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java
new file mode 100644
index 000000000000..e8838f219dc8
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.read;
+
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A global registry to keep track of active {@link UnboundedSolaceReader} instances on the worker
+ * JVM using weak references.
+ *
+ *
This allows serialized {@link SolaceCheckpointMark} instances to resolve their originating
+ * reader and perform sequential acknowledgments.
+ */
+class ActiveReadersRegistry {
+ private static final Cache> registry =
+ CacheBuilder.newBuilder().weakValues().build();
+
+ public static void register(String uuid, UnboundedSolaceReader> reader) {
+ registry.put(uuid, reader);
+ }
+
+ public static void unregister(String uuid) {
+ registry.invalidate(uuid);
+ }
+
+ public static @Nullable UnboundedSolaceReader> get(String uuid) {
+ return registry.getIfPresent(uuid);
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
index 83aed07374b3..8e9647549d76 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
@@ -17,15 +17,12 @@
*/
package org.apache.beam.sdk.io.solace.read;
-import com.solacesystems.jcsmp.BytesXMLMessage;
import java.util.Objects;
-import java.util.Queue;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,33 +36,41 @@
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
- private transient Queue safeToAck;
+ private String readerUuid;
+ private long checkpointId;
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
- private SolaceCheckpointMark() {}
+ private SolaceCheckpointMark() {
+ this.readerUuid = "";
+ this.checkpointId = 0;
+ }
/**
* Creates a new {@link SolaceCheckpointMark}.
*
- * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
+ * @param readerUuid - the UUID of the originating reader.
+ * @param checkpointId - the unique ID of this checkpoint.
*/
- SolaceCheckpointMark(Queue safeToAck) {
- this.safeToAck = safeToAck;
+ SolaceCheckpointMark(String readerUuid, long checkpointId) {
+ this.readerUuid = readerUuid;
+ this.checkpointId = checkpointId;
}
@Override
public void finalizeCheckpoint() {
- BytesXMLMessage msg;
- while ((msg = safeToAck.poll()) != null) {
- try {
- msg.ackMessage();
- } catch (IllegalStateException e) {
- LOG.error(
- "SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.",
- msg.getApplicationMessageId(),
- msg.getAckMessageId(),
- e);
- }
+ if (readerUuid == null || readerUuid.isEmpty()) {
+ LOG.warn("SolaceIO.Read: Checkpoint has no reader UUID, cannot finalize.");
+ return;
+ }
+ UnboundedSolaceReader> reader = ActiveReadersRegistry.get(readerUuid);
+ if (reader != null) {
+ reader.finalizeCheckpoint(checkpointId);
+ } else {
+ LOG.warn(
+ "SolaceIO.Read: Reader with UUID {} not found in registry. "
+ + "Checkpoint {} cannot be finalized. Messages will be redelivered if session is closed.",
+ readerUuid,
+ checkpointId);
}
}
@@ -81,14 +86,11 @@ public boolean equals(@Nullable Object o) {
return false;
}
SolaceCheckpointMark that = (SolaceCheckpointMark) o;
- return safeToAck == that.safeToAck
- || (safeToAck != null
- && that.safeToAck != null
- && Iterables.elementsEqual(safeToAck, that.safeToAck));
+ return checkpointId == that.checkpointId && Objects.equals(readerUuid, that.readerUuid);
}
@Override
public int hashCode() {
- return Objects.hash(safeToAck);
+ return Objects.hash(readerUuid, checkpointId);
}
}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
index dc84e0a07017..b047669f399b 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
@@ -24,10 +24,13 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -42,6 +45,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -55,16 +59,19 @@ class UnboundedSolaceReader extends UnboundedReader {
private final UnboundedSolaceSource currentSource;
private final WatermarkPolicy watermarkPolicy;
private final SempClient sempClient;
- private final UUID readerUuid;
+ final String readerUuid;
+ private final Object lock = new Object();
private final SessionServiceFactory sessionServiceFactory;
private @Nullable BytesXMLMessage solaceOriginalRecord;
private @Nullable T solaceMappedRecord;
/**
- * Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION:
- * Accessed by both reader and checkpointing threads.
+ * Map to track pending checkpoints and their messages. Accessed by both reader
+ * (getCheckpointMark) and finalizer (finalizeCheckpoint) threads.
*/
- private final Queue safeToAckMessages = new ConcurrentLinkedQueue<>();
+ private final TreeMap> pendingCheckpoints = new TreeMap<>();
+
+ private long nextCheckpointId = 1;
/**
* Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a
@@ -72,7 +79,7 @@ class UnboundedSolaceReader extends UnboundedReader {
*/
private final Queue receivedMessages = new ArrayDeque<>();
- private static final Cache sessionServiceCache;
+ private static final Cache sessionServiceCache;
private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1);
static {
@@ -81,7 +88,7 @@ class UnboundedSolaceReader extends UnboundedReader {
CacheBuilder.newBuilder()
.expireAfterAccess(cacheExpirationTimeout)
.removalListener(
- (RemovalNotification notification) -> {
+ (RemovalNotification notification) -> {
LOG.info(
"SolaceIO.Read: Closing session for the reader with uuid {} as it has been idle for over {}.",
notification.getKey(),
@@ -108,7 +115,7 @@ public UnboundedSolaceReader(UnboundedSolaceSource currentSource) {
currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold());
this.sessionServiceFactory = currentSource.getSessionServiceFactory();
this.sempClient = currentSource.getSempClientFactory().create();
- this.readerUuid = UUID.randomUUID();
+ this.readerUuid = UUID.randomUUID().toString();
}
private SessionService getSessionService() {
@@ -136,8 +143,6 @@ public boolean start() {
@Override
public boolean advance() {
- finalizeReadyMessages();
-
BytesXMLMessage receivedXmlMessage;
try {
receivedXmlMessage = getSessionService().getReceiver().receive();
@@ -158,23 +163,30 @@ public boolean advance() {
@Override
public void close() {
- finalizeReadyMessages();
sessionServiceCache.invalidate(readerUuid);
+ ActiveReadersRegistry.unregister(readerUuid);
}
- public void finalizeReadyMessages() {
- BytesXMLMessage msg;
- while ((msg = safeToAckMessages.poll()) != null) {
+ void finalizeCheckpoint(long checkpointId) {
+ List messagesToAck = new ArrayList<>();
+
+ synchronized (lock) {
+ SortedMap> toAck = pendingCheckpoints.headMap(checkpointId, true);
+ for (List msgs : toAck.values()) {
+ messagesToAck.addAll(msgs);
+ }
+ toAck.clear();
+ }
+
+ for (BytesXMLMessage msg : messagesToAck) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
- LOG.error(
- "SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.",
+ LOG.warn(
+ "SolaceIO.Read: Failed to acknowledge message with applicationMessageId={}, ackMessageId={}. Session might be closed.",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
- safeToAckMessages.add(msg); // In case the error was transient, might succeed later
- break; // Commit is only best effort
}
}
}
@@ -190,9 +202,13 @@ public Instant getWatermark() {
@Override
public UnboundedSource.CheckpointMark getCheckpointMark() {
- safeToAckMessages.addAll(receivedMessages);
+ long checkpointId = nextCheckpointId++;
+ ImmutableList messages = ImmutableList.copyOf(receivedMessages);
receivedMessages.clear();
- return new SolaceCheckpointMark(safeToAckMessages);
+ synchronized (lock) {
+ pendingCheckpoints.put(checkpointId, messages);
+ }
+ return new SolaceCheckpointMark(readerUuid, checkpointId);
}
@Override
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java
index 1cb17a49fbdb..b8b2d59c7914 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java
@@ -100,7 +100,9 @@ public UnboundedReader createReader(
PipelineOptions options, @Nullable SolaceCheckpointMark checkpointMark) {
// it makes no sense to resume a Solace Session with the previous checkpoint
// so don't need the pass a checkpoint to new a Solace Reader
- return new UnboundedSolaceReader<>(this);
+ UnboundedSolaceReader reader = new UnboundedSolaceReader<>(this);
+ ActiveReadersRegistry.register(reader.readerUuid, reader);
+ return reader;
}
@Override
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
index a1f80932eddf..a1ac07b30bed 100644
--- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
@@ -458,18 +458,16 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
// mark all consumed messages as ready to be acknowledged
CheckpointMark checkpointMark = reader.getCheckpointMark();
- // consume 1 more message. This will call #ackMsg() on messages that were ready to be acked.
+ // consume 1 more message.
reader.advance();
- assertEquals(4, countAckMessages.get());
+ assertEquals(0, countAckMessages.get());
// consume 1 more message. No change in the acknowledged messages.
reader.advance();
- assertEquals(4, countAckMessages.get());
+ assertEquals(0, countAckMessages.get());
// acknowledge from the first checkpoint
checkpointMark.finalizeCheckpoint();
- // No change in the acknowledged messages, because they were acknowledged in the #advance()
- // method.
assertEquals(4, countAckMessages.get());
}
@@ -607,4 +605,64 @@ public void testTopicEncoding() {
PAssert.that(destAreTopics).containsInAnyOrder(expected);
pipeline.run();
}
+
+ @Test
+ public void testLostCheckpointCatchUp() throws Exception {
+ AtomicInteger countConsumedMessages = new AtomicInteger(0);
+ AtomicInteger countAckMessages = new AtomicInteger(0);
+
+ // Broker that creates input data
+ SerializableFunction recordFn =
+ index -> {
+ List messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ messages.add(
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
+ }
+ countConsumedMessages.incrementAndGet();
+ return getOrNull(index, messages);
+ };
+
+ SessionServiceFactory fakeSessionServiceFactory =
+ MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
+
+ Read spec =
+ getDefaultRead()
+ .withSessionServiceFactory(fakeSessionServiceFactory)
+ .withMaxNumConnections(4);
+
+ UnboundedSolaceSource initialSource = getSource(spec, pipeline);
+
+ UnboundedReader reader =
+ initialSource.createReader(PipelineOptionsFactory.create(), null);
+
+ // start the reader and move to the first record
+ assertTrue(reader.start());
+
+ // consume 3 messages (NB: start already consumed the first message)
+ for (int i = 0; i < 3; i++) {
+ assertTrue(reader.advance());
+ }
+ assertEquals(0, countAckMessages.get());
+
+ // Create Checkpoint T1 (contains 4 messages)
+ reader.getCheckpointMark();
+
+ // consume 3 more messages
+ for (int i = 0; i < 3; i++) {
+ assertTrue(reader.advance());
+ }
+ assertEquals(0, countAckMessages.get());
+
+ // Create Checkpoint T2 (contains 3 messages)
+ CheckpointMark checkpointMark2 = reader.getCheckpointMark();
+
+ // We "lose" checkpointMark1 (do NOT finalize it)
+ // We finalize checkpointMark2
+ checkpointMark2.finalizeCheckpoint();
+
+ // checkpointMark2 should have caught up and acked both T1 and T2 (4 + 3 = 7 messages)
+ assertEquals(7, countAckMessages.get());
+ }
}