From cf041a7f2f7ba06b3de6a9fe00622d7cc8c6f1d4 Mon Sep 17 00:00:00 2001 From: Pruthviraj Date: Mon, 1 Jun 2026 12:07:00 +0530 Subject: [PATCH] [improve][admin] PIP-482: Peek messages from topic subscription with messagePosition Adds a messagePosition parameter to the Topics.peekMessages admin client API, enabling efficient pagination through a subscription backlog without fetching all preceding messages. The underlying REST endpoint (/admin/v3/persistent/{...}/subscription/{...}/position/{N}) has always supported this; the Java admin client previously hardcoded position=1. API design: - The 6-arg method peekMessages(topic, sub, messagePosition, numMessages, showServerMarker, transactionIsolationLevel) is the primary abstract. - All pre-existing peek overloads become default methods that delegate to it, eliminating duplicated Javadoc across overloads. - Parameter is named messagePosition (matching the REST endpoint and the internal nthMessage field) instead of offset (Kafka terminology). Files changed: - pip/pip-482.md: PIP markdown with motivation, API surface, alternatives considered (MessageId-based, PeekOptions builder), and binary- compatibility analysis. - pulsar-client-admin-api/.../Topics.java: new abstract method plus four default-method overloads (3/4/5-arg sync and 3/4/5-arg async) so all existing call sites remain source- and behavior-compatible. - pulsar-client-admin/.../internal/TopicsImpl.java: implements the new abstract method; adds checkArgument(messagePosition >= 1). - pulsar-client-admin/.../TopicsInterfaceDefaultsTest.java: unit tests for all six delegation paths using Mockito CALLS_REAL_METHODS. --- pip/pip-482.md | 186 ++++++++++++++++++ .../apache/pulsar/client/admin/Topics.java | 147 +++++++++++++- .../client/admin/internal/TopicsImpl.java | 12 +- .../admin/TopicsInterfaceDefaultsTest.java | 119 +++++++++++ 4 files changed, 450 insertions(+), 14 deletions(-) create mode 100644 pip/pip-482.md create mode 100644 pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/TopicsInterfaceDefaultsTest.java diff --git a/pip/pip-482.md b/pip/pip-482.md new file mode 100644 index 0000000000000..3d8633b91b83c --- /dev/null +++ b/pip/pip-482.md @@ -0,0 +1,186 @@ +# PIP-482: Peek messages from topic subscription with messagePosition + +# Background knowledge + +A Pulsar subscription tracks which messages have been acknowledged. The set of unacknowledged +messages on a subscription is the subscription **backlog**. The size of the backlog is the sum +of the sizes (in bytes) of the unacknowledged messages. + +A topic may have many subscriptions. + +Messages on a subscription backlog can be inspected without being acknowledged. This operation +is called **peeking** and is exposed via the Pulsar admin REST API and the Java admin client +`org.apache.pulsar.client.admin.Topics.peekMessages(...)`. + +The underlying REST endpoint +`/admin/v3/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/position/{messagePosition}` +already accepts an arbitrary message position. The current Java admin client API does not surface +that parameter — every call hardcodes position `1`, so callers can only inspect the messages at +the head of the backlog. + +# Motivation + +A common use case is paginated display of a subscription backlog in an admin UI: show messages +1–10 on page 1, messages 11–20 on page 2, and so on. With the current Java admin client API, +displaying page N requires fetching all `N * pageSize` messages from the head of the backlog and +discarding the first `(N-1) * pageSize` of them. For large backlogs and large page indexes this +is wasteful in bandwidth, memory, and time on both client and broker. + +The REST endpoint already supports the operation efficiently — the gap is purely in the Java +admin client API surface. + +This PIP exposes the message position parameter on the Java admin client, enabling efficient +backlog pagination from any caller. + +# Goals + +## In Scope + +- Add Java admin client `Topics` methods that accept a `messagePosition` parameter, + letting callers peek `numMessages` messages starting from `messagePosition` instead of from + position 1. +- Preserve full source and behavioral compatibility with the existing `peekMessages` / + `peekMessagesAsync` overloads. Existing callers continue to compile and run with no change + in behavior. +- Provide consistent overloads for the synchronous and asynchronous variants, with and without + the existing `showServerMarker` and `TransactionIsolationLevel` parameters. +- Validate `messagePosition >= 1` at the client and surface a clear `IllegalArgumentException` + for invalid input. + +## Out of Scope + +- Server-side changes. The REST endpoint already supports this; no broker change is required. +- Adding a `MessageId`-based peek API. That would be a separate, more invasive change and is + better discussed in its own PIP. +- A builder-style `PeekOptions` parameter. Considered but rejected as out of scope for this + proposal (see "Alternatives Considered" below). + +# Design & Implementation Details + +## API surface — `org.apache.pulsar.client.admin.Topics` + +The new method is the abstract, full-control entry point: + +```java +List> peekMessages(String topic, String subName, int messagePosition, int numMessages, + boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel) + throws PulsarAdminException; + +CompletableFuture>> peekMessagesAsync( + String topic, String subName, int messagePosition, int numMessages, + boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel); +``` + +All other overloads — including both pre-existing ones — become `default` methods that delegate +to this primary method. This eliminates duplicated Javadoc across overloads and gives the +interface a single source of truth for behavior. + +The full set of `default` overloads: + +```java +// 3-arg — pre-existing convenience method +default List> peekMessages(String topic, String subName, int numMessages) + throws PulsarAdminException { + return peekMessages(topic, subName, 1, numMessages, false, TransactionIsolationLevel.READ_COMMITTED); +} + +// 4-arg — new user-facing convenience for pagination +default List> peekMessages(String topic, String subName, int messagePosition, int numMessages) + throws PulsarAdminException { + return peekMessages(topic, subName, messagePosition, numMessages, false, + TransactionIsolationLevel.READ_COMMITTED); +} + +// 5-arg — pre-existing showServerMarker + isolation form; now delegates with messagePosition=1 +default List> peekMessages(String topic, String subName, int numMessages, + boolean showServerMarker, + TransactionIsolationLevel transactionIsolationLevel) + throws PulsarAdminException { + return peekMessages(topic, subName, 1, numMessages, showServerMarker, transactionIsolationLevel); +} +``` + +Async variants mirror the same shape. + +## `messagePosition` semantics + +- 1-indexed, matching the REST endpoint convention. + `messagePosition=1` returns the oldest unacknowledged message. + `messagePosition=N` returns the N-th unacknowledged message and onward. +- Must be `>= 1`. `0` and negative values are rejected with `IllegalArgumentException` at the + client. +- If `messagePosition` exceeds the size of the backlog, the returned list is empty (consistent + with the existing behavior when peeking past the tail). + +The parameter name `messagePosition` is used to match the existing internal `nthMessage` +parameter and the REST endpoint path segment, and to avoid Kafka's "offset" terminology +which the Pulsar community prefers to keep distinct. + +## Implementation + +The internal helper +`TopicsImpl#peekMessagesAsync(String, String, int, List>, CompletableFuture, int, boolean, TransactionIsolationLevel)` +already takes the position as `int nthMessage`. The public overloads previously passed `1` +unconditionally. The new abstract method passes the caller-supplied `messagePosition` instead. +No change to the helper or to the REST call path is required. + +# Backward & Forward Compatibility + +## Source compatibility + +All existing call sites and existing implementations of `Topics` continue to compile unchanged. + +## Binary compatibility + +The pre-existing abstract method +`peekMessages(String, String, int, boolean, TransactionIsolationLevel)` is changed to a `default` +method. This is binary-compatible: classes that previously implemented the method by overriding +it continue to work — their override still takes precedence over the new `default` body. + +The new abstract method +`peekMessages(String, String, int, int, boolean, TransactionIsolationLevel)` is added to the +interface. External classes implementing `Topics` directly will need to either: +- override this new method to provide custom behavior, or +- accept the default behavior by not overriding any peek methods. + +`TopicsImpl` (the in-tree implementation of `Topics`) is updated to override the new abstract +method. No other in-tree code requires changes. + +This follows the same compatibility pattern used when `showServerMarker` and +`TransactionIsolationLevel` were added to the peek API. + +## Behavioral compatibility + +For every pre-existing overload, the behavior with default arguments is unchanged. Calls that +omit `messagePosition` continue to peek from position 1. + +# Alternatives Considered + +## `MessageId`-based peek + +Allowing callers to peek starting from a specific `MessageId` would be more powerful (it would +let UIs paginate by remembering the last-seen `MessageId` rather than a numeric position) and +would compose more naturally with deduplication and replay. However: +- The REST endpoint already supports position-based peeking; `MessageId`-based peeking would + require a server-side change. +- Position-based pagination is the immediate request from Pulsar users and is fully implementable + client-side today. +- A `MessageId`-based API can be added in a future PIP without conflicting with this one. + +## `PeekOptions` builder parameter + +A `PeekOptions` builder pattern (e.g. `topics.peek(PeekOptions.builder().topic(...).startAt(N)...)`) +would be more extensible and self-documenting at the call site. However: +- It would require renaming or duplicating every peek method, which is a larger and more invasive + API change. +- It diverges from the parameter-list convention used by every other method in `Topics`. +- It can be added later as a separate PIP without conflicting with the additions in this PIP. + +This PIP keeps the existing parameter-list convention and lets each variant be discoverable +through method overloads, matching the rest of the `Topics` interface. + +# Links + + +* Mailing List discussion thread: _to be filled after this PIP is sent to dev@_ +* Mailing List voting thread: _to be filled after the discussion completes_ diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index e68be8fd2e805..4346db3666614 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1641,6 +1641,9 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) /** * Peek messages from a topic subscription. * + *

Equivalent to {@link #peekMessages(String, String, int, int, boolean, TransactionIsolationLevel) + * peekMessages(topic, subName, 1, numMessages, false, TransactionIsolationLevel.READ_COMMITTED)}. + * * @param topic * topic name * @param subName @@ -1657,18 +1660,57 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) */ default List> peekMessages(String topic, String subName, int numMessages) throws PulsarAdminException { - return peekMessages(topic, subName, numMessages, false, TransactionIsolationLevel.READ_COMMITTED); + return peekMessages(topic, subName, 1, numMessages, false, TransactionIsolationLevel.READ_COMMITTED); } /** - * Peek messages from a topic subscription. + * Peek {@code numMessages} messages from a topic subscription, starting from {@code messagePosition}. + * + *

Use this overload to paginate through a subscription backlog without fetching all + * preceding messages. For example, to display the 91st–100th unacknowledged messages in a UI, + * call this method with {@code messagePosition=91, numMessages=10}. + * + *

Equivalent to {@link #peekMessages(String, String, int, int, boolean, TransactionIsolationLevel) + * peekMessages(topic, subName, messagePosition, numMessages, false, + * TransactionIsolationLevel.READ_COMMITTED)}. * * @param topic * topic name * @param subName * Subscription name + * @param messagePosition + * 1-indexed position to start peeking from. Position {@code 1} returns the + * oldest unacknowledged message; position {@code N} returns the N-th unacknowledged + * message and onward. Must be {@code >= 1}. * @param numMessages - * Number of messages + * Number of messages to peek. Must be {@code > 0}. Fewer messages may be returned + * if the subscription backlog has fewer remaining messages from {@code messagePosition}. + * @return up to {@code numMessages} messages starting at {@code messagePosition} + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Topic or subscription does not exist + * @throws PulsarAdminException + * Unexpected error + */ + default List> peekMessages(String topic, String subName, int messagePosition, int numMessages) + throws PulsarAdminException { + return peekMessages(topic, subName, messagePosition, numMessages, + false, TransactionIsolationLevel.READ_COMMITTED); + } + + /** + * Peek messages from a topic subscription, with full control over starting position and + * transaction isolation. + * + * @param topic + * topic name + * @param subName + * Subscription name + * @param messagePosition + * 1-indexed position to start peeking from. Must be {@code >= 1}. + * @param numMessages + * Number of messages to peek. Must be {@code > 0}. * @param showServerMarker * Enables the display of internal server write markers * @param transactionIsolationLevel @@ -1676,7 +1718,7 @@ default List> peekMessages(String topic, String subName, int num * - 'READ_COMMITTED' allows peeking only committed transactional messages. * - 'READ_UNCOMMITTED' allows peeking all messages, * even transactional messages which have been aborted. - * @return + * @return up to {@code numMessages} messages starting at {@code messagePosition} * @throws NotAuthorizedException * Don't have admin permission * @throws NotFoundException @@ -1684,13 +1726,50 @@ default List> peekMessages(String topic, String subName, int num * @throws PulsarAdminException * Unexpected error */ - List> peekMessages(String topic, String subName, int numMessages, + List> peekMessages(String topic, String subName, int messagePosition, int numMessages, boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel) throws PulsarAdminException; + /** + * Peek messages from a topic subscription. Backward-compatible overload that uses + * {@code messagePosition=1}. + * + * @param topic + * topic name + * @param subName + * Subscription name + * @param numMessages + * Number of messages + * @param showServerMarker + * Enables the display of internal server write markers + * @param transactionIsolationLevel + * Sets the isolation level for peeking messages within transactions. + * - 'READ_COMMITTED' allows peeking only committed transactional messages. + * - 'READ_UNCOMMITTED' allows peeking all messages, + * even transactional messages which have been aborted. + * @return + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Topic or subscription does not exist + * @throws PulsarAdminException + * Unexpected error + */ + default List> peekMessages(String topic, String subName, int numMessages, + boolean showServerMarker, + TransactionIsolationLevel transactionIsolationLevel) + throws PulsarAdminException { + return peekMessages(topic, subName, 1, numMessages, showServerMarker, transactionIsolationLevel); + } + /** * Peek messages from a topic subscription asynchronously. * + *

Equivalent to + * {@link #peekMessagesAsync(String, String, int, int, boolean, TransactionIsolationLevel) + * peekMessagesAsync(topic, subName, 1, numMessages, false, + * TransactionIsolationLevel.READ_COMMITTED)}. + * * @param topic * topic name * @param subName @@ -1700,21 +1779,46 @@ List> peekMessages(String topic, String subName, int numMessages * @return a future that can be used to track when the messages are returned */ default CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages) { - return peekMessagesAsync(topic, subName, numMessages, false, TransactionIsolationLevel.READ_COMMITTED); + return peekMessagesAsync(topic, subName, 1, numMessages, + false, TransactionIsolationLevel.READ_COMMITTED); } /** - * Peek messages from a topic subscription asynchronously. + * Peek {@code numMessages} messages from a topic subscription asynchronously, starting from + * {@code messagePosition}. * * @param topic * topic name * @param subName * Subscription name + * @param messagePosition + * 1-indexed position to start peeking from. Must be {@code >= 1}. + * @param numMessages + * Number of messages + * @return a future that completes with up to {@code numMessages} messages starting at + * {@code messagePosition} + */ + default CompletableFuture>> peekMessagesAsync( + String topic, String subName, int messagePosition, int numMessages) { + return peekMessagesAsync(topic, subName, messagePosition, numMessages, + false, TransactionIsolationLevel.READ_COMMITTED); + } + + /** + * Peek messages from a topic subscription asynchronously, with full control over starting + * position and transaction isolation. + * + * @param topic + * topic name + * @param subName + * Subscription name + * @param messagePosition + * 1-indexed position to start peeking from. Must be {@code >= 1}. * @param numMessages * Number of messages * @param showServerMarker * Enables the display of internal server write markers - @param transactionIsolationLevel + * @param transactionIsolationLevel * Sets the isolation level for peeking messages within transactions. * - 'READ_COMMITTED' allows peeking only committed transactional messages. * - 'READ_UNCOMMITTED' allows peeking all messages, @@ -1722,9 +1826,34 @@ default CompletableFuture>> peekMessagesAsync(String topic, * @return a future that can be used to track when the messages are returned */ CompletableFuture>> peekMessagesAsync( - String topic, String subName, int numMessages, + String topic, String subName, int messagePosition, int numMessages, boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel); + /** + * Peek messages from a topic subscription asynchronously. Backward-compatible overload that + * uses {@code messagePosition=1}. + * + * @param topic + * topic name + * @param subName + * Subscription name + * @param numMessages + * Number of messages + * @param showServerMarker + * Enables the display of internal server write markers + * @param transactionIsolationLevel + * Sets the isolation level for peeking messages within transactions. + * - 'READ_COMMITTED' allows peeking only committed transactional messages. + * - 'READ_UNCOMMITTED' allows peeking all messages, + * even transactional messages which have been aborted. + * @return a future that can be used to track when the messages are returned + */ + default CompletableFuture>> peekMessagesAsync( + String topic, String subName, int numMessages, + boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel) { + return peekMessagesAsync(topic, subName, 1, numMessages, showServerMarker, transactionIsolationLevel); + } + /** * Get a message by its messageId via a topic subscription. * @param topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 1e03644a54e2e..d47dd5e959064 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -912,21 +912,23 @@ public void failed(Throwable throwable) { } @Override - public List> peekMessages(String topic, String subName, int numMessages, + public List> peekMessages(String topic, String subName, int messagePosition, int numMessages, boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel) throws PulsarAdminException { - return sync(() -> peekMessagesAsync(topic, subName, numMessages, showServerMarker, transactionIsolationLevel)); + return sync(() -> peekMessagesAsync(topic, subName, messagePosition, numMessages, + showServerMarker, transactionIsolationLevel)); } @Override public CompletableFuture>> peekMessagesAsync( - String topic, String subName, int numMessages, + String topic, String subName, int messagePosition, int numMessages, boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel) { - checkArgument(numMessages > 0); + checkArgument(numMessages > 0, "numMessages must be > 0"); + checkArgument(messagePosition >= 1, "messagePosition must be >= 1"); CompletableFuture>> future = new CompletableFuture>>(); peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), - future, 1, showServerMarker, transactionIsolationLevel); + future, messagePosition, showServerMarker, transactionIsolationLevel); return future; } diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/TopicsInterfaceDefaultsTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/TopicsInterfaceDefaultsTest.java new file mode 100644 index 0000000000000..535e7da6b172b --- /dev/null +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/TopicsInterfaceDefaultsTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.withSettings; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.TransactionIsolationLevel; +import org.testng.annotations.Test; + +/** + * Unit tests verifying that the default-method overloads of + * {@link Topics#peekMessages} and {@link Topics#peekMessagesAsync} delegate correctly to the + * primary abstract method + * {@code peekMessages(topic, sub, messagePosition, numMessages, showServerMarker, isolation)}. + * + *

Uses Mockito with {@code CALLS_REAL_METHODS} so the interface defaults run for real while + * the primary abstract method is stubbed. + * + *

Tests added for PIP-332. + */ +public class TopicsInterfaceDefaultsTest { + + private Topics mockTopicsWithStubbedPrimary() { + Topics topics = mock(Topics.class, withSettings().defaultAnswer(CALLS_REAL_METHODS)); + // Stub the sync primary + try { + doReturn(Collections.>emptyList()) + .when(topics) + .peekMessages(anyString(), anyString(), anyInt(), anyInt(), + anyBoolean(), any(TransactionIsolationLevel.class)); + } catch (Exception ignored) { + // checked exception declared on the method; no real call here + } + // Stub the async primary + doReturn(CompletableFuture.completedFuture(Collections.>emptyList())) + .when(topics) + .peekMessagesAsync(anyString(), anyString(), anyInt(), anyInt(), + anyBoolean(), any(TransactionIsolationLevel.class)); + return topics; + } + + @Test + public void threeArgPeekDelegatesWithMessagePositionOne() throws Exception { + Topics topics = mockTopicsWithStubbedPrimary(); + topics.peekMessages("topic", "sub", 10); + verify(topics).peekMessages("topic", "sub", 1, 10, false, + TransactionIsolationLevel.READ_COMMITTED); + } + + @Test + public void fourArgPeekDelegatesWithSuppliedPosition() throws Exception { + Topics topics = mockTopicsWithStubbedPrimary(); + topics.peekMessages("topic", "sub", 91, 10); + verify(topics).peekMessages("topic", "sub", 91, 10, false, + TransactionIsolationLevel.READ_COMMITTED); + } + + @Test + public void fiveArgPeekDelegatesWithMessagePositionOne() throws Exception { + Topics topics = mockTopicsWithStubbedPrimary(); + topics.peekMessages("topic", "sub", 10, true, TransactionIsolationLevel.READ_UNCOMMITTED); + verify(topics).peekMessages("topic", "sub", 1, 10, true, + TransactionIsolationLevel.READ_UNCOMMITTED); + } + + @Test + public void threeArgPeekAsyncDelegatesWithMessagePositionOne() { + Topics topics = mockTopicsWithStubbedPrimary(); + CompletableFuture>> fut = topics.peekMessagesAsync("topic", "sub", 10); + verify(topics).peekMessagesAsync("topic", "sub", 1, 10, false, + TransactionIsolationLevel.READ_COMMITTED); + // Smoke: the stub completes with an empty list + fut.join(); + } + + @Test + public void fourArgPeekAsyncDelegatesWithSuppliedPosition() { + Topics topics = mockTopicsWithStubbedPrimary(); + topics.peekMessagesAsync("topic", "sub", 91, 10); + verify(topics).peekMessagesAsync("topic", "sub", 91, 10, false, + TransactionIsolationLevel.READ_COMMITTED); + } + + @Test + public void fiveArgPeekAsyncDelegatesWithMessagePositionOne() { + Topics topics = mockTopicsWithStubbedPrimary(); + topics.peekMessagesAsync("topic", "sub", 10, true, TransactionIsolationLevel.READ_UNCOMMITTED); + verify(topics).peekMessagesAsync("topic", "sub", 1, 10, true, + TransactionIsolationLevel.READ_UNCOMMITTED); + } +}