From 7bd962f82a087cb37444c4416648c5eca88cc1a5 Mon Sep 17 00:00:00 2001 From: iryabov Date: Fri, 24 Apr 2026 10:31:35 +0200 Subject: [PATCH 1/4] fix: reduce await termination timeout for scheduled thread pool and executor shutdown --- .../com/epam/drill/agent/test/sending/TestInfoSender.kt | 4 ++-- .../drill/agent/transport/QueuedAgentMessageSender.kt | 8 ++++---- .../epam/drill/agent/test2code/coverage/CoverageSender.kt | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/java-agent/src/jvmMain/kotlin/com/epam/drill/agent/test/sending/TestInfoSender.kt b/java-agent/src/jvmMain/kotlin/com/epam/drill/agent/test/sending/TestInfoSender.kt index b3db15bc3..20634f2a4 100644 --- a/java-agent/src/jvmMain/kotlin/com/epam/drill/agent/test/sending/TestInfoSender.kt +++ b/java-agent/src/jvmMain/kotlin/com/epam/drill/agent/test/sending/TestInfoSender.kt @@ -49,13 +49,13 @@ class IntervalTestInfoSender( } override fun stopSendingTests() { + sendTests(collectTests()) + messageSender.shutdown() scheduledThreadPool.shutdown() if (!scheduledThreadPool.awaitTermination(1, TimeUnit.SECONDS)) { logger.error("Failed to send some tests prior to shutdown") scheduledThreadPool.shutdownNow(); } - sendTests(collectTests()) - messageSender.shutdown() logger.info { "Test sending job is stopped." } } diff --git a/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt b/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt index b69962d01..01b1930a8 100644 --- a/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt +++ b/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt @@ -16,7 +16,6 @@ package com.epam.drill.agent.transport import mu.KotlinLogging -import com.epam.drill.agent.common.transport.AgentMessage import com.epam.drill.agent.common.transport.AgentMessageDestination import com.epam.drill.agent.common.transport.AgentMessageSender import kotlinx.serialization.KSerializer @@ -52,7 +51,7 @@ open class QueuedAgentMessageSender( } } - override fun send(destination: AgentMessageDestination, message: T, serializer: KSerializer) { + override fun send(destination: AgentMessageDestination, message: T, serializer: KSerializer) { val mappedDestination = destinationMapper.map(destination) val serializedMessage = messageSerializer.serialize(message, serializer) if (!isRunning.get()) { @@ -69,10 +68,10 @@ open class QueuedAgentMessageSender( } override fun shutdown() { - isRunning.set(false) + if (!isRunning.compareAndSet(true, false)) return executor.shutdown() try { - if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { executor.shutdownNow() } } catch (e: InterruptedException) { @@ -144,6 +143,7 @@ open class QueuedAgentMessageSender( tryToSend(destination, message) || handleUnsent(destination, message, reason) } } while (message != null) + logger.info { "Finished unloading a message queue." } } /** diff --git a/test2code/src/main/kotlin/com/epam/drill/agent/test2code/coverage/CoverageSender.kt b/test2code/src/main/kotlin/com/epam/drill/agent/test2code/coverage/CoverageSender.kt index 8fec6feb2..edad511ac 100644 --- a/test2code/src/main/kotlin/com/epam/drill/agent/test2code/coverage/CoverageSender.kt +++ b/test2code/src/main/kotlin/com/epam/drill/agent/test2code/coverage/CoverageSender.kt @@ -59,14 +59,14 @@ class IntervalCoverageSender( } override fun stopSendingCoverage() { + sendProbes(collectReleasedProbes()) + sendProbes(collectUnreleasedProbes()) + sender.shutdown() scheduledThreadPool.shutdown() - if (!scheduledThreadPool.awaitTermination(5, TimeUnit.SECONDS)) { + if (!scheduledThreadPool.awaitTermination(1, TimeUnit.SECONDS)) { logger.error("Failed to send some coverage data prior to shutdown") scheduledThreadPool.shutdownNow(); } - sendProbes(collectReleasedProbes()) - sendProbes(collectUnreleasedProbes()) - sender.shutdown() logger.info { "Coverage sending job is stopped." } } From c711928e73e41617801b2d30001fe4a65dc21a6d Mon Sep 17 00:00:00 2001 From: iryabov Date: Tue, 28 Apr 2026 09:58:36 +0200 Subject: [PATCH 2/4] refactor: remove unnecessary check for sender running state in message sending --- .../epam/drill/agent/transport/QueuedAgentMessageSender.kt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt b/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt index 01b1930a8..309ed03f9 100644 --- a/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt +++ b/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt @@ -54,10 +54,6 @@ open class QueuedAgentMessageSender( override fun send(destination: AgentMessageDestination, message: T, serializer: KSerializer) { val mappedDestination = destinationMapper.map(destination) val serializedMessage = messageSerializer.serialize(message, serializer) - if (!isRunning.get()) { - handleUnsent(mappedDestination, serializedMessage, "sender is not running") - return - } if (!messageQueue.offer(Pair(mappedDestination, serializedMessage))) { handleUnsent(mappedDestination, serializedMessage, "queue capacity limit reached") return From 7a279c7a6074af0073d57b0d028af6ffb9f32339 Mon Sep 17 00:00:00 2001 From: iryabov Date: Wed, 6 May 2026 08:46:40 +0200 Subject: [PATCH 3/4] test: update method signatures in checksum test for consistency --- .../epam/drill/agent/test2code/classparsing/ChecksumTest.kt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test2code/src/test/kotlin/com/epam/drill/agent/test2code/classparsing/ChecksumTest.kt b/test2code/src/test/kotlin/com/epam/drill/agent/test2code/classparsing/ChecksumTest.kt index c37d25c19..0b7d74f1e 100644 --- a/test2code/src/test/kotlin/com/epam/drill/agent/test2code/classparsing/ChecksumTest.kt +++ b/test2code/src/test/kotlin/com/epam/drill/agent/test2code/classparsing/ChecksumTest.kt @@ -50,8 +50,10 @@ class ChecksumTest { @Test fun `changing lambda body should change it's checksum`() { - val methodSignatureBuild1 = "com/epam/drill/agent/fixture/ast/Build1:lambda\$null\$2:java.lang.String:java.lang.String" - val methodSignatureBuild2 = "com/epam/drill/agent/fixture/ast/Build2:lambda\$null\$2:java.lang.String:java.lang.String" + val methodSignatureBuild1 = + "com/epam/drill/agent/fixture/ast/Build1:lambda\$theSameMethodBody$0:java.lang.String:java.lang.String" + val methodSignatureBuild2 = + "com/epam/drill/agent/fixture/ast/Build2:lambda\$theSameMethodBody$0:java.lang.String:java.lang.String" assertNotEquals( checksumsBuild1[methodSignatureBuild1], checksumsBuild2[methodSignatureBuild2] From 5d18457bc6e239c74772f38d7107ae33bd68134d Mon Sep 17 00:00:00 2001 From: iryabov Date: Wed, 6 May 2026 09:04:52 +0200 Subject: [PATCH 4/4] fix: handle unsent messages when sender is not running --- .../epam/drill/agent/transport/QueuedAgentMessageSender.kt | 4 ++++ .../drill/agent/transport/QueuedAgentMessageSenderTest.kt | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt b/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt index 309ed03f9..01b1930a8 100644 --- a/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt +++ b/lib-jvm-shared/agent-transport/src/jvmMain/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSender.kt @@ -54,6 +54,10 @@ open class QueuedAgentMessageSender( override fun send(destination: AgentMessageDestination, message: T, serializer: KSerializer) { val mappedDestination = destinationMapper.map(destination) val serializedMessage = messageSerializer.serialize(message, serializer) + if (!isRunning.get()) { + handleUnsent(mappedDestination, serializedMessage, "sender is not running") + return + } if (!messageQueue.offer(Pair(mappedDestination, serializedMessage))) { handleUnsent(mappedDestination, serializedMessage, "queue capacity limit reached") return diff --git a/lib-jvm-shared/agent-transport/src/jvmTest/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSenderTest.kt b/lib-jvm-shared/agent-transport/src/jvmTest/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSenderTest.kt index 5169a4ef6..97b664534 100644 --- a/lib-jvm-shared/agent-transport/src/jvmTest/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSenderTest.kt +++ b/lib-jvm-shared/agent-transport/src/jvmTest/kotlin/com/epam/drill/agent/transport/QueuedAgentMessageSenderTest.kt @@ -180,10 +180,10 @@ class QueuedAgentMessageSenderTest { calls?.waitFor { verify(exactly = it) { messageSerializer.serialize(any(), TestAgentMessage.serializer()) } } calls?.waitFor { verify(exactly = it) { destinationMapper.map(any()) } } enqueued?.waitFor { - assertEquals(it, queueOffers.filter { o -> o }.size) + assertEquals(it, queueOffers.filter { o -> o }.size, "Expected $it messages to be enqueued, but was ${queueOffers.filter { o -> o }.size}") } dequeued?.waitFor { - assertEquals(it, queuePolls.filterNotNull().size) + assertEquals(it, queuePolls.filterNotNull().size, "Expected $it messages to be dequeued, but was ${queuePolls.filterNotNull().size}") } sendingAttempts?.waitFor { verify(exactly = it) { messageTransport.send(any(), any(), any()) } } @@ -191,7 +191,7 @@ class QueuedAgentMessageSenderTest { unsent?.waitFor { verify(exactly = it) { messageSendingListener.onUnsent(any(), any()) } } } - private fun T.waitFor(timeout: Long = 1000, block: (T) -> Unit) { + private fun T.waitFor(timeout: Long = 2000, block: (T) -> Unit) { val start = System.currentTimeMillis() val timeIsOut = { System.currentTimeMillis() - start > timeout } var error: Throwable? = null