diff --git a/admin-app/src/main/kotlin/com/epam/drill/admin/DrillAdminApplication.kt b/admin-app/src/main/kotlin/com/epam/drill/admin/DrillAdminApplication.kt index 9b11d5ee1..9a869d6cb 100644 --- a/admin-app/src/main/kotlin/com/epam/drill/admin/DrillAdminApplication.kt +++ b/admin-app/src/main/kotlin/com/epam/drill/admin/DrillAdminApplication.kt @@ -52,8 +52,12 @@ import io.ktor.server.request.* import io.ktor.server.resources.* import io.ktor.server.response.* import io.ktor.server.routing.* +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.runBlocking import kotlinx.serialization.json.Json import mu.KotlinLogging +import org.kodein.di.allInstances import org.kodein.di.instance import org.kodein.di.ktor.closestDI import org.kodein.di.ktor.di @@ -92,6 +96,7 @@ fun Application.module() { if (oauth2Enabled) configureOAuthAuthentication(di) roleBasedAuthentication() } + shutdownQueues() routing { rootRoute() swaggerUI(path = "swagger", swaggerFile = "openapi.yml") @@ -235,4 +240,18 @@ val Application.jsCoverageConverterAddress: String .propertyOrNull("jsCoverageConverterAddress") ?.getString() ?.takeIf { it.isNotBlank() } - ?: "http://localhost:8092" // TODO think of default \ No newline at end of file + ?: "http://localhost:8092" // TODO think of default + +private fun Application.shutdownQueues() { + val closableComponents: List by closestDI().allInstances() + + environment.monitor.subscribe(ApplicationStopping) { + runBlocking { + closableComponents.map { + async { + it.close() + } + }.awaitAll() + } + } +} \ No newline at end of file diff --git a/admin-app/src/main/resources/application.conf b/admin-app/src/main/resources/application.conf index 4b95d5e13..b72194a17 100644 --- a/admin-app/src/main/resources/application.conf +++ b/admin-app/src/main/resources/application.conf @@ -16,6 +16,20 @@ ktor { } } +kafka { + bootstrapServers = "localhost:9092" + bootstrapServers = ${?DRILL_KAFKA_BOOTSTRAP_SERVERS} + securityProtocol = ${?DRILL_KAFKA_SECURITY_PROTOCOL} + saslMechanism = ${?DRILL_KAFKA_SASL_MECHANISM} + saslJaasConfig = ${?DRILL_KAFKA_SASL_JAAS_CONFIG} + sslTruststoreLocation = ${?DRILL_KAFKA_SSL_TRUSTSTORE_LOCATION} + sslTruststorePassword = ${?DRILL_KAFKA_SSL_TRUSTSTORE_PASSWORD} + sslKeystoreLocation = ${?DRILL_KAFKA_SSL_KEYSTORE_LOCATION} + sslKeystorePassword = ${?DRILL_KAFKA_SSL_KEYSTORE_PASSWORD} + sslKeyPassword = ${?DRILL_KAFKA_SSL_KEY_PASSWORD} + sslEndpointIdentificationAlgorithm = ${?DRILL_KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM} +} + drill { database { host = ${?DRILL_DB_HOST} @@ -72,6 +86,29 @@ drill { } } rawData { + queue { + type = ${?DRILL_RAW_DATA_QUEUE_TYPE} + capacity = ${?DRILL_RAW_DATA_QUEUE_CAPACITY} + workers = ${?DRILL_RAW_DATA_QUEUE_WORKERS} + kafka { + topic = "drill-raw-data" + topic = ${?DRILL_RAW_DATA_QUEUE_KAFKA_TOPIC} + consumerGroupId = "drill-writer" + consumerGroupId = ${?DRILL_RAW_DATA_QUEUE_KAFKA_CONSUMER_GROUP_ID} + pollTimeoutMs = 500 + pollTimeoutMs = ${?DRILL_RAW_DATA_QUEUE_KAFKA_POLL_TIMEOUT_MS} + shutdownTimeoutMs = 5000 + shutdownTimeoutMs = ${?DRILL_RAW_DATA_QUEUE_KAFKA_SHUTDOWN_TIMEOUT_MS} + producer { + clientId = "drill-admin-raw-data-producer" + clientId = ${?DRILL_RAW_DATA_QUEUE_KAFKA_PRODUCER_CLIENT_ID} + } + consumer { + clientId = "drill-admin-raw-data-consumer" + clientId = ${?DRILL_RAW_DATA_QUEUE_KAFKA_CONSUMER_CLIENT_ID} + } + } + } } metrics { database { diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/BuildsInfoApiTest.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/BuildsInfoApiTest.kt index 62212a28a..f5c813bcd 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/BuildsInfoApiTest.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/BuildsInfoApiTest.kt @@ -18,6 +18,7 @@ package com.epam.drill.admin.metrics import com.epam.drill.admin.metrics.config.MetricsDatabaseConfig import com.epam.drill.admin.test.* import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig +import com.epam.drill.admin.writer.rawdata.route.payload.BuildInfoPayload import com.epam.drill.admin.writer.rawdata.route.payload.BuildPayload import com.epam.drill.admin.writer.rawdata.route.payload.InstancePayload import com.epam.drill.admin.writer.rawdata.table.BuildTable @@ -36,11 +37,11 @@ class BuildsInfoApiTest : MetricsDatabaseTests({ default, metrics -> MetricsDatabaseConfig.init(metrics) }) { private suspend fun TestDataDsl.initTestData() { - client.putBuild(BuildPayload(groupId = testGroup, appId = testApp, buildVersion = "1.0.0", branch = testBranch)) - client.putBuild(BuildPayload(groupId = testGroup, appId = testApp, buildVersion = "2.0.0", branch = testBranch)) - client.putBuild(BuildPayload(groupId = testGroup, appId = testApp, buildVersion = "3.0.0", branch = "develop")) - client.putBuild(BuildPayload(groupId = testGroup, appId = "app-2", buildVersion = "1.0.0", branch = testBranch)) - client.putBuild(BuildPayload(groupId = "group-2", appId = testApp, buildVersion = "1.0.0", branch = testBranch)) + client.putBuildInfo(BuildInfoPayload(groupId = testGroup, appId = testApp, buildVersion = "1.0.0", branch = testBranch)) + client.putBuildInfo(BuildInfoPayload(groupId = testGroup, appId = testApp, buildVersion = "2.0.0", branch = testBranch)) + client.putBuildInfo(BuildInfoPayload(groupId = testGroup, appId = testApp, buildVersion = "3.0.0", branch = "develop")) + client.putBuildInfo(BuildInfoPayload(groupId = testGroup, appId = "app-2", buildVersion = "1.0.0", branch = testBranch)) + client.putBuildInfo(BuildInfoPayload(groupId = "group-2", appId = testApp, buildVersion = "1.0.0", branch = testBranch)) } private suspend fun TestDataDsl.initEnvironmentData() { diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageTreemapTest.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageTreemapTest.kt index c7980c9e6..997a9f8df 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageTreemapTest.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageTreemapTest.kt @@ -19,6 +19,7 @@ import com.epam.drill.admin.metrics.config.MetricsDatabaseConfig import com.epam.drill.admin.test.MetricsDatabaseTests import com.epam.drill.admin.test.withTransaction import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig +import com.epam.drill.admin.writer.rawdata.route.payload.BuildInfoPayload import com.epam.drill.admin.writer.rawdata.route.payload.BuildPayload import com.epam.drill.admin.writer.rawdata.route.payload.InstancePayload import com.epam.drill.admin.writer.rawdata.route.payload.SingleMethodPayload @@ -35,7 +36,7 @@ class CoverageTreemapTest : MetricsDatabaseTests({ default, metrics -> }) { @Test fun `given build with no methods no coverage, coverage-treemap should return empty list`() = havingData { - client.putBuild(BuildPayload(groupId = testGroup, appId = testApp, buildVersion = "1.0.0", branch = "main")) + client.putBuildInfo(BuildInfoPayload(groupId = testGroup, appId = testApp, buildVersion = "1.0.0", branch = "main")) }.expectThat { client.get("/metrics/coverage-treemap") { parameter("buildId", "${testGroup}:${testApp}:1.0.0") diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/DataIngestClient.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/DataIngestClient.kt index 5ff674e01..611ea0b4f 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/DataIngestClient.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/DataIngestClient.kt @@ -15,7 +15,6 @@ */ package com.epam.drill.admin.metrics -import com.epam.drill.admin.writer.rawdata.config.toBitString import com.epam.drill.admin.writer.rawdata.route.payload.* import com.jayway.jsonpath.JsonPath import io.ktor.client.* @@ -100,6 +99,12 @@ val TestDetails.definitionId: String } suspend fun HttpClient.putBuild(payload: BuildPayload): HttpResponse { + return put("/data-ingest/builds") { + setBody(payload) + }.assertSuccessStatus() +} + +suspend fun HttpClient.putBuildInfo(payload: BuildInfoPayload): HttpResponse { return put("/data-ingest/builds/info") { setBody(payload) }.assertSuccessStatus() diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/RecommendedTestsApiTest.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/RecommendedTestsApiTest.kt index be4566a0d..89c2e4cae 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/RecommendedTestsApiTest.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/RecommendedTestsApiTest.kt @@ -19,6 +19,7 @@ import com.epam.drill.admin.metrics.config.MetricsDatabaseConfig import com.epam.drill.admin.test.MetricsDatabaseTests import com.epam.drill.admin.test.withTransaction import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig +import com.epam.drill.admin.writer.rawdata.route.payload.BuildInfoPayload import com.epam.drill.admin.writer.rawdata.route.payload.BuildPayload import com.epam.drill.admin.writer.rawdata.table.* import io.ktor.client.request.* @@ -140,8 +141,8 @@ class RecommendedTestsApiTest : MetricsDatabaseTests({ default, metrics -> @Test fun `given baselineBuildBranches parameter, recommended test service should suggest skipping tests if they are not impacted in baselines from specified branch`() { havingData { - client.putBuild(BuildPayload(groupId = build1.groupId, appId = build1.appId, buildVersion = build1.buildVersion, branch = "main")) - client.putBuild(BuildPayload(groupId = build2.groupId, appId = build2.appId, buildVersion = build2.buildVersion, branch = "feature")) + client.putBuildInfo(BuildInfoPayload(groupId = build1.groupId, appId = build1.appId, buildVersion = build1.buildVersion, branch = "main")) + client.putBuildInfo(BuildInfoPayload(groupId = build2.groupId, appId = build2.appId, buildVersion = build2.buildVersion, branch = "feature")) //build1 on main branch, test1 covers method2 client.deployInstance(build1, arrayOf(method1, method2)) client.launchTest( diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt index 4c3a56274..d5a2c6bbf 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt @@ -26,6 +26,7 @@ import com.epam.drill.admin.metrics.views.TestImpactStatus import com.epam.drill.admin.test.StubDrillScheduler import com.epam.drill.admin.test.drillApplication import com.epam.drill.admin.test.drillClient +import com.epam.drill.admin.test.waitUntilInBlocking import com.epam.drill.admin.writer.rawdata.config.rawDataServicesDIModule import com.epam.drill.admin.writer.rawdata.route.dataIngestRoutes import com.epam.drill.admin.writer.rawdata.route.payload.InstancePayload @@ -43,12 +44,14 @@ import org.kodein.di.instance import org.kodein.di.singleton val scheduler = DI.Module("testModule") { - bind() with singleton { - StubDrillScheduler(UpdateMetricsEtlJob( + bind() with singleton { + StubDrillScheduler( + UpdateMetricsEtlJob( instance(), instance() - )) - } + ) + ) } +} fun havingData(testsData: suspend TestDataDsl.() -> Unit): HttpClient { return runBlocking { @@ -214,7 +217,9 @@ class TestDataDsl(val client: HttpClient) { fun HttpClient.expectThat(checks: suspend ExpectationDsl.(HttpClient) -> Unit) { val client = this - return runBlocking { + return waitUntilInBlocking( + onAssertionFailed = { refreshMetrics() } + ) { checks(ExpectationDsl(client), client) } } diff --git a/admin-test/build.gradle.kts b/admin-test/build.gradle.kts index c9d2d8b88..710642d85 100644 --- a/admin-test/build.gradle.kts +++ b/admin-test/build.gradle.kts @@ -24,6 +24,7 @@ val junitJupiterVersion: String by parent!!.extra val postgresSqlVersion: String by parent!!.extra val zaxxerHikaricpVersion: String by parent!!.extra val quartzVersion: String by parent!!.extra +val awaitilityVersion: String by parent!!.extra repositories { mavenLocal() @@ -67,6 +68,7 @@ dependencies { implementation("com.zaxxer:HikariCP:$zaxxerHikaricpVersion") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.5.2") implementation("org.postgresql:postgresql:$postgresSqlVersion") + implementation("org.awaitility:awaitility:${awaitilityVersion}") } tasks { diff --git a/admin-test/src/main/kotlin/com/epam/drill/admin/test/AwaitDb.kt b/admin-test/src/main/kotlin/com/epam/drill/admin/test/AwaitDb.kt new file mode 100644 index 000000000..5a3e44b63 --- /dev/null +++ b/admin-test/src/main/kotlin/com/epam/drill/admin/test/AwaitDb.kt @@ -0,0 +1,60 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.test + +import kotlinx.coroutines.runBlocking +import org.awaitility.Awaitility.await +import org.jetbrains.exposed.sql.transactions.transaction +import java.time.Duration + +private val DEFAULT_DB_WAIT_TIMEOUT: Duration = Duration.ofSeconds(5) +private val DEFAULT_DB_POLL_INTERVAL: Duration = Duration.ofMillis(100) + +fun waitUntilInTransaction(assertion: () -> Unit) { + await() + .atMost(DEFAULT_DB_WAIT_TIMEOUT) + .pollInterval(DEFAULT_DB_POLL_INTERVAL) + .untilAsserted { + transaction { + assertion() + } + } +} + +fun waitUntilInBlocking( + onAssertionFailed: suspend (AssertionError) -> Unit = {}, + assertion: suspend () -> Unit +) { + await() + .atMost(DEFAULT_DB_WAIT_TIMEOUT) + .pollInterval(DEFAULT_DB_POLL_INTERVAL) + .untilAsserted { + runCatching { + runBlocking { + assertion() + } + }.onFailure { e -> + if (e is AssertionError) { + runCatching { + runBlocking { + onAssertionFailed(e) + } + } + } + throw e + } + } +} \ No newline at end of file diff --git a/admin-test/src/main/kotlin/com/epam/drill/admin/test/TestUtils.kt b/admin-test/src/main/kotlin/com/epam/drill/admin/test/TestUtils.kt index 6b673646c..b077ba531 100644 --- a/admin-test/src/main/kotlin/com/epam/drill/admin/test/TestUtils.kt +++ b/admin-test/src/main/kotlin/com/epam/drill/admin/test/TestUtils.kt @@ -16,7 +16,6 @@ package com.epam.drill.admin.test import io.ktor.serialization.kotlinx.json.* -import io.ktor.server.application.install import io.ktor.server.plugins.contentnegotiation.* import io.ktor.server.plugins.statuspages.StatusPages import io.ktor.server.resources.* @@ -29,8 +28,13 @@ import org.kodein.di.DI import org.kodein.di.ktor.di import kotlin.test.assertEquals import com.epam.drill.admin.common.route.commonStatusPages +import io.ktor.server.application.ApplicationStopping +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll import org.jetbrains.exposed.sql.Database -import javax.sql.DataSource +import org.kodein.di.allInstances +import org.kodein.di.ktor.closestDI +import kotlin.getValue fun withRollback(test: suspend () -> Unit) { @@ -68,6 +72,16 @@ fun drillApplication( di { diModules.forEach { import(it) } } + environment.monitor.subscribe(ApplicationStopping) { + val closableComponents: List by closestDI().allInstances() + runBlocking { + closableComponents.map { + async { + it.close() + } + }.awaitAll() + } + } } routing { routes() diff --git a/admin-writer/build.gradle.kts b/admin-writer/build.gradle.kts index b3c06a829..68d9b50ec 100644 --- a/admin-writer/build.gradle.kts +++ b/admin-writer/build.gradle.kts @@ -23,6 +23,8 @@ val testContainersVersion: String by parent!!.extra val postgresSqlVersion: String by parent!!.extra val zaxxerHikaricpVersion: String by parent!!.extra val logbackVersion: String by parent!!.extra +val kafkaClientsVersion: String by parent!!.extra +val junitJupiterVersion: String by parent!!.extra repositories { mavenLocal() @@ -62,6 +64,8 @@ dependencies { api("org.flywaydb:flyway-core:$flywaydbVersion") compileOnly("org.postgresql:postgresql:$postgresSqlVersion") + implementation("org.apache.kafka:kafka-clients:$kafkaClientsVersion") + implementation("io.ktor:ktor-client-core:$ktorVersion") implementation("io.ktor:ktor-client-apache:$ktorVersion") implementation("io.ktor:ktor-client-json:$ktorVersion") @@ -74,6 +78,11 @@ dependencies { testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.5.2") testImplementation("io.ktor:ktor-server-test-host:$ktorVersion") testImplementation("ch.qos.logback:logback-classic:$logbackVersion") + testImplementation("org.mockito.kotlin:mockito-kotlin:$mockitoKotlinVersion") + testImplementation("org.testcontainers:testcontainers:$testContainersVersion") + testImplementation("org.testcontainers:junit-jupiter:$testContainersVersion") + testImplementation("org.testcontainers:kafka:$testContainersVersion") + testImplementation("org.junit.jupiter:junit-jupiter:$junitJupiterVersion") testImplementation(project(":admin-test")) } diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/KafkaConfig.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/KafkaConfig.kt new file mode 100644 index 000000000..3c66c15b7 --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/KafkaConfig.kt @@ -0,0 +1,73 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.config + +import io.ktor.server.config.ApplicationConfig +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.SslConfigs +import java.util.Properties + +/** + * Common Kafka cluster and connection configuration shared by all Kafka clients. + */ +class KafkaConfig(private val config: ApplicationConfig) { + + /** + * Comma-separated list of `host:port` pairs used to establish the initial connection to the + * Kafka cluster. + */ + val bootstrapServers: String + get() = config.string("bootstrapServers", "localhost:9092") + + /** + * Builds a [Properties] map containing connection settings shared by producer and consumer + * clients (DNS lookup, idle/backoff timeouts, and security/SSL properties). + */ + fun toCommonProperties(): Properties = Properties().apply { + put(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, config.string("clientDnsLookup", "use_all_dns_ips")) + put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, config.int("connectionsMaxIdleMs", 540_000).toString()) + put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, config.int("reconnectBackoffMs", 50).toString()) + put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, config.int("reconnectBackoffMaxMs", 1_000).toString()) + put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, config.int("retryBackoffMs", 100).toString()) + config.stringOrNull("securityProtocol")?.let { put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, it) } + config.stringOrNull("saslMechanism")?.let { put(SaslConfigs.SASL_MECHANISM, it) } + config.stringOrNull("saslJaasConfig")?.let { put(SaslConfigs.SASL_JAAS_CONFIG, it) } + config.stringOrNull("sslTruststoreLocation")?.let { put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, it) } + config.stringOrNull("sslTruststorePassword")?.let { put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, it) } + config.stringOrNull("sslKeystoreLocation")?.let { put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, it) } + config.stringOrNull("sslKeystorePassword")?.let { put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, it) } + config.stringOrNull("sslKeyPassword")?.let { put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, it) } + config.stringOrNull("sslEndpointIdentificationAlgorithm") + ?.let { put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, it) } + } +} + +internal fun ApplicationConfig.string(path: String, default: String): String = + stringOrNull(path) ?: default + +internal fun ApplicationConfig.stringOrNull(path: String): String? = + propertyOrNull(path)?.getString()?.takeIf { it.isNotBlank() } + +internal fun ApplicationConfig.int(path: String, default: Int): Int = + propertyOrNull(path)?.getString()?.toIntOrNull() ?: default + +internal fun ApplicationConfig.long(path: String, default: Long): Long = + propertyOrNull(path)?.getString()?.toLongOrNull() ?: default + +internal fun ApplicationConfig.boolean(path: String, default: Boolean): Boolean = + propertyOrNull(path)?.getString()?.toBooleanStrictOrNull() ?: default + diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/RawDataKafkaQueueConfig.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/RawDataKafkaQueueConfig.kt new file mode 100644 index 000000000..90fd1bdcf --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/RawDataKafkaQueueConfig.kt @@ -0,0 +1,104 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.config + +import io.ktor.server.config.ApplicationConfig +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import java.util.Properties + +/** + * Raw-data-queue-specific Kafka configuration. + * + * Contains settings that are private to the raw-data processing task: topic name, + * consumer group, poll/shutdown timeouts, and producer/consumer tuning parameters. + * Cluster-level connection settings (bootstrap servers, security, SSL) are provided + * by the shared [KafkaConfig]. + * + * @param config Application config scoped to `drill.rawData.queue.kafka`. + * @param kafkaConfig Shared Kafka cluster and connection configuration. + */ +class RawDataKafkaQueueConfig( + private val config: ApplicationConfig, + private val kafkaConfig: KafkaConfig, +) { + /** + * Name of the Kafka topic to which raw data is produced and from which it is consumed. + * Auto creation of the topic is disabled by default, so it must be provisioned before starting the service. + */ + val topic: String + get() = config.string("topic", "drill-raw-data") + + /** + * Consumer group identifier shared by all consumer instances of this service. + */ + val consumerGroupId: String + get() = config.string("consumerGroupId", "drill-writer") + + /** + * Maximum time in milliseconds the consumer will block waiting for new records in a + * single [org.apache.kafka.clients.consumer.KafkaConsumer.poll] call. + */ + val pollTimeoutMs: Long + get() = config.long("pollTimeoutMs", 500L) + + /** + * Maximum time in milliseconds to wait for in-flight records to be flushed and the consumer + * to commit its offsets during a graceful shutdown. + * Should be greater than [pollTimeoutMs] to avoid data loss on shutdown. + */ + val shutdownTimeoutMs: Long + get() = config.long("shutdownTimeoutMs", 5_000L) + + /** + * Builds a [java.util.Properties] map for the Kafka producer. + * Common connection properties are contributed by [kafkaConfig]. + */ + val producerProperties: Properties + get() = Properties().apply { + putAll(kafkaConfig.toCommonProperties()) + put(ProducerConfig.CLIENT_ID_CONFIG, config.string("producer.clientId", "drill-admin-raw-data-producer")) + put(ProducerConfig.ACKS_CONFIG, config.string("producer.acks", "all")) + put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.boolean("producer.enableIdempotence", true).toString()) + put(ProducerConfig.RETRIES_CONFIG, config.int("producer.retries", Int.MAX_VALUE).toString()) + put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, config.int("producer.maxInFlightRequestsPerConnection", 5).toString()) + put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.int("producer.deliveryTimeoutMs", 120_000).toString()) + put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.int("producer.requestTimeoutMs", 30_000).toString()) + put(ProducerConfig.LINGER_MS_CONFIG, config.int("producer.lingerMs", 20).toString()) + put(ProducerConfig.BATCH_SIZE_CONFIG, config.int("producer.batchSize", 32_768).toString()) + put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.string("producer.compressionType", "lz4")) + } + + /** + * Builds a [java.util.Properties] map for the Kafka consumer. + * Common connection properties are contributed by [kafkaConfig]. + */ + val consumerProperties: Properties + get() = Properties().apply { + putAll(kafkaConfig.toCommonProperties()) + put(ConsumerConfig.CLIENT_ID_CONFIG, config.string("consumer.clientId", "drill-admin-raw-data-consumer")) + put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, config.boolean("consumer.allowAutoCreateTopics", false).toString()) + put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.string("consumer.autoOffsetReset", "earliest")) + put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.string("consumer.isolationLevel", "read_committed")) + put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.int("consumer.maxPollRecords", 500).toString()) + put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, config.int("consumer.fetchMinBytes", 1).toString()) + put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, config.int("consumer.fetchMaxWaitMs", 500).toString()) + put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.int("consumer.sessionTimeoutMs", 45_000).toString()) + put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, config.int("consumer.heartbeatIntervalMs", 15_000).toString()) + put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, config.int("consumer.maxPollIntervalMs", 300_000).toString()) + } +} diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/RawDataQueueConfig.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/RawDataQueueConfig.kt new file mode 100644 index 000000000..a04026399 --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/RawDataQueueConfig.kt @@ -0,0 +1,61 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.config + +import io.ktor.server.config.ApplicationConfig + +/** + * Configuration for the raw data queue. + * + * @param config Application config scoped to `drill.rawData.queue`. + * @param kafkaConfig Shared Kafka cluster and connection configuration from `drill.kafka`. + */ +class RawDataQueueConfig( + private val config: ApplicationConfig, + private val kafkaConfig: KafkaConfig, +) { + /** + * Defines the raw data queue implementation. + */ + val type: RawDataQueueType + get() = config.propertyOrNull("type")?.getString()?.let { RawDataQueueType.valueOf(it) } ?: RawDataQueueType.IN_MEMORY + + /** + * Defines the capacity of the queue used for processing incoming raw data. + * If the queue reaches its capacity, processing of new data will be suspended until there is space available. + */ + val capacity: Int + get() = config.propertyOrNull("capacity")?.getString()?.toIntOrNull() ?: 1000 + + /** + * Defines the number of concurrent workers that will process the raw data from the queue. + * Should be less than database connection pool size. + */ + val workers: Int + get() = config.propertyOrNull("workers")?.getString()?.toIntOrNull() ?: 10 + + /** + * Kafka-specific queue configuration (topic, consumer group, poll/shutdown timeouts, + * producer/consumer tuning). Cluster connection settings are delegated to [kafkaConfig]. + */ + val kafka: RawDataKafkaQueueConfig + get() = RawDataKafkaQueueConfig(config.config("kafka"), kafkaConfig) +} + +enum class RawDataQueueType { + IN_MEMORY, + KAFKA +} diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/RawDataWriterModule.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/RawDataWriterModule.kt index dbea1407a..8544aeb60 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/RawDataWriterModule.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/config/RawDataWriterModule.kt @@ -16,25 +16,41 @@ package com.epam.drill.admin.writer.rawdata.config import com.epam.drill.admin.writer.rawdata.job.DataRetentionPolicyJob +import com.epam.drill.admin.writer.rawdata.queue.DataQueue +import com.epam.drill.admin.writer.rawdata.queue.impl.ChannelDataQueue +import com.epam.drill.admin.writer.rawdata.queue.impl.KafkaDataQueue +import com.epam.drill.admin.writer.rawdata.service.QueuedRawDataWriter +import com.epam.drill.admin.writer.rawdata.queue.impl.json import com.epam.drill.admin.writer.rawdata.repository.* import com.epam.drill.admin.writer.rawdata.repository.impl.* +import com.epam.drill.admin.writer.rawdata.route.DataIngestRoute +import com.epam.drill.admin.writer.rawdata.route.payload.RawDataPayload import com.epam.drill.admin.writer.rawdata.service.DataManagementService import com.epam.drill.admin.writer.rawdata.service.RawDataWriter import com.epam.drill.admin.writer.rawdata.service.SettingsService import com.epam.drill.admin.writer.rawdata.service.impl.DataManagementServiceImpl import com.epam.drill.admin.writer.rawdata.service.impl.RawDataServiceImpl import com.epam.drill.admin.writer.rawdata.service.impl.SettingsServiceImpl +import com.epam.drill.admin.writer.rawdata.service.impl.toKey +import com.epam.drill.admin.writer.rawdata.service.impl.toPayloadType +import io.ktor.server.application.Application +import io.ktor.server.config.ApplicationConfig import org.kodein.di.DI import org.kodein.di.bind +import org.kodein.di.eagerSingleton import org.kodein.di.instance import org.kodein.di.singleton import org.quartz.JobBuilder import org.quartz.JobDetail +import kotlin.time.Duration.Companion.milliseconds + +private val logger = mu.KotlinLogging.logger {} val rawDataDIModule get() = DI.Module("rawDataServices") { - import(rawDataServicesDIModule) + importOnce(rawDataServicesDIModule) importOnce(settingsServicesDIModule) + importOnce(dataManagementServicesDIModule) bind() with singleton { DataRetentionPolicyJob( @@ -59,7 +75,6 @@ val rawDataServicesDIModule bind() with singleton { TestSessionRepositoryImpl() } bind() with singleton { TestSessionBuildRepositoryImpl() } bind() with singleton { TestLaunchRepositoryImpl() } - bind() with singleton { MethodIgnoreRuleRepositoryImpl() } bind() with singleton { RawDataServiceImpl( @@ -71,21 +86,62 @@ val rawDataServicesDIModule buildRepository = instance(), testSessionRepository = instance(), testSessionBuildRepository = instance(), - methodIgnoreRuleRepository = instance() ) } - bind() with singleton { - DataManagementServiceImpl( - instanceRepository = instance(), - buildRepository = instance(), - methodRepository = instance(), - coverageRepository = instance(), - testSessionRepository = instance(), - testLaunchRepository = instance(), - testSessionBuildRepository = instance(), - scheduler = instance(), + + bind() with singleton { + val env = instance().environment.config + KafkaConfig(env.config("kafka")) + } + bind() with singleton { + val drillConfig: ApplicationConfig = instance().environment.config.config("drill") + RawDataQueueConfig(drillConfig.config("rawData.queue"), instance()) + } + bind>(tag = RawDataQueueType.IN_MEMORY) with singleton { + val config = instance() + ChannelDataQueue( + deserializer = ::json, + routeToPayloadType = { route -> + route.toPayloadType() + }, + capacity = config.capacity + ) + } + bind>(tag = RawDataQueueType.KAFKA) with singleton { + val config = instance() + val kafkaQueueConfig = config.kafka + val kafkaClusterConfig = instance() + KafkaDataQueue.create( + bootstrapServers = kafkaClusterConfig.bootstrapServers, + topic = kafkaQueueConfig.topic, + consumerGroupId = kafkaQueueConfig.consumerGroupId, + deserializer = ::json, + recordKeyToPayloadType = { key -> + key.toPayloadType() + }, + routeToRecordKey = { route -> + route.toKey() + }, + producerProps = kafkaQueueConfig.producerProperties, + consumerProps = kafkaQueueConfig.consumerProperties, + capacity = config.capacity, + pollTimeout = kafkaQueueConfig.pollTimeoutMs.milliseconds, + shutdownTimeout = kafkaQueueConfig.shutdownTimeoutMs.milliseconds, ) } + bind() with eagerSingleton { + val config = instance() + val writer = instance() + val queue = instance>(tag = config.type) + + QueuedRawDataWriter( + handler = writer, + workers = config.workers, + queue = queue + ).also { + logger.info { "${config.type} queue is configured for raw data writing with ${config.workers} workers." } + } + } } val settingsServicesDIModule @@ -100,6 +156,24 @@ val settingsServicesDIModule } } +val dataManagementServicesDIModule + get() = DI.Module("dataManagementServices") { + bind() with singleton { MethodIgnoreRuleRepositoryImpl() } + bind() with singleton { + DataManagementServiceImpl( + instanceRepository = instance(), + buildRepository = instance(), + methodRepository = instance(), + coverageRepository = instance(), + testSessionRepository = instance(), + testLaunchRepository = instance(), + testSessionBuildRepository = instance(), + methodIgnoreRuleRepository = instance(), + scheduler = instance(), + ) + } + } + val rawDataRetentionPolicyJob: JobDetail get() = JobBuilder.newJob(DataRetentionPolicyJob::class.java) .storeDurably() diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/DataQueue.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/DataQueue.kt new file mode 100644 index 000000000..d6109044b --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/DataQueue.kt @@ -0,0 +1,36 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.queue + +import com.epam.drill.admin.writer.rawdata.route.DataIngestRoute +import com.epam.drill.admin.writer.rawdata.route.payload.RawDataPayload +import kotlinx.coroutines.channels.ReceiveChannel + +interface DataQueue : ReceiveChannel> { + suspend fun enqueue(input: QueueInput) + suspend fun dequeue(): QueueOutput +} + +class QueueInput( + val route: R, + val payload: ByteArray, + val metadata: Map = emptyMap() +) + +class QueueOutput( + val payload: T, + val metadata: Map = emptyMap() +) \ No newline at end of file diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/QueueProcessor.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/QueueProcessor.kt new file mode 100644 index 000000000..7391e843f --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/QueueProcessor.kt @@ -0,0 +1,47 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.queue + +import com.epam.drill.admin.writer.rawdata.route.DataIngestRoute +import com.epam.drill.admin.writer.rawdata.route.payload.RawDataPayload +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch + +class QueueProcessor( + private val handler: suspend (QueueOutput) -> Unit, + private val onError: suspend (QueueOutput, Throwable) -> Unit = { _, _ -> }, + private val onSuccess: suspend (QueueOutput) -> Unit = {}, +) { + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + + fun run(queue: DataQueue, workers: Int = 1) { + repeat(workers) { worker -> + scope.launch { + for (output in queue) { + runCatching { + handler(output) + }.onFailure { e -> + onError(output, e) + }.onSuccess { + onSuccess(output) + } + } + } + } + } +} diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/ChannelDataQueue.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/ChannelDataQueue.kt new file mode 100644 index 000000000..727077867 --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/ChannelDataQueue.kt @@ -0,0 +1,82 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.queue.impl + +import com.epam.drill.admin.writer.rawdata.queue.DataQueue +import com.epam.drill.admin.writer.rawdata.queue.QueueInput +import com.epam.drill.admin.writer.rawdata.queue.QueueOutput +import com.epam.drill.admin.writer.rawdata.route.DataIngestRoute +import com.epam.drill.admin.writer.rawdata.route.payload.RawDataPayload +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.time.withTimeout +import mu.KotlinLogging +import kotlin.reflect.KClass +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds +import kotlin.time.toJavaDuration + +class ChannelDataQueue( + private val deserializer: suspend (KClass, ByteArray) -> T, + private val routeToPayloadType: (R) -> KClass, + capacity: Int = Channel.BUFFERED, + private val shutdownTimeout: Duration = 5.seconds, +) : DataQueue, Channel> by Channel(capacity), AutoCloseable { + private val logger = KotlinLogging.logger {} + private val inputChannel = Channel>(Channel.RENDEZVOUS) + private val outputChannel: Channel> get() = this + private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob()) + + init { + scope.launch { + for (input in inputChannel) { + val payloadType = routeToPayloadType(input.route) + runCatching { + deserializer(payloadType, input.payload) + }.onFailure { e -> + logger.error(e) { "Error while deserialization queue for [${input.route::class.simpleName}]: ${e.message}" } + }.getOrNull()?.let { payload -> + outputChannel.send(QueueOutput(payload, input.metadata)) + } ?: continue + } + } + } + + override suspend fun enqueue(input: QueueInput) { + inputChannel.send(input) + } + + override suspend fun dequeue(): QueueOutput { + return outputChannel.receive() + } + + override fun close() { + inputChannel.close() + outputChannel.close() + runBlocking { + withTimeout(shutdownTimeout.toJavaDuration()) { + scope.coroutineContext[Job]?.children?.forEach { it.join() } + } + scope.cancel() + } + } +} \ No newline at end of file diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/JsonDeserializer.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/JsonDeserializer.kt new file mode 100644 index 000000000..31e286352 --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/JsonDeserializer.kt @@ -0,0 +1,36 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.queue.impl + +import com.epam.drill.admin.writer.rawdata.route.jsonConfig +import com.epam.drill.admin.writer.rawdata.route.payload.RawDataPayload +import kotlinx.serialization.KSerializer +import kotlinx.serialization.json.Json +import kotlinx.serialization.serializer +import kotlin.reflect.KClass + +class JsonDeserializer( + private val serializer: KSerializer, + private val json: Json = jsonConfig +) { + fun deserialize(bytes: ByteArray): T { + val decoded = bytes.toString(Charsets.UTF_8) + return json.decodeFromString(serializer, decoded) + } +} + +fun json(type: KClass, bytes: ByteArray): T = + JsonDeserializer(type.serializer()).deserialize(bytes) \ No newline at end of file diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/KafkaDataQueue.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/KafkaDataQueue.kt new file mode 100644 index 000000000..a9abe6b13 --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/KafkaDataQueue.kt @@ -0,0 +1,228 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.queue.impl + +import com.epam.drill.admin.writer.rawdata.queue.DataQueue +import com.epam.drill.admin.writer.rawdata.queue.QueueInput +import com.epam.drill.admin.writer.rawdata.queue.QueueOutput +import com.epam.drill.admin.writer.rawdata.route.DataIngestRoute +import com.epam.drill.admin.writer.rawdata.route.payload.RawDataPayload +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.future.await +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.time.withTimeout +import mu.KotlinLogging +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import java.util.Properties +import java.util.concurrent.CompletableFuture +import kotlin.coroutines.coroutineContext +import kotlin.reflect.KClass +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds +import kotlin.time.toJavaDuration +import java.time.Duration as JavaDuration + +/** + * Kafka-backed implementation of [DataQueue]. + * + * @param producer pre-configured Kafka [Producer]; closed by [close]. + * @param consumer pre-configured Kafka [Consumer]; subscribed to [topic] and closed by [close]. + * @param topic single Kafka topic used for both publishing and consuming raw payloads. + * @param deserializer converts a route + payload bytes into a typed [RawDataPayload]. + * @param recordKeyToPayloadType maps the route key (extracted from the record header) back to the corresponding payload type for deserialization. + * @param routeToRecordKey extracts the key for a given route instance (defaults to its class simple name). + * @param RECORD_KEY_HEADER Kafka header name used to carry the route key. + * @param capacity capacity of the internal channel exposed to consumers of this queue. + * @param pollTimeout per-poll timeout used by the consumer loop. + * @param shutdownTimeout maximum time to wait for the background coroutines to finish on [close]. + */ +class KafkaDataQueue( + private val producer: Producer, + private val consumer: Consumer, + private val topic: String, + private val deserializer: suspend (KClass, ByteArray) -> T, + private val recordKeyToPayloadType: (String) -> KClass, + private val routeToRecordKey: (R) -> String, + capacity: Int = Channel.BUFFERED, + private val pollTimeout: Duration = 500.milliseconds, + private val shutdownTimeout: Duration = 5.seconds, +) : DataQueue, Channel> by Channel(capacity), AutoCloseable { + + private val logger = KotlinLogging.logger {} + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + private val outputChannel: SendChannel> get() = this + + init { + consumer.subscribe(listOf(topic)) + scope.launch { runConsumerLoop() } + } + + override suspend fun enqueue(input: QueueInput) { + val recordKey = routeToRecordKey(input.route) + val record = ProducerRecord(topic, null, recordKey, input.payload).apply { + headers().add(RecordHeader(RECORD_KEY_HEADER, recordKey.toByteArray(Charsets.UTF_8))) + input.metadata.forEach { (k, v) -> + headers().add(RecordHeader(k, v.toByteArray(Charsets.UTF_8))) + } + } + // Bridge the Kafka producer's java callback to a coroutine. + val future = CompletableFuture() + producer.send(record) { _, e -> + if (e != null) future.completeExceptionally(e) else future.complete(Unit) + } + future.await() + } + + override suspend fun dequeue(): QueueOutput = this.receive() + + override fun close() { + runCatching { consumer.wakeup() } + runBlocking { + withTimeout(shutdownTimeout.toJavaDuration()) { + scope.coroutineContext[Job]?.children?.forEach { it.join() } + } + scope.cancel() + } + runCatching { consumer.close(shutdownTimeout.toJavaDuration()) } + runCatching { producer.close(shutdownTimeout.toJavaDuration()) } + outputChannel.close() + } + + private suspend fun runConsumerLoop() { + val pollDuration: JavaDuration = pollTimeout.toJavaDuration() + try { + while (coroutineContext[Job]?.isActive == true) { + val records = try { + consumer.poll(pollDuration) + } catch (e: WakeupException) { + throw e + } catch (e: Throwable) { + logger.error(e) { "Error while polling Kafka topic [$topic]: ${e.message}" } + null + } ?: continue + + for (record in records) { + val key = record.key() + if (key == null) { + logger.warn { "Skipping Kafka record without record key on topic [$topic]" } + continue + } + + val payloadType = runCatching { + recordKeyToPayloadType(key) + }.onFailure { e -> + logger.error(e) { "Error while determining payload type for [$key]: ${e.message}" } + }.getOrNull() ?: continue + + val payload = runCatching { + deserializer(payloadType, record.value()) + }.onFailure { e -> + logger.error(e) { "Error while deserializing record for [$key]: ${e.message}" } + }.getOrNull() ?: continue + + val metadata = record.headers() + .associate { it.key() to it.value().toString(Charsets.UTF_8) } + + this@KafkaDataQueue.send(QueueOutput(payload, metadata)) + } + + runCatching { + consumer.commitAsync() + }.onFailure { e -> + logger.warn(e) { "Kafka commitAsync failed: ${e.message}" } + } + } + } catch (_: WakeupException) { + logger.debug { "Kafka consumer woken up, exiting poll loop" } + } catch (e: Throwable) { + logger.error(e) { "Kafka consumer loop terminated unexpectedly: ${e.message}" } + } + } + + companion object { + const val RECORD_KEY_HEADER = "drill-record-key" + + /** + * Convenience factory that builds Kafka producer/consumer from raw [Properties]. + * Key/value (de)serializers are forced to String/ByteArray. + */ + fun create( + bootstrapServers: String, + topic: String, + consumerGroupId: String, + deserializer: suspend (KClass, ByteArray) -> T, + recordKeyToPayloadType: (String) -> KClass, + routeToRecordKey: (R) -> String, + producerProps: Properties = Properties(), + consumerProps: Properties = Properties(), + capacity: Int = Channel.BUFFERED, + pollTimeout: Duration = 500.milliseconds, + shutdownTimeout: Duration = 5.seconds, + ): KafkaDataQueue { + val pProps = Properties().apply { + putAll(producerProps) + put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name) + put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer::class.java.name) + } + val cProps = Properties().apply { + putAll(consumerProps) + put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId) + put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) + put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java.name) + putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + } + return KafkaDataQueue( + producer = KafkaProducer(pProps), + consumer = KafkaConsumer(cProps), + topic = topic, + deserializer = deserializer, + recordKeyToPayloadType = recordKeyToPayloadType, + routeToRecordKey = routeToRecordKey, + capacity = capacity, + pollTimeout = pollTimeout, + shutdownTimeout = shutdownTimeout, + ) + } + } +} + + + + + diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/record/RecordKey.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/record/RecordKey.kt new file mode 100644 index 000000000..3f8d81501 --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/queue/record/RecordKey.kt @@ -0,0 +1,45 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.queue.record + +import com.epam.drill.admin.writer.rawdata.route.payload.AddTestDefinitionsPayload +import com.epam.drill.admin.writer.rawdata.route.payload.AddTestLaunchesPayload +import com.epam.drill.admin.writer.rawdata.route.payload.AddTestsPayload +import com.epam.drill.admin.writer.rawdata.route.payload.BuildInfoPayload +import com.epam.drill.admin.writer.rawdata.route.payload.BuildPayload +import com.epam.drill.admin.writer.rawdata.route.payload.CoveragePayload +import com.epam.drill.admin.writer.rawdata.route.payload.InstancePayload +import com.epam.drill.admin.writer.rawdata.route.payload.MethodsPayload +import com.epam.drill.admin.writer.rawdata.route.payload.RawDataPayload +import com.epam.drill.admin.writer.rawdata.route.payload.SessionPayload +import kotlin.reflect.KClass + +enum class RecordKey(val value: String, val payloadType: KClass) { + COVERAGE("coverage", CoveragePayload::class), + BUILDS_INFO("builds-info", BuildInfoPayload::class), + BUILDS("builds", BuildPayload::class), + INSTANCES("instances", InstancePayload::class), + METHODS("methods", MethodsPayload::class), + TEST_DEFINITIONS("test-definitions", AddTestDefinitionsPayload::class), + TEST_LAUNCHES("test-launches", AddTestLaunchesPayload::class), + TEST_METADATA("test-metadata", AddTestsPayload::class), + TEST_SESSIONS("test-sessions", SessionPayload::class); + + companion object { + private val map = RecordKey.entries.associateBy(RecordKey::value) + fun fromValue(value: String): RecordKey = map[value] ?: throw IllegalArgumentException("Unknown RecordKey value: $value") + } +} \ No newline at end of file diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/RawDataWriterRoutes.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/RawDataWriterRoutes.kt index 09b3bf4ad..6d024f609 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/RawDataWriterRoutes.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/RawDataWriterRoutes.kt @@ -17,7 +17,8 @@ package com.epam.drill.admin.writer.rawdata.route import com.epam.drill.admin.common.principal.User import com.epam.drill.admin.common.route.ok -import com.epam.drill.admin.writer.rawdata.service.RawDataWriter +import com.epam.drill.admin.writer.rawdata.service.QueuedRawDataWriter +import com.epam.drill.admin.writer.rawdata.service.DataManagementService import io.ktor.client.* import io.ktor.client.engine.apache.* import io.ktor.client.plugins.contentnegotiation.* @@ -43,35 +44,38 @@ import org.kodein.di.instance import org.kodein.di.ktor.closestDI import java.io.InputStream import java.util.zip.GZIPInputStream +import kotlin.getValue private val logger = KotlinLogging.logger {} +sealed interface DataIngestRoute + @Resource("builds") -class BuildsRoute() +class BuildsRoute(): DataIngestRoute @Resource("builds/info") -class BuildsInfoRoute() +class BuildsInfoRoute(): DataIngestRoute @Resource("instances") -class InstancesRoute() +class InstancesRoute(): DataIngestRoute @Resource("coverage") -class CoverageRoute() +class CoverageRoute(): DataIngestRoute @Resource("methods") -class MethodsRoute() +class MethodsRoute(): DataIngestRoute @Resource("tests-metadata") -class TestMetadataRoute() +class TestMetadataRoute(): DataIngestRoute @Resource("sessions") -class TestSessionRoute() +class TestSessionRoute(): DataIngestRoute @Resource("test-definitions") -class TestDefinitionsRoute() +class TestDefinitionsRoute(): DataIngestRoute @Resource("test-launches") -class TestLaunchesRoute() +class TestLaunchesRoute(): DataIngestRoute @Resource("method-ignore-rules") class MethodIgnoreRulesRoute() { @@ -100,109 +104,109 @@ fun Route.dataIngestRoutes() { } fun Route.putBuilds() { - val rawDataWriter by closestDI().instance() + val queuedRawDataWriter by closestDI().instance() - put { - rawDataWriter.saveBuild(call.decompressAndReceive()) + put { params -> + queuedRawDataWriter.enqueue(params, call.decompress()) call.ok("Build saved") } } fun Route.putBuildsInfo() { - val rawDataWriter by closestDI().instance() + val queuedRawDataWriter by closestDI().instance() - put { - rawDataWriter.saveBuildInfo(call.decompressAndReceive()) + put { params -> + queuedRawDataWriter.enqueue(params, call.decompress()) call.ok("Build info saved") } } fun Route.putInstances() { - val rawDataWriter by closestDI().instance() + val queuedRawDataWriter by closestDI().instance() - put { - rawDataWriter.saveInstance(call.decompressAndReceive()) + put { params -> + queuedRawDataWriter.enqueue(params, call.decompress()) call.ok("Instance saved") } } fun Route.postCoverage() { - val rawDataWriter by closestDI().instance() + val queuedRawDataWriter by closestDI().instance() - post { - rawDataWriter.saveCoverage(call.decompressAndReceive()) + post { params -> + queuedRawDataWriter.enqueue(params, call.decompress()) call.ok("Coverage saved") } } fun Route.putMethods() { - val rawDataWriter by closestDI().instance() + val queuedRawDataWriter by closestDI().instance() - put { - rawDataWriter.saveMethods(call.decompressAndReceive()) + put { params -> + queuedRawDataWriter.enqueue(params, call.decompress()) call.ok("Methods saved") } } fun Route.postTestMetadata() { - val rawDataWriter by closestDI().instance() + val queuedRawDataWriter by closestDI().instance() - post { - rawDataWriter.saveTestMetadata(call.decompressAndReceive()) + post { params -> + queuedRawDataWriter.enqueue(params, call.decompress()) call.ok("Test metadata saved") } } fun Route.putTestSessions() { - val rawDataWriter by closestDI().instance() + val queuedRawDataWriter by closestDI().instance() - put { - rawDataWriter.saveTestSession(call.decompressAndReceive(), call.principal()) + put { params -> + queuedRawDataWriter.enqueue(params, call.decompress(), call.principal()?.username) call.ok("Test sessions saved") } } fun Route.postTestDefinitions() { - val rawDataWriter by closestDI().instance() + val queuedRawDataWriter by closestDI().instance() - post { - rawDataWriter.saveTestDefinitions(call.decompressAndReceive()) + post { params -> + queuedRawDataWriter.enqueue(params, call.decompress()) call.ok("Test definitions saved") } } fun Route.postTestLaunches() { - val rawDataWriter by closestDI().instance() + val queuedRawDataWriter by closestDI().instance() - post { - rawDataWriter.saveTestLaunches(call.decompressAndReceive()) + post { params -> + queuedRawDataWriter.enqueue(params, call.decompress()) call.ok("Test launches saved") } } fun Route.postMethodIgnoreRules() { - val rawDataWriter by closestDI().instance() + val dataManagementService by closestDI().instance() post { - rawDataWriter.saveMethodIgnoreRule(call.decompressAndReceive()) + dataManagementService.saveMethodIgnoreRule(call.decompressAndReceive()) call.ok("Method ignore rule saved") } } fun Route.getMethodIgnoreRules() { - val rawDataWriter by closestDI().instance() + val dataManagementService by closestDI().instance() get { - call.ok(rawDataWriter.getAllMethodIgnoreRules()) + call.ok(dataManagementService.getAllMethodIgnoreRules()) } } fun Route.deleteMethodIgnoreRule() { - val rawDataWriter by closestDI().instance() + val dataManagementService by closestDI().instance() delete { params -> val id = params.id - rawDataWriter.deleteMethodIgnoreRuleById(id) + dataManagementService.deleteMethodIgnoreRuleById(id) call.ok("Method ignore rule deleted") } } @@ -240,32 +244,37 @@ internal suspend fun sendPostRequest(url: String, data: Any) { } } -internal val json = Json { +internal val jsonConfig = Json { ignoreUnknownKeys = true explicitNulls = false } -/** - * Workaround for decompressing the request body before upgrading to Ktor 3.0.0, where this feature works out of the box - * https://github.com/ktorio/ktor/issues/3845 - */ internal suspend inline fun ApplicationCall.decompressAndReceive(): T { - val body: ByteArray = when (request.headers[HttpHeaders.ContentEncoding]) { - "gzip" -> decompressGZip(receiveStream()) - else -> receive() - } + val body: ByteArray = decompress() return when (request.headers[HttpHeaders.ContentType]) { ContentType.Application.ProtoBuf.toString() -> ProtoBuf.decodeFromByteArray(T::class.serializer(), body) - ContentType.Application.Json.toString() -> json.decodeFromString(T::class.serializer(), String(body)) + ContentType.Application.Json.toString() -> jsonConfig.decodeFromString(T::class.serializer(), String(body)) else -> throw request.headers[HttpHeaders.ContentType]?.let { UnsupportedMediaTypeException(ContentType.parse(it)) } ?: BadRequestException("Content-Type header is missing") } } + internal suspend fun decompressGZip(inputStream: InputStream): ByteArray { val decompressedBytes = withContext(Dispatchers.IO) { GZIPInputStream(inputStream).readBytes() } return decompressedBytes } + +/** + * TODO: Workaround for decompressing the request body before upgrading to Ktor 3.0.0, where this feature works out of the box + * https://github.com/ktorio/ktor/issues/3845 + */ +internal suspend inline fun ApplicationCall.decompress(): ByteArray { + return when (request.headers[HttpHeaders.ContentEncoding]) { + "gzip" -> decompressGZip(receiveStream()) + else -> receive() + } +} diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/BuildInfoPayload.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/BuildInfoPayload.kt new file mode 100644 index 000000000..506f04c49 --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/BuildInfoPayload.kt @@ -0,0 +1,30 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.route.payload + +import kotlinx.serialization.Serializable + +@Serializable +class BuildInfoPayload( + val groupId: String, + val appId: String, + val commitSha: String? = null, + val buildVersion: String? = null, + val branch: String? = null, + val commitDate: String? = null, + val commitMessage: String? = null, + val commitAuthor: String? = null +): RawDataPayload \ No newline at end of file diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/BuildPayload.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/BuildPayload.kt index 4cb70ec3f..895422ec2 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/BuildPayload.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/BuildPayload.kt @@ -15,7 +15,6 @@ */ package com.epam.drill.admin.writer.rawdata.route.payload -import kotlinx.datetime.Instant import kotlinx.serialization.Serializable @Serializable @@ -24,8 +23,4 @@ class BuildPayload( val appId: String, val commitSha: String? = null, val buildVersion: String? = null, - val branch: String? = null, - val commitDate: String? = null, - val commitMessage: String? = null, - val commitAuthor: String? = null -) +): RawDataPayload diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/CoveragePayload.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/CoveragePayload.kt index 6d9115b28..227dc8215 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/CoveragePayload.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/CoveragePayload.kt @@ -25,7 +25,7 @@ class CoveragePayload( val commitSha: String?, val buildVersion: String?, val coverage: List -) +): RawDataPayload @Serializable class SingleMethodCoveragePayload( diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/InstancePayload.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/InstancePayload.kt index f508e1e75..ccffe8949 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/InstancePayload.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/InstancePayload.kt @@ -25,4 +25,4 @@ class InstancePayload( val commitSha: String? = null, val buildVersion: String? = null, val envId: String? = null, -) \ No newline at end of file +): RawDataPayload \ No newline at end of file diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/MethodPayload.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/MethodPayload.kt index 44318c7bb..ddfc18236 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/MethodPayload.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/MethodPayload.kt @@ -25,7 +25,7 @@ class MethodsPayload( val buildVersion: String? = null, val instanceId: String? = null, val methods: Array -) +) : RawDataPayload @Serializable class SingleMethodPayload( diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/RawDataPayload.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/RawDataPayload.kt new file mode 100644 index 000000000..dbcc9e374 --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/RawDataPayload.kt @@ -0,0 +1,18 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.route.payload + +sealed interface RawDataPayload \ No newline at end of file diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/TestMetadataPayload.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/TestMetadataPayload.kt index b124f5caa..d6cc3b2e8 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/TestMetadataPayload.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/TestMetadataPayload.kt @@ -25,7 +25,7 @@ class AddTestsPayload( val groupId: String, val sessionId: String, val tests: List = emptyList(), -) +): RawDataPayload @Serializable class TestLaunchInfo( @@ -70,14 +70,14 @@ class SessionPayload( val testTaskId: String, val startedAt: Instant, val builds: List = emptyList(), -) +): RawDataPayload @Serializable class AddTestLaunchesPayload( val groupId: String, val testSessionId: String, val launches: List, -) +): RawDataPayload @Serializable class TestLaunchPayload ( @@ -92,7 +92,7 @@ class TestLaunchPayload ( class AddTestDefinitionsPayload( val groupId: String, val definitions: List -) +): RawDataPayload // TODO: update test agent // Order of fields, and field definitions changed compared to original TestDefinition class: diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/DataManagementService.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/DataManagementService.kt index 60999b83b..53677f6dd 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/DataManagementService.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/DataManagementService.kt @@ -16,6 +16,8 @@ package com.epam.drill.admin.writer.rawdata.service import com.epam.drill.admin.common.principal.User +import com.epam.drill.admin.writer.rawdata.route.payload.MethodIgnoreRulePayload +import com.epam.drill.admin.writer.rawdata.views.MethodIgnoreRuleView interface DataManagementService { /** @@ -33,4 +35,8 @@ interface DataManagementService { * @param user The user performing the deletion (optional). */ suspend fun deleteTestSessionData(groupId: String, testSessionId: String, user: User?) + + suspend fun saveMethodIgnoreRule(rulePayload: MethodIgnoreRulePayload) + suspend fun getAllMethodIgnoreRules(): List + suspend fun deleteMethodIgnoreRuleById(ruleId: Int) } \ No newline at end of file diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/QueuedRawDataWriter.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/QueuedRawDataWriter.kt new file mode 100644 index 000000000..64a91823e --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/QueuedRawDataWriter.kt @@ -0,0 +1,73 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.service + +import com.epam.drill.admin.writer.rawdata.queue.DataQueue +import com.epam.drill.admin.writer.rawdata.queue.QueueInput +import com.epam.drill.admin.writer.rawdata.queue.QueueProcessor +import com.epam.drill.admin.writer.rawdata.route.DataIngestRoute +import com.epam.drill.admin.writer.rawdata.route.payload.AddTestDefinitionsPayload +import com.epam.drill.admin.writer.rawdata.route.payload.AddTestLaunchesPayload +import com.epam.drill.admin.writer.rawdata.route.payload.AddTestsPayload +import com.epam.drill.admin.writer.rawdata.route.payload.BuildInfoPayload +import com.epam.drill.admin.writer.rawdata.route.payload.BuildPayload +import com.epam.drill.admin.writer.rawdata.route.payload.CoveragePayload +import com.epam.drill.admin.writer.rawdata.route.payload.InstancePayload +import com.epam.drill.admin.writer.rawdata.route.payload.MethodsPayload +import com.epam.drill.admin.writer.rawdata.route.payload.RawDataPayload +import com.epam.drill.admin.writer.rawdata.route.payload.SessionPayload +import mu.KotlinLogging + +class QueuedRawDataWriter( + handler: RawDataWriter, + workers: Int = 10, + private val queue: DataQueue +) { + private val logger = KotlinLogging.logger {} + private val usernameKey = "username" + private val queueProcessor = QueueProcessor( + handler = { output -> + val payload = output.payload + val metadata = output.metadata + when (payload) { + is CoveragePayload -> handler.saveCoverage(payload) + is MethodsPayload -> handler.saveMethods(payload) + is BuildPayload -> handler.saveBuild(payload) + is BuildInfoPayload -> handler.saveBuildInfo(payload) + is AddTestDefinitionsPayload -> handler.saveTestDefinitions(payload) + is AddTestLaunchesPayload -> handler.saveTestLaunches(payload) + is AddTestsPayload -> handler.saveTestMetadata(payload) + is InstancePayload -> handler.saveInstance(payload) + is SessionPayload -> handler.saveTestSession(payload, metadata[usernameKey]) + } + }, + onError = { output, e -> + logger.error(e) { "Error while saving [${output.payload::class.simpleName}]: ${e.message}" } + }, + onSuccess = { payload -> + logger.debug { "Successfully saved [${payload::class.simpleName}]" } + } + ) + + init { + queueProcessor.run(queue, workers) + } + + suspend fun enqueue(route: DataIngestRoute, payload: ByteArray, username: String? = null) { + val metadata = username?.let { mapOf(usernameKey to it) } ?: emptyMap() + queue.enqueue(QueueInput(route, payload, metadata)) + } +} \ No newline at end of file diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/RawDataWriter.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/RawDataWriter.kt index ebdca25d3..dc2e44f19 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/RawDataWriter.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/RawDataWriter.kt @@ -21,15 +21,12 @@ import com.epam.drill.admin.writer.rawdata.views.MethodIgnoreRuleView interface RawDataWriter { suspend fun saveBuild(buildPayload: BuildPayload) - suspend fun saveBuildInfo(buildPayload: BuildPayload) + suspend fun saveBuildInfo(buildPayload: BuildInfoPayload) suspend fun saveInstance(instancePayload: InstancePayload) suspend fun saveMethods(methodsPayload: MethodsPayload) suspend fun saveCoverage(coveragePayload: CoveragePayload) suspend fun saveTestMetadata(testsPayload: AddTestsPayload) suspend fun saveTestDefinitions(testDefinitionsPayload: AddTestDefinitionsPayload) suspend fun saveTestLaunches(testLaunchesPayload: AddTestLaunchesPayload) - suspend fun saveTestSession(sessionPayload: SessionPayload, user: User?) - suspend fun saveMethodIgnoreRule(rulePayload: MethodIgnoreRulePayload) - suspend fun getAllMethodIgnoreRules(): List - suspend fun deleteMethodIgnoreRuleById(ruleId: Int) + suspend fun saveTestSession(sessionPayload: SessionPayload, username: String?) } diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/DataManagementServiceImpl.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/DataManagementServiceImpl.kt index 14efde7e8..25006afea 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/DataManagementServiceImpl.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/DataManagementServiceImpl.kt @@ -22,14 +22,18 @@ import com.epam.drill.admin.common.scheduler.deleteMetricsDataJobKey import com.epam.drill.admin.common.scheduler.getBuildDataDeletionDataMap import com.epam.drill.admin.common.scheduler.getTestDataDeletionDataMap import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig.transaction +import com.epam.drill.admin.writer.rawdata.entity.MethodIgnoreRule import com.epam.drill.admin.writer.rawdata.repository.BuildRepository import com.epam.drill.admin.writer.rawdata.repository.CoverageRepository import com.epam.drill.admin.writer.rawdata.repository.InstanceRepository +import com.epam.drill.admin.writer.rawdata.repository.MethodIgnoreRuleRepository import com.epam.drill.admin.writer.rawdata.repository.MethodRepository import com.epam.drill.admin.writer.rawdata.repository.TestLaunchRepository import com.epam.drill.admin.writer.rawdata.repository.TestSessionBuildRepository import com.epam.drill.admin.writer.rawdata.repository.TestSessionRepository +import com.epam.drill.admin.writer.rawdata.route.payload.MethodIgnoreRulePayload import com.epam.drill.admin.writer.rawdata.service.DataManagementService +import com.epam.drill.admin.writer.rawdata.views.MethodIgnoreRuleView class DataManagementServiceImpl( private val buildRepository: BuildRepository, @@ -39,6 +43,7 @@ class DataManagementServiceImpl( private val methodRepository: MethodRepository, private val testSessionBuildRepository: TestSessionBuildRepository, private val testLaunchRepository: TestLaunchRepository, + private val methodIgnoreRuleRepository: MethodIgnoreRuleRepository, private val scheduler: DrillScheduler, ) : DataManagementService { @@ -68,5 +73,29 @@ class DataManagementServiceImpl( scheduler.triggerJob(deleteMetricsDataJobKey, getTestDataDeletionDataMap(groupId, testSessionId)) } } + + override suspend fun saveMethodIgnoreRule(rulePayload: MethodIgnoreRulePayload) { + val rule = MethodIgnoreRule( + groupId = rulePayload.groupId, + appId = rulePayload.appId, + namePattern = rulePayload.namePattern, + classnamePattern = rulePayload.classnamePattern, + ) + transaction { + methodIgnoreRuleRepository.create(rule) + } + } + + override suspend fun getAllMethodIgnoreRules(): List { + return transaction { + methodIgnoreRuleRepository.getAll() + } + } + + override suspend fun deleteMethodIgnoreRuleById(ruleId: Int) { + transaction { + methodIgnoreRuleRepository.deleteById(ruleId) + } + } } diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataMappers.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataMappers.kt new file mode 100644 index 000000000..01e1177e5 --- /dev/null +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataMappers.kt @@ -0,0 +1,48 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.service.impl + +import com.epam.drill.admin.writer.rawdata.queue.record.RecordKey +import com.epam.drill.admin.writer.rawdata.route.BuildsInfoRoute +import com.epam.drill.admin.writer.rawdata.route.BuildsRoute +import com.epam.drill.admin.writer.rawdata.route.CoverageRoute +import com.epam.drill.admin.writer.rawdata.route.DataIngestRoute +import com.epam.drill.admin.writer.rawdata.route.InstancesRoute +import com.epam.drill.admin.writer.rawdata.route.MethodsRoute +import com.epam.drill.admin.writer.rawdata.route.TestDefinitionsRoute +import com.epam.drill.admin.writer.rawdata.route.TestLaunchesRoute +import com.epam.drill.admin.writer.rawdata.route.TestMetadataRoute +import com.epam.drill.admin.writer.rawdata.route.TestSessionRoute +import com.epam.drill.admin.writer.rawdata.route.payload.RawDataPayload +import kotlin.reflect.KClass + +fun DataIngestRoute.toRecordKey(): RecordKey = when (this) { + is CoverageRoute -> RecordKey.COVERAGE + is BuildsInfoRoute -> RecordKey.BUILDS_INFO + is BuildsRoute -> RecordKey.BUILDS + is InstancesRoute -> RecordKey.INSTANCES + is MethodsRoute -> RecordKey.METHODS + is TestDefinitionsRoute -> RecordKey.TEST_DEFINITIONS + is TestLaunchesRoute -> RecordKey.TEST_LAUNCHES + is TestMetadataRoute -> RecordKey.TEST_METADATA + is TestSessionRoute -> RecordKey.TEST_SESSIONS +} + +fun DataIngestRoute.toPayloadType(): KClass = toRecordKey().payloadType + +fun DataIngestRoute.toKey(): String = toRecordKey().value + +fun String.toPayloadType(): KClass = RecordKey.fromValue(this).payloadType diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt index 9c227c91e..0f633d2e9 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt @@ -24,7 +24,6 @@ import com.epam.drill.admin.writer.rawdata.repository.* import com.epam.drill.admin.writer.rawdata.route.payload.* import com.epam.drill.admin.writer.rawdata.service.RawDataWriter import com.epam.drill.admin.writer.rawdata.util.md5 -import com.epam.drill.admin.writer.rawdata.views.MethodIgnoreRuleView import kotlinx.datetime.TimeZone import kotlinx.datetime.toJavaLocalDateTime import kotlinx.datetime.toLocalDateTime @@ -44,7 +43,6 @@ class RawDataServiceImpl( private val buildRepository: BuildRepository, private val testSessionRepository: TestSessionRepository, private val testSessionBuildRepository: TestSessionBuildRepository, - private val methodIgnoreRuleRepository: MethodIgnoreRuleRepository ) : RawDataWriter { override suspend fun saveBuild(buildPayload: BuildPayload) { @@ -67,7 +65,7 @@ class RawDataServiceImpl( } } - override suspend fun saveBuildInfo(buildPayload: BuildPayload) { + override suspend fun saveBuildInfo(buildPayload: BuildInfoPayload) { val build = Build( id = generateBuildId( buildPayload.groupId, @@ -255,13 +253,13 @@ class RawDataServiceImpl( }.let { testLaunchRepository.createMany(it) } } - override suspend fun saveTestSession(sessionPayload: SessionPayload, user: User?) { + override suspend fun saveTestSession(sessionPayload: SessionPayload, username: String?) { val testSession = TestSession( id = sessionPayload.id, groupId = sessionPayload.groupId, testTaskId = sessionPayload.testTaskId, startedAt = sessionPayload.startedAt.toLocalDateTime(TimeZone.UTC).toJavaLocalDateTime(), - createdBy = user?.username + createdBy = username ) transaction { testSessionRepository.create(testSession) @@ -279,30 +277,6 @@ class RawDataServiceImpl( } } - override suspend fun saveMethodIgnoreRule(rulePayload: MethodIgnoreRulePayload) { - val rule = MethodIgnoreRule( - groupId = rulePayload.groupId, - appId = rulePayload.appId, - namePattern = rulePayload.namePattern, - classnamePattern = rulePayload.classnamePattern, - ) - transaction { - methodIgnoreRuleRepository.create(rule) - } - } - - override suspend fun getAllMethodIgnoreRules(): List { - return transaction { - methodIgnoreRuleRepository.getAll() - } - } - - override suspend fun deleteMethodIgnoreRuleById(ruleId: Int) { - transaction { - methodIgnoreRuleRepository.deleteById(ruleId) - } - } - private fun convertGitDefaultDateTime(commitDate: String): LocalDateTime { return ZonedDateTime.parse(commitDate, DateTimeFormatter.ofPattern("EEE MMM d HH:mm:ss yyyy Z", Locale.ENGLISH)) .toLocalDateTime() diff --git a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/BuildsApiTest.kt b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/BuildsApiTest.kt index 7260a0a54..fa18e303e 100644 --- a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/BuildsApiTest.kt +++ b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/BuildsApiTest.kt @@ -67,18 +67,20 @@ class BuildsApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }) { ) } - val savedBuilds = BuildTable.selectAll() - .filter { it[BuildTable.groupId] == testGroup } - .filter { it[BuildTable.appId] == testApp } - .filter { it[BuildTable.buildVersion] == testBuildVersion } - assertEquals(1, savedBuilds.size) - savedBuilds.forEach { - assertNull(it[BuildTable.branch]) - assertNotNull(it[BuildTable.commitSha]) - assertNull(it[BuildTable.commitAuthor]) - assertNull(it[BuildTable.commitMessage]) - assertNull(it[BuildTable.committedAt]) - assertTrue(it[BuildTable.createdAt] >= timeBeforeTest) + waitUntilInTransaction { + val savedBuilds = BuildTable.selectAll() + .filter { it[BuildTable.groupId] == testGroup } + .filter { it[BuildTable.appId] == testApp } + .filter { it[BuildTable.buildVersion] == testBuildVersion } + assertEquals(1, savedBuilds.size) + savedBuilds.forEach { + assertNull(it[BuildTable.branch]) + assertNotNull(it[BuildTable.commitSha]) + assertNull(it[BuildTable.commitAuthor]) + assertNull(it[BuildTable.commitMessage]) + assertNull(it[BuildTable.committedAt]) + assertTrue(it[BuildTable.createdAt] >= timeBeforeTest) + } } } @@ -119,18 +121,20 @@ class BuildsApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }) { ) } - val savedBuilds = BuildTable.selectAll() - .filter { it[BuildTable.groupId] == testGroup } - .filter { it[BuildTable.appId] == testApp } - .filter { it[BuildTable.buildVersion] == testBuildVersion } - assertEquals(1, savedBuilds.size) - savedBuilds.forEach { - assertEquals("main", it[BuildTable.branch]) - assertNotNull(it[BuildTable.commitSha]) - assertEquals("John Doe", it[BuildTable.commitAuthor]) - assertEquals("Initial commit", it[BuildTable.commitMessage]) - assertNotNull(it[BuildTable.committedAt]) - assertTrue(it[BuildTable.createdAt] >= timeBeforeTest) + waitUntilInTransaction { + val savedBuilds = BuildTable.selectAll() + .filter { it[BuildTable.groupId] == testGroup } + .filter { it[BuildTable.appId] == testApp } + .filter { it[BuildTable.buildVersion] == testBuildVersion } + assertEquals(1, savedBuilds.size) + savedBuilds.forEach { + assertEquals("main", it[BuildTable.branch]) + assertNotNull(it[BuildTable.commitSha]) + assertEquals("John Doe", it[BuildTable.commitAuthor]) + assertEquals("Initial commit", it[BuildTable.commitMessage]) + assertNotNull(it[BuildTable.committedAt]) + assertTrue(it[BuildTable.createdAt] >= timeBeforeTest) + } } } @@ -160,6 +164,22 @@ class BuildsApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }) { } """.trimIndent() ) + }.apply { + assertEquals(HttpStatusCode.OK, status) + } + + waitUntilInTransaction { + val savedBuilds = BuildTable.selectAll() + .filter { it[BuildTable.groupId] == testGroup } + .filter { it[BuildTable.appId] == testApp } + .filter { it[BuildTable.buildVersion] == testBuildVersion } + assertEquals(1, savedBuilds.size) + savedBuilds.forEach { + assertNotNull(it[BuildTable.branch]) + assertNotNull(it[BuildTable.commitAuthor]) + assertNotNull(it[BuildTable.commitMessage]) + assertNotNull(it[BuildTable.committedAt]) + } } app.client.put("/builds") { @@ -178,16 +198,18 @@ class BuildsApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }) { assertEquals(HttpStatusCode.OK, status) } - val buildsBeforeInfo = BuildTable.selectAll() - .filter { it[BuildTable.groupId] == testGroup } - .filter { it[BuildTable.appId] == testApp } - .filter { it[BuildTable.buildVersion] == testBuildVersion } - assertEquals(1, buildsBeforeInfo.size) - buildsBeforeInfo.forEach { - assertNotNull(it[BuildTable.branch]) - assertNotNull(it[BuildTable.commitAuthor]) - assertNotNull(it[BuildTable.commitMessage]) - assertNotNull(it[BuildTable.committedAt]) + waitUntilInTransaction { + val buildsBeforeInfo = BuildTable.selectAll() + .filter { it[BuildTable.groupId] == testGroup } + .filter { it[BuildTable.appId] == testApp } + .filter { it[BuildTable.buildVersion] == testBuildVersion } + assertEquals(1, buildsBeforeInfo.size) + buildsBeforeInfo.forEach { + assertNotNull(it[BuildTable.branch]) + assertNotNull(it[BuildTable.commitAuthor]) + assertNotNull(it[BuildTable.commitMessage]) + assertNotNull(it[BuildTable.committedAt]) + } } } diff --git a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/CoverageApiTest.kt b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/CoverageApiTest.kt index ac8ffa09d..1b6cf627e 100644 --- a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/CoverageApiTest.kt +++ b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/CoverageApiTest.kt @@ -20,7 +20,6 @@ import com.epam.drill.admin.writer.rawdata.table.MethodCoverageTable import com.epam.drill.admin.test.* import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig import com.epam.drill.admin.writer.rawdata.config.rawDataServicesDIModule -import com.epam.drill.admin.writer.rawdata.table.MethodTable import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.http.* @@ -85,17 +84,19 @@ class CoverageApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }) ) } - val savedCoverageMethods = MethodCoverageTable.selectAll().asSequence() - .filter { it[MethodCoverageTable.groupId] == testGroup } - .filter { it[MethodCoverageTable.appId] == testApp } - .filter { it[MethodCoverageTable.instanceId] == testInstance } - .filter { it[MethodCoverageTable.buildId] == "$testGroup:$testApp:$testBuildVersion" } - .filter { it[MethodCoverageTable.testId] == testTestId } - .toList() - assertEquals(2, savedCoverageMethods.size) - savedCoverageMethods.forEach { - assertTrue(it[MethodCoverageTable.createdAt] >= timeBeforeTest) - assertTrue(it[MethodCoverageTable.methodId] != null) + waitUntilInTransaction { + val savedCoverageMethods = MethodCoverageTable.selectAll().asSequence() + .filter { it[MethodCoverageTable.groupId] == testGroup } + .filter { it[MethodCoverageTable.appId] == testApp } + .filter { it[MethodCoverageTable.instanceId] == testInstance } + .filter { it[MethodCoverageTable.buildId] == "$testGroup:$testApp:$testBuildVersion" } + .filter { it[MethodCoverageTable.testId] == testTestId } + .toList() + assertEquals(2, savedCoverageMethods.size) + savedCoverageMethods.forEach { + assertTrue(it[MethodCoverageTable.createdAt] >= timeBeforeTest) + assertTrue(it[MethodCoverageTable.methodId] != null) + } } } } \ No newline at end of file diff --git a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/InstancesApiTest.kt b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/InstancesApiTest.kt index 8c2438fe9..373703b92 100644 --- a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/InstancesApiTest.kt +++ b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/InstancesApiTest.kt @@ -95,24 +95,26 @@ class InstancesApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }) ) } - val savedBuilds = BuildTable.selectAll() - .filter { it[BuildTable.groupId] == testGroup } - .filter { it[BuildTable.appId] == testApp } - .filter { it[BuildTable.instanceId] == testInstance } - assertEquals(1, savedBuilds.size) - savedBuilds.forEach { - assertNotNull(it[BuildTable.commitSha]) - assertNotNull(it[BuildTable.buildVersion]) - assertTrue(it[BuildTable.createdAt] >= timeBeforeTest) - } - val savedInstances = InstanceTable.selectAll() - .filter { it[InstanceTable.groupId] == testGroup } - .filter { it[InstanceTable.appId] == testApp } - .filter { it[InstanceTable.id].value == testInstance } - assertEquals(1, savedInstances.size) - savedInstances.forEach { - assertNotNull(it[InstanceTable.envId]) - assertTrue(it[InstanceTable.createdAt] >= timeBeforeTest) + waitUntilInTransaction { + val savedBuilds = BuildTable.selectAll() + .filter { it[BuildTable.groupId] == testGroup } + .filter { it[BuildTable.appId] == testApp } + .filter { it[BuildTable.instanceId] == testInstance } + assertEquals(1, savedBuilds.size) + savedBuilds.forEach { + assertNotNull(it[BuildTable.commitSha]) + assertNotNull(it[BuildTable.buildVersion]) + assertTrue(it[BuildTable.createdAt] >= timeBeforeTest) + } + val savedInstances = InstanceTable.selectAll() + .filter { it[InstanceTable.groupId] == testGroup } + .filter { it[InstanceTable.appId] == testApp } + .filter { it[InstanceTable.id].value == testInstance } + assertEquals(1, savedInstances.size) + savedInstances.forEach { + assertNotNull(it[InstanceTable.envId]) + assertTrue(it[InstanceTable.createdAt] >= timeBeforeTest) + } } } @@ -149,19 +151,21 @@ class InstancesApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }) ) } - val savedInstances = InstanceTable.selectAll() - .filter { it[InstanceTable.groupId] == testExistingGroup } - .filter { it[InstanceTable.appId] == testExistingApp } - .filter { it[InstanceTable.id].value == testInstance } - assertEquals(1, savedInstances.size) - savedInstances.forEach { - assertNotNull(it[InstanceTable.envId]) - assertTrue(it[InstanceTable.createdAt] >= timeBeforeTest) + waitUntilInTransaction { + val savedInstances = InstanceTable.selectAll() + .filter { it[InstanceTable.groupId] == testExistingGroup } + .filter { it[InstanceTable.appId] == testExistingApp } + .filter { it[InstanceTable.id].value == testInstance } + assertEquals(1, savedInstances.size) + savedInstances.forEach { + assertNotNull(it[InstanceTable.envId]) + assertTrue(it[InstanceTable.createdAt] >= timeBeforeTest) + } + val savedBuilds = BuildTable.selectAll() + .filter { it[BuildTable.groupId] == testExistingGroup } + .filter { it[BuildTable.appId] == testExistingApp } + .filter { it[BuildTable.buildVersion] == testExistingBuildVersion } + assertEquals(1, savedBuilds.size) } - val savedBuilds = BuildTable.selectAll() - .filter { it[BuildTable.groupId] == testExistingGroup } - .filter { it[BuildTable.appId] == testExistingApp } - .filter { it[BuildTable.buildVersion] == testExistingBuildVersion } - assertEquals(1, savedBuilds.size) } } \ No newline at end of file diff --git a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/MethodsApiTest.kt b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/MethodsApiTest.kt index 891b6662a..923a941c1 100644 --- a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/MethodsApiTest.kt +++ b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/MethodsApiTest.kt @@ -89,13 +89,15 @@ class MethodsApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }) { ) } - val savedMethods = MethodTable.selectAll() - .filter { it[MethodTable.groupId] == testGroup } - .filter { it[MethodTable.appId] == testApp } - .filter { it[MethodTable.classname] == testClassname } - assertEquals(2, savedMethods.size) - savedMethods.forEach { - assertTrue(it[MethodTable.createdAt] >= timeBeforeTest) + waitUntilInTransaction { + val savedMethods = MethodTable.selectAll() + .filter { it[MethodTable.groupId] == testGroup } + .filter { it[MethodTable.appId] == testApp } + .filter { it[MethodTable.classname] == testClassname } + assertEquals(2, savedMethods.size) + savedMethods.forEach { + assertTrue(it[MethodTable.createdAt] >= timeBeforeTest) + } } } } \ No newline at end of file diff --git a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/SettingsRoutesTest.kt b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/SettingsRoutesTest.kt index 83949cef3..4aa5cf612 100644 --- a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/SettingsRoutesTest.kt +++ b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/SettingsRoutesTest.kt @@ -66,7 +66,7 @@ class SettingsRoutesTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }.apply { assertEquals(HttpStatusCode.OK, status) } - transaction { + waitUntilInTransaction { val savedSettings = GroupSettingsTable.selectAll().where { GroupSettingsTable.id eq testGroup }.single() assertEquals(30, savedSettings[GroupSettingsTable.retentionPeriodDays]) assertEquals(10, savedSettings[GroupSettingsTable.metricsPeriodDays]) @@ -112,7 +112,7 @@ class SettingsRoutesTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) client.delete("group-settings/$testGroup").apply { assertEquals(HttpStatusCode.OK, status) } - transaction { + waitUntilInTransaction { assertTrue(GroupSettingsTable.selectAll().where { GroupSettingsTable.id eq testGroup }.empty()) } } diff --git a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/TestMetadataApiTest.kt b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/TestMetadataApiTest.kt index 909dcd752..b1e6a2bcc 100644 --- a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/TestMetadataApiTest.kt +++ b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/TestMetadataApiTest.kt @@ -18,6 +18,7 @@ package com.epam.drill.admin.writer.rawdata import com.epam.drill.admin.test.DatabaseTests import com.epam.drill.admin.test.assertJsonEquals import com.epam.drill.admin.test.drillApplication +import com.epam.drill.admin.test.waitUntilInTransaction import com.epam.drill.admin.test.withRollback import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig import com.epam.drill.admin.writer.rawdata.config.rawDataServicesDIModule @@ -109,28 +110,30 @@ class TestMetadataApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) ) } - val savedTestLaunches = TestLaunchTable.selectAll() - .filter { it[TestLaunchTable.groupId] == testGroup } - .filter { it[TestLaunchTable.testSessionId] == testSession } - .filter { it[TestLaunchTable.testDefinitionId] == testDefinition } - assertEquals(2, savedTestLaunches.size) - savedTestLaunches.forEach { - assertNotNull(it[TestLaunchTable.duration]) - assertNotNull(it[TestLaunchTable.result]) - assertTrue(it[TestLaunchTable.createdAt] >= timeBeforeTest) - } + waitUntilInTransaction { + val savedTestLaunches = TestLaunchTable.selectAll() + .filter { it[TestLaunchTable.groupId] == testGroup } + .filter { it[TestLaunchTable.testSessionId] == testSession } + .filter { it[TestLaunchTable.testDefinitionId] == testDefinition } + assertEquals(2, savedTestLaunches.size) + savedTestLaunches.forEach { + assertNotNull(it[TestLaunchTable.duration]) + assertNotNull(it[TestLaunchTable.result]) + assertTrue(it[TestLaunchTable.createdAt] >= timeBeforeTest) + } - val savedTestDefinitions = TestDefinitionTable.selectAll() - .filter { it[TestDefinitionTable.groupId] == testGroup } - .filter { it[TestDefinitionTable.id].value == testDefinition } - assertEquals(1, savedTestDefinitions.size) - savedTestDefinitions.forEach { - assertNotNull(it[TestDefinitionTable.runner]) - assertNotNull(it[TestDefinitionTable.name]) - assertNotNull(it[TestDefinitionTable.path]) - assertEquals(2, it[TestDefinitionTable.tags]?.size) - assertEquals(2, it[TestDefinitionTable.metadata]?.jsonObject?.size) - assertTrue(it[TestDefinitionTable.createdAt] >= timeBeforeTest) + val savedTestDefinitions = TestDefinitionTable.selectAll() + .filter { it[TestDefinitionTable.groupId] == testGroup } + .filter { it[TestDefinitionTable.id].value == testDefinition } + assertEquals(1, savedTestDefinitions.size) + savedTestDefinitions.forEach { + assertNotNull(it[TestDefinitionTable.runner]) + assertNotNull(it[TestDefinitionTable.name]) + assertNotNull(it[TestDefinitionTable.path]) + assertEquals(2, it[TestDefinitionTable.tags]?.size) + assertEquals(2, it[TestDefinitionTable.metadata]?.jsonObject?.size) + assertTrue(it[TestDefinitionTable.createdAt] >= timeBeforeTest) + } } } @@ -186,14 +189,16 @@ class TestMetadataApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) ) } - val saved = TestDefinitionTable.selectAll() - .filter { it[TestDefinitionTable.groupId] == testGroup } + waitUntilInTransaction { + val saved = TestDefinitionTable.selectAll() + .filter { it[TestDefinitionTable.groupId] == testGroup } - assertEquals(2, saved.size) - saved.forEach { - assertNotNull(it[TestDefinitionTable.runner]) - assertNotNull(it[TestDefinitionTable.name]) - assertTrue(it[TestDefinitionTable.createdAt] >= timeBeforeTest) + assertEquals(2, saved.size) + saved.forEach { + assertNotNull(it[TestDefinitionTable.runner]) + assertNotNull(it[TestDefinitionTable.name]) + assertTrue(it[TestDefinitionTable.createdAt] >= timeBeforeTest) + } } } @@ -246,16 +251,18 @@ class TestMetadataApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) ) } - val saved = TestLaunchTable.selectAll() - .filter { it[TestLaunchTable.groupId] == testGroup } - .filter { it[TestLaunchTable.testSessionId] == testSession } - .filter { it[TestLaunchTable.testDefinitionId] == testDefinition } + waitUntilInTransaction { + val saved = TestLaunchTable.selectAll() + .filter { it[TestLaunchTable.groupId] == testGroup } + .filter { it[TestLaunchTable.testSessionId] == testSession } + .filter { it[TestLaunchTable.testDefinitionId] == testDefinition } - assertEquals(2, saved.size) - saved.forEach { - assertNotNull(it[TestLaunchTable.result]) - assertNotNull(it[TestLaunchTable.duration]) - assertTrue(it[TestLaunchTable.createdAt] >= timeBeforeTest) + assertEquals(2, saved.size) + saved.forEach { + assertNotNull(it[TestLaunchTable.result]) + assertNotNull(it[TestLaunchTable.duration]) + assertTrue(it[TestLaunchTable.createdAt] >= timeBeforeTest) + } } } diff --git a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/TestSessionsApiTest.kt b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/TestSessionsApiTest.kt index 2dabb66f0..8ae0e8dcf 100644 --- a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/TestSessionsApiTest.kt +++ b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/TestSessionsApiTest.kt @@ -66,14 +66,16 @@ class TestSessionsApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) ) } - val savedTestSessions = TestSessionTable.selectAll() - .filter { it[TestSessionTable.groupId] == testGroup } - .filter { it[TestSessionTable.id].value == testSession } - assertEquals(1, savedTestSessions.size) - savedTestSessions.forEach { - assertNotNull(it[TestSessionTable.testTaskId]) - assertNotNull(it[TestSessionTable.startedAt]) - assertTrue(it[TestSessionTable.createdAt] >= timeBeforeTest) + waitUntilInTransaction { + val savedTestSessions = TestSessionTable.selectAll() + .filter { it[TestSessionTable.groupId] == testGroup } + .filter { it[TestSessionTable.id].value == testSession } + assertEquals(1, savedTestSessions.size) + savedTestSessions.forEach { + assertNotNull(it[TestSessionTable.testTaskId]) + assertNotNull(it[TestSessionTable.startedAt]) + assertTrue(it[TestSessionTable.createdAt] >= timeBeforeTest) + } } } @@ -119,13 +121,15 @@ class TestSessionsApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) ) } - val savedSessionBuilds = TestSessionBuildTable.selectAll() - .filter { it[TestSessionBuildTable.testSessionId] == testSession } - assertEquals(2, savedSessionBuilds.size) - savedSessionBuilds.forEach { - assertNotNull(it[TestSessionBuildTable.buildId]) - assertNotNull(it[TestSessionBuildTable.groupId]) - assertTrue(it[TestSessionBuildTable.createdAt] >= timeBeforeTest) + waitUntilInTransaction { + val savedSessionBuilds = TestSessionBuildTable.selectAll() + .filter { it[TestSessionBuildTable.testSessionId] == testSession } + assertEquals(2, savedSessionBuilds.size) + savedSessionBuilds.forEach { + assertNotNull(it[TestSessionBuildTable.buildId]) + assertNotNull(it[TestSessionBuildTable.groupId]) + assertTrue(it[TestSessionBuildTable.createdAt] >= timeBeforeTest) + } } } } \ No newline at end of file diff --git a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/ChannelDataQueueTest.kt b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/ChannelDataQueueTest.kt new file mode 100644 index 000000000..8e89e587b --- /dev/null +++ b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/ChannelDataQueueTest.kt @@ -0,0 +1,76 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.queue.impl + +import com.epam.drill.admin.writer.rawdata.queue.QueueInput +import com.epam.drill.admin.writer.rawdata.route.BuildsRoute +import com.epam.drill.admin.writer.rawdata.route.jsonConfig +import com.epam.drill.admin.writer.rawdata.route.payload.BuildPayload +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedSendChannelException +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.time.Duration.Companion.seconds + +class ChannelDataQueueTest { + + @Test + fun `enqueue should deserialize payload and make it available via dequeue`() { + val queue = ChannelDataQueue( + deserializer = ::json, + routeToPayloadType = { BuildPayload::class }, + capacity = Channel.UNLIMITED, + shutdownTimeout = 1.seconds, + ) + val testBytes = BuildPayload(groupId = "my-group", appId = "my-app", buildVersion = "1.0.0").toBytes() + val testMetadata = mapOf("key-1" to "value-1") + + runBlocking { + queue.enqueue(QueueInput(BuildsRoute(), testBytes, testMetadata)) + val output = withTimeout(2_000) { queue.dequeue() } + + assertEquals("my-group", output.payload.groupId) + assertEquals("my-app", output.payload.appId) + assertEquals(testMetadata, output.metadata) + } + + queue.close() + } + + @Test + fun `close should close the output channel and prevent further enqueueing`() { + val queue = ChannelDataQueue( + deserializer = ::json, + routeToPayloadType = { BuildPayload::class }, + capacity = Channel.UNLIMITED, + shutdownTimeout = 1.seconds, + ) + + queue.close() + + runBlocking { + assertFailsWith { + queue.enqueue(QueueInput(BuildsRoute(), ByteArray(0), emptyMap())) + } + } + } + + private fun BuildPayload.toBytes() = + jsonConfig.encodeToString(BuildPayload.serializer(), this).toByteArray(Charsets.UTF_8) +} \ No newline at end of file diff --git a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/KafkaDataQueueTest.kt b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/KafkaDataQueueTest.kt new file mode 100644 index 000000000..3daab3292 --- /dev/null +++ b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/queue/impl/KafkaDataQueueTest.kt @@ -0,0 +1,108 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.writer.rawdata.queue.impl + +import com.epam.drill.admin.writer.rawdata.queue.QueueInput +import com.epam.drill.admin.writer.rawdata.route.BuildsRoute +import com.epam.drill.admin.writer.rawdata.route.jsonConfig +import com.epam.drill.admin.writer.rawdata.route.payload.BuildPayload +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout +import org.testcontainers.kafka.ConfluentKafkaContainer +import org.testcontainers.junit.jupiter.Container +import org.testcontainers.junit.jupiter.Testcontainers +import org.testcontainers.utility.DockerImageName +import java.util.UUID +import kotlin.reflect.KClass +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds + +@Testcontainers +class KafkaDataQueueTest { + + companion object { + @Container + @JvmField + val kafka: ConfluentKafkaContainer = + ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")) + } + + private fun uniqueTopic() = "test-topic-${UUID.randomUUID()}" + private fun uniqueGroupId() = "test-group-${UUID.randomUUID()}" + + private fun createQueue( + topic: String = uniqueTopic(), + recordKeyToPayloadType: (String) -> KClass = { + when (it) { + "builds" -> BuildPayload::class + else -> throw IllegalArgumentException("Unknown record key: $it") + } + }, + routeToRecordKey: (BuildsRoute) -> String = { "builds" }, + ) = KafkaDataQueue.create( + bootstrapServers = kafka.bootstrapServers, + topic = topic, + consumerGroupId = uniqueGroupId(), + deserializer = ::json, + recordKeyToPayloadType = recordKeyToPayloadType, + routeToRecordKey = routeToRecordKey, + capacity = Channel.UNLIMITED, + pollTimeout = 500.milliseconds, + shutdownTimeout = 5.seconds, + ) + + + @Test + fun `enqueue and dequeue should deliver payload`() { + val queue = createQueue() + val testBytes = BuildPayload(groupId = "my-group", appId = "my-app", buildVersion = "1.0.0").toBytes() + val testMetadata = mapOf("key-1" to "value-1") + + runBlocking { + queue.enqueue(QueueInput(BuildsRoute(), testBytes, testMetadata)) + val output = withTimeout(10_000) { queue.dequeue() } + + assertEquals("my-group", output.payload.groupId) + assertEquals("my-app", output.payload.appId) + assertEquals("value-1", output.metadata["key-1"]) + } + + queue.close() + } + + @Test + fun `close should stop consuming and mark channel as closed`() { + val queue = createQueue() + + queue.close() + + runBlocking { + assertFailsWith("Cannot perform operation after producer has been closed") { + queue.enqueue(QueueInput(BuildsRoute(), ByteArray(0), emptyMap())) + } + } + } + + private fun BuildPayload.toBytes() = + jsonConfig.encodeToString(BuildPayload.serializer(), this).toByteArray(Charsets.UTF_8) +} + + + diff --git a/gradle.properties b/gradle.properties index 4134aff76..cd2965e59 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,10 +17,12 @@ flywaydbVersion = 8.4.1 jibVersion = 3.1.4 openApiGeneratorVersion = 6.6.0 mockitoKotlinVersion = 4.1.0 +awaitilityVersion = 4.2.2 jbcryptVersion = 0.4 caffeineVersion = 2.9.3 quartzVersion = 2.5.0 logbackVersion = 1.3.14 +kafkaClientsVersion = 3.7.0 loggerSkipJvmTests = false testsSkipIntegrationTests = false