diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 0455f0efa8bb6..eeb9e105f4fc6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -20,6 +20,7 @@ import com.google.common.collect.Range; import io.netty.buffer.ByteBuf; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; @@ -716,6 +717,21 @@ default void skipNonRecoverableLedger(long ledgerId){} * */ ManagedLedgerInterceptor getManagedLedgerInterceptor(); + /** + * Read raw entries starting from {@code start}, inclusive, without using a ManagedCursor. + * + *

The returned entries are retained for the caller. The caller must release every returned entry. + * Broker-level visibility rules such as transaction, marker, and delayed-delivery filtering are not applied here. + * + * @param start the position to start reading from, inclusive + * @param numberOfEntries maximum number of entries to read + * @return a future that completes with the list of entries, or fails if the read cannot be performed + */ + default CompletableFuture> readEntries(Position start, int numberOfEntries) { + return CompletableFuture.failedFuture( + new UnsupportedOperationException("ManagedLedger random reads are not implemented")); + } + /** * Get basic ledger summary. * will got null if corresponding ledger not exists. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 3ad9eb438104a..582e2aecc8519 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1131,6 +1131,15 @@ public void handleConsumerAdded(String subscriptionName, String consumerName) { .log("Added consumer"); } + public void handleRandomReaderAdded(BrokerRandomReader reader) { + USAGE_COUNT_UPDATER.incrementAndGet(this); + log.debug() + .attr("randomReaderId", reader.randomReaderId()) + .attr("readerName", reader.readerName()) + .attr("usageCount", USAGE_COUNT_UPDATER.get(this)) + .log("Added random reader"); + } + public void decrementUsageCount() { USAGE_COUNT_UPDATER.decrementAndGet(this); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerRandomReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerRandomReader.java new file mode 100644 index 0000000000000..e60d7d6b55765 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerRandomReader.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.api.proto.RandomReadInvisibleReason; +import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Markers; +import org.apache.pulsar.common.util.FutureUtil; + +public class BrokerRandomReader implements AutoCloseable { + private final long randomReaderId; + private final String readerName; + private final Map metadata; + private final PersistentTopic topic; + private final ServerCnx owner; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicLong inFlightRequestId = new AtomicLong(-1L); + + @Getter + private final boolean readCommitted; + + public BrokerRandomReader(long randomReaderId, String readerName, Map metadata, + PersistentTopic topic, ServerCnx owner, boolean readCommitted) { + this.randomReaderId = randomReaderId; + this.readerName = readerName; + this.metadata = Map.copyOf(metadata); + this.topic = topic; + this.owner = owner; + this.readCommitted = readCommitted; + } + + public boolean beginRead(long requestId) { + return !closed.get() && inFlightRequestId.compareAndSet(-1L, requestId); + } + + public void endRead(long requestId) { + inFlightRequestId.compareAndSet(requestId, -1L); + } + + public PersistentTopic topic() { + return topic; + } + + public ServerCnx owner() { + return owner; + } + + public long randomReaderId() { + return randomReaderId; + } + + public String readerName() { + return readerName; + } + + public Map metadata() { + return metadata; + } + + public boolean isClosed() { + return closed.get(); + } + + public static CompletableFuture validatePersistentTopic(Topic topic) { + if (topic instanceof PersistentTopic) { + return CompletableFuture.completedFuture((PersistentTopic) topic); + } + return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException( + "RandomReader only supports persistent topics")); + } + + @Override + public void close() { + closed.set(true); + } + + public CompletableFuture> readEntries(Position start, int numberOfEntries, + Position maxVisible) { + if (numberOfEntries <= 0) { + return CompletableFuture.completedFuture(List.of()); + } + return topic.getManagedLedger().readEntries(start, numberOfEntries) + .thenApply(entries -> toEntryResults(entries, maxVisible)); + } + + public CompletableFuture disconnect(String brokerServiceUrl, String brokerServiceUrlTls) { + close(); + owner.disconnectRandomReader(randomReaderId, brokerServiceUrl, brokerServiceUrlTls); + return CompletableFuture.completedFuture(null); + } + + private List toEntryResults(List entries, Position maxVisiblePosition) { + List result = new ArrayList<>(entries.size()); + for (int i = 0; i < entries.size(); i++) { + Entry entry = entries.get(i); + if (entry == null) { + continue; + } + try { + Position position = entry.getPosition(); + if (position.compareTo(maxVisiblePosition) > 0) { + result.add(EntryResult.invisible(entry, RandomReadInvisibleReason.EXCEEDED_MAX_VISIBLE_POSITION)); + continue; + } + ByteBuf metadataAndPayload = entry.getDataBuffer(); + int readerIndex = metadataAndPayload.readerIndex(); + MessageMetadata metadata = Commands.peekAndCopyMessageMetadata( + metadataAndPayload, topic.getName(), -1); + metadataAndPayload.readerIndex(readerIndex); + if (metadata == null) { + throw new BrokerServiceException.PersistenceException( + "Failed to parse message metadata at " + position); + } + if (Markers.isTxnMarker(metadata)) { + result.add(EntryResult.invisible(entry, RandomReadInvisibleReason.TRANSACTION_MARKER)); + continue; + } + if (Markers.isServerOnlyMarker(metadata)) { + result.add(EntryResult.invisible(entry, RandomReadInvisibleReason.SERVER_ONLY_MARKER)); + continue; + } + if (readCommitted && metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits() + && topic.isTxnAborted(new TxnID(metadata.getTxnidMostBits(), + metadata.getTxnidLeastBits()), position)) { + result.add(EntryResult.invisible(entry, RandomReadInvisibleReason.ABORTED_TRANSACTION)); + continue; + } + if (topic.isDelayedDeliveryEnabled() + && metadata.hasDeliverAtTime() + && metadata.getDeliverAtTime() > System.currentTimeMillis()) { + result.add(EntryResult.invisible(entry, RandomReadInvisibleReason.DELAYED_DELIVERY)); + continue; + } + result.add(EntryResult.visible(entry)); + } catch (Throwable t) { + entry.release(); + releaseResults(result); + releaseEntries(entries.subList(i + 1, entries.size())); + throw FutureUtil.wrapToCompletionException(t); + } + } + return result; + } + + private static void releaseResults(List results) { + for (EntryResult result : results) { + result.release(); + } + } + + private static void releaseEntries(List entries) { + for (Entry entry : entries) { + if (entry != null) { + entry.release(); + } + } + } + + public static ServerError toServerError(Throwable cause) { + if (cause instanceof ManagedLedgerException) { + if (cause instanceof ManagedLedgerException.ManagedLedgerFencedException + || cause instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException + || cause instanceof ManagedLedgerException.OffloadInProgressException) { + return ServerError.ServiceNotReady; + } + if (cause instanceof ManagedLedgerException.ManagedLedgerNotFoundException) { + return ServerError.TopicNotFound; + } + if (cause instanceof ManagedLedgerException.ManagedLedgerTerminatedException) { + return ServerError.TopicTerminatedError; + } + if (cause instanceof ManagedLedgerException.NonRecoverableLedgerException) { + return ServerError.PersistenceError; + } + // Generic fallback: conservative mapping to PersistenceError + return ServerError.PersistenceError; + } + return BrokerServiceException.getClientErrorCode(cause); + } + + public static final class EntryResult { + private final Entry entry; + private final RandomReadInvisibleReason invisibleReason; + + private EntryResult(Entry entry, RandomReadInvisibleReason invisibleReason) { + this.entry = entry; + this.invisibleReason = invisibleReason; + } + + public static EntryResult visible(Entry entry) { + return new EntryResult(entry, null); + } + + public static EntryResult invisible(Entry entry, RandomReadInvisibleReason invisibleReason) { + return new EntryResult(entry, invisibleReason); + } + + public Entry entry() { + return entry; + } + + public boolean isVisible() { + return invisibleReason == null; + } + + public RandomReadInvisibleReason invisibleReason() { + return invisibleReason; + } + + public void release() { + entry.release(); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java index d98fc35858327..6b8dcdde0b51b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; +import io.netty.channel.ChannelPromise; import io.netty.util.concurrent.Future; import java.util.Collection; import java.util.List; @@ -26,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; import org.apache.bookkeeper.mledger.Entry; +import org.apache.pulsar.broker.service.BrokerRandomReader.EntryResult; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; @@ -88,6 +90,12 @@ Future sendMessagesToConsumer(long consumerId, String topicName, Subscript EntryBatchIndexesAcks batchIndexesAcks, RedeliveryTracker redeliveryTracker, long epoch); + void sendRandomReaderSuccessResponse(long requestId, long randomReaderId, String readerName); + + ChannelPromise sendRandomReadMessages(long randomReaderId, long requestId, String topicName, + int partitionIdx, List results, + int numberOfEntries, Runnable afterFinalResponseWriteDone); + void sendTcClientConnectResponse(long requestId, ServerError error, String message); void sendTcClientConnectResponse(long requestId); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java index f499818399992..eddf50d0d811e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java @@ -32,6 +32,7 @@ import lombok.CustomLog; import org.apache.bookkeeper.mledger.Entry; import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.service.BrokerRandomReader.EntryResult; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; @@ -322,6 +323,62 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, return writePromise; } + @Override + public void sendRandomReaderSuccessResponse(long requestId, long randomReaderId, String readerName) { + BaseCommand command = Commands.newRandomReaderSuccessCommand(requestId, randomReaderId, readerName); + safeIntercept(command, cnx); + ByteBuf outBuf = Commands.serializeWithSize(command); + writeAndFlush(outBuf); + } + + @Override + public ChannelPromise sendRandomReadMessages(long randomReaderId, long requestId, String topicName, + int partitionIdx, List results, + int numberOfEntries, + Runnable afterFinalResponseWriteDone) { + final ChannelHandlerContext ctx = cnx.ctx(); + final ChannelPromise writePromise = ctx.newPromise(); + ctx.channel().eventLoop().execute(() -> { + try { + for (EntryResult result : results) { + Entry entry = result.entry(); + if (entry == null) { + continue; + } + if (!result.isVisible()) { + ctx.write(Commands.newRandomReadEntryResult(randomReaderId, requestId, entry.getLedgerId(), + entry.getEntryId(), partitionIdx, result.invisibleReason()), ctx.voidPromise()); + continue; + } + + ByteBuf metadataAndPayload = entry.getDataBuffer().retainedDuplicate(); + if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue() + || !cnx.supportBrokerMetadata() + || !cnx.getBrokerService().getPulsar().getConfig() + .isExposingBrokerEntryMetadataToClientEnabled()) { + Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload); + } + if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getValue()) { + Commands.skipChecksumIfPresent(metadataAndPayload); + } + + ctx.write(Commands.newRandomReadMessage(randomReaderId, requestId, entry.getLedgerId(), + entry.getEntryId(), partitionIdx, metadataAndPayload), ctx.voidPromise()); + } + + ctx.writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, numberOfEntries, + null, null), writePromise); + } catch (Throwable t) { + writePromise.setFailure(t); + } + writePromise.addListener(future -> { + results.forEach(EntryResult::release); + afterFinalResponseWriteDone.run(); + }); + }); + return writePromise; + } + @Override public void sendTcClientConnectResponse(long requestId, ServerError error, String message) { BaseCommand command = Commands.newTcClientConnectResponse(requestId, error, message); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index fa7e919880f3a..645aa246c4d22 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -25,6 +25,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync; import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync; +import static org.apache.pulsar.broker.service.BrokerRandomReader.toServerError; import static org.apache.pulsar.broker.service.ServerCnxThrottleTracker.ThrottleType; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException; @@ -120,6 +121,7 @@ import org.apache.pulsar.common.api.proto.CommandAuthResponse; import org.apache.pulsar.common.api.proto.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.CommandCloseProducer; +import org.apache.pulsar.common.api.proto.CommandCloseRandomReader; import org.apache.pulsar.common.api.proto.CommandConnect; import org.apache.pulsar.common.api.proto.CommandConsumerStats; import org.apache.pulsar.common.api.proto.CommandEndTxn; @@ -134,6 +136,8 @@ import org.apache.pulsar.common.api.proto.CommandNewTxn; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.CommandProducer; +import org.apache.pulsar.common.api.proto.CommandRandomRead; +import org.apache.pulsar.common.api.proto.CommandRandomReader; import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages; import org.apache.pulsar.common.api.proto.CommandScalableTopicClose; import org.apache.pulsar.common.api.proto.CommandScalableTopicLookup; @@ -215,6 +219,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final Map recentlyClosedProducers; private final ConcurrentLongHashMap> producers; private final ConcurrentLongHashMap> consumers; + private final ConcurrentLongHashMap> randomReaders; private final boolean enableSubscriptionPatternEvaluation; private final boolean enableTopicListWatcher; private final boolean scalableTopicsEnabled; @@ -366,6 +371,10 @@ public ServerCnx(PulsarService pulsar, String listenerName) { .expectedItems(8) .concurrencyLevel(1) .build(); + this.randomReaders = ConcurrentLongHashMap.>newBuilder() + .expectedItems(8) + .concurrencyLevel(1) + .build(); this.recentlyClosedProducers = new ConcurrentHashMap<>(); this.replicatorPrefix = conf.getReplicatorPrefix(); this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection(); @@ -475,6 +484,20 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } } }); + + randomReaders.forEach((__, readerFuture) -> { + if (!readerFuture.isDone() + && readerFuture.completeExceptionally(new IllegalStateException("Connection closed."))) { + return; + } + if (readerFuture.isDone() && !readerFuture.isCompletedExceptionally()) { + BrokerRandomReader reader = readerFuture.getNow(null); + reader.close(); + reader.topic().removeRandomReader(reader); + } + }); + randomReaders.clear(); + this.topicListService.inactivate(); // Close any outstanding scalable-topic DAG watch sessions held by this connection. @@ -2786,6 +2809,221 @@ public int hashCode() { return Objects.hash(ctx().channel().id()); } + @Override + protected void handleRandomReader(CommandRandomReader command) { + checkArgument(state == State.Connected); + long requestId = command.getRequestId(); + long randomReaderId = command.getRandomReaderId(); + String topic = command.getTopic(); + String readerName = command.hasReaderName() ? command.getReaderName() : ""; + SchemaData schema = command.hasSchema() ? getSchema(command.getSchema()) : null; + boolean readCommitted = command.hasReadCommitted() && command.isReadCommitted(); + Map metadata = CommandUtils.metadataFromCommand(command); + + CompletableFuture future = new CompletableFuture<>(); + if (randomReaders.putIfAbsent(randomReaderId, future) != null) { + commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, + "RandomReader is already present on the connection"); + return; + } + + try { + Metadata.validateMetadata(metadata, service.getPulsar().getConfiguration().getMaxConsumerMetadataSize()); + } catch (IllegalArgumentException e) { + randomReaders.remove(randomReaderId, future); + commandSender.sendErrorResponse(requestId, ServerError.MetadataError, e.getMessage()); + return; + } + + TopicName topicName; + try { + topicName = TopicName.get(topic); + } catch (IllegalArgumentException e) { + randomReaders.remove(randomReaderId, future); + commandSender.sendErrorResponse(requestId, ServerError.InvalidTopicName, e.getMessage()); + return; + } + + isTopicOperationAllowed(topicName, null, TopicOperation.CONSUME) + .thenCompose(isAllowed -> { + if (!isAllowed) { + return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException( + "Client is not authorized to RandomReader")); + } + return service.getTopic(topic, false); + }) + .thenCompose(optionalTopic -> { + if (optionalTopic.isEmpty()) { + return FutureUtil.failedFuture( + new TopicNotFoundException("Topic " + topic + " does not exist")); + } + Topic resolved = optionalTopic.get(); + return BrokerRandomReader.validatePersistentTopic(resolved); + }) + .thenCompose(persistentTopic -> { + CompletableFuture schemaFuture = schema != null + && schema.getType() != SchemaType.AUTO_CONSUME + ? persistentTopic.addSchemaIfIdleOrCheckCompatible(schema) + : CompletableFuture.completedFuture(null); + return schemaFuture.thenApply(ignored -> persistentTopic); + }) + .thenAcceptAsync(persistentTopic -> { + BrokerRandomReader reader = new BrokerRandomReader(randomReaderId, + readerName, metadata, persistentTopic, this, readCommitted); + if (randomReaders.get(randomReaderId) != future || future.isDone()) { + reader.close(); + return; + } + try { + persistentTopic.addRandomReader(reader); + if (future.complete(reader)) { + commandSender.sendRandomReaderSuccessResponse(requestId, randomReaderId, + reader.readerName()); + } else { + persistentTopic.removeRandomReader(reader); + } + } catch (Throwable t) { + randomReaders.remove(randomReaderId, future); + persistentTopic.removeRandomReader(reader); + future.completeExceptionally(t); + commandSender.sendErrorResponse(requestId, + BrokerServiceException.getClientErrorCode(t), t.getMessage()); + } + }, ctx.executor()) + .exceptionallyAsync(ex -> { + randomReaders.remove(randomReaderId, future); + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (future.completeExceptionally(cause)) { + commandSender.sendErrorResponse(requestId, + BrokerServiceException.getClientErrorCode(cause), cause.getMessage()); + } + return null; + }, ctx.executor()); + } + + @Override + protected void handleRandomRead(CommandRandomRead command) { + checkArgument(state == State.Connected); + long randomReaderId = command.getRandomReaderId(); + long requestId = command.getRequestId(); + CompletableFuture readerFuture = randomReaders.get(randomReaderId); + if (readerFuture == null || !readerFuture.isDone() || readerFuture.isCompletedExceptionally()) { + writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, 0, + ServerError.ConsumerNotFound, "RandomReader not found")); + return; + } + + BrokerRandomReader reader = readerFuture.getNow(null); + if (!reader.beginRead(requestId)) { + writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, 0, + ServerError.TooManyRequests, "RandomReader already has a read in flight")); + return; + } + + try { + MessageIdData start = command.getStartMessageId(); + if (start.hasBatchIndex() && start.getBatchIndex() >= 0) { + reader.endRead(requestId); + writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, 0, + ServerError.MetadataError, "RandomReader does not support batch-index message ids")); + return; + } + int maxEntries = service.getPulsar().getConfiguration().getDispatcherMaxReadBatchSize(); + int numberOfEntries = Math.min(command.getNumberOfEntries(), maxEntries); + if (numberOfEntries <= 0) { + reader.endRead(requestId); + writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, 0, + ServerError.MetadataError, "number_of_entries must be positive")); + return; + } + + PersistentTopic topic = reader.topic(); + int partitionIndex = TopicName.getPartitionIndex(topic.getName()); + if (partitionIndex >= 0 && (!start.hasPartition() || start.getPartition() != partitionIndex)) { + reader.endRead(requestId); + writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, 0, + ServerError.MetadataError, + "Partitioned RandomReader requires matching MessageId partition")); + return; + } + if (partitionIndex < 0 && start.hasPartition() && start.getPartition() >= 0) { + reader.endRead(requestId); + writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, 0, + ServerError.MetadataError, + "MessageId partition does not match the RandomReader topic")); + return; + } + + if (!start.hasLedgerId() || !start.hasEntryId()) { + reader.endRead(requestId); + writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, 0, + ServerError.MetadataError, "RandomReader requires ledgerId and entryId")); + return; + } + if (start.getLedgerId() < 0 || start.getEntryId() < 0) { + reader.endRead(requestId); + writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, 0, + ServerError.MetadataError, "RandomReader requires non-negative ledgerId and entryId")); + return; + } + Position startPosition = PositionFactory.create(start.getLedgerId(), start.getEntryId()); + topic.checkIfTransactionBufferRecoverCompletely() + .thenApply(ignored -> + reader.isReadCommitted() ? topic.getMaxReadPosition() : PositionFactory.LATEST + ) + .thenCompose(maxVisible -> reader.readEntries(startPosition, numberOfEntries, maxVisible)) + .thenAccept(entries -> + commandSender.sendRandomReadMessages(randomReaderId, requestId, topic.getName(), + partitionIndex, entries, entries.size(), () -> reader.endRead(requestId)) + .addListener(future -> { + if (!future.isSuccess()) { + reader.endRead(requestId); + } + })) + .exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, 0, + toServerError(cause), cause.getMessage())); + reader.endRead(requestId); + return null; + }); + } catch (Throwable t) { + reader.endRead(requestId); + writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, 0, + ServerError.UnknownError, t.getMessage())); + } + } + + @Override + protected void handleCloseRandomReader(CommandCloseRandomReader command) { + checkArgument(state == State.Connected); + CompletableFuture future = randomReaders.remove(command.getRandomReaderId()); + if (future != null && future.isDone() && !future.isCompletedExceptionally()) { + BrokerRandomReader reader = future.getNow(null); + reader.topic().removeRandomReader(reader); + } else if (future != null && !future.isDone()) { + future.completeExceptionally( + new IllegalStateException("Closed RandomReader before creation completed")); + } + commandSender.sendSuccessResponse(command.getRequestId()); + } + + public void disconnectRandomReader(long randomReaderId, + String brokerServiceUrl, String brokerServiceUrlTls) { + CompletableFuture future = randomReaders.remove(randomReaderId); + if (future != null && future.isDone() && !future.isCompletedExceptionally()) { + BrokerRandomReader reader = future.getNow(null); + reader.close(); + if (brokerServiceUrl != null) { + commandSender.sendTopicMigrated(ResourceType.RandomReader, randomReaderId, + brokerServiceUrl, brokerServiceUrlTls); + } + } else if (future != null && !future.isDone()) { + future.completeExceptionally( + new IllegalStateException("RandomReader disconnected before creation completed")); + } + } + @Override protected void handleCloseProducer(CommandCloseProducer closeProducer) { checkArgument(state == State.Connected); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c9bcad341fcd7..e08cd24ee39e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -107,6 +107,7 @@ import org.apache.pulsar.broker.resources.TopicResources; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.AbstractTopic; +import org.apache.pulsar.broker.service.BrokerRandomReader; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException; @@ -223,6 +224,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private final Map replicators = new ConcurrentHashMap<>(); private final Map shadowReplicators = new ConcurrentHashMap<>(); + private final Map randomReaders = new ConcurrentHashMap<>(); @Getter private volatile List shadowTopics; private final TopicName shadowSourceTopic; @@ -1740,6 +1742,56 @@ public void deleteLedgerComplete(Object ctx) { } + public void addRandomReader(BrokerRandomReader reader) throws BrokerServiceException { + lock.writeLock().lock(); + try { + if (isFenced) { + throw new TopicFencedException("Topic is temporarily unavailable"); + } + if (isMigrated()) { + throw new TopicMigratedException("Topic was already migrated"); + } + if (randomReaders.putIfAbsent(reader.randomReaderId(), reader) == null) { + handleRandomReaderAdded(reader); + } + } finally { + lock.writeLock().unlock(); + } + } + + public void removeRandomReader(BrokerRandomReader reader) { + lock.writeLock().lock(); + try { + BrokerRandomReader removed = randomReaders.remove(reader.randomReaderId()); + if (removed != null) { + removed.close(); + decrementUsageCount(); + } + } finally { + lock.writeLock().unlock(); + } + } + + public List> disconnectRandomReaders(String brokerServiceUrl, + String brokerServiceUrlTls) { + List> futures = new ArrayList<>(); + List readers; + lock.writeLock().lock(); + try { + readers = new ArrayList<>(randomReaders.values()); + for (BrokerRandomReader reader : readers) { + decrementUsageCount(); + reader.close(); + } + randomReaders.clear(); + } finally { + lock.writeLock().unlock(); + } + readers.forEach(reader -> futures.add( + reader.disconnect(brokerServiceUrl, brokerServiceUrlTls))); + return futures; + } + public CompletableFuture close() { return close(true, false); } @@ -1828,10 +1880,14 @@ public CompletableFuture close( } else { subscriptions.forEach((s, sub) -> futures.add(sub.close(true, lookupData))); } + futures.addAll(disconnectRandomReaders( + lookupData.map(ld -> ld.getPulsarServiceUrl()).orElse(null), + lookupData.map(ld -> ld.getPulsarServiceUrlTls()).orElse(null))); } )); } else { subscriptions.forEach((s, sub) -> futures.add(sub.close(false, Optional.empty()))); + futures.addAll(disconnectRandomReaders(null, null)); } //close entry filters diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RandomReaderBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RandomReaderBrokerTest.java new file mode 100644 index 0000000000000..f198036e9a094 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RandomReaderBrokerTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.pulsar.broker.service.BrokerRandomReader.EntryResult; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.api.proto.RandomReadInvisibleReason; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Markers; +import org.testng.annotations.Test; + +public class RandomReaderBrokerTest { + + @Test + public void testReadEntriesCoversRequestedPhysicalEntriesWithoutReadingPastInvisibleEntry() { + EntryImpl visible0 = entry(3L, 0L, metadata(0L)); + EntryImpl delayed1 = entry(3L, 1L, metadata(1L).setDeliverAtTime(System.currentTimeMillis() + 60_000)); + EntryImpl visible2 = entry(3L, 2L, metadata(2L)); + EntryImpl beyondRequest3 = entry(3L, 3L, metadata(3L)); + + ManagedLedger managedLedger = mock(ManagedLedger.class); + Position start = PositionFactory.create(3L, 0L); + when(managedLedger.readEntries(start, 3)) + .thenReturn(CompletableFuture.completedFuture(List.of(visible0, delayed1, visible2))); + + PersistentTopic topic = mock(PersistentTopic.class); + when(topic.getManagedLedger()).thenReturn(managedLedger); + when(topic.getName()).thenReturn("persistent://public/default/t1"); + when(topic.isDelayedDeliveryEnabled()).thenReturn(true); + + BrokerRandomReader reader = new BrokerRandomReader(7L, "rr", Map.of(), topic, mock(ServerCnx.class), true); + + List results = reader.readEntries(start, 3, PositionFactory.create(3L, 10L)).join(); + assertEquals(results.size(), 3); + assertTrue(results.get(0).isVisible()); + assertFalse(results.get(1).isVisible()); + assertEquals(results.get(1).invisibleReason(), RandomReadInvisibleReason.DELAYED_DELIVERY); + assertTrue(results.get(2).isVisible()); + assertEquals(beyondRequest3.refCnt(), 1); + + results.forEach(EntryResult::release); + beyondRequest3.release(); + } + + @Test + public void testExceededMaxVisiblePositionReturnsInvisibleSlot() { + EntryImpl visible0 = entry(3L, 0L, metadata(0L)); + EntryImpl overMax1 = entry(3L, 1L, metadata(1L)); + + ManagedLedger managedLedger = mock(ManagedLedger.class); + Position start = PositionFactory.create(3L, 0L); + when(managedLedger.readEntries(start, 2)) + .thenReturn(CompletableFuture.completedFuture(List.of(visible0, overMax1))); + + PersistentTopic topic = mock(PersistentTopic.class); + when(topic.getManagedLedger()).thenReturn(managedLedger); + when(topic.getName()).thenReturn("persistent://public/default/t1"); + + BrokerRandomReader reader = new BrokerRandomReader(7L, "rr", Map.of(), topic, mock(ServerCnx.class), true); + + List results = reader.readEntries(start, 2, PositionFactory.create(3L, 0L)).join(); + assertEquals(results.size(), 2); + assertTrue(results.get(0).isVisible()); + assertFalse(results.get(1).isVisible()); + assertEquals(results.get(1).invisibleReason(), RandomReadInvisibleReason.EXCEEDED_MAX_VISIBLE_POSITION); + + results.forEach(EntryResult::release); + } + + @Test + public void testReadEntriesStillCoversEntriesWhenStartExceedsMaxVisiblePosition() { + EntryImpl overMax2 = entry(3L, 2L, metadata(2L)); + EntryImpl overMax3 = entry(3L, 3L, metadata(3L)); + + ManagedLedger managedLedger = mock(ManagedLedger.class); + Position start = PositionFactory.create(3L, 2L); + when(managedLedger.readEntries(start, 2)) + .thenReturn(CompletableFuture.completedFuture(List.of(overMax2, overMax3))); + + PersistentTopic topic = mock(PersistentTopic.class); + when(topic.getManagedLedger()).thenReturn(managedLedger); + when(topic.getName()).thenReturn("persistent://public/default/t1"); + + BrokerRandomReader reader = new BrokerRandomReader(7L, "rr", Map.of(), topic, mock(ServerCnx.class), true); + + List results = reader.readEntries(start, 2, PositionFactory.create(3L, 1L)).join(); + assertEquals(results.size(), 2); + assertFalse(results.get(0).isVisible()); + assertEquals(results.get(0).invisibleReason(), RandomReadInvisibleReason.EXCEEDED_MAX_VISIBLE_POSITION); + assertFalse(results.get(1).isVisible()); + assertEquals(results.get(1).invisibleReason(), RandomReadInvisibleReason.EXCEEDED_MAX_VISIBLE_POSITION); + + results.forEach(EntryResult::release); + } + + @Test + public void testMarkerEntriesReturnSpecificInvisibleReasons() { + EntryImpl serverOnlyMarker = entry(3L, 0L, Markers.newReplicatedSubscriptionsSnapshotRequest("sid", "us-west")); + EntryImpl txnMarker = entry(3L, 1L, Markers.newTxnCommitMarker(1L, 2L, 3L)); + + ManagedLedger managedLedger = mock(ManagedLedger.class); + Position start = PositionFactory.create(3L, 0L); + when(managedLedger.readEntries(start, 2)) + .thenReturn(CompletableFuture.completedFuture(List.of(serverOnlyMarker, txnMarker))); + + PersistentTopic topic = mock(PersistentTopic.class); + when(topic.getManagedLedger()).thenReturn(managedLedger); + when(topic.getName()).thenReturn("persistent://public/default/t1"); + + BrokerRandomReader reader = new BrokerRandomReader(7L, "rr", Map.of(), topic, mock(ServerCnx.class), true); + + List results = reader.readEntries(start, 2, PositionFactory.create(3L, 10L)).join(); + assertEquals(results.size(), 2); + assertFalse(results.get(0).isVisible()); + assertEquals(results.get(0).invisibleReason(), RandomReadInvisibleReason.SERVER_ONLY_MARKER); + assertFalse(results.get(1).isVisible()); + assertEquals(results.get(1).invisibleReason(), RandomReadInvisibleReason.TRANSACTION_MARKER); + + results.forEach(EntryResult::release); + } + + @Test + public void testReadEntriesReleasesAllEntriesWhenMetadataParsingFails() { + EntryImpl visible0 = entry(3L, 0L, metadata(0L)); + EntryImpl invalid1 = EntryImpl.create(3L, 1L, Unpooled.wrappedBuffer(new byte[] {1, 2, 3})); + EntryImpl unprocessed2 = entry(3L, 2L, metadata(2L)); + + ManagedLedger managedLedger = mock(ManagedLedger.class); + Position start = PositionFactory.create(3L, 0L); + when(managedLedger.readEntries(start, 3)) + .thenReturn(CompletableFuture.completedFuture(List.of(visible0, invalid1, unprocessed2))); + + PersistentTopic topic = mock(PersistentTopic.class); + when(topic.getManagedLedger()).thenReturn(managedLedger); + when(topic.getName()).thenReturn("persistent://public/default/t1"); + + BrokerRandomReader reader = new BrokerRandomReader(7L, "rr", Map.of(), topic, mock(ServerCnx.class), true); + + assertThrows(CompletionException.class, + () -> reader.readEntries(start, 3, PositionFactory.create(3L, 10L)).join()); + assertEquals(visible0.refCnt(), 0); + assertEquals(invalid1.refCnt(), 0); + assertEquals(unprocessed2.refCnt(), 0); + } + + private static EntryImpl entry(long ledgerId, long entryId, MessageMetadata metadata) { + ByteBuf payload = Unpooled.wrappedBuffer(new byte[] {1}); + ByteBuf data = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, metadata, payload); + payload.release(); + try { + return EntryImpl.create(ledgerId, entryId, data); + } finally { + data.release(); + } + } + + private static EntryImpl entry(long ledgerId, long entryId, ByteBuf data) { + try { + return EntryImpl.create(ledgerId, entryId, data); + } finally { + data.release(); + } + } + + private static MessageMetadata metadata(long sequenceId) { + return new MessageMetadata() + .setProducerName("p") + .setSequenceId(sequenceId) + .setPublishTime(System.currentTimeMillis()); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 7de4c198f30df..1038609f23838 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -138,6 +138,9 @@ import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; import org.apache.pulsar.common.api.proto.CommandPing; import org.apache.pulsar.common.api.proto.CommandProducerSuccess; +import org.apache.pulsar.common.api.proto.CommandRandomReadEntryResult; +import org.apache.pulsar.common.api.proto.CommandRandomReadResponse; +import org.apache.pulsar.common.api.proto.CommandRandomReaderSuccess; import org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse; import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate; import org.apache.pulsar.common.api.proto.CommandSendError; @@ -149,6 +152,7 @@ import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; +import org.apache.pulsar.common.api.proto.RandomReadInvisibleReason; import org.apache.pulsar.common.api.proto.ScalableConsumerType; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.api.proto.Subscription; @@ -2846,6 +2850,59 @@ public void testProducerFailureOnEncryptionRequiredTopic() throws Exception { channel.finish(); } + + @Test(timeOut = 30000) + public void testRandomReadCoversEntryAfterMaxVisiblePosition() throws Exception { + resetChannel(); + setChannelConnected(); + PersistentTopic topic = mock(PersistentTopic.class); + ManagedLedger managedLedger = mock(ManagedLedger.class); + Position start = PositionFactory.create(3L, 2L); + ByteBuf data = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, + new MessageMetadata() + .setProducerName("p") + .setSequenceId(0) + .setPublishTime(System.currentTimeMillis()), + Unpooled.EMPTY_BUFFER); + org.apache.bookkeeper.mledger.impl.EntryImpl entry = + org.apache.bookkeeper.mledger.impl.EntryImpl.create(3L, 2L, data); + data.release(); + try { + doReturn(managedLedger).when(topic).getManagedLedger(); + doReturn(successTopicName).when(topic).getName(); + doReturn(CompletableFuture.completedFuture(null)).when(topic).checkIfTransactionBufferRecoverCompletely(); + doReturn(CompletableFuture.completedFuture(PositionFactory.create(3L, 1L))) + .when(topic).getLastDispatchablePosition(); + doReturn(CompletableFuture.completedFuture(null)).when(topic).addSchemaIfIdleOrCheckCompatible(any()); + doReturn(CompletableFuture.completedFuture(List.of(entry))).when(managedLedger).readEntries(start, 1); + doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService) + .getTopic(eq(successTopicName), eq(false)); + + channel.writeInbound(Commands.newRandomReader(successTopicName, 7L, 11L, "rr", + null, Collections.emptyMap(), true)); + Object createResponse = getResponse(); + assertTrue(createResponse instanceof CommandRandomReaderSuccess); + + channel.writeInbound(Commands.newRandomRead(7L, 12L, 3L, 2L, -1, 1)); + Object entryResult = getResponse(); + assertTrue(entryResult instanceof CommandRandomReadEntryResult); + assertEquals(((CommandRandomReadEntryResult) entryResult).getInvisibleReason(), + RandomReadInvisibleReason.EXCEEDED_MAX_VISIBLE_POSITION); + + Object response = getResponse(); + assertTrue(response instanceof CommandRandomReadResponse); + CommandRandomReadResponse readResponse = (CommandRandomReadResponse) response; + assertEquals(readResponse.getRequestId(), 12L); + assertEquals(readResponse.getNumberOfEntries(), 1); + assertFalse(readResponse.hasError()); + } finally { + if (entry.refCnt() > 0) { + entry.release(); + } + channel.finish(); + } + } + @SuppressWarnings("deprecation") @Test(timeOut = 30000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java index 32382cdb6fe4f..2ff52adfbe96a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java @@ -45,6 +45,10 @@ import org.apache.pulsar.common.api.proto.CommandPong; import org.apache.pulsar.common.api.proto.CommandProducer; import org.apache.pulsar.common.api.proto.CommandProducerSuccess; +import org.apache.pulsar.common.api.proto.CommandRandomReadEntryResult; +import org.apache.pulsar.common.api.proto.CommandRandomReadMessage; +import org.apache.pulsar.common.api.proto.CommandRandomReadResponse; +import org.apache.pulsar.common.api.proto.CommandRandomReaderSuccess; import org.apache.pulsar.common.api.proto.CommandScalableTopicAssignmentUpdate; import org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse; import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate; @@ -163,6 +167,26 @@ protected void handleProducerSuccess(CommandProducerSuccess success) { queue.offer(new CommandProducerSuccess().copyFrom(success)); } + @Override + protected void handleRandomReaderSuccess(CommandRandomReaderSuccess success) { + queue.offer(new CommandRandomReaderSuccess().copyFrom(success)); + } + + @Override + protected void handleRandomReadMessage(CommandRandomReadMessage command, ByteBuf headersAndPayload) { + queue.offer(new CommandRandomReadMessage().copyFrom(command)); + } + + @Override + protected void handleRandomReadEntryResult(CommandRandomReadEntryResult command) { + queue.offer(new CommandRandomReadEntryResult().copyFrom(command)); + } + + @Override + protected void handleRandomReadResponse(CommandRandomReadResponse response) { + queue.offer(new CommandRandomReadResponse().copyFrom(response)); + } + @Override protected void handleLookupResponse(CommandLookupTopicResponse connection) { queue.offer(new CommandLookupTopicResponse().copyFrom(connection)); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageInvisibleReason.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageInvisibleReason.java new file mode 100644 index 0000000000000..57f31c5bcfd03 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageInvisibleReason.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Reason a RandomReader entry slot is not visible as client messages. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public enum MessageInvisibleReason { + UNKNOWN, + SERVER_ONLY_MARKER, + TRANSACTION_MARKER, + ABORTED_TRANSACTION, + DELAYED_DELIVERY, + EXCEEDED_MAX_VISIBLE_POSITION +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java index 6c46bce254f6f..14e0deb0bb6e0 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java @@ -222,6 +222,27 @@ static ClientBuilder builder() { */ ReaderBuilder newReader(Schema schema); + /** + * Create a {@link RandomReaderBuilder} for creating a {@link RandomReader}. + * + *

RandomReader reads messages directly from managed ledgers without creating subscriptions + * or managed cursors. + * + * @return a RandomReaderBuilder for byte array messages + * @since 4.2.0 + */ + RandomReaderBuilder newRandomReader(); + + /** + * Create a {@link RandomReaderBuilder} with a specific schema for creating a {@link RandomReader}. + * + * @param schema the schema to use for decoding messages + * @param the message payload type + * @return a RandomReaderBuilder for the specified schema type + * @since 4.2.0 + */ + RandomReaderBuilder newRandomReader(Schema schema); + /** * Create a table view builder with a specific schema for subscribing on a specific topic. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index b4d9a232b1431..775c72984cb1f 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -690,6 +690,27 @@ public InvalidMessageException(String msg, long sequenceId) { } } + /** + * Message invisible exception thrown when a RandomReader result slot does not contain visible messages. + */ + @Getter + public static class MessageInvisibleException extends PulsarClientException { + private final MessageId messageId; + private final MessageInvisibleReason reason; + + public MessageInvisibleException(MessageId messageId, MessageInvisibleReason reason) { + super("Message " + messageId + " is invisible: " + reason); + this.messageId = messageId; + this.reason = reason; + } + + public MessageInvisibleException(String msg, MessageId messageId, MessageInvisibleReason reason) { + super(msg); + this.messageId = messageId; + this.reason = reason; + } + } + /** * Invalid topic name exception thrown by Pulsar client. */ @@ -1013,6 +1034,9 @@ public static Throwable wrap(Throwable t, String msg) { return new NotConnectedException(); } else if (t instanceof InvalidMessageException) { return new InvalidMessageException(msg); + } else if (t instanceof MessageInvisibleException) { + MessageInvisibleException ex = (MessageInvisibleException) t; + return new MessageInvisibleException(msg, ex.getMessageId(), ex.getReason()); } else if (t instanceof InvalidTopicNameException) { return new InvalidTopicNameException(msg); } else if (t instanceof NotSupportedException) { @@ -1105,6 +1129,9 @@ public static PulsarClientException unwrap(Throwable t) { newException = new NotConnectedException(); } else if (cause instanceof InvalidMessageException) { newException = new InvalidMessageException(msg); + } else if (cause instanceof MessageInvisibleException) { + MessageInvisibleException ex = (MessageInvisibleException) cause; + newException = new MessageInvisibleException(msg, ex.getMessageId(), ex.getReason()); } else if (cause instanceof InvalidTopicNameException) { newException = new InvalidTopicNameException(msg); } else if (cause instanceof NotSupportedException) { @@ -1164,6 +1191,7 @@ public static boolean isRetriableError(Throwable t) { || t instanceof SubscriptionNotFoundException || t instanceof UnsupportedAuthenticationException || t instanceof InvalidMessageException + || t instanceof MessageInvisibleException || t instanceof InvalidTopicNameException || t instanceof NotSupportedException || t instanceof NotAllowedException diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RandomReadResult.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RandomReadResult.java new file mode 100644 index 0000000000000..b43f687927c8d --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RandomReadResult.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.List; +import java.util.Optional; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Result for one physical entry slot covered by a RandomReader read. + * + * @param the message payload type + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface RandomReadResult { + + /** + * @return the physical entry message id for this slot + */ + MessageId getMessageId(); + + /** + * @return whether this entry is visible as client messages + */ + boolean isVisible(); + + /** + * Return decoded messages for a visible entry. + * + * @throws PulsarClientException.MessageInvisibleException if this entry is invisible + */ + List> getMessages() throws PulsarClientException.MessageInvisibleException; + + /** + * @return the invisibility exception for an invisible entry, otherwise empty + */ + Optional getInvisibleException(); +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RandomReader.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RandomReader.java new file mode 100644 index 0000000000000..257ef3aa0d17b --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RandomReader.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * A RandomReader reads messages from specific positions in a Pulsar topic without creating a subscription + * or managed cursor. + * + *

Unlike {@link Reader} and {@link Consumer}, RandomReader does not create a subscription or + * {@code ManagedCursor}. It reads raw entries directly from the managed ledger and returns all decoded + * result slots for those entries. + * + *

{@code numberOfBatches} specifies the maximum number of persisted batches (entries) to read. + * Each batch may contain multiple messages. Visible entries return all decoded messages from that entry. + * Invisible entries return a result slot whose {@link RandomReadResult#getMessages()} method throws + * {@link PulsarClientException.MessageInvisibleException}. + * + *

RandomReader is a bounded, point-in-time reader. It reads available entries from the specified + * start position to the current topic end without waiting for future entries. + * + * @param the message payload type + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface RandomReader extends Closeable { + + /** + * Read messages from the specified start position. + * + * @param startPosition the position to start reading from, inclusive. Must be a concrete message id + * with non-negative ledger id and entry id. {@link MessageId#earliest} and + * {@link MessageId#latest} are not supported. + * @param numberOfBatches the maximum number of persisted batches (entries) to read + * @return a future that completes with one result slot per covered entry + */ + CompletableFuture>> read(MessageId startPosition, int numberOfBatches); + + + default CompletableFuture>> read(MessageId startPosition) { + return read(startPosition, 1); + } + + /** + * @return the topic name of this RandomReader + */ + String getTopic(); + + /** + * Close the RandomReader synchronously. + */ + @Override + void close() throws PulsarClientException; + + /** + * Close the RandomReader asynchronously. + * + * @return a future that completes when the RandomReader is closed + */ + CompletableFuture closeAsync(); +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RandomReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RandomReaderBuilder.java new file mode 100644 index 0000000000000..40a8802c2ef80 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RandomReaderBuilder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Builder for creating {@link RandomReader} instances. + * + * @param the message payload type + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface RandomReaderBuilder extends Cloneable { + + /** + * Create a RandomReader synchronously. + */ + RandomReader create() throws PulsarClientException; + + /** + * Create a RandomReader asynchronously. + */ + CompletableFuture> createAsync(); + + /** + * Load configuration from a map. + */ + RandomReaderBuilder loadConf(Map config); + + /** + * Clone the builder. + */ + RandomReaderBuilder clone(); + + /** + * Set the topic name. + */ + RandomReaderBuilder topic(String topicName); + + /** + * Set the reader name. + */ + RandomReaderBuilder readerName(String readerName); + + /** + * Set a custom message payload processor. + */ + RandomReaderBuilder messagePayloadProcessor(MessagePayloadProcessor payloadProcessor); + + /** + * Enable or disable pooled messages. + */ + RandomReaderBuilder poolMessages(boolean poolMessages); + + /** + * If set to true, the RandomReader will filter out messages from aborted transactions + * (READ_COMMITTED). By default (false), all messages including those from aborted + * transactions are returned (READ_UNCOMMITTED). + */ + RandomReaderBuilder readCommitted(boolean readCommitted); + + /** + * Add a key-value property. + */ + RandomReaderBuilder property(String key, String value); + + /** + * Set multiple key-value properties. + */ + RandomReaderBuilder properties(Map properties); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 97430e72a221f..97dca6b018294 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -88,6 +88,10 @@ import org.apache.pulsar.common.api.proto.CommandNewTxnResponse; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; import org.apache.pulsar.common.api.proto.CommandProducerSuccess; +import org.apache.pulsar.common.api.proto.CommandRandomReadEntryResult; +import org.apache.pulsar.common.api.proto.CommandRandomReadMessage; +import org.apache.pulsar.common.api.proto.CommandRandomReadResponse; +import org.apache.pulsar.common.api.proto.CommandRandomReaderSuccess; import org.apache.pulsar.common.api.proto.CommandReachedEndOfTopic; import org.apache.pulsar.common.api.proto.CommandSendError; import org.apache.pulsar.common.api.proto.CommandSendReceipt; @@ -190,6 +194,12 @@ public class ClientCnx extends PulsarHandler { .concurrencyLevel(1) .build(); + private final ConcurrentLongHashMap> randomReaders = + ConcurrentLongHashMap.>newBuilder() + .expectedItems(8) + .concurrencyLevel(1) + .build(); + private final CompletableFuture connectionFuture = new CompletableFuture(); private final ConcurrentLinkedQueue requestTimeoutQueue = new ConcurrentLinkedQueue<>(); @@ -275,7 +285,9 @@ private enum RequestType { GetSchema, GetOrCreateSchema, AckResponse, - Lookup; + Lookup, + RandomReaderCreate, + RandomRead; String getDescription() { if (this == Command) { @@ -389,10 +401,13 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { scalableConsumerSessions.forEach((__, session) -> session.connectionClosed()); scalableTopicsWatchers.forEach((__, session) -> session.connectionClosed()); + randomReaders.forEach((id, session) -> + session.connectionClosed(this, Optional.empty(), Optional.empty())); waitingLookupRequests.clear(); producers.clear(); consumers.clear(); + randomReaders.clear(); topicListWatchers.clear(); dagWatchSessions.clear(); scalableConsumerSessions.clear(); @@ -776,21 +791,40 @@ protected void handleTopicMigrated(CommandTopicMigrated commandTopicMigrated) { final String serviceUrlTls = commandTopicMigrated.hasBrokerServiceUrlTls() ? commandTopicMigrated.getBrokerServiceUrlTls() : null; - HandlerState resource = commandTopicMigrated.getResourceType() == ResourceType.Producer - ? producers.get(resourceId) - : consumers.get(resourceId); - log.info().attr("resourceType", commandTopicMigrated.getResourceType().name()) + final ResourceType resourceType = commandTopicMigrated.getResourceType(); + log.info().attr("resourceType", resourceType.name()) .attr("serviceUrl", serviceUrl) .attr("serviceUrlTls", serviceUrlTls) .log("Resource migrated to new service url"); - if (resource != null) { - try { - resource.setRedirectedClusterURI(serviceUrl, serviceUrlTls); - } catch (URISyntaxException e) { - log.info().attr("serviceUrl", serviceUrl) - .attr("serviceUrlTls", serviceUrlTls) - .attr("resourceId", resourceId) - .log("Invalid redirect URL"); + + if (resourceType == ResourceType.RandomReader) { + RandomReaderImpl.RandomReaderSession session = randomReaders.remove(resourceId); + if (session != null) { + try { + session.setRedirectedClusterURI(serviceUrl, serviceUrlTls); + } catch (URISyntaxException e) { + log.info().attr("serviceUrl", serviceUrl) + .attr("serviceUrlTls", serviceUrlTls) + .attr("resourceId", resourceId) + .log("Invalid redirect URL for RandomReader"); + } + Optional hostUri = Optional.ofNullable(session.redirectedClusterURI); + Optional initialDelay = hostUri.isPresent() ? Optional.of(0L) : Optional.empty(); + session.connectionClosed(this, initialDelay, hostUri); + } + } else { + HandlerState resource = resourceType == ResourceType.Producer + ? producers.get(resourceId) + : consumers.get(resourceId); + if (resource != null) { + try { + resource.setRedirectedClusterURI(serviceUrl, serviceUrlTls); + } catch (URISyntaxException e) { + log.info().attr("serviceUrl", serviceUrl) + .attr("serviceUrlTls", serviceUrlTls) + .attr("resourceId", resourceId) + .log("Invalid redirect URL"); + } } } } @@ -1576,6 +1610,76 @@ void removeConsumer(final long consumerId) { consumers.remove(consumerId); } + void registerRandomReader(final long randomReaderId, + final RandomReaderImpl.RandomReaderSession session) { + randomReaders.put(randomReaderId, session); + } + + void removeRandomReader(final long randomReaderId) { + randomReaders.remove(randomReaderId); + } + + void registerPendingReadTimeout(long requestId, TimedCompletableFuture future) { + pendingRequests.put(requestId, future); + requestTimeoutQueue.add(new RequestTime(requestId, RequestType.RandomRead)); + } + + void removePendingReadTimeout(long requestId, TimedCompletableFuture future) { + pendingRequests.remove(requestId, future); + } + + CompletableFuture sendRandomReaderCreate(ByteBuf command, long requestId) { + TimedCompletableFuture future = new TimedCompletableFuture<>(); + sendRequestAndHandleTimeout(command, requestId, RequestType.RandomReaderCreate, true, future); + return future; + } + + @Override + protected void handleRandomReaderSuccess(CommandRandomReaderSuccess success) { + long requestId = success.getRequestId(); + @SuppressWarnings("unchecked") + TimedCompletableFuture future = + (TimedCompletableFuture) pendingRequests.remove(requestId); + if (future != null) { + future.complete(success); + } + } + + @Override + protected void handleRandomReadMessage(CommandRandomReadMessage command, ByteBuf headersAndPayload) { + RandomReaderImpl.RandomReaderSession session = randomReaders.get(command.getRandomReaderId()); + if (session != null) { + session.messageReceived(command, headersAndPayload, this); + } + } + + @Override + protected void handleRandomReadEntryResult(CommandRandomReadEntryResult command) { + RandomReaderImpl.RandomReaderSession session = randomReaders.get(command.getRandomReaderId()); + if (session != null) { + session.entryResultReceived(command); + } + } + + @Override + protected void handleRandomReadResponse(CommandRandomReadResponse response) { + long requestId = response.getRequestId(); + RandomReaderImpl.RandomReaderSession session = randomReaders.get(response.getRandomReaderId()); + if (session == null || !session.hasPendingRead(requestId)) { + return; + } + TimedCompletableFuture requestFuture = pendingRequests.remove(requestId); + if (requestFuture != null) { + requestFuture.markAsResponded(); + } + if (response.hasError()) { + session.failPendingRead(response.getRequestId(), + getPulsarClientException(response.getError(), response.getMessage())); + } else { + session.completePendingRead(response.getRequestId(), response.getNumberOfEntries()); + } + } + void removeTopicListWatcher(final long watcherId) { topicListWatchers.remove(watcherId); } @@ -1758,6 +1862,9 @@ public boolean idleCheck() { if (!scalableTopicsWatchers.isEmpty()) { return false; } + if (!randomReaders.isEmpty()) { + return false; + } return true; } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index a8c7b46b3db1e..657a9f69ba399 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -63,6 +63,8 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.RandomReader; +import org.apache.pulsar.client.api.RandomReaderBuilder; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.RegexSubscriptionMode; @@ -75,6 +77,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.client.impl.conf.RandomReaderConfigurationData; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.impl.metrics.MemoryBufferStats; @@ -368,6 +371,22 @@ public ReaderBuilder newReader(Schema schema) { return new ReaderBuilderImpl<>(this, schema); } + @Override + public RandomReaderBuilder newRandomReader() { + return new RandomReaderBuilderImpl<>(this, Schema.BYTES); + } + + @Override + public RandomReaderBuilder newRandomReader(Schema schema) { + return new RandomReaderBuilderImpl<>(this, schema); + } + + CompletableFuture> createRandomReaderAsync(RandomReaderConfigurationData conf, + Schema schema) { + return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName()) + .thenCompose(schemaClone -> RandomReaderImpl.create(this, conf, schemaClone)); + } + /** * @deprecated use {@link #newTableView(Schema)} instead. */ @@ -1321,6 +1340,10 @@ public long newRequestId() { return requestIdGenerator.getAndIncrement(); } + long newRandomReaderId() { + return consumerIdGenerator.getAndIncrement(); + } + public ConnectionPool getCnxPool() { return cnxPool; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RandomReadResultImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RandomReadResultImpl.java new file mode 100644 index 0000000000000..6cbf131b286e1 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RandomReadResultImpl.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.List; +import java.util.Optional; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageInvisibleReason; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.RandomReadResult; + +public class RandomReadResultImpl implements RandomReadResult { + private final MessageId messageId; + private final List> messages; + private final PulsarClientException.MessageInvisibleException invisibleException; + + private RandomReadResultImpl(MessageId messageId, List> messages, + PulsarClientException.MessageInvisibleException invisibleException) { + this.messageId = messageId; + this.messages = messages; + this.invisibleException = invisibleException; + } + + public static RandomReadResultImpl visible(MessageId messageId, List> messages) { + return new RandomReadResultImpl<>(messageId, List.copyOf(messages), null); + } + + public static RandomReadResultImpl invisible(MessageId messageId, MessageInvisibleReason reason) { + return new RandomReadResultImpl<>(messageId, null, + new PulsarClientException.MessageInvisibleException(messageId, reason)); + } + + @Override + public MessageId getMessageId() { + return messageId; + } + + @Override + public boolean isVisible() { + return invisibleException == null; + } + + @Override + public List> getMessages() throws PulsarClientException.MessageInvisibleException { + if (invisibleException != null) { + throw invisibleException; + } + return messages; + } + + @Override + public Optional getInvisibleException() { + return Optional.ofNullable(invisibleException); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RandomReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RandomReaderBuilderImpl.java new file mode 100644 index 0000000000000..405302cc4cb39 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RandomReaderBuilderImpl.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.RandomReader; +import org.apache.pulsar.client.api.RandomReaderBuilder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; +import org.apache.pulsar.client.impl.conf.RandomReaderConfigurationData; +import org.apache.pulsar.common.util.FutureUtil; + +public class RandomReaderBuilderImpl implements RandomReaderBuilder { + private final PulsarClientImpl client; + private RandomReaderConfigurationData conf; + private final Schema schema; + + public RandomReaderBuilderImpl(PulsarClientImpl client, Schema schema) { + this(client, new RandomReaderConfigurationData<>(), schema); + } + + private RandomReaderBuilderImpl(PulsarClientImpl client, RandomReaderConfigurationData conf, Schema schema) { + this.client = client; + this.conf = conf; + this.schema = schema; + } + + @Override + public RandomReader create() throws PulsarClientException { + try { + return FutureUtil.getAndCleanupOnInterrupt(createAsync(), RandomReader::closeAsync); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + + @Override + public CompletableFuture> createAsync() { + if (StringUtils.isBlank(conf.getTopicName())) { + return FutureUtil.failedFuture(new IllegalArgumentException( + "Topic name must be set on the random reader builder")); + } + return client.createRandomReaderAsync(conf, schema); + } + + @Override + public RandomReaderBuilder clone() { + return new RandomReaderBuilderImpl<>(client, conf.clone(), schema); + } + + @Override + public RandomReaderBuilder topic(String topicName) { + conf.setTopicName(StringUtils.trim(topicName)); + return this; + } + + @Override + public RandomReaderBuilder loadConf(Map config) { + conf = ConfigurationDataUtils.loadData(config, conf, RandomReaderConfigurationData.class); + return this; + } + + @Override + public RandomReaderBuilder readerName(String readerName) { + conf.setReaderName(readerName); + return this; + } + + @Override + public RandomReaderBuilder messagePayloadProcessor(MessagePayloadProcessor payloadProcessor) { + conf.setPayloadProcessor(payloadProcessor); + return this; + } + + @Override + public RandomReaderBuilder poolMessages(boolean poolMessages) { + conf.setPoolMessages(poolMessages); + return this; + } + + @Override + public RandomReaderBuilder readCommitted(boolean readCommitted) { + conf.setReadCommitted(readCommitted); + return this; + } + + @Override + public RandomReaderBuilder property(String key, String value) { + checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value), + "property key/value cannot be blank"); + conf.getProperties().put(key, value); + return this; + } + + @Override + public RandomReaderBuilder properties(Map properties) { + properties.entrySet().forEach(entry -> + checkArgument(StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue()), + "properties' key/value cannot be blank")); + conf.getProperties().putAll(properties); + return this; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RandomReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RandomReaderImpl.java new file mode 100644 index 0000000000000..dff783d1a1abd --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RandomReaderImpl.java @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.MessageInvisibleReason; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.RandomReadResult; +import org.apache.pulsar.client.api.RandomReader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.conf.RandomReaderConfigurationData; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; +import org.apache.pulsar.client.util.TimedCompletableFuture; +import org.apache.pulsar.common.api.proto.CommandRandomReadEntryResult; +import org.apache.pulsar.common.api.proto.CommandRandomReadMessage; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.api.proto.ProtocolVersion; +import org.apache.pulsar.common.api.proto.RandomReadInvisibleReason; +import org.apache.pulsar.common.api.proto.SingleMessageMetadata; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.FutureUtil; + +public class RandomReaderImpl extends HandlerState implements RandomReader { + + private final RandomReaderConfigurationData conf; + private final Schema schema; + private final boolean partitionedTopic; + private final ConcurrentHashMap> sessions = new ConcurrentHashMap<>(); + + static CompletableFuture> create(PulsarClientImpl client, + RandomReaderConfigurationData conf, + Schema schema) { + return client.getPartitionedTopicMetadata(conf.getTopicName(), false, false) + .thenApply(metadata -> { + RandomReaderImpl reader = new RandomReaderImpl<>(client, conf, schema, + metadata.partitions > 0); + reader.changeToReadyState(); + return reader; + }); + } + + private RandomReaderImpl(PulsarClientImpl client, RandomReaderConfigurationData conf, Schema schema, + boolean partitionedTopic) { + super(client, conf.getTopicName()); + this.conf = conf; + this.schema = schema; + this.partitionedTopic = partitionedTopic; + } + + boolean isOpen() { + return getState() == State.Ready; + } + + boolean isClosed() { + return getState() == State.Closed || getState() == State.Closing || getState() == State.Failed; + } + + @Override + public CompletableFuture>> read(MessageId startPosition, int numberOfBatches) { + if (!isOpen()) { + return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException( + "RandomReader is not ready or already closed")); + } + if (numberOfBatches <= 0) { + return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException( + "numberOfBatches must be positive")); + } + MessageIdAdv messageId; + try { + messageId = unwrapAndValidate(startPosition); + } catch (RuntimeException e) { + return FutureUtil.failedFuture(e); + } + return getSession(messageId.getPartitionIndex()) + .thenCompose(session -> session.read(messageId, numberOfBatches)); + } + + private MessageIdAdv unwrapAndValidate(MessageId startPosition) { + if (startPosition == MessageId.earliest || startPosition == MessageId.latest) { + throw new IllegalArgumentException("RandomReader requires a concrete message id"); + } + if (!(startPosition instanceof MessageIdAdv messageId)) { + throw new IllegalArgumentException("RandomReader requires a concrete Pulsar message id"); + } + if (messageId.getLedgerId() < 0 || messageId.getEntryId() < 0) { + throw new IllegalArgumentException("RandomReader requires non-negative ledger and entry ids"); + } + if (messageId instanceof BatchMessageIdImpl && messageId.getBatchIndex() >= 0) { + throw new IllegalArgumentException("RandomReader does not support batch-index message ids"); + } + int partition = messageId.getPartitionIndex(); + if (partitionedTopic && partition < 0) { + throw new IllegalArgumentException("Partitioned topics require a message id with a partition"); + } + return messageId; + } + + private CompletableFuture> getSession(int partitionIndex) { + int concretePartition = partitionedTopic ? partitionIndex : -1; + return sessions.computeIfAbsent(concretePartition, this::newSession).connect(); + } + + private RandomReaderSession newSession(int concretePartition) { + String sessionTopic = concretePartition >= 0 + ? TopicName.get(conf.getTopicName()).getPartition(concretePartition).toString() + : conf.getTopicName(); + return new RandomReaderSession<>(this, sessionTopic, concretePartition); + } + + @Override + public void close() throws PulsarClientException { + try { + closeAsync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw PulsarClientException.unwrap(e); + } catch (ExecutionException e) { + throw PulsarClientException.unwrap(e); + } + } + + @Override + public CompletableFuture closeAsync() { + setState(State.Closing); + List> futures = new ArrayList<>(); + sessions.values().forEach(s -> futures.add(s.closeAsync())); + return FutureUtil.waitForAll(futures).thenRun(() -> setState(State.Closed)); + } + + @Override + public String getTopic() { + return conf.getTopicName(); + } + + @Override + String getHandlerName() { + return "random-reader"; + } + + Schema schema() { + return schema; + } + + RandomReaderConfigurationData conf() { + return conf; + } + + // ---- Inner classes ---- + + static final class RandomReaderSession extends HandlerState implements ConnectionHandler.Connection { + + @Override + String getHandlerName() { + return "random-reader-session"; + } + + private final long randomReaderId; + private final int partitionIndex; + private final String sessionTopic; + private final RandomReaderImpl parent; + private final ConnectionHandler connectionHandler; + private final AtomicReference>> connectFuture = + new AtomicReference<>(new CompletableFuture<>()); + private final AtomicReference> pendingRead = new AtomicReference<>(); + private final AtomicInteger previousExceptionCount = new AtomicInteger(); + private volatile long connectDeadline; + private volatile boolean createdOnce; + + RandomReaderSession(RandomReaderImpl parent, String sessionTopic, int partitionIndex) { + super(parent.client, sessionTopic); + this.parent = parent; + this.randomReaderId = parent.client.newRandomReaderId(); + this.partitionIndex = partitionIndex; + this.sessionTopic = sessionTopic; + this.connectionHandler = new ConnectionHandler(this, newBackoff(), this); + } + + private Backoff newBackoff() { + return Backoff.builder() + .initialDelay(Duration.ofNanos(parent.client.getConfiguration() + .getInitialBackoffIntervalNanos())) + .maxBackoff(Duration.ofNanos(parent.client.getConfiguration() + .getMaxBackoffIntervalNanos())) + .build(); + } + + CompletableFuture> connect() { + if (getState() == State.Ready) { + return CompletableFuture.completedFuture(this); + } + if (getState() == State.Connecting) { + return connectFuture.get(); + } + CompletableFuture> nextConnectFuture = newConnectFuture(); + if (!changeToConnecting()) { + nextConnectFuture.completeExceptionally( + new PulsarClientException("RandomReader session is closed")); + return nextConnectFuture; + } + connectionHandler.grabCnx(); + return nextConnectFuture; + } + + private CompletableFuture> newConnectFuture() { + CompletableFuture> next = new CompletableFuture<>(); + connectFuture.set(next); + connectDeadline = System.currentTimeMillis() + parent.client.getConfiguration().getOperationTimeoutMs(); + return next; + } + + @Override + public CompletableFuture connectionOpened(ClientCnx clientCnx) { + if (clientCnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v22.getValue()) { + return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException( + "Broker does not support RandomReader protocol")); + } + if (!changeToConnecting()) { + clientCnx.removeRandomReader(randomReaderId); + return CompletableFuture.completedFuture(null); + } + connectionHandler.setClientCnx(clientCnx); + clientCnx.registerRandomReader(randomReaderId, this); + + SchemaInfo schemaInfo = parent.schema().getSchemaInfo(); + if (schemaInfo != null && (SchemaType.BYTES == schemaInfo.getType() + || SchemaType.NONE == schemaInfo.getType())) { + schemaInfo = null; + } else if (parent.schema() instanceof AutoConsumeSchema + && Commands.peerSupportsCarryAutoConsumeSchemaToBroker( + clientCnx.getRemoteEndpointProtocolVersion())) { + schemaInfo = AutoConsumeSchema.SCHEMA_INFO; + } + + long requestId = client.newRequestId(); + ByteBuf command = Commands.newRandomReader(sessionTopic, randomReaderId, requestId, + parent.conf.getReaderName(), schemaInfo, parent.conf.getProperties(), + parent.conf.isReadCommitted()); + return clientCnx.sendRandomReaderCreate(command, requestId).thenAccept(success -> { + if (changeToReadyState()) { + createdOnce = true; + connectionHandler.resetBackoff(); + connectFuture.get().complete(this); + } + }).exceptionally(ex -> { + clientCnx.removeRandomReader(randomReaderId); + connectionHandler.setClientCnx(null); + throw FutureUtil.wrapToCompletionException(ex); + }); + } + + @Override + public boolean connectionFailed(PulsarClientException exception) { + boolean nonRetriable = !PulsarClientException.isRetriableError(exception); + boolean initialCreateTimedOut = !createdOnce && System.currentTimeMillis() > connectDeadline; + if (nonRetriable || initialCreateTimedOut) { + exception.setPreviousExceptionCount(previousExceptionCount); + connectFuture.get().completeExceptionally(exception); + ClientCnx current = connectionHandler.cnx(); + if (current != null) { + current.removeRandomReader(randomReaderId); + connectionHandler.setClientCnx(null); + } + setState(State.Failed); + return false; + } + previousExceptionCount.incrementAndGet(); + return true; + } + + void connectionClosed(ClientCnx cnx, Optional initialDelay, Optional hostUrl) { + newConnectFuture(); + PendingRandomRead pending = pendingRead.get(); + if (pending != null) { + pending.fail(new PulsarClientException("Connection closed")); + } + connectionHandler.connectionClosed(cnx, initialDelay, hostUrl); + } + + CompletableFuture>> read(MessageIdAdv startPosition, int numberOfBatches) { + CompletableFuture>> future = new CompletableFuture<>(); + long deadline = System.currentTimeMillis() + + parent.client.getConfiguration().getOperationTimeoutMs(); + doRead(startPosition, numberOfBatches, future, deadline, newBackoff()); + return future; + } + + private void doRead(MessageIdAdv startPosition, int numberOfBatches, + CompletableFuture>> resultFuture, long deadline, + Backoff readBackoff) { + if (isTerminalState()) { + resultFuture.completeExceptionally( + new PulsarClientException.AlreadyClosedException("RandomReader session is closed")); + return; + } + if (System.currentTimeMillis() > deadline) { + resultFuture.completeExceptionally( + new PulsarClientException("RandomReader read operation timed out")); + return; + } + ClientCnx current = connectionHandler.cnx(); + if (current == null || getState() != State.Ready) { + client.timer().newTimeout( + __ -> doRead(startPosition, numberOfBatches, resultFuture, deadline, readBackoff), + 100, TimeUnit.MILLISECONDS); + return; + } + PendingRandomRead pending = new PendingRandomRead<>(startPosition, numberOfBatches, this); + if (!pendingRead.compareAndSet(null, pending)) { + resultFuture.completeExceptionally(new PulsarClientException.TooManyRequestsException( + "RandomReader already has a read in flight")); + return; + } + long requestId = client.newRequestId(); + pending.requestId = requestId; + ByteBuf command = Commands.newRandomRead(randomReaderId, requestId, + pending.startPosition.getLedgerId(), pending.startPosition.getEntryId(), + partitionIndex, pending.numberOfBatches); + current.registerPendingReadTimeout(requestId, pending.getFuture()); + current.ctx().writeAndFlush(command).addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + failPendingRead(requestId, writeFuture.cause()); + } + }); + pending.future.whenComplete((result, ex) -> { + current.removePendingReadTimeout(requestId, pending.getFuture()); + if (ex == null) { + readBackoff.reset(); + resultFuture.complete(result); + return; + } + pending.releaseVisibleMessages(); + if (!isRetriableForRead(ex) || System.currentTimeMillis() > deadline) { + resultFuture.completeExceptionally(ex); + return; + } + client.timer().newTimeout( + __ -> doRead(startPosition, numberOfBatches, resultFuture, deadline, readBackoff), + readBackoff.next().toMillis(), TimeUnit.MILLISECONDS); + }); + } + + private boolean isTerminalState() { + return getState() == State.Closed || getState() == State.Closing || getState() == State.Failed; + } + + private static boolean isRetriableForRead(Throwable ex) { + return PulsarClientException.isRetriableError(ex) + && !(ex instanceof PulsarClientException.BrokerMetadataException) + && !(ex instanceof PulsarClientException.AlreadyClosedException); + } + + void messageReceived(CommandRandomReadMessage command, ByteBuf headersAndPayload, ClientCnx cnx) { + PendingRandomRead pending = pendingRead.get(); + if (pending != null && pending.requestId == command.getRequestId()) { + pending.messageReceived(command, headersAndPayload, cnx); + } + } + + void entryResultReceived(CommandRandomReadEntryResult command) { + PendingRandomRead pending = pendingRead.get(); + if (pending != null && pending.requestId == command.getRequestId()) { + pending.entryResultReceived(command); + } + } + + boolean hasPendingRead(long requestId) { + PendingRandomRead pending = pendingRead.get(); + return pending != null && pending.requestId == requestId; + } + + void completePendingRead(long requestId, int expectedResultCount) { + PendingRandomRead pending = pendingRead.get(); + if (pending != null && pending.requestId == requestId) { + pending.complete(expectedResultCount); + } + } + + void failPendingRead(long requestId, Throwable cause) { + PendingRandomRead pending = pendingRead.get(); + if (pending != null && pending.requestId == requestId) { + pending.fail(cause); + } + } + + void failPendingRead(Throwable cause) { + PendingRandomRead pending = pendingRead.get(); + if (pending != null) { + pending.fail(cause); + } + } + + CompletableFuture closeAsync() { + setState(State.Closing); + PendingRandomRead pending = pendingRead.get(); + if (pending != null) { + pending.fail(new PulsarClientException.AlreadyClosedException( + "RandomReader session is closed")); + } + ClientCnx current = connectionHandler.cnx(); + if (current != null) { + long requestId = client.newRequestId(); + ByteBuf cmd = Commands.newCloseRandomReader(randomReaderId, requestId); + current.removeRandomReader(randomReaderId); + connectionHandler.setClientCnx(null); + current.ctx().writeAndFlush(cmd); + } + setState(State.Closed); + return CompletableFuture.completedFuture(null); + } + } + + static final class PendingRandomRead { + private final MessageIdAdv startPosition; + private final int numberOfBatches; + final RandomReaderSession session; + private final TimedCompletableFuture>> future = new TimedCompletableFuture<>(); + private final List> results = new ArrayList<>(); + volatile long requestId; + + PendingRandomRead(MessageIdAdv startPosition, int numberOfBatches, + RandomReaderSession session) { + this.startPosition = startPosition; + this.numberOfBatches = numberOfBatches; + this.session = session; + this.future.whenComplete((result, ex) -> { + session.pendingRead.compareAndSet(this, null); + }); + } + + TimedCompletableFuture>> getFuture() { + return future; + } + + void complete(int expectedResultCount) { + if (results.size() != expectedResultCount) { + future.completeExceptionally(new PulsarClientException.InvalidMessageException( + "RandomReader response result count mismatch for request " + requestId + + ": expected " + expectedResultCount + " entries but received " + + results.size())); + return; + } + future.complete(List.copyOf(results)); + } + + void fail(Throwable cause) { + future.completeExceptionally(cause); + } + + void releaseVisibleMessages() { + for (RandomReadResult result : results) { + if (!result.isVisible()) { + continue; + } + try { + result.getMessages().forEach(Message::release); + } catch (PulsarClientException.MessageInvisibleException ignored) { + // Guarded by isVisible(). + } + } + results.clear(); + } + + void messageReceived(CommandRandomReadMessage command, ByteBuf headersAndPayload, ClientCnx cnx) { + try { + List> decoded = decodeEntry(command, headersAndPayload, session.parent.schema(), + session.parent.conf(), session.sessionTopic, session.partitionIndex); + results.add(RandomReadResultImpl.visible(toMessageId(command.getMessageId(), + session.partitionIndex), decoded)); + } catch (Exception e) { + session.failPendingRead(command.getRequestId(), toInvalidMessageException(e)); + } + } + + private static PulsarClientException toInvalidMessageException(Exception e) { + if (e instanceof PulsarClientException.InvalidMessageException invalidMessageException) { + return invalidMessageException; + } + String message = e.getMessage(); + return new PulsarClientException.InvalidMessageException( + "Failed to decode random read entry" + (message == null ? "" : ": " + message)); + } + + void entryResultReceived(CommandRandomReadEntryResult command) { + results.add(RandomReadResultImpl.invisible(toMessageId(command.getMessageId(), session.partitionIndex), + toClientReason(command.getInvisibleReason()))); + } + + private static MessageIdImpl toMessageId(org.apache.pulsar.common.api.proto.MessageIdData messageId, + int fallbackPartition) { + int partition = messageId.hasPartition() ? messageId.getPartition() : fallbackPartition; + return new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), partition); + } + + private static MessageInvisibleReason toClientReason(RandomReadInvisibleReason reason) { + if (reason == null) { + return MessageInvisibleReason.UNKNOWN; + } + switch (reason) { + case SERVER_ONLY_MARKER: + return MessageInvisibleReason.SERVER_ONLY_MARKER; + case TRANSACTION_MARKER: + return MessageInvisibleReason.TRANSACTION_MARKER; + case ABORTED_TRANSACTION: + return MessageInvisibleReason.ABORTED_TRANSACTION; + case DELAYED_DELIVERY: + return MessageInvisibleReason.DELAYED_DELIVERY; + case EXCEEDED_MAX_VISIBLE_POSITION: + return MessageInvisibleReason.EXCEEDED_MAX_VISIBLE_POSITION; + case UNKNOWN: + default: + return MessageInvisibleReason.UNKNOWN; + } + } + + private static List> decodeEntry(CommandRandomReadMessage command, ByteBuf headersAndPayload, + Schema schema, RandomReaderConfigurationData conf, + String topicName, int partitionIndex) + throws PulsarClientException { + headersAndPayload.markReaderIndex(); + Commands.skipChecksumIfPresent(headersAndPayload); + MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); + + MessageIdImpl entryMsgId = new MessageIdImpl(command.getMessageId().getLedgerId(), + command.getMessageId().getEntryId(), partitionIndex); + + int numMessages = msgMetadata.getNumMessagesInBatch(); + List> result = new ArrayList<>(Math.max(numMessages, 1)); + + try { + if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) { + MessageImpl message = MessageImpl.create(topicName, entryMsgId, + msgMetadata, headersAndPayload, Optional.empty(), null, schema, + 0, conf.isPoolMessages(), Commands.DEFAULT_CONSUMER_EPOCH); + result.add(message); + } else { + for (int i = 0; i < numMessages; i++) { + SingleMessageMetadata singleMetadata = new SingleMessageMetadata(); + ByteBuf singlePayload; + try { + singlePayload = Commands.deSerializeSingleMessageInBatch( + headersAndPayload, singleMetadata, i, numMessages); + } catch (IOException e) { + throw new PulsarClientException(e); + } + try { + BatchMessageIdImpl batchMsgId = new BatchMessageIdImpl( + command.getMessageId().getLedgerId(), + command.getMessageId().getEntryId(), + partitionIndex, i, numMessages, null); + MessageImpl message = MessageImpl.create(topicName, batchMsgId, + msgMetadata, singleMetadata, singlePayload, + Optional.empty(), null, schema, 0, + conf.isPoolMessages(), Commands.DEFAULT_CONSUMER_EPOCH); + result.add(message); + } finally { + singlePayload.release(); + } + } + } + } catch (PulsarClientException | RuntimeException | Error e) { + result.forEach(Message::release); + throw e; + } + return result; + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/RandomReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/RandomReaderConfigurationData.java new file mode 100644 index 0000000000000..18894b797cdbb --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/RandomReaderConfigurationData.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.conf; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.io.Serializable; +import java.util.SortedMap; +import java.util.TreeMap; +import lombok.Data; +import org.apache.pulsar.client.api.MessagePayloadProcessor; + +@Data +public class RandomReaderConfigurationData implements Serializable, Cloneable { + private static final long serialVersionUID = 1L; + + private String topicName; + private String readerName; + @JsonIgnore + private transient MessagePayloadProcessor payloadProcessor; + private boolean poolMessages; + private boolean readCommitted = false; + private SortedMap properties = new TreeMap<>(); + + @Override + @SuppressWarnings("unchecked") + public RandomReaderConfigurationData clone() { + try { + RandomReaderConfigurationData clone = (RandomReaderConfigurationData) super.clone(); + clone.properties = new TreeMap<>(properties); + return clone; + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Failed to clone RandomReaderConfigurationData", e); + } + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RandomReadResultImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RandomReadResultImplTest.java new file mode 100644 index 0000000000000..e86b524d88078 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RandomReadResultImplTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.lang.reflect.Method; +import java.util.List; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageInvisibleReason; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.conf.RandomReaderConfigurationData; +import org.apache.pulsar.common.api.proto.CommandRandomReadMessage; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; +import org.testng.annotations.Test; + +public class RandomReadResultImplTest { + + @Test + public void testInvisibleResultThrowsMessageInvisibleException() { + MessageIdImpl messageId = new MessageIdImpl(3L, 4L, 0); + RandomReadResultImpl result = RandomReadResultImpl.invisible( + messageId, MessageInvisibleReason.ABORTED_TRANSACTION); + + assertFalse(result.isVisible()); + PulsarClientException.MessageInvisibleException exception = + expectThrows(PulsarClientException.MessageInvisibleException.class, result::getMessages); + assertEquals(exception.getMessageId(), messageId); + assertEquals(exception.getReason(), MessageInvisibleReason.ABORTED_TRANSACTION); + assertTrue(result.getInvisibleException().isPresent()); + assertEquals(result.getInvisibleException().get().getReason(), MessageInvisibleReason.ABORTED_TRANSACTION); + } + + @Test + public void testVisibleResultReturnsMessages() throws Exception { + MessageIdImpl messageId = new MessageIdImpl(3L, 4L, 0); + RandomReadResultImpl result = RandomReadResultImpl.visible(messageId, List.of()); + + assertTrue(result.isVisible()); + assertEquals(result.getMessageId(), messageId); + assertTrue(result.getMessages().isEmpty()); + assertTrue(result.getInvisibleException().isEmpty()); + } + + @Test + public void testDecodeVisibleSingleEntryWithShortPayload() throws Exception { + MessageMetadata metadata = new MessageMetadata() + .setProducerName("p") + .setSequenceId(1L) + .setPublishTime(System.currentTimeMillis()); + ByteBuf data = Commands.serializeMetadataAndPayload( + Commands.ChecksumType.Crc32c, metadata, Unpooled.wrappedBuffer(new byte[] {9})); + try { + CommandRandomReadMessage command = Commands.newRandomReadMessageCommand(7L, 11L, 3L, 4L, 0) + .getRandomReadMessage(); + RandomReaderConfigurationData conf = new RandomReaderConfigurationData<>(); + + List> messages = decodeEntry(command, data, Schema.BYTES, conf); + + assertEquals(messages.size(), 1); + assertEquals(messages.get(0).getMessageId(), new MessageIdImpl(3L, 4L, 0)); + assertEquals(messages.get(0).getData(), new byte[] {9}); + } finally { + data.release(); + } + } + + @SuppressWarnings("unchecked") + private static List> decodeEntry(CommandRandomReadMessage command, ByteBuf data, + Schema schema, + RandomReaderConfigurationData conf) throws Exception { + Method method = RandomReaderImpl.PendingRandomRead.class.getDeclaredMethod("decodeEntry", + CommandRandomReadMessage.class, ByteBuf.class, Schema.class, RandomReaderConfigurationData.class, + String.class, int.class); + method.setAccessible(true); + return (List>) method.invoke(null, command, data, schema, conf, + "persistent://public/default/t1", 0); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/CommandUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/CommandUtils.java index 7edfbf889531b..052f339f63845 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/CommandUtils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/CommandUtils.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.pulsar.common.api.proto.CommandProducer; +import org.apache.pulsar.common.api.proto.CommandRandomReader; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.KeyValue; @@ -41,6 +42,10 @@ public static Map metadataFromCommand(CommandSubscribe commandSu return toMap(commandSubscribe.getMetadatasList()); } + public static Map metadataFromCommand(CommandRandomReader commandRandomReader) { + return toMap(commandRandomReader.getMetadatasList()); + } + private static Map toMap(List keyValues) { if (keyValues == null || keyValues.isEmpty()) { return Collections.emptyMap(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index e6cb2605315ad..80023179f7c64 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -85,6 +85,12 @@ import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; import org.apache.pulsar.common.api.proto.CommandProducer; import org.apache.pulsar.common.api.proto.CommandProducerSuccess; +import org.apache.pulsar.common.api.proto.CommandRandomRead; +import org.apache.pulsar.common.api.proto.CommandRandomReadEntryResult; +import org.apache.pulsar.common.api.proto.CommandRandomReadMessage; +import org.apache.pulsar.common.api.proto.CommandRandomReadResponse; +import org.apache.pulsar.common.api.proto.CommandRandomReader; +import org.apache.pulsar.common.api.proto.CommandRandomReaderSuccess; import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages; import org.apache.pulsar.common.api.proto.CommandScalableTopicAssignmentUpdate; import org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse; @@ -105,6 +111,7 @@ import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; +import org.apache.pulsar.common.api.proto.RandomReadInvisibleReason; import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment; import org.apache.pulsar.common.api.proto.ScalableConsumerType; import org.apache.pulsar.common.api.proto.ScalableTopicDAG; @@ -2411,4 +2418,122 @@ public static ProducerAccessMode convertProducerAccessMode( public static boolean peerSupportsBrokerMetadata(int peerVersion) { return peerVersion >= ProtocolVersion.v16.getValue(); } + + public static ByteBuf newRandomReader(String topic, long randomReaderId, long requestId, String readerName, + SchemaInfo schemaInfo, Map metadata, + boolean readCommitted) { + BaseCommand cmd = localCmd(Type.RANDOM_READER); + CommandRandomReader reader = cmd.setRandomReader() + .setTopic(topic) + .setRandomReaderId(randomReaderId) + .setRequestId(requestId); + if (readerName != null) { + reader.setReaderName(readerName); + } + if (schemaInfo != null) { + convertSchema(schemaInfo, reader.setSchema()); + } + metadata.forEach((key, value) -> reader.addMetadata().setKey(key).setValue(value)); + if (readCommitted) { + reader.setReadCommitted(true); + } + return serializeWithSize(cmd); + } + + public static BaseCommand newRandomReaderSuccessCommand(long requestId, long randomReaderId, String readerName) { + BaseCommand cmd = localCmd(Type.RANDOM_READER_SUCCESS); + CommandRandomReaderSuccess success = cmd.setRandomReaderSuccess() + .setRequestId(requestId) + .setRandomReaderId(randomReaderId); + if (readerName != null) { + success.setReaderName(readerName); + } + return cmd; + } + + public static ByteBuf newRandomRead(long randomReaderId, long requestId, long ledgerId, long entryId, + int partitionIndex, int numberOfEntries) { + BaseCommand cmd = localCmd(Type.RANDOM_READ); + CommandRandomRead read = cmd.setRandomRead() + .setRandomReaderId(randomReaderId) + .setRequestId(requestId) + .setNumberOfEntries(numberOfEntries); + read.setStartMessageId() + .setLedgerId(ledgerId) + .setEntryId(entryId); + if (partitionIndex >= 0) { + read.getStartMessageId().setPartition(partitionIndex); + } + return serializeWithSize(cmd); + } + + public static BaseCommand newRandomReadMessageCommand(long randomReaderId, long requestId, long ledgerId, + long entryId, int partition) { + BaseCommand cmd = localCmd(Type.RANDOM_READ_MESSAGE); + CommandRandomReadMessage msg = cmd.setRandomReadMessage() + .setRandomReaderId(randomReaderId) + .setRequestId(requestId); + msg.setMessageId() + .setLedgerId(ledgerId) + .setEntryId(entryId); + if (partition >= 0) { + msg.getMessageId().setPartition(partition); + } + return cmd; + } + + public static ByteBufPair newRandomReadMessage(long randomReaderId, long requestId, long ledgerId, long entryId, + int partition, ByteBuf metadataAndPayload) { + return serializeCommandMessageWithSize( + newRandomReadMessageCommand(randomReaderId, requestId, ledgerId, entryId, partition), + metadataAndPayload); + } + + public static BaseCommand newRandomReadEntryResultCommand(long randomReaderId, long requestId, long ledgerId, + long entryId, int partition, + RandomReadInvisibleReason invisibleReason) { + BaseCommand cmd = localCmd(Type.RANDOM_READ_ENTRY_RESULT); + CommandRandomReadEntryResult result = cmd.setRandomReadEntryResult() + .setRandomReaderId(randomReaderId) + .setRequestId(requestId) + .setInvisibleReason(invisibleReason); + result.setMessageId() + .setLedgerId(ledgerId) + .setEntryId(entryId); + if (partition >= 0) { + result.getMessageId().setPartition(partition); + } + return cmd; + } + + public static ByteBuf newRandomReadEntryResult(long randomReaderId, long requestId, long ledgerId, + long entryId, int partition, + RandomReadInvisibleReason invisibleReason) { + return serializeWithSize(newRandomReadEntryResultCommand(randomReaderId, requestId, ledgerId, + entryId, partition, invisibleReason)); + } + + public static ByteBuf newRandomReadResponse(long randomReaderId, long requestId, int numberOfEntries, + ServerError error, String message) { + BaseCommand cmd = localCmd(Type.RANDOM_READ_RESPONSE); + CommandRandomReadResponse response = cmd.setRandomReadResponse() + .setRandomReaderId(randomReaderId) + .setRequestId(requestId) + .setNumberOfEntries(numberOfEntries); + if (error != null) { + response.setError(error); + } + if (message != null) { + response.setMessage(message); + } + return serializeWithSize(cmd); + } + + public static ByteBuf newCloseRandomReader(long randomReaderId, long requestId) { + BaseCommand cmd = localCmd(Type.CLOSE_RANDOM_READER); + cmd.setCloseRandomReader() + .setRandomReaderId(randomReaderId) + .setRequestId(requestId); + return serializeWithSize(cmd); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index bca87683f279c..7cee2086c20ab 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -39,6 +39,7 @@ import org.apache.pulsar.common.api.proto.CommandAuthResponse; import org.apache.pulsar.common.api.proto.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.CommandCloseProducer; +import org.apache.pulsar.common.api.proto.CommandCloseRandomReader; import org.apache.pulsar.common.api.proto.CommandConnect; import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.api.proto.CommandConsumerStats; @@ -70,6 +71,12 @@ import org.apache.pulsar.common.api.proto.CommandPong; import org.apache.pulsar.common.api.proto.CommandProducer; import org.apache.pulsar.common.api.proto.CommandProducerSuccess; +import org.apache.pulsar.common.api.proto.CommandRandomRead; +import org.apache.pulsar.common.api.proto.CommandRandomReadEntryResult; +import org.apache.pulsar.common.api.proto.CommandRandomReadMessage; +import org.apache.pulsar.common.api.proto.CommandRandomReadResponse; +import org.apache.pulsar.common.api.proto.CommandRandomReader; +import org.apache.pulsar.common.api.proto.CommandRandomReaderSuccess; import org.apache.pulsar.common.api.proto.CommandReachedEndOfTopic; import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages; import org.apache.pulsar.common.api.proto.CommandScalableTopicAssignmentUpdate; @@ -529,6 +536,60 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception handleCommandWatchScalableTopicsClose(cmd.getWatchScalableTopicsClose()); break; + case RANDOM_READER: + checkArgument(cmd.hasRandomReader()); + try { + interceptCommand(cmd); + handleRandomReader(cmd.getRandomReader()); + } catch (InterceptException e) { + writeAndFlush(ctx, Commands.newError(cmd.getRandomReader().getRequestId(), + getServerError(e.getErrorCode()), e.getMessage())); + } + break; + + case RANDOM_READER_SUCCESS: + checkArgument(cmd.hasRandomReaderSuccess()); + handleRandomReaderSuccess(cmd.getRandomReaderSuccess()); + break; + + case RANDOM_READ: + checkArgument(cmd.hasRandomRead()); + try { + interceptCommand(cmd); + handleRandomRead(cmd.getRandomRead()); + } catch (InterceptException e) { + writeAndFlush(ctx, Commands.newRandomReadResponse(cmd.getRandomRead().getRandomReaderId(), + cmd.getRandomRead().getRequestId(), 0, getServerError(e.getErrorCode()), e.getMessage())); + } + break; + + case RANDOM_READ_MESSAGE: { + checkArgument(cmd.hasRandomReadMessage()); + handleRandomReadMessage(cmd.getRandomReadMessage(), buffer); + break; + } + + case RANDOM_READ_ENTRY_RESULT: + checkArgument(cmd.hasRandomReadEntryResult()); + handleRandomReadEntryResult(cmd.getRandomReadEntryResult()); + break; + + case RANDOM_READ_RESPONSE: + checkArgument(cmd.hasRandomReadResponse()); + handleRandomReadResponse(cmd.getRandomReadResponse()); + break; + + case CLOSE_RANDOM_READER: + checkArgument(cmd.hasCloseRandomReader()); + try { + interceptCommand(cmd); + handleCloseRandomReader(cmd.getCloseRandomReader()); + } catch (InterceptException e) { + writeAndFlush(ctx, Commands.newError(cmd.getCloseRandomReader().getRequestId(), + getServerError(e.getErrorCode()), e.getMessage())); + } + break; + default: break; } @@ -863,4 +924,32 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { } ctx.fireUserEventTriggered(evt); } + + protected void handleRandomReader(CommandRandomReader randomReader) { + throw new UnsupportedOperationException(); + } + + protected void handleRandomReaderSuccess(CommandRandomReaderSuccess success) { + throw new UnsupportedOperationException(); + } + + protected void handleRandomRead(CommandRandomRead randomRead) { + throw new UnsupportedOperationException(); + } + + protected void handleRandomReadMessage(CommandRandomReadMessage randomReadMessage, ByteBuf headersAndPayload) { + throw new UnsupportedOperationException(); + } + + protected void handleRandomReadEntryResult(CommandRandomReadEntryResult randomReadEntryResult) { + throw new UnsupportedOperationException(); + } + + protected void handleRandomReadResponse(CommandRandomReadResponse randomReadResponse) { + throw new UnsupportedOperationException(); + } + + protected void handleCloseRandomReader(CommandCloseRandomReader closeRandomReader) { + throw new UnsupportedOperationException(); + } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 28501c8bc6637..49a42250a77ab 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -277,6 +277,7 @@ enum ProtocolVersion { v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse v20 = 20; // Add client support for topic migration redirection CommandTopicMigrated v21 = 21; // Carry the AUTO_CONSUME schema to the Broker after this version + v22 = 22; // Add RandomReader commands } message CommandConnect { @@ -646,10 +647,69 @@ message CommandReachedEndOfTopic { required uint64 consumer_id = 1; } +message CommandRandomReader { + required string topic = 1; + required uint64 random_reader_id = 2; + required uint64 request_id = 3; + optional string reader_name = 4; + optional Schema schema = 5; + repeated KeyValue metadata = 6; + optional bool read_committed = 7; +} + +message CommandRandomReaderSuccess { + required uint64 request_id = 1; + required uint64 random_reader_id = 2; + optional string reader_name = 3; +} + +message CommandRandomRead { + required uint64 random_reader_id = 1; + required uint64 request_id = 2; + required MessageIdData start_message_id = 3; + required uint32 number_of_entries = 4; +} + +message CommandRandomReadMessage { + required uint64 random_reader_id = 1; + required uint64 request_id = 2; + required MessageIdData message_id = 3; +} + +enum RandomReadInvisibleReason { + UNKNOWN = 0; + SERVER_ONLY_MARKER = 1; + TRANSACTION_MARKER = 2; + ABORTED_TRANSACTION = 3; + DELAYED_DELIVERY = 4; + EXCEEDED_MAX_VISIBLE_POSITION = 5; +} + +message CommandRandomReadEntryResult { + required uint64 random_reader_id = 1; + required uint64 request_id = 2; + required MessageIdData message_id = 3; + required RandomReadInvisibleReason invisible_reason = 4; +} + +message CommandRandomReadResponse { + required uint64 random_reader_id = 1; + required uint64 request_id = 2; + required uint32 number_of_entries = 3; + optional ServerError error = 4; + optional string message = 5; +} + +message CommandCloseRandomReader { + required uint64 random_reader_id = 1; + required uint64 request_id = 2; +} + message CommandTopicMigrated { enum ResourceType { Producer = 0; Consumer = 1; + RandomReader = 2; } required uint64 resource_id = 1; required ResourceType resource_type = 2; @@ -1262,6 +1322,14 @@ message BaseCommand { WATCH_SCALABLE_TOPICS = 76; WATCH_SCALABLE_TOPICS_UPDATE = 77; WATCH_SCALABLE_TOPICS_CLOSE = 78; + + RANDOM_READER = 79; + RANDOM_READER_SUCCESS = 80; + RANDOM_READ = 81; + RANDOM_READ_MESSAGE = 82; + RANDOM_READ_RESPONSE = 83; + CLOSE_RANDOM_READER = 84; + RANDOM_READ_ENTRY_RESULT = 85; } @@ -1357,4 +1425,12 @@ message BaseCommand { optional CommandWatchScalableTopics watchScalableTopics = 76; optional CommandWatchScalableTopicsUpdate watchScalableTopicsUpdate = 77; optional CommandWatchScalableTopicsClose watchScalableTopicsClose = 78; + + optional CommandRandomReader randomReader = 79; + optional CommandRandomReaderSuccess randomReaderSuccess = 80; + optional CommandRandomRead randomRead = 81; + optional CommandRandomReadMessage randomReadMessage = 82; + optional CommandRandomReadResponse randomReadResponse = 83; + optional CommandCloseRandomReader closeRandomReader = 84; + optional CommandRandomReadEntryResult randomReadEntryResult = 85; } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsRandomReaderTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsRandomReaderTest.java new file mode 100644 index 0000000000000..6217035fe2c90 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsRandomReaderTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.protocol; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import io.netty.buffer.ByteBuf; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.RandomReadInvisibleReason; +import org.testng.annotations.Test; + +public class CommandsRandomReaderTest { + + private static BaseCommand parseFrame(ByteBuf frame) { + try { + frame.skipBytes(4); + int commandSize = (int) frame.readUnsignedInt(); + BaseCommand command = new BaseCommand(); + command.parseFrom(frame, commandSize); + command.materialize(); + return command; + } finally { + frame.release(); + } + } + + @Test + public void testRandomReadEntryResultCommandRoundTrip() { + BaseCommand command = parseFrame(Commands.newRandomReadEntryResult( + 7L, 11L, 3L, 4L, 2, RandomReadInvisibleReason.DELAYED_DELIVERY)); + + assertEquals(command.getType(), BaseCommand.Type.RANDOM_READ_ENTRY_RESULT); + assertTrue(command.hasRandomReadEntryResult()); + assertEquals(command.getRandomReadEntryResult().getRandomReaderId(), 7L); + assertEquals(command.getRandomReadEntryResult().getRequestId(), 11L); + assertEquals(command.getRandomReadEntryResult().getMessageId().getLedgerId(), 3L); + assertEquals(command.getRandomReadEntryResult().getMessageId().getEntryId(), 4L); + assertEquals(command.getRandomReadEntryResult().getMessageId().getPartition(), 2); + assertEquals(command.getRandomReadEntryResult().getInvisibleReason(), + RandomReadInvisibleReason.DELAYED_DELIVERY); + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java index af68f6cd28b2b..bae04c1b2cb57 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java @@ -23,10 +23,15 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; +import org.apache.pulsar.common.api.proto.CommandRandomReadEntryResult; +import org.apache.pulsar.common.api.proto.CommandRandomReadMessage; +import org.apache.pulsar.common.api.proto.RandomReadInvisibleReason; import org.testng.annotations.Test; /** @@ -50,4 +55,40 @@ protected void messageReceived(BaseCommand cmd) { decoder.channelRead(mock(ChannelHandlerContext.class), cmdBuf); verify(decoder, times(1)).handleActiveConsumerChange(any(CommandActiveConsumerChange.class)); } + + @Test + public void testRandomReadMessageAndEntryResultDispatch() throws Exception { + PulsarDecoder decoder = spy(new PulsarDecoder() { + @Override + protected void handleRandomReadMessage(CommandRandomReadMessage command, ByteBuf headersAndPayload) { + assertEquals(command.getRequestId(), 11L); + assertEquals(headersAndPayload.readableBytes(), 3); + } + + @Override + protected void handleRandomReadEntryResult(CommandRandomReadEntryResult command) { + assertEquals(command.getRequestId(), 12L); + assertEquals(command.getInvisibleReason(), RandomReadInvisibleReason.ABORTED_TRANSACTION); + } + + @Override + protected void messageReceived(BaseCommand cmd) { + } + }); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + + ByteBuf payload = Unpooled.wrappedBuffer(new byte[] {1, 2, 3}); + ByteBuf randomReadMessage = ByteBufPair.coalesce(Commands.newRandomReadMessage( + 7L, 11L, 3L, 4L, 0, payload)); + randomReadMessage.skipBytes(4); + decoder.channelRead(ctx, randomReadMessage); + + ByteBuf randomReadEntryResult = Commands.newRandomReadEntryResult( + 7L, 12L, 3L, 5L, 0, RandomReadInvisibleReason.ABORTED_TRANSACTION); + randomReadEntryResult.skipBytes(4); + decoder.channelRead(ctx, randomReadEntryResult); + + verify(decoder, times(1)).handleRandomReadMessage(any(CommandRandomReadMessage.class), any(ByteBuf.class)); + verify(decoder, times(1)).handleRandomReadEntryResult(any(CommandRandomReadEntryResult.class)); + } }