Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,3 @@ build/
**/kotlin/kni/
kni-meta-info
**/commonGenerated/
/lib-jvm-shared/
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ object ParameterDefinitions: AgentParameterDefinitionCollection() {
defaultValue = "QUEUED").register()
val MESSAGE_QUEUE_LIMIT = AgentParameterDefinition.forString(name = "messageQueueLimit", defaultValue = "512Mb").register()
val MESSAGE_MAX_RETRIES = AgentParameterDefinition.forInt(name = "messageMaxRetries", defaultValue = Int.MAX_VALUE).register()
val SHUTDOWN_FLUSH_TIMEOUT_MS = AgentParameterDefinition.forInt(
name = "shutdownFlushTimeoutMs",
description = "Maximum time in milliseconds to stop producers and flush messages during JVM shutdown",
defaultValue = 60_000,
).register()
val SSL_TRUSTSTORE = NullableAgentParameterDefinition.forString(name = "sslTruststore").register()
val SSL_TRUSTSTORE_PASSWORD = NullableAgentParameterDefinition.forString(name = "sslTruststorePassword").register()
val LOG_LEVEL = AgentParameterDefinition.forString(name = "logLevel", defaultValue = "INFO").register()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* Copyright 2020 - 2022 EPAM Systems
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.epam.drill.agent.lifecycle

expect object AgentShutdownCoordinator {
fun install()
fun shutdown()
}
4 changes: 3 additions & 1 deletion java-agent/src/jvmMain/kotlin/com/epam/drill/agent/Agent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.epam.drill.agent.module.JvmModuleLoader
import com.epam.drill.agent.test.session.SessionController
import com.epam.drill.agent.test2code.Test2Code
import com.epam.drill.agent.test2code.configuration.Test2CodeParameterDefinitions
import com.epam.drill.agent.lifecycle.AgentShutdownCoordinator
import com.epam.drill.agent.transport.JvmModuleMessageSender
import mu.KotlinLogging
import java.lang.instrument.ClassFileTransformer
Expand Down Expand Up @@ -61,6 +62,7 @@ fun premain(agentArgs: String?, inst: Instrumentation) {
}

SessionController.startSession()
AgentShutdownCoordinator.install()
} catch (e: Throwable) {
println("Drill4J Initialization Error:\n${e.message ?: e::class.java.name}")
}
Expand All @@ -82,7 +84,7 @@ fun main(args: Array<String>) {
JvmModuleMessageSender.sendBuildMetadata()
val test2Code = JvmModuleLoader.loadJvmModule(Test2Code::class.java.name) as Test2Code
test2Code.scanAndSendMetadataClasses()
Runtime.getRuntime().addShutdownHook(Thread { JvmModuleMessageSender.shutdown() })
AgentShutdownCoordinator.install()
exitProcess(0)
} catch (e: Throwable) {
println("Drill4J Initialization Error:\n${e.message ?: e::class.java.name}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright 2020 - 2022 EPAM Systems
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.epam.drill.agent.lifecycle

import com.epam.drill.agent.common.lifecycle.AgentShutdownRegistry
import com.epam.drill.agent.configuration.Configuration
import com.epam.drill.agent.configuration.ParameterDefinitions
import com.epam.drill.agent.transport.DataIngestMessageSender
import mu.KotlinLogging
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

/**
* Coordinates JVM shutdown: stop producers, then flush the shared message sender.
*/
actual object AgentShutdownCoordinator {
private val logger = KotlinLogging.logger {}
private val hookInstalled = AtomicBoolean(false)
private val shutdownCompleted = AtomicBoolean(false)

actual fun install() {
if (!hookInstalled.compareAndSet(false, true)) return
Runtime.getRuntime().addShutdownHook(
Thread({ shutdown() }, "drill-shutdown-hook")
)
logger.debug { "Agent shutdown hook installed." }
}

actual fun shutdown() {
if (!shutdownCompleted.compareAndSet(false, true)) return
runShutdown()
}

private fun runShutdown() {
val flushTimeoutMs = Configuration.parameters[ParameterDefinitions.SHUTDOWN_FLUSH_TIMEOUT_MS].toLong()
val deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(flushTimeoutMs)
logger.debug { "Agent shutdown started, flush timeout is ${flushTimeoutMs}ms." }

for (task in AgentShutdownRegistry.tasks()) {
val remainingMs = remainingMs(deadlineNanos)
if (remainingMs <= 0) {
logger.warn { "Shutdown flush timeout reached before task '${task.name}'." }
break
}
runCatching {
logger.debug { "Running shutdown task '${task.name}' (${remainingMs}ms remaining)." }
task.action(remainingMs)
}.onFailure {
logger.error(it) { "Shutdown task '${task.name}' failed." }
}
}

val remainingMs = remainingMs(deadlineNanos)
if (remainingMs <= 0) {
logger.warn { "Shutdown flush timeout exhausted, attempting a best-effort message sender flush." }
} else {
logger.debug { "Flushing message sender, ${remainingMs}ms remaining." }
}
runCatching {
DataIngestMessageSender.shutdownWithTimeout(remainingMs)
}.onFailure {
logger.error(it) { "Message sender shutdown failed." }
}
logger.debug { "Agent shutdown completed." }
}

private fun remainingMs(deadlineNanos: Long): Long =
TimeUnit.NANOSECONDS.toMillis(deadlineNanos - System.nanoTime()).coerceAtLeast(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit

interface TestInfoSender {
fun startSendingTests()
fun stopSendingTests()
fun stopSendingTests(remainingMs: Long)
}

class IntervalTestInfoSender(
Expand All @@ -36,25 +36,33 @@ class IntervalTestInfoSender(
private val collectTests: () -> List<TestLaunchPayload> = { emptyList() }
) : TestInfoSender {
private val logger = KotlinLogging.logger {}
private val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
private val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor { r ->
Thread(r, "drill-test-info-sender").apply {
isDaemon = true
}
}

override fun startSendingTests() {
scheduledThreadPool.scheduleAtFixedRate(
{ sendTests(collectTests()) },
{
try {
sendTests(collectTests())
} catch (t: Throwable) {
logger.error(t) { "Test sending job failed" }
}
},
0,
intervalMs,
TimeUnit.MILLISECONDS
)
logger.debug { "Test sending job is started." }
}

override fun stopSendingTests() {
override fun stopSendingTests(remainingMs: Long) {
sendTests(collectTests())
messageSender.shutdown()
scheduledThreadPool.shutdown()
if (!scheduledThreadPool.awaitTermination(1, TimeUnit.SECONDS)) {
logger.error("Failed to send some tests prior to shutdown")
scheduledThreadPool.shutdownNow();
if (remainingMs > 0 && !scheduledThreadPool.awaitTermination(remainingMs, TimeUnit.MILLISECONDS)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we wait for the full remainingMs instead of a shorter timeout specific to this step?
If we block here for the entire remaining shutdown window due to a stuck task, other cleanup steps may not run in time.

logger.warn { "Test sending scheduler did not stop within ${remainingMs}ms; leaving it for JVM exit." }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is shutdownNow() not used here?
Without it, non-daemon threads may remain running in a worst-case scenario, potentially preventing the JVM from shutting down.

}
logger.info { "Test sending job is stopped." }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.epam.drill.agent.test.execution.TestController
import com.epam.drill.agent.test.execution.TestExecutionInfo
import com.epam.drill.agent.test.sending.TestDefinitionPayload
import com.epam.drill.agent.test.sending.TestLaunchPayload
import com.epam.drill.agent.common.lifecycle.AgentShutdownRegistry
import com.epam.drill.agent.transport.DataIngestMessageSender
import mu.KotlinLogging
import java.time.Instant
Expand Down Expand Up @@ -63,7 +64,9 @@ actual object SessionController {
logger.info { "Test session: $sessionId" }

testInfoSender.startSendingTests()
Runtime.getRuntime().addShutdownHook(Thread { testInfoSender.stopSendingTests() })
AgentShutdownRegistry.register("test-info-sender") { remainingMs ->
testInfoSender.stopSendingTests(remainingMs)
}

val builds =
takeIf { Configuration.parameters[ParameterDefinitions.RECOMMENDED_TESTS_TARGET_APP_ID].isNotEmpty() }?.let {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,37 @@
*/
package com.epam.drill.agent.transport

import com.epam.drill.agent.common.transport.AgentMessageDestination
import com.epam.drill.agent.common.transport.AgentMessageSender
import com.epam.drill.agent.configuration.Configuration
import com.epam.drill.agent.configuration.DefaultParameterDefinitions
import com.epam.drill.agent.configuration.ParameterDefinitions
import com.epam.drill.agent.transport.http.HttpAgentMessageTransport
import io.aesy.datasize.ByteUnit
import io.aesy.datasize.DataSize
import kotlinx.serialization.KSerializer
import mu.KotlinLogging
import java.io.File
import kotlin.takeIf

private const val QUEUE_DEFAULT_SIZE: Long = 512L * 1024 * 1024
private val logger = KotlinLogging.logger {}

object DataIngestMessageSender : AgentMessageSender by messageSender()
object DataIngestMessageSender : AgentMessageSender {
private val delegate: AgentMessageSender = messageSender()

override fun <T> send(destination: AgentMessageDestination, message: T, serializer: KSerializer<T>) =
delegate.send(destination, message, serializer)

override fun shutdown() = shutdownWithTimeout(Long.MAX_VALUE)

fun shutdownWithTimeout(flushTimeoutMs: Long) {
when (val sender = delegate) {
is QueuedAgentMessageSender -> sender.shutdown(flushTimeoutMs)
else -> sender.shutdown()
}
}
}

fun messageSender(): AgentMessageSender {
val transport = agentMessageTransport()
Expand Down Expand Up @@ -86,4 +102,4 @@ private fun parseBytes(value: String): Long = value.run {
.onFailure(logError)
.getOrDefault(DataSize.of(QUEUE_DEFAULT_SIZE, ByteUnit.BYTE))
.toUnit(ByteUnit.BYTE).value.toLong()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import com.epam.drill.agent.configuration.DefaultParameterDefinitions
import com.epam.drill.agent.configuration.DefaultParameterDefinitions.INSTALLATION_DIR
import com.epam.drill.agent.configuration.ParameterDefinitions
import com.epam.drill.agent.jvmti.classFileLoadHook
import com.epam.drill.agent.jvmti.vmDeathEvent
import com.epam.drill.agent.jvmti.vmInitEvent
import com.epam.drill.agent.module.JvmModuleLoader
import com.epam.drill.agent.transport.JvmModuleMessageSender
import com.epam.drill.agent.jvmapi.gen.*
import com.epam.drill.agent.lifecycle.AgentShutdownCoordinator
import com.epam.drill.agent.test.session.SessionController
import kotlinx.cinterop.ExperimentalForeignApi
import kotlin.experimental.ExperimentalNativeApi
Expand Down Expand Up @@ -89,10 +89,7 @@ object Agent {
JvmModuleMessageSender.sendAgentMetadata()
}
SessionController.startSession()
}

fun agentOnVmDeath() {
logger.debug { "agentOnVmDeath" }
AgentShutdownCoordinator.install()
}

@OptIn(ExperimentalForeignApi::class)
Expand All @@ -107,7 +104,6 @@ object Agent {
private fun setEventCallbacks() = memScoped {
val alloc = alloc<jvmtiEventCallbacks>()
alloc.VMInit = staticCFunction(::vmInitEvent)
alloc.VMDeath = staticCFunction(::vmDeathEvent)
alloc.ClassFileLoadHook = staticCFunction(::classFileLoadHook)
SetEventCallbacks(alloc.ptr, sizeOf<jvmtiEventCallbacks>().toInt())
SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_VM_INIT, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ import kotlinx.cinterop.ExperimentalForeignApi
@Suppress("unused_parameter")
fun vmInitEvent(env: CPointer<jvmtiEnvVar>?, jniEnv: CPointer<JNIEnvVar>?, thread: jthread?) = Agent.agentOnVmInit()

@OptIn(ExperimentalForeignApi::class)
@Suppress("unused_parameter")
fun vmDeathEvent(jvmtiEnv: CPointer<jvmtiEnvVar>?, jniEnv: CPointer<JNIEnvVar>?) = Agent.agentOnVmDeath()

@OptIn(ExperimentalForeignApi::class)
@Suppress("unused_parameter")
fun classFileLoadHook(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2020 - 2022 EPAM Systems
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.epam.drill.agent.lifecycle

import com.epam.drill.agent.jvmapi.callObjectVoidMethod

actual object AgentShutdownCoordinator {
actual fun install(): Unit =
callObjectVoidMethod(AgentShutdownCoordinator::class, AgentShutdownCoordinator::install)

actual fun shutdown(): Unit =
callObjectVoidMethod(AgentShutdownCoordinator::class, AgentShutdownCoordinator::shutdown)
}
Loading
Loading