Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.read;

import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A global registry to keep track of active {@link UnboundedSolaceReader} instances on the worker
* JVM using weak references.
*
* <p>This allows serialized {@link SolaceCheckpointMark} instances to resolve their originating
* reader and perform sequential acknowledgments.
*/
class ActiveReadersRegistry {
private static final Cache<String, UnboundedSolaceReader<?>> registry =
CacheBuilder.newBuilder().weakValues().build();

public static void register(String uuid, UnboundedSolaceReader<?> reader) {
registry.put(uuid, reader);
}

public static void unregister(String uuid) {
registry.invalidate(uuid);
}

public static @Nullable UnboundedSolaceReader<?> get(String uuid) {
return registry.getIfPresent(uuid);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
*/
package org.apache.beam.sdk.io.solace.read;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.util.Objects;
import java.util.Queue;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,33 +36,41 @@
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
private transient Queue<BytesXMLMessage> safeToAck;
private String readerUuid;
private long checkpointId;
Comment thread
iht marked this conversation as resolved.

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private SolaceCheckpointMark() {}
private SolaceCheckpointMark() {
this.readerUuid = "";
this.checkpointId = 0;
}

/**
* Creates a new {@link SolaceCheckpointMark}.
*
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
* @param readerUuid - the UUID of the originating reader.
* @param checkpointId - the unique ID of this checkpoint.
*/
SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
this.safeToAck = safeToAck;
SolaceCheckpointMark(String readerUuid, long checkpointId) {
this.readerUuid = readerUuid;
this.checkpointId = checkpointId;
}

@Override
public void finalizeCheckpoint() {
BytesXMLMessage msg;
while ((msg = safeToAck.poll()) != null) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
LOG.error(
"SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
}
if (readerUuid == null || readerUuid.isEmpty()) {
LOG.warn("SolaceIO.Read: Checkpoint has no reader UUID, cannot finalize.");
return;
}
UnboundedSolaceReader<?> reader = ActiveReadersRegistry.get(readerUuid);
if (reader != null) {
reader.finalizeCheckpoint(checkpointId);
} else {
LOG.warn(
"SolaceIO.Read: Reader with UUID {} not found in registry. "
+ "Checkpoint {} cannot be finalized. Messages will be redelivered if session is closed.",
readerUuid,
checkpointId);
}
}

Expand All @@ -81,14 +86,11 @@ public boolean equals(@Nullable Object o) {
return false;
}
SolaceCheckpointMark that = (SolaceCheckpointMark) o;
return safeToAck == that.safeToAck
|| (safeToAck != null
&& that.safeToAck != null
&& Iterables.elementsEqual(safeToAck, that.safeToAck));
return checkpointId == that.checkpointId && Objects.equals(readerUuid, that.readerUuid);
}

@Override
public int hashCode() {
return Objects.hash(safeToAck);
return Objects.hash(readerUuid, checkpointId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -42,6 +45,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -55,24 +59,27 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private final UnboundedSolaceSource<T> currentSource;
private final WatermarkPolicy<T> watermarkPolicy;
private final SempClient sempClient;
private final UUID readerUuid;
final String readerUuid;
private final Object lock = new Object();
private final SessionServiceFactory sessionServiceFactory;
private @Nullable BytesXMLMessage solaceOriginalRecord;
private @Nullable T solaceMappedRecord;

/**
* Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION:
* Accessed by both reader and checkpointing threads.
* Map to track pending checkpoints and their messages. Accessed by both reader
* (getCheckpointMark) and finalizer (finalizeCheckpoint) threads.
*/
private final Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();
private final TreeMap<Long, List<BytesXMLMessage>> pendingCheckpoints = new TreeMap<>();

private long nextCheckpointId = 1;

/**
* Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a
* {@link SolaceCheckpointMark}.
*/
private final Queue<BytesXMLMessage> receivedMessages = new ArrayDeque<>();

private static final Cache<UUID, SessionService> sessionServiceCache;
private static final Cache<String, SessionService> sessionServiceCache;
private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1);

static {
Expand All @@ -81,7 +88,7 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
CacheBuilder.newBuilder()
.expireAfterAccess(cacheExpirationTimeout)
.removalListener(
(RemovalNotification<UUID, SessionService> notification) -> {
(RemovalNotification<String, SessionService> notification) -> {
LOG.info(
"SolaceIO.Read: Closing session for the reader with uuid {} as it has been idle for over {}.",
notification.getKey(),
Expand All @@ -108,7 +115,7 @@ public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) {
currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold());
this.sessionServiceFactory = currentSource.getSessionServiceFactory();
this.sempClient = currentSource.getSempClientFactory().create();
this.readerUuid = UUID.randomUUID();
this.readerUuid = UUID.randomUUID().toString();
}

private SessionService getSessionService() {
Expand Down Expand Up @@ -136,8 +143,6 @@ public boolean start() {

@Override
public boolean advance() {
finalizeReadyMessages();

BytesXMLMessage receivedXmlMessage;
try {
receivedXmlMessage = getSessionService().getReceiver().receive();
Expand All @@ -158,23 +163,30 @@ public boolean advance() {

@Override
public void close() {
finalizeReadyMessages();
sessionServiceCache.invalidate(readerUuid);
ActiveReadersRegistry.unregister(readerUuid);
}

public void finalizeReadyMessages() {
BytesXMLMessage msg;
while ((msg = safeToAckMessages.poll()) != null) {
void finalizeCheckpoint(long checkpointId) {
List<BytesXMLMessage> messagesToAck = new ArrayList<>();

synchronized (lock) {
SortedMap<Long, List<BytesXMLMessage>> toAck = pendingCheckpoints.headMap(checkpointId, true);
for (List<BytesXMLMessage> msgs : toAck.values()) {
messagesToAck.addAll(msgs);
}
toAck.clear();
}

for (BytesXMLMessage msg : messagesToAck) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
LOG.error(
"SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.",
LOG.warn(
"SolaceIO.Read: Failed to acknowledge message with applicationMessageId={}, ackMessageId={}. Session might be closed.",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
safeToAckMessages.add(msg); // In case the error was transient, might succeed later
break; // Commit is only best effort
}
Comment thread
iht marked this conversation as resolved.
}
}
Expand All @@ -190,9 +202,13 @@ public Instant getWatermark() {

@Override
public UnboundedSource.CheckpointMark getCheckpointMark() {
safeToAckMessages.addAll(receivedMessages);
long checkpointId = nextCheckpointId++;
ImmutableList<BytesXMLMessage> messages = ImmutableList.copyOf(receivedMessages);
receivedMessages.clear();
return new SolaceCheckpointMark(safeToAckMessages);
synchronized (lock) {
pendingCheckpoints.put(checkpointId, messages);
}
return new SolaceCheckpointMark(readerUuid, checkpointId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ public UnboundedReader<T> createReader(
PipelineOptions options, @Nullable SolaceCheckpointMark checkpointMark) {
// it makes no sense to resume a Solace Session with the previous checkpoint
// so don't need the pass a checkpoint to new a Solace Reader
return new UnboundedSolaceReader<>(this);
UnboundedSolaceReader<T> reader = new UnboundedSolaceReader<>(this);
ActiveReadersRegistry.register(reader.readerUuid, reader);
return reader;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,18 +458,16 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
// mark all consumed messages as ready to be acknowledged
CheckpointMark checkpointMark = reader.getCheckpointMark();

// consume 1 more message. This will call #ackMsg() on messages that were ready to be acked.
// consume 1 more message.
reader.advance();
assertEquals(4, countAckMessages.get());
assertEquals(0, countAckMessages.get());

// consume 1 more message. No change in the acknowledged messages.
reader.advance();
assertEquals(4, countAckMessages.get());
assertEquals(0, countAckMessages.get());

// acknowledge from the first checkpoint
checkpointMark.finalizeCheckpoint();
// No change in the acknowledged messages, because they were acknowledged in the #advance()
// method.
assertEquals(4, countAckMessages.get());
}

Expand Down Expand Up @@ -607,4 +605,64 @@ public void testTopicEncoding() {
PAssert.that(destAreTopics).containsInAnyOrder(expected);
pipeline.run();
}

@Test
public void testLostCheckpointCatchUp() throws Exception {
AtomicInteger countConsumedMessages = new AtomicInteger(0);
AtomicInteger countAckMessages = new AtomicInteger(0);

// Broker that creates input data
SerializableFunction<Integer, BytesXMLMessage> recordFn =
index -> {
List<BytesXMLMessage> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
messages.add(
SolaceDataUtils.getBytesXmlMessage(
"payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
}
countConsumedMessages.incrementAndGet();
return getOrNull(index, messages);
};

SessionServiceFactory fakeSessionServiceFactory =
MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();

Read<Record> spec =
getDefaultRead()
.withSessionServiceFactory(fakeSessionServiceFactory)
.withMaxNumConnections(4);

UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);

UnboundedReader<Record> reader =
initialSource.createReader(PipelineOptionsFactory.create(), null);

// start the reader and move to the first record
assertTrue(reader.start());

// consume 3 messages (NB: start already consumed the first message)
for (int i = 0; i < 3; i++) {
assertTrue(reader.advance());
}
assertEquals(0, countAckMessages.get());

// Create Checkpoint T1 (contains 4 messages)
reader.getCheckpointMark();

// consume 3 more messages
for (int i = 0; i < 3; i++) {
assertTrue(reader.advance());
}
assertEquals(0, countAckMessages.get());

// Create Checkpoint T2 (contains 3 messages)
CheckpointMark checkpointMark2 = reader.getCheckpointMark();

// We "lose" checkpointMark1 (do NOT finalize it)
// We finalize checkpointMark2
checkpointMark2.finalizeCheckpoint();

// checkpointMark2 should have caught up and acked both T1 and T2 (4 + 3 = 7 messages)
assertEquals(7, countAckMessages.get());
}
}
Loading