Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.collect.Range;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
Expand Down Expand Up @@ -716,6 +717,21 @@ default void skipNonRecoverableLedger(long ledgerId){}
* */
ManagedLedgerInterceptor getManagedLedgerInterceptor();

/**
* Read raw entries starting from {@code start}, inclusive, without using a ManagedCursor.
*
* <p/>The returned entries are retained for the caller. The caller must release every returned entry.
* Broker-level visibility rules such as transaction, marker, and delayed-delivery filtering are not applied here.
*
* @param start the position to start reading from, inclusive
* @param numberOfEntries maximum number of entries to read
* @return a future that completes with the list of entries, or fails if the read cannot be performed
*/
default CompletableFuture<List<Entry>> readEntries(Position start, int numberOfEntries) {
return CompletableFuture.failedFuture(
new UnsupportedOperationException("ManagedLedger random reads are not implemented"));
}

/**
* Get basic ledger summary.
* will got null if corresponding ledger not exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,15 @@ public void handleConsumerAdded(String subscriptionName, String consumerName) {
.log("Added consumer");
}

public void handleRandomReaderAdded(BrokerRandomReader reader) {
USAGE_COUNT_UPDATER.incrementAndGet(this);
log.debug()
.attr("randomReaderId", reader.randomReaderId())
.attr("readerName", reader.readerName())
.attr("usageCount", USAGE_COUNT_UPDATER.get(this))
.log("Added random reader");
}

public void decrementUsageCount() {
USAGE_COUNT_UPDATER.decrementAndGet(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.RandomReadInvisibleReason;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;

public class BrokerRandomReader implements AutoCloseable {
private final long randomReaderId;
private final String readerName;
private final Map<String, String> metadata;
private final PersistentTopic topic;
private final ServerCnx owner;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicLong inFlightRequestId = new AtomicLong(-1L);

@Getter
private final boolean readCommitted;

public BrokerRandomReader(long randomReaderId, String readerName, Map<String, String> metadata,
PersistentTopic topic, ServerCnx owner, boolean readCommitted) {
this.randomReaderId = randomReaderId;
this.readerName = readerName;
this.metadata = Map.copyOf(metadata);
this.topic = topic;
this.owner = owner;
this.readCommitted = readCommitted;
}

public boolean beginRead(long requestId) {
return !closed.get() && inFlightRequestId.compareAndSet(-1L, requestId);
}

public void endRead(long requestId) {
inFlightRequestId.compareAndSet(requestId, -1L);
}

public PersistentTopic topic() {
return topic;
}

public ServerCnx owner() {
return owner;
}

public long randomReaderId() {
return randomReaderId;
}

public String readerName() {
return readerName;
}

public Map<String, String> metadata() {
return metadata;
}

public boolean isClosed() {
return closed.get();
}

public static CompletableFuture<PersistentTopic> validatePersistentTopic(Topic topic) {
if (topic instanceof PersistentTopic) {
return CompletableFuture.completedFuture((PersistentTopic) topic);
}
return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException(
"RandomReader only supports persistent topics"));
}

@Override
public void close() {
closed.set(true);
}

public CompletableFuture<List<EntryResult>> readEntries(Position start, int numberOfEntries,
Position maxVisible) {
if (numberOfEntries <= 0) {
return CompletableFuture.completedFuture(List.of());
}
return topic.getManagedLedger().readEntries(start, numberOfEntries)
.thenApply(entries -> toEntryResults(entries, maxVisible));
}

public CompletableFuture<Void> disconnect(String brokerServiceUrl, String brokerServiceUrlTls) {
close();
owner.disconnectRandomReader(randomReaderId, brokerServiceUrl, brokerServiceUrlTls);
return CompletableFuture.completedFuture(null);
}

private List<EntryResult> toEntryResults(List<Entry> entries, Position maxVisiblePosition) {
List<EntryResult> result = new ArrayList<>(entries.size());
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
if (entry == null) {
continue;
}
try {
Position position = entry.getPosition();
if (position.compareTo(maxVisiblePosition) > 0) {
result.add(EntryResult.invisible(entry, RandomReadInvisibleReason.EXCEEDED_MAX_VISIBLE_POSITION));
continue;
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
int readerIndex = metadataAndPayload.readerIndex();
MessageMetadata metadata = Commands.peekAndCopyMessageMetadata(
metadataAndPayload, topic.getName(), -1);
metadataAndPayload.readerIndex(readerIndex);
if (metadata == null) {
throw new BrokerServiceException.PersistenceException(
"Failed to parse message metadata at " + position);
}
if (Markers.isTxnMarker(metadata)) {
result.add(EntryResult.invisible(entry, RandomReadInvisibleReason.TRANSACTION_MARKER));
continue;
}
if (Markers.isServerOnlyMarker(metadata)) {
result.add(EntryResult.invisible(entry, RandomReadInvisibleReason.SERVER_ONLY_MARKER));
continue;
}
if (readCommitted && metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()
&& topic.isTxnAborted(new TxnID(metadata.getTxnidMostBits(),
metadata.getTxnidLeastBits()), position)) {
result.add(EntryResult.invisible(entry, RandomReadInvisibleReason.ABORTED_TRANSACTION));
continue;
}
if (topic.isDelayedDeliveryEnabled()
&& metadata.hasDeliverAtTime()
&& metadata.getDeliverAtTime() > System.currentTimeMillis()) {
result.add(EntryResult.invisible(entry, RandomReadInvisibleReason.DELAYED_DELIVERY));
continue;
}
result.add(EntryResult.visible(entry));
} catch (Throwable t) {
entry.release();
releaseResults(result);
releaseEntries(entries.subList(i + 1, entries.size()));
throw FutureUtil.wrapToCompletionException(t);
}
}
return result;
}

private static void releaseResults(List<EntryResult> results) {
for (EntryResult result : results) {
result.release();
}
}

private static void releaseEntries(List<Entry> entries) {
for (Entry entry : entries) {
if (entry != null) {
entry.release();
}
}
}

public static ServerError toServerError(Throwable cause) {
if (cause instanceof ManagedLedgerException) {
if (cause instanceof ManagedLedgerException.ManagedLedgerFencedException
|| cause instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException
|| cause instanceof ManagedLedgerException.OffloadInProgressException) {
return ServerError.ServiceNotReady;
}
if (cause instanceof ManagedLedgerException.ManagedLedgerNotFoundException) {
return ServerError.TopicNotFound;
}
if (cause instanceof ManagedLedgerException.ManagedLedgerTerminatedException) {
return ServerError.TopicTerminatedError;
}
if (cause instanceof ManagedLedgerException.NonRecoverableLedgerException) {
return ServerError.PersistenceError;
}
// Generic fallback: conservative mapping to PersistenceError
return ServerError.PersistenceError;
}
return BrokerServiceException.getClientErrorCode(cause);
}

public static final class EntryResult {
private final Entry entry;
private final RandomReadInvisibleReason invisibleReason;

private EntryResult(Entry entry, RandomReadInvisibleReason invisibleReason) {
this.entry = entry;
this.invisibleReason = invisibleReason;
}

public static EntryResult visible(Entry entry) {
return new EntryResult(entry, null);
}

public static EntryResult invisible(Entry entry, RandomReadInvisibleReason invisibleReason) {
return new EntryResult(entry, invisibleReason);
}

public Entry entry() {
return entry;
}

public boolean isVisible() {
return invisibleReason == null;
}

public RandomReadInvisibleReason invisibleReason() {
return invisibleReason;
}

public void release() {
entry.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.pulsar.broker.service;


import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.service.BrokerRandomReader.EntryResult;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
Expand Down Expand Up @@ -88,6 +90,12 @@ Future<Void> sendMessagesToConsumer(long consumerId, String topicName, Subscript
EntryBatchIndexesAcks batchIndexesAcks,
RedeliveryTracker redeliveryTracker, long epoch);

void sendRandomReaderSuccessResponse(long requestId, long randomReaderId, String readerName);

ChannelPromise sendRandomReadMessages(long randomReaderId, long requestId, String topicName,
int partitionIdx, List<EntryResult> results,
int numberOfEntries, Runnable afterFinalResponseWriteDone);

void sendTcClientConnectResponse(long requestId, ServerError error, String message);

void sendTcClientConnectResponse(long requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.CustomLog;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerRandomReader.EntryResult;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
Expand Down Expand Up @@ -322,6 +323,62 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
return writePromise;
}

@Override
public void sendRandomReaderSuccessResponse(long requestId, long randomReaderId, String readerName) {
BaseCommand command = Commands.newRandomReaderSuccessCommand(requestId, randomReaderId, readerName);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
writeAndFlush(outBuf);
}

@Override
public ChannelPromise sendRandomReadMessages(long randomReaderId, long requestId, String topicName,
int partitionIdx, List<EntryResult> results,
int numberOfEntries,
Runnable afterFinalResponseWriteDone) {
final ChannelHandlerContext ctx = cnx.ctx();
final ChannelPromise writePromise = ctx.newPromise();
ctx.channel().eventLoop().execute(() -> {
try {
for (EntryResult result : results) {
Entry entry = result.entry();
if (entry == null) {
continue;
}
if (!result.isVisible()) {
ctx.write(Commands.newRandomReadEntryResult(randomReaderId, requestId, entry.getLedgerId(),
entry.getEntryId(), partitionIdx, result.invisibleReason()), ctx.voidPromise());
continue;
}

ByteBuf metadataAndPayload = entry.getDataBuffer().retainedDuplicate();
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()
|| !cnx.supportBrokerMetadata()
|| !cnx.getBrokerService().getPulsar().getConfig()
.isExposingBrokerEntryMetadataToClientEnabled()) {
Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
}
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getValue()) {
Commands.skipChecksumIfPresent(metadataAndPayload);
}

ctx.write(Commands.newRandomReadMessage(randomReaderId, requestId, entry.getLedgerId(),
entry.getEntryId(), partitionIdx, metadataAndPayload), ctx.voidPromise());
}

ctx.writeAndFlush(Commands.newRandomReadResponse(randomReaderId, requestId, numberOfEntries,
null, null), writePromise);
} catch (Throwable t) {
writePromise.setFailure(t);
}
writePromise.addListener(future -> {
results.forEach(EntryResult::release);
afterFinalResponseWriteDone.run();
});
});
return writePromise;
}

@Override
public void sendTcClientConnectResponse(long requestId, ServerError error, String message) {
BaseCommand command = Commands.newTcClientConnectResponse(requestId, error, message);
Expand Down
Loading
Loading