Skip to content

Commit 55044e6

Browse files
mario-schwede-hivemqA-Imal
authored andcommitted
MQTT 5 client not disconnected on timeout
1 parent 58b9c40 commit 55044e6

3 files changed

Lines changed: 31 additions & 10 deletions

File tree

src/main/java/com/hivemq/extensions/handler/DisconnectInterceptorHandler.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.hivemq.extensions.handler;
1717

1818
import com.google.inject.Inject;
19-
import com.hivemq.bootstrap.ClientConnection;
2019
import com.hivemq.bootstrap.ClientConnectionContext;
2120
import com.hivemq.configuration.service.FullConfigurationService;
2221
import com.hivemq.extension.sdk.api.annotations.NotNull;
@@ -80,12 +79,12 @@ public void handleInboundDisconnect(
8079
final @NotNull ChannelHandlerContext ctx,
8180
final @NotNull DISCONNECT disconnect) {
8281
final Channel channel = ctx.channel();
83-
final ClientConnection clientConnection = ClientConnection.of(channel);
84-
final String clientId = clientConnection.getClientId();
82+
final ClientConnectionContext clientConnectionContext = ClientConnectionContext.of(channel);
83+
final String clientId = clientConnectionContext.getClientId();
8584
if (clientId == null) {
8685
return;
8786
}
88-
final ClientContextImpl clientContext = clientConnection.getExtensionClientContext();
87+
final ClientContextImpl clientContext = clientConnectionContext.getExtensionClientContext();
8988
if (clientContext == null) {
9089
ctx.fireChannelRead(disconnect);
9190
return;
@@ -98,7 +97,7 @@ public void handleInboundDisconnect(
9897
channel.config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true);
9998
final ClientInformation clientInfo = ExtensionInformationUtil.getAndSetClientInformation(channel, clientId);
10099
final ConnectionInformation connectionInfo = ExtensionInformationUtil.getAndSetConnectionInformation(channel);
101-
final Long originalSessionExpiryInterval = clientConnection.getClientSessionExpiryInterval();
100+
final Long originalSessionExpiryInterval = clientConnectionContext.getClientSessionExpiryInterval();
102101
final DisconnectPacketImpl packet = new DisconnectPacketImpl(disconnect);
103102
final DisconnectInboundInputImpl input = new DisconnectInboundInputImpl(clientInfo, connectionInfo, packet);
104103
final ExtensionParameterHolder<DisconnectInboundInputImpl> inputHolder = new ExtensionParameterHolder<>(input);

src/main/java/com/hivemq/mqtt/handler/connect/MessageBarrier.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.hivemq.mqtt.message.auth.AUTH;
2424
import com.hivemq.mqtt.message.connack.CONNACK;
2525
import com.hivemq.mqtt.message.connect.CONNECT;
26+
import com.hivemq.mqtt.message.disconnect.DISCONNECT;
2627
import com.hivemq.mqtt.message.reason.Mqtt5ConnAckReasonCode;
2728
import io.netty.channel.Channel;
2829
import io.netty.channel.ChannelDuplexHandler;
@@ -62,15 +63,13 @@ public void channelRead(final @NotNull ChannelHandlerContext ctx, final @NotNull
6263
if (msg instanceof Message) {
6364
if (msg instanceof CONNECT) {
6465
connectReceived = true;
65-
suspendRead(ctx.channel());
6666
} else if (!connectReceived) {
6767
serverDisconnector.logAndClose(ctx.channel(),
6868
"A client (IP: {}) sent other message before CONNECT. Disconnecting client.",
6969
"Sent other message before CONNECT");
7070
return;
71-
} else if (msg instanceof AUTH) {
71+
} else if (!(msg instanceof AUTH) && !(msg instanceof DISCONNECT) && !connackSent) {
7272
suspendRead(ctx.channel());
73-
} else if (!connackSent) {
7473
messageQueue.add((Message) msg);
7574
return;
7675
}

src/test/java/com/hivemq/mqtt/handler/connect/MessageBarrierTest.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.google.common.collect.ImmutableList;
1919
import com.hivemq.bootstrap.ClientConnectionContext;
20+
import com.hivemq.extension.sdk.api.annotations.NotNull;
2021
import com.hivemq.logging.EventLog;
2122
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
2223
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnectorImpl;
@@ -86,9 +87,31 @@ public void test_queue_messages_after_connect() {
8687
channel.writeInbound(TestMessageUtil.createMqtt3Publish());
8788
channel.writeInbound(new PINGREQ());
8889
channel.writeInbound(TestMessageUtil.createMqtt3Publish());
89-
channel.writeInbound(new DISCONNECT());
9090
assertTrue(channel.isActive());
91-
assertEquals(6, messageBarrier.getQueue().size());
91+
assertEquals(5, messageBarrier.getQueue().size());
92+
}
93+
94+
@Test
95+
public void test_disconnect_bypasses_queue_before_connack() {
96+
channel.writeInbound(new CONNECT.Mqtt3Builder().withProtocolVersion(ProtocolVersion.MQTTv3_1_1)
97+
.withClientIdentifier("clientID")
98+
.build());
99+
100+
final AtomicInteger disconnectCounter = new AtomicInteger(0);
101+
channel.pipeline().addAfter(MQTT_MESSAGE_BARRIER, "test", new ChannelDuplexHandler() {
102+
103+
@Override
104+
public void channelRead(final @NotNull ChannelHandlerContext ctx, final @NotNull Object msg) {
105+
if (msg instanceof DISCONNECT) {
106+
disconnectCounter.incrementAndGet();
107+
}
108+
}
109+
});
110+
111+
channel.writeInbound(new DISCONNECT());
112+
113+
assertEquals(1, disconnectCounter.get());
114+
assertEquals(0, messageBarrier.getQueue().size());
92115
}
93116

94117
@Test

0 commit comments

Comments
 (0)