Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8ee2fe0
refactor: move method ignore rules services to DataManagementService …
iryabov May 7, 2026
6493c3f
feat: implement queue-based data ingestion with ChannelDataQueue and …
iryabov May 11, 2026
68253bf
feat: implement RawDataQueuedWriter for queue processing
iryabov May 12, 2026
f9a6542
refactor: rename raw data configuration for queue processing
iryabov May 12, 2026
9aca341
feat: enhance RawDataQueuedWriter with specific enqueue methods for v…
iryabov May 12, 2026
f68f9e4
refactor: refactor ChannelDataQueue and RawDataQueuedWriter for enhan…
iryabov May 13, 2026
410d291
feat: enhance build information handling
iryabov May 13, 2026
1f6f11e
refactor: enhance queue processing with DataIngestRoute
iryabov May 13, 2026
18d9eba
feat: add waitUntilInBlocking function for improved assertion handlin…
iryabov May 13, 2026
db78dd6
feat: implement KafkaDataQueue for data ingestion processing
iryabov May 14, 2026
39d5178
feat: add Kafka-based configuration for raw data queue processing
iryabov May 14, 2026
d2d1a5c
feat: change raw data queue type from CHANNEL to IN_MEMORY
iryabov May 14, 2026
4379c10
feat: add shared Kafka configuration
iryabov May 14, 2026
66235d3
feat: add unit tests for KafkaDataQueue
iryabov May 15, 2026
800e430
feat: add Prometheus monitoring
iryabov May 18, 2026
76925c9
feat: integrate Micrometer for ETL metrics tracking
iryabov May 19, 2026
53c422b
refactor: rename EtlMetrics to EtlMeter
iryabov May 19, 2026
88d3b41
feat: integrate RawDataMeter for metrics tracking
iryabov May 19, 2026
6fcdd47
feat: add Micrometer metrics integration for tests
iryabov May 19, 2026
72153b8
refactor: simplify EtlMeter by removing internal metric caches
iryabov May 20, 2026
66ecf46
feat: enhance EtlMeter with additional metrics for ETL processes
iryabov May 20, 2026
16897ee
feat: add logging filter to ignore specific paths in CallLogging
iryabov May 20, 2026
d5f8131
feat: update EtlMeter to use counters for metrics and improve gauge r…
iryabov May 22, 2026
4135907
feat: add SimpleTransformer to ETLSimpleTest for flow transformation
iryabov May 22, 2026
3a31120
refactor: move data filtration logic from isProcessable function of l…
iryabov May 25, 2026
60615ed
feat: implement fluent ETL pipeline builder DSL and refactor existing…
iryabov May 26, 2026
4909e33
feat: enhance ETL metrics tracking by renaming counters and adding ex…
iryabov May 26, 2026
3c5ce03
refactor: update success logging in QueuedRawDataWriter to use output…
iryabov May 26, 2026
f2a3146
refactor: optimize query execution timing in SqlDataExtractor using m…
iryabov May 27, 2026
2f797fe
refactor: simplify logging in UntypedAggregationTransformer by removi…
iryabov May 27, 2026
f88adbe
refactor: streamline ETL pipeline structure by moving extraction fan …
iryabov May 28, 2026
53f8e95
refactor: update Kotlin to 2.1.21, Ktor to 3.1.3
iryabov Jun 2, 2026
d8c119c
refactor: update Exposed version to 0.61.0
iryabov Jun 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ build/
/tests/distr
/tests/work
**/out/
**/bin/
.kotlin/
4 changes: 4 additions & 0 deletions admin-app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ val testContainersVersion: String by parent!!.extra
val junitJupiterVersion: String by parent!!.extra
val quartzVersion: String by parent!!.extra
val logbackVersion: String by parent!!.extra
val micrometerVersion: String by parent!!.extra

repositories {
mavenLocal()
Expand All @@ -53,12 +54,15 @@ dependencies {
implementation("io.ktor:ktor-serialization-kotlinx-protobuf:$ktorVersion")
implementation("io.ktor:ktor-server-cors:$ktorVersion")
implementation("io.ktor:ktor-server-call-logging:$ktorVersion")
implementation("io.ktor:ktor-server-metrics-micrometer:$ktorVersion")
implementation("io.ktor:ktor-server-compression:$ktorVersion")
implementation("io.ktor:ktor-server-resources:$ktorVersion")
implementation("io.ktor:ktor-server-swagger:$ktorVersion")
implementation("io.github.microutils:kotlin-logging-jvm:$microutilsLoggingVersion")
implementation("org.kodein.di:kodein-di-framework-ktor-server-jvm:$kodeinVersion")
implementation("ch.qos.logback:logback-classic:$logbackVersion")
implementation("io.micrometer:micrometer-core:$micrometerVersion")
implementation("io.micrometer:micrometer-registry-prometheus:$micrometerVersion")
implementation("com.zaxxer:HikariCP:$zaxxerHikaricpVersion")
implementation("org.postgresql:postgresql:$postgresSqlVersion")
implementation("org.quartz-scheduler:quartz:$quartzVersion")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.epam.drill.admin.common.route.commonStatusPages
import com.epam.drill.admin.common.scheduler.DrillScheduler
import com.epam.drill.admin.config.SchedulerConfig
import com.epam.drill.admin.config.monitoringDIModule
import com.epam.drill.admin.config.schedulerDIModule
import com.epam.drill.admin.etl.config.etlDIModule
import com.epam.drill.admin.etl.config.updateMetricsEtlJob
Expand All @@ -41,30 +42,39 @@
import io.ktor.server.application.*
import io.ktor.server.application.call
import io.ktor.server.auth.*
import io.ktor.server.plugins.callloging.*
import io.ktor.server.plugins.calllogging.*
import io.ktor.server.plugins.compression.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.plugins.cors.routing.*
import io.ktor.server.metrics.micrometer.*
import io.ktor.server.plugins.origin
import io.ktor.server.plugins.statuspages.*
import io.ktor.server.plugins.swagger.*
import io.ktor.server.request.*
import io.ktor.server.resources.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.Json
import io.micrometer.prometheus.PrometheusMeterRegistry
import mu.KotlinLogging
import org.kodein.di.allInstances
import org.kodein.di.instance
import org.kodein.di.ktor.closestDI
import org.kodein.di.ktor.di
import javax.sql.DataSource

private val logger = KotlinLogging.logger {}
private val loggingIgnorePaths = setOf("/metrics", "/swagger")

Check failure on line 71 in admin-app/src/main/kotlin/com/epam/drill/admin/DrillAdminApplication.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "/metrics" 3 times.

See more on https://sonarcloud.io/project/issues?id=Drill4J_admin&issues=AZ6YI_jBR1Qv_XVviaQC&open=AZ6YI_jBR1Qv_XVviaQC&pullRequest=477

fun Application.module() {
val oauth2Enabled = oauth2Enabled
val simpleAuthEnabled = simpleAuthEnabled
di {
import(monitoringDIModule)
import(dataSourceDIModule)
import(schedulerDIModule)
import(jwtServicesDIModule)
Expand Down Expand Up @@ -92,9 +102,11 @@
if (oauth2Enabled) configureOAuthAuthentication(di)
roleBasedAuthentication()
}
shutdownCloseableServices()
routing {
rootRoute()
swaggerUI(path = "swagger", swaggerFile = "openapi.yml")
metricsRoute()
if (oauth2Enabled) configureOAuthRoutes()
route("/api") {
//UI
Expand Down Expand Up @@ -151,6 +163,10 @@

private fun Application.installPlugins() {
install(CallLogging) {
filter { call ->
val path = call.request.path()
loggingIgnorePaths.none { path.startsWith(it) }
}
format { call ->
val status = call.response.status()
val httpMethod = call.request.httpMethod.value
Expand Down Expand Up @@ -188,6 +204,11 @@
exposeHeader(HttpHeaders.Authorization)
exposeHeader(HttpHeaders.ContentType)
}

val meterRegistry by closestDI().instance<MeterRegistry>()
install(MicrometerMetrics) {
registry = meterRegistry
}
}

private fun StatusPagesConfig.defaultStatusPages() {
Expand All @@ -212,7 +233,7 @@
val scheduler by closestDI().instance<DrillScheduler>()

scheduler.init(KodeinJobFactory(closestDI()), dataSource)
environment.monitor.subscribe(ApplicationStopped) {
monitor.subscribe(ApplicationStopped) {
scheduler.shutdown()
}
scheduler.start()
Expand All @@ -235,4 +256,25 @@
.propertyOrNull("jsCoverageConverterAddress")
?.getString()
?.takeIf { it.isNotBlank() }
?: "http://localhost:8092" // TODO think of default
?: "http://localhost:8092" // TODO think of default

private fun Application.shutdownCloseableServices() {
val closableComponents: List<AutoCloseable> by closestDI().allInstances()

monitor.subscribe(ApplicationStopping) {
runBlocking {
closableComponents.map {
async {
it.close()
}
}.awaitAll()
}
}
}

private fun Route.metricsRoute() {
val meterRegistry by closestDI().instance<PrometheusMeterRegistry>()
get("/metrics") {
call.respondText(meterRegistry.scrape(), ContentType.Text.Plain.withCharset(Charsets.UTF_8))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package com.epam.drill.admin.config

import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import com.zaxxer.hikari.metrics.micrometer.MicrometerMetricsTrackerFactory
import io.micrometer.core.instrument.Metrics
import io.ktor.server.application.*
import io.ktor.server.config.*
import org.kodein.di.DI
Expand Down Expand Up @@ -48,8 +50,9 @@ class DatabaseConfig(private val config: ApplicationConfig) {
get() = config.propertyOrNull("ssl")?.getString()?.toBooleanStrictOrNull() ?: false
}

private fun DatabaseConfig.toHikariConfig(): HikariConfig = HikariConfig().apply {
private fun DatabaseConfig.toHikariConfig(poolName: String): HikariConfig = HikariConfig().apply {
this.driverClassName = "org.postgresql.Driver"
this.poolName = poolName
this.jdbcUrl = "jdbc:postgresql://${host}:${port}/${databaseName}"
this.username = this@toHikariConfig.username
this.password = this@toHikariConfig.password
Expand All @@ -58,6 +61,7 @@ private fun DatabaseConfig.toHikariConfig(): HikariConfig = HikariConfig().apply
this.transactionIsolation = "TRANSACTION_READ_UNCOMMITTED"
this.addDataSourceProperty("rewriteBatchedInserts", true)
this.addDataSourceProperty("rewriteBatchedStatements", true)
this.metricsTrackerFactory = MicrometerMetricsTrackerFactory(Metrics.globalRegistry)
if (ssl) {
this.addDataSourceProperty("ssl", true)
this.addDataSourceProperty("sslmode", "require")
Expand All @@ -73,7 +77,7 @@ val dataSourceDIModule = DI.Module("dataSource") {
DatabaseConfig(instance<Application>().environment.config.config("drill.metrics.database"))
}
bind<DataSource>() with singleton {
HikariDataSource(instance<DatabaseConfig>().toHikariConfig())
HikariDataSource(instance<DatabaseConfig>().toHikariConfig("drill-admin-main"))
}
bind<DataSource>(tag = "metrics") with singleton {
val mainConfig = instance<DatabaseConfig>()
Expand All @@ -86,7 +90,7 @@ val dataSourceDIModule = DI.Module("dataSource") {
) {
instance<DataSource>()
} else {
HikariDataSource(metricsConfig.toHikariConfig())
HikariDataSource(metricsConfig.toHikariConfig("drill-admin-metrics"))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright 2020 - 2022 EPAM Systems
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.epam.drill.admin.config

import io.micrometer.core.instrument.Clock
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Metrics
import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics
import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics
import io.micrometer.core.instrument.binder.system.ProcessorMetrics
import io.micrometer.core.instrument.binder.system.UptimeMetrics
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import io.prometheus.client.CollectorRegistry
import org.kodein.di.DI
import org.kodein.di.bind
import org.kodein.di.singleton

val monitoringDIModule = DI.Module("monitoring") {
bind<MeterRegistry>() with singleton {
prometheusRegistry
}
bind<PrometheusMeterRegistry>() with singleton {
prometheusRegistry
}
}

private val prometheusRegistry: PrometheusMeterRegistry by lazy {
PrometheusMeterRegistry(
PrometheusConfig.DEFAULT,
CollectorRegistry.defaultRegistry,
Clock.SYSTEM
).also { registry ->
if (Metrics.globalRegistry.registries.none { it === registry }) {
Metrics.addRegistry(registry)
}
ClassLoaderMetrics().bindTo(registry)
JvmMemoryMetrics().bindTo(registry)
JvmGcMetrics().bindTo(registry)
JvmThreadMetrics().bindTo(registry)
ProcessorMetrics().bindTo(registry)
FileDescriptorMetrics().bindTo(registry)
UptimeMetrics().bindTo(registry)
}
}
37 changes: 37 additions & 0 deletions admin-app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ ktor {
}
}

kafka {
bootstrapServers = "localhost:9092"
bootstrapServers = ${?DRILL_KAFKA_BOOTSTRAP_SERVERS}
securityProtocol = ${?DRILL_KAFKA_SECURITY_PROTOCOL}
saslMechanism = ${?DRILL_KAFKA_SASL_MECHANISM}
saslJaasConfig = ${?DRILL_KAFKA_SASL_JAAS_CONFIG}
sslTruststoreLocation = ${?DRILL_KAFKA_SSL_TRUSTSTORE_LOCATION}
sslTruststorePassword = ${?DRILL_KAFKA_SSL_TRUSTSTORE_PASSWORD}
sslKeystoreLocation = ${?DRILL_KAFKA_SSL_KEYSTORE_LOCATION}
sslKeystorePassword = ${?DRILL_KAFKA_SSL_KEYSTORE_PASSWORD}
sslKeyPassword = ${?DRILL_KAFKA_SSL_KEY_PASSWORD}
sslEndpointIdentificationAlgorithm = ${?DRILL_KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM}
}

drill {
database {
host = ${?DRILL_DB_HOST}
Expand Down Expand Up @@ -72,6 +86,29 @@ drill {
}
}
rawData {
queue {
type = ${?DRILL_RAW_DATA_QUEUE_TYPE}
capacity = ${?DRILL_RAW_DATA_QUEUE_CAPACITY}
workers = ${?DRILL_RAW_DATA_QUEUE_WORKERS}
kafka {
topic = "drill-raw-data"
topic = ${?DRILL_RAW_DATA_QUEUE_KAFKA_TOPIC}
consumerGroupId = "drill-writer"
consumerGroupId = ${?DRILL_RAW_DATA_QUEUE_KAFKA_CONSUMER_GROUP_ID}
pollTimeoutMs = 500
pollTimeoutMs = ${?DRILL_RAW_DATA_QUEUE_KAFKA_POLL_TIMEOUT_MS}
shutdownTimeoutMs = 5000
shutdownTimeoutMs = ${?DRILL_RAW_DATA_QUEUE_KAFKA_SHUTDOWN_TIMEOUT_MS}
producer {
clientId = "drill-admin-raw-data-producer"
clientId = ${?DRILL_RAW_DATA_QUEUE_KAFKA_PRODUCER_CLIENT_ID}
}
consumer {
clientId = "drill-admin-raw-data-consumer"
clientId = ${?DRILL_RAW_DATA_QUEUE_KAFKA_CONSUMER_CLIENT_ID}
}
}
}
}
metrics {
database {
Expand Down
3 changes: 2 additions & 1 deletion admin-auth/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ val kodeinVersion: String by parent!!.extra
val kotlinxSerializationVersion: String by parent!!.extra
val kotlinxDatetimeVersion: String by parent!!.extra
val mockitoKotlinVersion: String by parent!!.extra
val kotlinxCoroutinesVersion: String by parent!!.extra
val jbcryptVersion: String by parent!!.extra
val exposedVersion: String by parent!!.extra
val flywaydbVersion: String by parent!!.extra
Expand Down Expand Up @@ -72,7 +73,7 @@ dependencies {
testImplementation("org.testcontainers:postgresql:$testContainersVersion")
testImplementation("org.postgresql:postgresql:$postgresSqlVersion")
testImplementation("com.zaxxer:HikariCP:$zaxxerHikaricpVersion")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.5.2")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlinxCoroutinesVersion")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ApiKeyAuthenticationProvider internal constructor(
*/
class Configuration internal constructor(name: String?) : Config(name) {

internal lateinit var authenticationFunction: suspend ApplicationCall.(String) -> Principal?
internal lateinit var authenticationFunction: suspend ApplicationCall.(String) -> Any?

internal var challengeFunction: suspend (ApplicationCall) -> Unit = { call ->
call.respond(HttpStatusCode.Unauthorized)
Expand All @@ -60,7 +60,7 @@ class ApiKeyAuthenticationProvider internal constructor(
* A function that will check given API key retrieved from [headerName] and return [Principal],
* or null if credential does not correspond to an authenticated principal.
*/
fun validate(body: suspend ApplicationCall.(String) -> Principal?) {
fun validate(body: suspend ApplicationCall.(String) -> Any?) {
authenticationFunction = body
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import io.ktor.server.application.*
import io.ktor.server.auth.*
import io.ktor.client.*
import io.ktor.client.engine.apache.*
import io.ktor.content.*
import io.ktor.http.content.*
import io.ktor.http.*
import io.ktor.server.plugins.statuspages.*
import io.ktor.server.response.*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class RoleBasedAuthConfiguration {
}

class AuthorizedRouteSelector(private val description: String) : RouteSelector() {
override fun evaluate(context: RoutingResolveContext, segmentIndex: Int) = RouteSelectorEvaluation.Constant
override suspend fun evaluate(context: RoutingResolveContext, segmentIndex: Int) = RouteSelectorEvaluation.Constant

override fun toString(): String = "(authorize ${description})"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import io.ktor.server.application.*
import io.ktor.server.auth.*
import io.ktor.server.request.*
import io.ktor.server.routing.*
import io.ktor.util.pipeline.*
import org.kodein.di.instance
import org.kodein.di.ktor.closestDI as di
import com.epam.drill.admin.common.route.*
Expand Down Expand Up @@ -149,7 +148,7 @@ fun Route.resetPasswordRoute() {
}
}

private fun PipelineContext<Unit, ApplicationCall>.throwExceptionIfCurrentUserIs(userId: Int, message: String) {
private fun RoutingContext.throwExceptionIfCurrentUserIs(userId: Int, message: String) {
if (call.principal<User>()?.id == userId)
throw ForbiddenOperationException(message)
}
Loading
Loading