diff --git a/netty-socketio-core/src/test/java/com/socketio4j/socketio/scheduler/HashedWheelTimeoutSchedulerTest.java b/netty-socketio-core/src/test/java/com/socketio4j/socketio/scheduler/HashedWheelTimeoutSchedulerTest.java index 161fcbad..1bda9887 100644 --- a/netty-socketio-core/src/test/java/com/socketio4j/socketio/scheduler/HashedWheelTimeoutSchedulerTest.java +++ b/netty-socketio-core/src/test/java/com/socketio4j/socketio/scheduler/HashedWheelTimeoutSchedulerTest.java @@ -16,6 +16,8 @@ */ package com.socketio4j.socketio.scheduler; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -575,11 +577,12 @@ void shouldHandleConcurrentTimeoutReplacement() throws InterruptedException { CountDownLatch completionLatch = new CountDownLatch(threadCount); AtomicInteger executionCount = new AtomicInteger(0); SchedulerKey sharedKey = new SchedulerKey(SchedulerKey.Type.PING, "shared-session"); + List threads = new ArrayList<>(threadCount); // When for (int i = 0; i < threadCount; i++) { final int threadId = i; - new Thread(() -> { + Thread thread = new Thread(() -> { try { startLatch.await(); // All threads try to schedule with the same key @@ -590,13 +593,18 @@ void shouldHandleConcurrentTimeoutReplacement() throws InterruptedException { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - }).start(); + }); + threads.add(thread); + thread.start(); } startLatch.countDown(); // Then boolean completed = completionLatch.await(5, TimeUnit.SECONDS); + for (Thread thread : threads) { + thread.join(); + } // Due to replacement, the exact count depends on timing and implementation // We just verify that the test completes without hanging assertThat(executionCount.get()).isGreaterThanOrEqualTo(0); @@ -611,30 +619,35 @@ void shouldHandleConcurrentCancellation() throws InterruptedException { CountDownLatch startLatch = new CountDownLatch(1); CountDownLatch completionLatch = new CountDownLatch(threadCount); AtomicInteger executionCount = new AtomicInteger(0); + Thread[] workers = new Thread[threadCount]; // When for (int i = 0; i < threadCount; i++) { final int threadId = i; - new Thread(() -> { + workers[i] = new Thread(() -> { try { startLatch.await(); SchedulerKey key = new SchedulerKey(SchedulerKey.Type.PING, "session-" + threadId); - + // Schedule and immediately cancel scheduler.schedule(key, () -> { executionCount.incrementAndGet(); completionLatch.countDown(); }, 200, TimeUnit.MILLISECONDS); - + scheduler.cancel(key); - + } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - }).start(); + }); + workers[i].start(); } startLatch.countDown(); + for (Thread worker : workers) { + worker.join(); + } // Then boolean completed = completionLatch.await(3, TimeUnit.SECONDS); @@ -781,6 +794,7 @@ void shouldHandleRaceConditionBetweenCancelAndSchedule() throws InterruptedExcep // Then boolean completed = completionLatch.await(5, TimeUnit.SECONDS); + raceThread.join(1000); // The task might or might not execute due to race condition, but no exception should occur assertThat(raceThread.isAlive()).isFalse(); }