Skip to content

Commit c1279b6

Browse files
MQTT 5 client not disconnected on timeout
1 parent dab29af commit c1279b6

2 files changed

Lines changed: 27 additions & 5 deletions

File tree

src/main/java/com/hivemq/mqtt/handler/connect/MessageBarrier.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.hivemq.mqtt.message.auth.AUTH;
2424
import com.hivemq.mqtt.message.connack.CONNACK;
2525
import com.hivemq.mqtt.message.connect.CONNECT;
26+
import com.hivemq.mqtt.message.disconnect.DISCONNECT;
2627
import com.hivemq.mqtt.message.reason.Mqtt5ConnAckReasonCode;
2728
import io.netty.channel.Channel;
2829
import io.netty.channel.ChannelDuplexHandler;
@@ -62,15 +63,13 @@ public void channelRead(final @NotNull ChannelHandlerContext ctx, final @NotNull
6263
if (msg instanceof Message) {
6364
if (msg instanceof CONNECT) {
6465
connectReceived = true;
65-
suspendRead(ctx.channel());
6666
} else if (!connectReceived) {
6767
serverDisconnector.logAndClose(ctx.channel(),
6868
"A client (IP: {}) sent other message before CONNECT. Disconnecting client.",
6969
"Sent other message before CONNECT");
7070
return;
71-
} else if (msg instanceof AUTH) {
71+
} else if (!(msg instanceof AUTH) && !(msg instanceof DISCONNECT) && !connackSent) {
7272
suspendRead(ctx.channel());
73-
} else if (!connackSent) {
7473
messageQueue.add((Message) msg);
7574
return;
7675
}

src/test/java/com/hivemq/mqtt/handler/connect/MessageBarrierTest.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.google.common.collect.ImmutableList;
1919
import com.hivemq.bootstrap.ClientConnectionContext;
20+
import com.hivemq.extension.sdk.api.annotations.NotNull;
2021
import com.hivemq.logging.EventLog;
2122
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
2223
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnectorImpl;
@@ -86,9 +87,31 @@ public void test_queue_messages_after_connect() {
8687
channel.writeInbound(TestMessageUtil.createMqtt3Publish());
8788
channel.writeInbound(new PINGREQ());
8889
channel.writeInbound(TestMessageUtil.createMqtt3Publish());
89-
channel.writeInbound(new DISCONNECT());
9090
assertTrue(channel.isActive());
91-
assertEquals(6, messageBarrier.getQueue().size());
91+
assertEquals(5, messageBarrier.getQueue().size());
92+
}
93+
94+
@Test
95+
public void test_disconnect_bypasses_queue_before_connack() {
96+
channel.writeInbound(new CONNECT.Mqtt3Builder().withProtocolVersion(ProtocolVersion.MQTTv3_1_1)
97+
.withClientIdentifier("clientID")
98+
.build());
99+
100+
final AtomicInteger disconnectCounter = new AtomicInteger(0);
101+
channel.pipeline().addAfter(MQTT_MESSAGE_BARRIER, "test", new ChannelDuplexHandler() {
102+
103+
@Override
104+
public void channelRead(final @NotNull ChannelHandlerContext ctx, final @NotNull Object msg) {
105+
if (msg instanceof DISCONNECT) {
106+
disconnectCounter.incrementAndGet();
107+
}
108+
}
109+
});
110+
111+
channel.writeInbound(new DISCONNECT());
112+
113+
assertEquals(1, disconnectCounter.get());
114+
assertEquals(0, messageBarrier.getQueue().size());
92115
}
93116

94117
@Test

0 commit comments

Comments
 (0)