Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions pip/pip-482.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# PIP-482: Peek messages from topic subscription with messagePosition

# Background knowledge

A Pulsar subscription tracks which messages have been acknowledged. The set of unacknowledged
messages on a subscription is the subscription **backlog**. The size of the backlog is the sum
of the sizes (in bytes) of the unacknowledged messages.

A topic may have many subscriptions.

Messages on a subscription backlog can be inspected without being acknowledged. This operation
is called **peeking** and is exposed via the Pulsar admin REST API and the Java admin client
`org.apache.pulsar.client.admin.Topics.peekMessages(...)`.

The underlying REST endpoint
`/admin/v3/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/position/{messagePosition}`
already accepts an arbitrary message position. The current Java admin client API does not surface
that parameter — every call hardcodes position `1`, so callers can only inspect the messages at
the head of the backlog.

# Motivation

A common use case is paginated display of a subscription backlog in an admin UI: show messages
1–10 on page 1, messages 11–20 on page 2, and so on. With the current Java admin client API,
displaying page N requires fetching all `N * pageSize` messages from the head of the backlog and
discarding the first `(N-1) * pageSize` of them. For large backlogs and large page indexes this
is wasteful in bandwidth, memory, and time on both client and broker.

The REST endpoint already supports the operation efficiently — the gap is purely in the Java
admin client API surface.

This PIP exposes the message position parameter on the Java admin client, enabling efficient
backlog pagination from any caller.

# Goals

## In Scope

- Add Java admin client `Topics` methods that accept a `messagePosition` parameter,
letting callers peek `numMessages` messages starting from `messagePosition` instead of from
position 1.
- Preserve full source and behavioral compatibility with the existing `peekMessages` /
`peekMessagesAsync` overloads. Existing callers continue to compile and run with no change
in behavior.
- Provide consistent overloads for the synchronous and asynchronous variants, with and without
the existing `showServerMarker` and `TransactionIsolationLevel` parameters.
- Validate `messagePosition >= 1` at the client and surface a clear `IllegalArgumentException`
for invalid input.

## Out of Scope

- Server-side changes. The REST endpoint already supports this; no broker change is required.
- Adding a `MessageId`-based peek API. That would be a separate, more invasive change and is
better discussed in its own PIP.
- A builder-style `PeekOptions` parameter. Considered but rejected as out of scope for this
proposal (see "Alternatives Considered" below).

# Design & Implementation Details

## API surface — `org.apache.pulsar.client.admin.Topics`

The new method is the abstract, full-control entry point:

```java
List<Message<byte[]>> peekMessages(String topic, String subName, int messagePosition, int numMessages,
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel)
throws PulsarAdminException;

CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(
String topic, String subName, int messagePosition, int numMessages,
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel);
```

All other overloads — including both pre-existing ones — become `default` methods that delegate
to this primary method. This eliminates duplicated Javadoc across overloads and gives the
interface a single source of truth for behavior.

The full set of `default` overloads:

```java
// 3-arg — pre-existing convenience method
default List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages)
throws PulsarAdminException {
return peekMessages(topic, subName, 1, numMessages, false, TransactionIsolationLevel.READ_COMMITTED);
}

// 4-arg — new user-facing convenience for pagination
default List<Message<byte[]>> peekMessages(String topic, String subName, int messagePosition, int numMessages)
throws PulsarAdminException {
return peekMessages(topic, subName, messagePosition, numMessages, false,
TransactionIsolationLevel.READ_COMMITTED);
}

// 5-arg — pre-existing showServerMarker + isolation form; now delegates with messagePosition=1
default List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages,
boolean showServerMarker,
TransactionIsolationLevel transactionIsolationLevel)
throws PulsarAdminException {
return peekMessages(topic, subName, 1, numMessages, showServerMarker, transactionIsolationLevel);
}
```

Async variants mirror the same shape.

## `messagePosition` semantics

- 1-indexed, matching the REST endpoint convention.
`messagePosition=1` returns the oldest unacknowledged message.
`messagePosition=N` returns the N-th unacknowledged message and onward.
- Must be `>= 1`. `0` and negative values are rejected with `IllegalArgumentException` at the
client.
- If `messagePosition` exceeds the size of the backlog, the returned list is empty (consistent
with the existing behavior when peeking past the tail).

The parameter name `messagePosition` is used to match the existing internal `nthMessage`
parameter and the REST endpoint path segment, and to avoid Kafka's "offset" terminology
which the Pulsar community prefers to keep distinct.

## Implementation

The internal helper
`TopicsImpl#peekMessagesAsync(String, String, int, List<Message<byte[]>>, CompletableFuture, int, boolean, TransactionIsolationLevel)`
already takes the position as `int nthMessage`. The public overloads previously passed `1`
unconditionally. The new abstract method passes the caller-supplied `messagePosition` instead.
No change to the helper or to the REST call path is required.

# Backward & Forward Compatibility

## Source compatibility

All existing call sites and existing implementations of `Topics` continue to compile unchanged.

## Binary compatibility

The pre-existing abstract method
`peekMessages(String, String, int, boolean, TransactionIsolationLevel)` is changed to a `default`
method. This is binary-compatible: classes that previously implemented the method by overriding
it continue to work — their override still takes precedence over the new `default` body.

The new abstract method
`peekMessages(String, String, int, int, boolean, TransactionIsolationLevel)` is added to the
interface. External classes implementing `Topics` directly will need to either:
- override this new method to provide custom behavior, or
- accept the default behavior by not overriding any peek methods.

`TopicsImpl` (the in-tree implementation of `Topics`) is updated to override the new abstract
method. No other in-tree code requires changes.

This follows the same compatibility pattern used when `showServerMarker` and
`TransactionIsolationLevel` were added to the peek API.

## Behavioral compatibility

For every pre-existing overload, the behavior with default arguments is unchanged. Calls that
omit `messagePosition` continue to peek from position 1.

# Alternatives Considered

## `MessageId`-based peek

Allowing callers to peek starting from a specific `MessageId` would be more powerful (it would
let UIs paginate by remembering the last-seen `MessageId` rather than a numeric position) and
would compose more naturally with deduplication and replay. However:
- The REST endpoint already supports position-based peeking; `MessageId`-based peeking would
require a server-side change.
- Position-based pagination is the immediate request from Pulsar users and is fully implementable
client-side today.
- A `MessageId`-based API can be added in a future PIP without conflicting with this one.

## `PeekOptions` builder parameter

A `PeekOptions` builder pattern (e.g. `topics.peek(PeekOptions.builder().topic(...).startAt(N)...)`)
would be more extensible and self-documenting at the call site. However:
- It would require renaming or duplicating every peek method, which is a larger and more invasive
API change.
- It diverges from the parameter-list convention used by every other method in `Topics`.
- It can be added later as a separate PIP without conflicting with the additions in this PIP.

This PIP keeps the existing parameter-list convention and lets each variant be discoverable
through method overloads, matching the rest of the `Topics` interface.

# Links

<!-- Updated after the dev@ discussion thread is opened. -->
* Mailing List discussion thread: _to be filled after this PIP is sent to dev@_
* Mailing List voting thread: _to be filled after the discussion completes_
Loading