From ad8ee9e77c879bc063bb4f411050c14fd5f6c244 Mon Sep 17 00:00:00 2001 From: Israel Herraiz Date: Fri, 22 May 2026 15:16:26 +0000 Subject: [PATCH 1/3] SolaceIO - fix for data loss during scaling/rebalancing (#36991) Introduced a sequential pending checkpoints tracking mechanism using a TreeMap in the reader. Created a JVM-global ActiveReadersRegistry using weak references to resolve serialized checkpoint marks back to their originating active reader. This enables reliable sequential acknowledgments of checkpoints, ensuring we only ack committed data, while allowing subsequent finalizations to catch up and prevent message leaks (stuckness) if intermediate finalizations are lost. Also synchronized received messages access for thread safety with minimal lock duration (network I/O done outside locks). Fixed initialization order by registering the reader post-construction. --- .../io/solace/read/ActiveReadersRegistry.java | 48 +++++++++++++ .../io/solace/read/SolaceCheckpointMark.java | 52 +++++++------- .../io/solace/read/UnboundedSolaceReader.java | 59 ++++++++++------ .../io/solace/read/UnboundedSolaceSource.java | 4 +- .../beam/sdk/io/solace/SolaceIOReadTest.java | 68 +++++++++++++++++-- 5 files changed, 179 insertions(+), 52 deletions(-) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java new file mode 100644 index 000000000000..450e5c361cac --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.read; + +import java.lang.ref.WeakReference; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A global registry to keep track of active {@link UnboundedSolaceReader} instances on the worker + * JVM using weak references. + * + *

This allows serialized {@link SolaceCheckpointMark} instances to resolve their originating + * reader and perform sequential acknowledgments. + */ +class ActiveReadersRegistry { + private static final ConcurrentHashMap>> registry = + new ConcurrentHashMap<>(); + + public static void register(UUID uuid, UnboundedSolaceReader reader) { + registry.put(uuid, new WeakReference<>(reader)); + } + + public static void unregister(UUID uuid) { + registry.remove(uuid); + } + + public static @Nullable UnboundedSolaceReader get(UUID uuid) { + WeakReference> ref = registry.get(uuid); + return ref != null ? ref.get() : null; + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index 83aed07374b3..be12d8e61bdb 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -17,15 +17,13 @@ */ package org.apache.beam.sdk.io.solace.read; -import com.solacesystems.jcsmp.BytesXMLMessage; import java.util.Objects; -import java.util.Queue; +import java.util.UUID; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,33 +37,42 @@ @VisibleForTesting public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class); - private transient Queue safeToAck; + private String readerUuid; + private long checkpointId; @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction - private SolaceCheckpointMark() {} + private SolaceCheckpointMark() { + this.readerUuid = ""; + this.checkpointId = 0; + } /** * Creates a new {@link SolaceCheckpointMark}. * - * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged. + * @param readerUuid - the UUID of the originating reader. + * @param checkpointId - the unique ID of this checkpoint. */ - SolaceCheckpointMark(Queue safeToAck) { - this.safeToAck = safeToAck; + SolaceCheckpointMark(String readerUuid, long checkpointId) { + this.readerUuid = readerUuid; + this.checkpointId = checkpointId; } @Override public void finalizeCheckpoint() { - BytesXMLMessage msg; - while ((msg = safeToAck.poll()) != null) { - try { - msg.ackMessage(); - } catch (IllegalStateException e) { - LOG.error( - "SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.", - msg.getApplicationMessageId(), - msg.getAckMessageId(), - e); - } + if (readerUuid == null || readerUuid.isEmpty()) { + LOG.warn("SolaceIO.Read: Checkpoint has no reader UUID, cannot finalize."); + return; + } + UUID uuid = UUID.fromString(readerUuid); + UnboundedSolaceReader reader = ActiveReadersRegistry.get(uuid); + if (reader != null) { + reader.finalizeCheckpoint(checkpointId); + } else { + LOG.warn( + "SolaceIO.Read: Reader with UUID {} not found in registry. " + + "Checkpoint {} cannot be finalized. Messages will be redelivered if session is closed.", + readerUuid, + checkpointId); } } @@ -81,14 +88,11 @@ public boolean equals(@Nullable Object o) { return false; } SolaceCheckpointMark that = (SolaceCheckpointMark) o; - return safeToAck == that.safeToAck - || (safeToAck != null - && that.safeToAck != null - && Iterables.elementsEqual(safeToAck, that.safeToAck)); + return checkpointId == that.checkpointId && Objects.equals(readerUuid, that.readerUuid); } @Override public int hashCode() { - return Objects.hash(safeToAck); + return Objects.hash(readerUuid, checkpointId); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index dc84e0a07017..b70d49aba946 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -24,10 +24,13 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; import java.util.NoSuchElementException; import java.util.Queue; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -42,6 +45,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -55,16 +59,18 @@ class UnboundedSolaceReader extends UnboundedReader { private final UnboundedSolaceSource currentSource; private final WatermarkPolicy watermarkPolicy; private final SempClient sempClient; - private final UUID readerUuid; + final UUID readerUuid; private final SessionServiceFactory sessionServiceFactory; private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; /** - * Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION: - * Accessed by both reader and checkpointing threads. + * Map to track pending checkpoints and their messages. Accessed by both reader + * (getCheckpointMark) and finalizer (finalizeCheckpoint) threads. */ - private final Queue safeToAckMessages = new ConcurrentLinkedQueue<>(); + private final TreeMap> pendingCheckpoints = new TreeMap<>(); + + private long nextCheckpointId = 1; /** * Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a @@ -136,8 +142,6 @@ public boolean start() { @Override public boolean advance() { - finalizeReadyMessages(); - BytesXMLMessage receivedXmlMessage; try { receivedXmlMessage = getSessionService().getReceiver().receive(); @@ -151,30 +155,35 @@ public boolean advance() { } solaceOriginalRecord = receivedXmlMessage; solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); - receivedMessages.add(receivedXmlMessage); + synchronized (this) { + receivedMessages.add(receivedXmlMessage); + } return true; } @Override public void close() { - finalizeReadyMessages(); sessionServiceCache.invalidate(readerUuid); + ActiveReadersRegistry.unregister(readerUuid); } - public void finalizeReadyMessages() { - BytesXMLMessage msg; - while ((msg = safeToAckMessages.poll()) != null) { + public void finalizeCheckpoint(long checkpointId) { + List messagesToAck = new ArrayList<>(); + + synchronized (this) { + SortedMap> toAck = pendingCheckpoints.headMap(checkpointId, true); + for (List msgs : toAck.values()) { + messagesToAck.addAll(msgs); + } + toAck.clear(); + } + + for (BytesXMLMessage msg : messagesToAck) { try { msg.ackMessage(); } catch (IllegalStateException e) { - LOG.error( - "SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.", - msg.getApplicationMessageId(), - msg.getAckMessageId(), - e); - safeToAckMessages.add(msg); // In case the error was transient, might succeed later - break; // Commit is only best effort + LOG.warn("SolaceIO.Read: Failed to ack message, session might be closed.", e); } } } @@ -190,9 +199,15 @@ public Instant getWatermark() { @Override public UnboundedSource.CheckpointMark getCheckpointMark() { - safeToAckMessages.addAll(receivedMessages); - receivedMessages.clear(); - return new SolaceCheckpointMark(safeToAckMessages); + long checkpointId; + ImmutableList messages; + synchronized (this) { + checkpointId = nextCheckpointId++; + messages = ImmutableList.copyOf(receivedMessages); + receivedMessages.clear(); + pendingCheckpoints.put(checkpointId, messages); + } + return new SolaceCheckpointMark(readerUuid.toString(), checkpointId); } @Override diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java index 1cb17a49fbdb..b8b2d59c7914 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java @@ -100,7 +100,9 @@ public UnboundedReader createReader( PipelineOptions options, @Nullable SolaceCheckpointMark checkpointMark) { // it makes no sense to resume a Solace Session with the previous checkpoint // so don't need the pass a checkpoint to new a Solace Reader - return new UnboundedSolaceReader<>(this); + UnboundedSolaceReader reader = new UnboundedSolaceReader<>(this); + ActiveReadersRegistry.register(reader.readerUuid, reader); + return reader; } @Override diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java index a1f80932eddf..a1ac07b30bed 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java @@ -458,18 +458,16 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { // mark all consumed messages as ready to be acknowledged CheckpointMark checkpointMark = reader.getCheckpointMark(); - // consume 1 more message. This will call #ackMsg() on messages that were ready to be acked. + // consume 1 more message. reader.advance(); - assertEquals(4, countAckMessages.get()); + assertEquals(0, countAckMessages.get()); // consume 1 more message. No change in the acknowledged messages. reader.advance(); - assertEquals(4, countAckMessages.get()); + assertEquals(0, countAckMessages.get()); // acknowledge from the first checkpoint checkpointMark.finalizeCheckpoint(); - // No change in the acknowledged messages, because they were acknowledged in the #advance() - // method. assertEquals(4, countAckMessages.get()); } @@ -607,4 +605,64 @@ public void testTopicEncoding() { PAssert.that(destAreTopics).containsInAnyOrder(expected); pipeline.run(); } + + @Test + public void testLostCheckpointCatchUp() throws Exception { + AtomicInteger countConsumedMessages = new AtomicInteger(0); + AtomicInteger countAckMessages = new AtomicInteger(0); + + // Broker that creates input data + SerializableFunction recordFn = + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }; + + SessionServiceFactory fakeSessionServiceFactory = + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); + + Read spec = + getDefaultRead() + .withSessionServiceFactory(fakeSessionServiceFactory) + .withMaxNumConnections(4); + + UnboundedSolaceSource initialSource = getSource(spec, pipeline); + + UnboundedReader reader = + initialSource.createReader(PipelineOptionsFactory.create(), null); + + // start the reader and move to the first record + assertTrue(reader.start()); + + // consume 3 messages (NB: start already consumed the first message) + for (int i = 0; i < 3; i++) { + assertTrue(reader.advance()); + } + assertEquals(0, countAckMessages.get()); + + // Create Checkpoint T1 (contains 4 messages) + reader.getCheckpointMark(); + + // consume 3 more messages + for (int i = 0; i < 3; i++) { + assertTrue(reader.advance()); + } + assertEquals(0, countAckMessages.get()); + + // Create Checkpoint T2 (contains 3 messages) + CheckpointMark checkpointMark2 = reader.getCheckpointMark(); + + // We "lose" checkpointMark1 (do NOT finalize it) + // We finalize checkpointMark2 + checkpointMark2.finalizeCheckpoint(); + + // checkpointMark2 should have caught up and acked both T1 and T2 (4 + 3 = 7 messages) + assertEquals(7, countAckMessages.get()); + } } From 6e5b5cadaa416c79ded621e6274b57adb3e0d872 Mon Sep 17 00:00:00 2001 From: Israel Herraiz Date: Fri, 22 May 2026 17:11:56 +0000 Subject: [PATCH 2/3] refactor(io/solace): address review comments on PR 38603 - Use Guava Cache with weak values in ActiveReadersRegistry to prevent memory leaks. - Standardize on String for readerUuid in SolaceCheckpointMark and UnboundedSolaceReader to avoid Avro serialization issues with UUID on JDK 17+, while still eliminating UUID.fromString() overhead. - Use private lock object in UnboundedSolaceReader instead of synchronizing on 'this'. - Reduce visibility of UnboundedSolaceReader.finalizeCheckpoint to package-private. - Restore applicationMessageId and ackMessageId in failed ack logs. --- .../io/solace/read/ActiveReadersRegistry.java | 22 ++++++++-------- .../io/solace/read/SolaceCheckpointMark.java | 4 +-- .../io/solace/read/UnboundedSolaceReader.java | 25 +++++++++++-------- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java index 450e5c361cac..e8838f219dc8 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java @@ -17,9 +17,8 @@ */ package org.apache.beam.sdk.io.solace.read; -import java.lang.ref.WeakReference; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -30,19 +29,18 @@ * reader and perform sequential acknowledgments. */ class ActiveReadersRegistry { - private static final ConcurrentHashMap>> registry = - new ConcurrentHashMap<>(); + private static final Cache> registry = + CacheBuilder.newBuilder().weakValues().build(); - public static void register(UUID uuid, UnboundedSolaceReader reader) { - registry.put(uuid, new WeakReference<>(reader)); + public static void register(String uuid, UnboundedSolaceReader reader) { + registry.put(uuid, reader); } - public static void unregister(UUID uuid) { - registry.remove(uuid); + public static void unregister(String uuid) { + registry.invalidate(uuid); } - public static @Nullable UnboundedSolaceReader get(UUID uuid) { - WeakReference> ref = registry.get(uuid); - return ref != null ? ref.get() : null; + public static @Nullable UnboundedSolaceReader get(String uuid) { + return registry.getIfPresent(uuid); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index be12d8e61bdb..8e9647549d76 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.solace.read; import java.util.Objects; -import java.util.UUID; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; @@ -63,8 +62,7 @@ public void finalizeCheckpoint() { LOG.warn("SolaceIO.Read: Checkpoint has no reader UUID, cannot finalize."); return; } - UUID uuid = UUID.fromString(readerUuid); - UnboundedSolaceReader reader = ActiveReadersRegistry.get(uuid); + UnboundedSolaceReader reader = ActiveReadersRegistry.get(readerUuid); if (reader != null) { reader.finalizeCheckpoint(checkpointId); } else { diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index b70d49aba946..d6bae0aa907d 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -59,7 +59,8 @@ class UnboundedSolaceReader extends UnboundedReader { private final UnboundedSolaceSource currentSource; private final WatermarkPolicy watermarkPolicy; private final SempClient sempClient; - final UUID readerUuid; + final String readerUuid; + private final Object lock = new Object(); private final SessionServiceFactory sessionServiceFactory; private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; @@ -78,7 +79,7 @@ class UnboundedSolaceReader extends UnboundedReader { */ private final Queue receivedMessages = new ArrayDeque<>(); - private static final Cache sessionServiceCache; + private static final Cache sessionServiceCache; private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1); static { @@ -87,7 +88,7 @@ class UnboundedSolaceReader extends UnboundedReader { CacheBuilder.newBuilder() .expireAfterAccess(cacheExpirationTimeout) .removalListener( - (RemovalNotification notification) -> { + (RemovalNotification notification) -> { LOG.info( "SolaceIO.Read: Closing session for the reader with uuid {} as it has been idle for over {}.", notification.getKey(), @@ -114,7 +115,7 @@ public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold()); this.sessionServiceFactory = currentSource.getSessionServiceFactory(); this.sempClient = currentSource.getSempClientFactory().create(); - this.readerUuid = UUID.randomUUID(); + this.readerUuid = UUID.randomUUID().toString(); } private SessionService getSessionService() { @@ -155,7 +156,7 @@ public boolean advance() { } solaceOriginalRecord = receivedXmlMessage; solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); - synchronized (this) { + synchronized (lock) { receivedMessages.add(receivedXmlMessage); } @@ -168,10 +169,10 @@ public void close() { ActiveReadersRegistry.unregister(readerUuid); } - public void finalizeCheckpoint(long checkpointId) { + void finalizeCheckpoint(long checkpointId) { List messagesToAck = new ArrayList<>(); - synchronized (this) { + synchronized (lock) { SortedMap> toAck = pendingCheckpoints.headMap(checkpointId, true); for (List msgs : toAck.values()) { messagesToAck.addAll(msgs); @@ -183,7 +184,11 @@ public void finalizeCheckpoint(long checkpointId) { try { msg.ackMessage(); } catch (IllegalStateException e) { - LOG.warn("SolaceIO.Read: Failed to ack message, session might be closed.", e); + LOG.warn( + "SolaceIO.Read: Failed to acknowledge message with applicationMessageId={}, ackMessageId={}. Session might be closed.", + msg.getApplicationMessageId(), + msg.getAckMessageId(), + e); } } } @@ -201,13 +206,13 @@ public Instant getWatermark() { public UnboundedSource.CheckpointMark getCheckpointMark() { long checkpointId; ImmutableList messages; - synchronized (this) { + synchronized (lock) { checkpointId = nextCheckpointId++; messages = ImmutableList.copyOf(receivedMessages); receivedMessages.clear(); pendingCheckpoints.put(checkpointId, messages); } - return new SolaceCheckpointMark(readerUuid.toString(), checkpointId); + return new SolaceCheckpointMark(readerUuid, checkpointId); } @Override From 0cfdc2dba7b10c491ad48b69451202c1133df7c0 Mon Sep 17 00:00:00 2001 From: Israel Herraiz Date: Mon, 25 May 2026 11:36:28 +0000 Subject: [PATCH 3/3] perf(solace): optimize synchronization in UnboundedSolaceReader Removed unnecessary synchronization in advance() when adding to receivedMessages. Reduced the scope of the synchronized block in getCheckpointMark() to only cover pendingCheckpoints.put(). These changes are safe because advance() and getCheckpointMark() are executed sequentially by the same reader thread, so receivedMessages does not require synchronization. pendingCheckpoints still requires synchronization as it is shared with the asynchronous finalizeCheckpoint() thread. TAG=agy CONV=f94654e7-4a0a-4667-8a8b-d5bcf77e2609 --- .../sdk/io/solace/read/UnboundedSolaceReader.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index d6bae0aa907d..b047669f399b 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -156,9 +156,7 @@ public boolean advance() { } solaceOriginalRecord = receivedXmlMessage; solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); - synchronized (lock) { - receivedMessages.add(receivedXmlMessage); - } + receivedMessages.add(receivedXmlMessage); return true; } @@ -204,12 +202,10 @@ public Instant getWatermark() { @Override public UnboundedSource.CheckpointMark getCheckpointMark() { - long checkpointId; - ImmutableList messages; + long checkpointId = nextCheckpointId++; + ImmutableList messages = ImmutableList.copyOf(receivedMessages); + receivedMessages.clear(); synchronized (lock) { - checkpointId = nextCheckpointId++; - messages = ImmutableList.copyOf(receivedMessages); - receivedMessages.clear(); pendingCheckpoints.put(checkpointId, messages); } return new SolaceCheckpointMark(readerUuid, checkpointId);