Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.socketio4j.socketio.scheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -575,11 +577,12 @@ void shouldHandleConcurrentTimeoutReplacement() throws InterruptedException {
CountDownLatch completionLatch = new CountDownLatch(threadCount);
AtomicInteger executionCount = new AtomicInteger(0);
SchedulerKey sharedKey = new SchedulerKey(SchedulerKey.Type.PING, "shared-session");
List<Thread> threads = new ArrayList<>(threadCount);

// When
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
Thread thread = new Thread(() -> {
try {
startLatch.await();
// All threads try to schedule with the same key
Expand All @@ -590,13 +593,18 @@ void shouldHandleConcurrentTimeoutReplacement() throws InterruptedException {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
});
threads.add(thread);
thread.start();
}

startLatch.countDown();

// Then
boolean completed = completionLatch.await(5, TimeUnit.SECONDS);
for (Thread thread : threads) {
thread.join();
}
// Due to replacement, the exact count depends on timing and implementation
// We just verify that the test completes without hanging
assertThat(executionCount.get()).isGreaterThanOrEqualTo(0);
Expand All @@ -611,30 +619,35 @@ void shouldHandleConcurrentCancellation() throws InterruptedException {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch completionLatch = new CountDownLatch(threadCount);
AtomicInteger executionCount = new AtomicInteger(0);
Thread[] workers = new Thread[threadCount];

// When
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
workers[i] = new Thread(() -> {
try {
startLatch.await();
SchedulerKey key = new SchedulerKey(SchedulerKey.Type.PING, "session-" + threadId);

// Schedule and immediately cancel
scheduler.schedule(key, () -> {
executionCount.incrementAndGet();
completionLatch.countDown();
}, 200, TimeUnit.MILLISECONDS);

scheduler.cancel(key);

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
});
workers[i].start();
}

startLatch.countDown();
for (Thread worker : workers) {
worker.join();
}

// Then
boolean completed = completionLatch.await(3, TimeUnit.SECONDS);
Expand Down Expand Up @@ -781,6 +794,7 @@ void shouldHandleRaceConditionBetweenCancelAndSchedule() throws InterruptedExcep

// Then
boolean completed = completionLatch.await(5, TimeUnit.SECONDS);
raceThread.join(1000);
// The task might or might not execute due to race condition, but no exception should occur
assertThat(raceThread.isAlive()).isFalse();
}
Expand Down
Loading