Skip to content

Commit bbd81e8

Browse files
authored
KAFKA-20549: Share group DLQ manager implementation. Skeleton classes [1/N] (#22209)
* Added default share group dlq manager impl `DefaultShareGroupDLQManager`. * Added base impl for share group dlq state manager `ShareGroupDLQStateManager` * Extended `ShareGroupDLQManager` interface to add method `void stop()` for cleanup. * The classes are standalone at the moment and not plugged into the rest of the code. Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent 03e9e30 commit bbd81e8

7 files changed

Lines changed: 292 additions & 9 deletions

File tree

checkstyle/import-control-server-common.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@
113113
<allow pkg="org.apache.kafka.server.util" />
114114
<allow pkg="org.apache.kafka.test" />
115115
</subpackage>
116+
<subpackage name="dlq">
117+
<allow pkg="org.apache.kafka.clients" />
118+
<allow pkg="org.apache.kafka.server.util" />
119+
<allow pkg="org.apache.kafka.test" />
120+
</subpackage>
116121
</subpackage>
117122

118123
<subpackage name="util">

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
4444
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
4545
import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
46-
import org.apache.kafka.server.share.dlq.ShareGroupDLQ;
46+
import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
4747
import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
4848
import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
4949
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
@@ -332,7 +332,7 @@ enum SharePartitionState {
332332
/**
333333
* Reference to the dlq manager implementation.
334334
*/
335-
private final ShareGroupDLQ shareGroupDLQ = new NoOpShareGroupDLQManager();
335+
private final ShareGroupDLQManager shareGroupDLQ = new NoOpShareGroupDLQManager();
336336

337337
/**
338338
* Supplier to toggle dlq support.
@@ -2335,7 +2335,7 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
23352335
// mapping between bytes and record state type. All ack types have been added except for RENEW which
23362336
// has been handled above.
23372337
RecordState recordState = recordStateWithDlq(ackType);
2338-
Throwable dlqCause = recordState == RecordState.ARCHIVING ? ShareGroupDLQ.CLIENT_REJECT : null;
2338+
Throwable dlqCause = recordState == RecordState.ARCHIVING ? ShareGroupDLQManager.CLIENT_REJECT : null;
23392339
if (recordState == null) {
23402340
return Optional.of(new IllegalArgumentException("Unknown acknowledge type id: " + ackType));
23412341
}
@@ -2416,7 +2416,7 @@ private Optional<Throwable> acknowledgeCompleteBatch(
24162416
// either released or moved to a state where member id existence is not important. The member id
24172417
// is only important when the batch is acquired.
24182418
RecordState recordState = recordStateWithDlq(ackType);
2419-
Throwable dlqCause = recordState == RecordState.ARCHIVING ? ShareGroupDLQ.CLIENT_REJECT : null;
2419+
Throwable dlqCause = recordState == RecordState.ARCHIVING ? ShareGroupDLQManager.CLIENT_REJECT : null;
24202420
if (recordState == null) {
24212421
return Optional.of(new IllegalArgumentException("Unknown acknowledge type id: " + ackType));
24222422
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.server.share.dlq;
19+
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.util.concurrent.CompletableFuture;
24+
25+
/**
26+
* The default share group DLQ manager responsible for processing
27+
* incoming messages and writing them to the appropriate dlq topic.
28+
*/
29+
public class DefaultShareGroupDLQManager implements ShareGroupDLQManager {
30+
/**
31+
* Reference to state manager responsible for actually sending
32+
* the relevant RPCs and writing records.
33+
*/
34+
private final ShareGroupDLQStateManager stateManager;
35+
36+
private static final Logger log = LoggerFactory.getLogger(DefaultShareGroupDLQManager.class);
37+
38+
public DefaultShareGroupDLQManager instance(ShareGroupDLQStateManager stateManager) {
39+
DefaultShareGroupDLQManager instance = new DefaultShareGroupDLQManager(stateManager);
40+
instance.start();
41+
return instance;
42+
}
43+
44+
private DefaultShareGroupDLQManager(ShareGroupDLQStateManager stateManager) {
45+
this.stateManager = stateManager;
46+
}
47+
48+
private void start() {
49+
this.stateManager.start();
50+
}
51+
52+
@Override
53+
public CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param) {
54+
try {
55+
validate(param);
56+
} catch (Exception e) {
57+
log.error("Unable to validate dlq record parameters", e);
58+
return CompletableFuture.failedFuture(e);
59+
}
60+
return stateManager.dlq(param);
61+
}
62+
63+
@Override
64+
public void stop() {
65+
try {
66+
if (stateManager != null) {
67+
stateManager.stop();
68+
}
69+
} catch (Exception e) {
70+
log.error("Unable to stop DLQ state manager", e);
71+
}
72+
}
73+
74+
private static void validate(ShareGroupDLQRecordParameter param) {
75+
String prefix = "DLQ records parameters";
76+
if (param == null) {
77+
throw new IllegalArgumentException(prefix + " cannot be null.");
78+
}
79+
80+
if (param.groupId() == null || param.groupId().isEmpty()) {
81+
throw new IllegalArgumentException(prefix + " group cannot be null or empty.");
82+
}
83+
84+
if (param.topicIdPartition() == null) {
85+
throw new IllegalArgumentException(prefix + " topic/partition data cannot be null or empty.");
86+
}
87+
88+
if (param.topicIdPartition().topicId() == null) {
89+
throw new IllegalArgumentException(prefix + " topic id data cannot be null or empty.");
90+
}
91+
92+
if (param.topicIdPartition().partition() < 0) {
93+
throw new IllegalArgumentException(prefix + " partition cannot be negative.");
94+
}
95+
96+
if (param.lastOffset() < param.firstOffset()) {
97+
throw new IllegalArgumentException(prefix + " last offset cannot be less than first offset.");
98+
}
99+
100+
if (param.firstOffset() < 0) {
101+
throw new IllegalArgumentException(prefix + " first offset cannot be negative.");
102+
}
103+
104+
if (param.lastOffset() < 0) {
105+
throw new IllegalArgumentException(prefix + " last offset cannot be negative.");
106+
}
107+
}
108+
}

server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,21 @@
2323
import java.util.concurrent.CompletableFuture;
2424

2525
/**
26-
* A no op implementation of {@link ShareGroupDLQ}. This will be useful
26+
* A no op implementation of {@link ShareGroupDLQManager}. This will be useful
2727
* in development cycle and testing. All methods return immediately with
2828
* a successfully completed future.
2929
*/
30-
public class NoOpShareGroupDLQManager implements ShareGroupDLQ {
30+
public class NoOpShareGroupDLQManager implements ShareGroupDLQManager {
3131
private static final Logger log = LoggerFactory.getLogger(NoOpShareGroupDLQManager.class);
3232

3333
@Override
3434
public CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param) {
3535
log.trace("Enqueuing share group dlq record parameter: {}", param);
3636
return CompletableFuture.completedFuture(null);
3737
}
38+
39+
@Override
40+
public void stop() {
41+
// noop
42+
}
3843
}

server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java renamed to server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQManager.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
/**
2323
* The main interface to identify implementations of dead letter queues for share groups.
2424
*/
25-
public interface ShareGroupDLQ {
25+
public interface ShareGroupDLQManager {
2626
class ShareGroupDLQThrowable extends Throwable {
2727
ShareGroupDLQThrowable(String message) {
2828
// We don't want the stack trace to be filled.
@@ -34,11 +34,16 @@ class ShareGroupDLQThrowable extends Throwable {
3434
Throwable DELIVERY_COUNT_EXCEEDED = new ShareGroupDLQThrowable("Offset delivery count exceeded the threshold.");
3535

3636
/**
37-
* Main method exposed to the world to enqueuing a record to the share groups dead letter queue.
37+
* Main method exposed to the world to enqueue a record to the share groups dead letter queue.
3838
*
3939
* @param param A java record encapsulating required and optional information about the kafka record
4040
* being dead letter queued.
4141
* @return A completable future of Void type, mainly to signal exceptions.
4242
*/
4343
CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param);
44+
45+
/**
46+
* Perform cleanup and interrupt any threads.
47+
*/
48+
void stop();
4449
}

server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.Optional;
2323

2424
/**
25-
* Record representing information needed from callers of {@link ShareGroupDLQ#enqueue}. Inclusion
25+
* Record representing information needed from callers of {@link ShareGroupDLQManager#enqueue}. Inclusion
2626
* of first and last offset allows passing batch information as well.
2727
*
2828
* @param groupId The share group id of the message being recorded.
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.server.share.dlq;
19+
20+
import org.apache.kafka.clients.ClientResponse;
21+
import org.apache.kafka.clients.CommonClientConfigs;
22+
import org.apache.kafka.clients.KafkaClient;
23+
import org.apache.kafka.clients.RequestCompletionHandler;
24+
import org.apache.kafka.common.requests.AbstractRequest;
25+
import org.apache.kafka.common.requests.AbstractResponse;
26+
import org.apache.kafka.common.utils.Time;
27+
import org.apache.kafka.common.utils.Timer;
28+
import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
29+
import org.apache.kafka.server.util.InterBrokerSendThread;
30+
import org.apache.kafka.server.util.RequestAndCompletionHandler;
31+
32+
import java.util.Collection;
33+
import java.util.List;
34+
import java.util.Random;
35+
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.ConcurrentLinkedQueue;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
39+
/**
40+
* Core implementation of RPC send logic for the dlq manager.
41+
* This class allows for enqueuing records meant to be DLQ'ed
42+
* and manages various RPC which are to be sent to the KafkaApis.
43+
* These RPCs include PRODUCE, CREATE_TOPIC.
44+
*/
45+
public class ShareGroupDLQStateManager {
46+
private final AtomicBoolean isStarted = new AtomicBoolean(false);
47+
private final SendThread sender;
48+
private final Time time;
49+
private final Timer timer;
50+
private final ShareCoordinatorMetadataCacheHelper cacheHelper;
51+
52+
public enum RPCType {
53+
PRODUCE,
54+
CREATE_TOPIC
55+
}
56+
57+
public ShareGroupDLQStateManager(KafkaClient client, ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
58+
if (client == null) {
59+
throw new IllegalArgumentException("Kafkaclient must not be null.");
60+
}
61+
62+
if (cacheHelper == null) {
63+
throw new IllegalArgumentException("Cache helper must not be null.");
64+
}
65+
66+
if (time == null) {
67+
throw new IllegalArgumentException("Time must not be null.");
68+
}
69+
70+
if (timer == null) {
71+
throw new IllegalArgumentException("Timer must not be null.");
72+
}
73+
74+
this.time = time;
75+
this.timer = timer;
76+
this.cacheHelper = cacheHelper;
77+
this.sender = new SendThread(
78+
"ShareGroupDLQSendThread",
79+
client,
80+
Math.toIntExact(CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS), //30 seconds
81+
this.time,
82+
true,
83+
new Random(this.time.milliseconds())
84+
);
85+
}
86+
87+
public void start() {
88+
if (isStarted.compareAndSet(false, true)) {
89+
this.sender.start();
90+
isStarted.set(true);
91+
}
92+
}
93+
94+
public void stop() throws Exception {
95+
if (isStarted.compareAndSet(true, false)) {
96+
this.sender.shutdown();
97+
}
98+
}
99+
100+
/**
101+
* Enqueues a {@link ShareGroupDLQRecordParameter} based on which records will be DLQ'ed.
102+
* The actual record written to the DLQ topic will be built by fetching information from this argument.
103+
*
104+
* @param param Reference comprising offset information
105+
* @return A future completing normally on successful DLQ, exceptionally otherwise.
106+
*/
107+
public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
108+
ProduceRequestHandler requestHandler = new ProduceRequestHandler(param);
109+
sender.enqueue(requestHandler);
110+
return requestHandler.result().thenAccept(response -> {
111+
});
112+
}
113+
114+
private abstract class ShareGroupDLQStateManagerHandler implements RequestCompletionHandler {
115+
protected abstract AbstractRequest.Builder<? extends AbstractRequest> requestBuilder();
116+
117+
protected abstract CompletableFuture<? extends AbstractResponse> result();
118+
}
119+
120+
private class ProduceRequestHandler extends ShareGroupDLQStateManagerHandler {
121+
122+
ProduceRequestHandler(ShareGroupDLQRecordParameter param) {
123+
}
124+
125+
@Override
126+
protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
127+
return null;
128+
}
129+
130+
@Override
131+
protected CompletableFuture<? extends AbstractResponse> result() {
132+
return CompletableFuture.completedFuture(null);
133+
}
134+
135+
@Override
136+
public void onComplete(ClientResponse response) {
137+
138+
}
139+
}
140+
141+
private static class SendThread extends InterBrokerSendThread {
142+
private final ConcurrentLinkedQueue<ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler> queue = new ConcurrentLinkedQueue<>();
143+
private final Random random;
144+
145+
SendThread(String name, KafkaClient client, int requestTimeoutMs, Time time, boolean isInterruptible, Random random) {
146+
super(name, client, requestTimeoutMs, time, isInterruptible);
147+
this.random = random;
148+
}
149+
150+
@Override
151+
public Collection<RequestAndCompletionHandler> generateRequests() {
152+
return List.of();
153+
}
154+
155+
public void enqueue(ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler handler) {
156+
queue.add(handler);
157+
wakeup();
158+
}
159+
}
160+
}

0 commit comments

Comments
 (0)