From 2275ba5b5e813d9f08f35f3433821aaef373afb2 Mon Sep 17 00:00:00 2001 From: Nikita Sychev Date: Tue, 7 Apr 2026 13:43:22 +0500 Subject: [PATCH] Refactor EventStream to get rid of StateFlow --- .../org/icpclive/balloons/Application.kt | 8 +- .../org/icpclive/balloons/event/CDSFetcher.kt | 36 ----- .../balloons/event/ContestController.kt | 20 ++- .../icpclive/balloons/event/EventStream.kt | 149 +++++++++++------- 4 files changed, 108 insertions(+), 105 deletions(-) delete mode 100644 src/main/kotlin/org/icpclive/balloons/event/CDSFetcher.kt diff --git a/src/main/kotlin/org/icpclive/balloons/Application.kt b/src/main/kotlin/org/icpclive/balloons/Application.kt index 38f00fa..4c53d14 100644 --- a/src/main/kotlin/org/icpclive/balloons/Application.kt +++ b/src/main/kotlin/org/icpclive/balloons/Application.kt @@ -37,10 +37,9 @@ import org.icpclive.balloons.auth.authController import org.icpclive.balloons.auth.installJwt import org.icpclive.balloons.db.DatabaseConfig import org.icpclive.balloons.db.databaseModule -import org.icpclive.balloons.event.CDSFetcher import org.icpclive.balloons.event.EventStream import org.icpclive.balloons.event.contestController -import org.icpclive.balloons.event.launchCDSFetcher +import org.icpclive.balloons.event.launchCDS import org.icpclive.balloons.tools.H2Shell import org.icpclive.balloons.tools.ResetContest import org.icpclive.balloons.tools.Volunteer @@ -75,8 +74,7 @@ object Application : CliktCommand("balloons") { val jwtVerifier = JWT.require(Algorithm.HMAC256(secretKeyRepository.secretKey)).build() val credentialValidator = CredentialValidator(volunteerRepository) val webSocketAuthenticator = WebSocketAuthenticator(jwtVerifier, credentialValidator) - val eventStream = EventStream(balloonRepository) - val cdsFetcher = CDSFetcher(eventStream, cdsSettings) + val eventStream = EventStream(balloonRepository, cdsSettings) embeddedServer( Netty, @@ -113,7 +111,7 @@ object Application : CliktCommand("balloons") { installJwt(jwtVerifier, credentialValidator) - launchCDSFetcher(cdsFetcher) + launchCDS(eventStream) routing { adminController(volunteerRepository) diff --git a/src/main/kotlin/org/icpclive/balloons/event/CDSFetcher.kt b/src/main/kotlin/org/icpclive/balloons/event/CDSFetcher.kt deleted file mode 100644 index 99db0e0..0000000 --- a/src/main/kotlin/org/icpclive/balloons/event/CDSFetcher.kt +++ /dev/null @@ -1,36 +0,0 @@ -package org.icpclive.balloons.event - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED -import kotlinx.coroutines.flow.buffer -import kotlinx.coroutines.launch -import org.icpclive.cds.InfoUpdate -import org.icpclive.cds.RunUpdate -import org.icpclive.cds.adapters.addComputedData -import org.icpclive.cds.cli.CdsCommandLineOptions - -class CDSFetcher( - private val eventStream: EventStream, - settings: CdsCommandLineOptions, -) { - private val cds = settings.toFlow().addComputedData() - - suspend fun run() { - cds.buffer(capacity = UNLIMITED).collect { event -> - when (event) { - is RunUpdate -> eventStream.processRun(event.newInfo) - is InfoUpdate -> eventStream.processContestInfo(event.newInfo) - else -> { - // Unsupported command - } - } - } - } -} - -fun launchCDSFetcher(cdsFetcher: CDSFetcher) { - CoroutineScope(Job()).launch { - cdsFetcher.run() - } -} diff --git a/src/main/kotlin/org/icpclive/balloons/event/ContestController.kt b/src/main/kotlin/org/icpclive/balloons/event/ContestController.kt index d9585c1..d4abd70 100644 --- a/src/main/kotlin/org/icpclive/balloons/event/ContestController.kt +++ b/src/main/kotlin/org/icpclive/balloons/event/ContestController.kt @@ -11,7 +11,6 @@ import io.ktor.websocket.readText import io.ktor.websocket.send import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.consumeEach -import kotlinx.coroutines.launch import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json import org.icpclive.balloons.BalloonOptions @@ -68,17 +67,16 @@ fun Route.contestController( } val outgoingStream = - launch { - var expectState = true - - eventStream.stream.collect { (state, event) -> - if (expectState || event == Reload) { - expectState = false - send(jsonSerializer.encodeToString(state)) - } else { - send(jsonSerializer.encodeToString(event)) + eventStream.subscribe(this) { message -> + // TODO: serialize whole message + val frameContent = + when { + message.state != null -> jsonSerializer.encodeToString(message.state) + message.event != null -> jsonSerializer.encodeToString(message.event) + else -> throw IllegalArgumentException("invalid event: $message") } - } + + send(frameContent) } try { diff --git a/src/main/kotlin/org/icpclive/balloons/event/EventStream.kt b/src/main/kotlin/org/icpclive/balloons/event/EventStream.kt index da2383a..1fc31c2 100644 --- a/src/main/kotlin/org/icpclive/balloons/event/EventStream.kt +++ b/src/main/kotlin/org/icpclive/balloons/event/EventStream.kt @@ -1,21 +1,68 @@ package org.icpclive.balloons.event -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.asStateFlow -import kotlinx.coroutines.flow.update +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import org.icpclive.balloons.db.BalloonRepository import org.icpclive.balloons.db.tables.references.BALLOON import org.icpclive.balloons.db.tables.references.VOLUNTEER +import org.icpclive.cds.InfoUpdate +import org.icpclive.cds.RunUpdate +import org.icpclive.cds.adapters.addComputedData import org.icpclive.cds.api.ContestInfo import org.icpclive.cds.api.RunInfo import org.icpclive.cds.api.RunResult +import org.icpclive.cds.cli.CdsCommandLineOptions import org.icpclive.cds.util.getLogger +import kotlin.getValue class EventStream( private val balloonRepository: BalloonRepository, + settings: CdsCommandLineOptions, ) { - private val sink = MutableStateFlow>(State(Contest("Loading", listOf(), listOf()), mapOf()) to Reload) - val stream = sink.asStateFlow() + // We maintain current [state] of the contest, and [eventFlow] with updates to that state. + // When new subscriber arrives, they receive state at the time of connection, and all subsequent updates. + // To avoid locking by slow clients, we extend buffer capacity of the flow. It may eat RAM. + // + // You should never emit manually to [eventFlow] or update [state]. Always use [update] method that + // prevents concurrent modifications. + private var state: State = State(Contest("", listOf(), listOf()), mapOf()) + private val eventFlow = MutableSharedFlow>(replay = 1, extraBufferCapacity = Int.MAX_VALUE) + + private val mutex = Mutex() + + private val cds = settings.toFlow().addComputedData() + + suspend fun consumeCds() { + cds.buffer(capacity = UNLIMITED).collect { event -> + when (event) { + is RunUpdate -> processRun(event.newInfo) + is InfoUpdate -> processContestInfo(event.newInfo) + else -> {} + } + } + } + + fun subscribe( + scope: CoroutineScope, + process: suspend (ClientMessage) -> Unit, + ): Job = + scope.launch { + var hasState = false + eventFlow.collect { (state, event) -> + if (!hasState) { + process(ClientMessage(state = state)) + hasState = true + } else { + process(ClientMessage(event = event)) + } + } + } /** * @return `true` if command succeeded, `false` otherwise (in case of concurrent modification, etc.) @@ -23,63 +70,46 @@ class EventStream( suspend fun processCommand( command: Command, volunteerId: Long, - ): Boolean = - when (command) { - is BalloonCommand -> processBalloonCommand(command, volunteerId) + ): Boolean { + if (command !is BalloonCommand) { + throw NotImplementedError("${command::class.qualifiedName} command is not supported yet") } - private suspend fun processBalloonCommand( - command: BalloonCommand, - volunteerId: Long, - ): Boolean { - val balloon = - getState().balloons[command.runId] - ?: return false - - when (command) { - is TakeBalloon -> { - if (!balloonRepository.reserveBalloon(balloon, volunteerId)) { - return false - } - } + val balloon = state.balloons[command.runId] ?: return false - is DropBalloon -> { - if (!balloonRepository.dropBalloon(balloon, volunteerId)) { - return false - } + val commandResult = + when (command) { + is TakeBalloon -> balloonRepository.reserveBalloon(balloon, volunteerId) + is DropBalloon -> balloonRepository.dropBalloon(balloon, volunteerId) + is DeliverBalloon -> balloonRepository.deliverBalloon(balloon, volunteerId) } - is DeliverBalloon -> { - if (!balloonRepository.deliverBalloon(balloon, volunteerId)) { - return false - } - } + if (!commandResult) { + return false } val newBalloon = balloon.withDelivery() if (balloon != newBalloon) { - updateSink(BalloonUpdated(newBalloon)) + update(BalloonUpdated(newBalloon)) } return true } - // This can be written in non-concurrent fashion. - fun processContestInfo(contestInfo: ContestInfo) { + private suspend fun processContestInfo(contestInfo: ContestInfo) { val newContest = Contest(contestInfo) - if (newContest != getState().contest) { - updateSink(ContestUpdated(newContest)) + if (newContest != state.contest) { + update(ContestUpdated(newContest)) } } - // This can be written in non-concurrent fashion. - suspend fun processRun(runInfo: RunInfo) { + private suspend fun processRun(runInfo: RunInfo) { val runId = runInfo.id.value val isBalloon = !runInfo.isHidden && runInfo.result.isSolved() && !(runInfo.result as RunResult.ICPC).isAfterFirstOk - val existingBalloon = getState().balloons[runId] + val existingBalloon = state.balloons[runId] if (isBalloon) { val balloon = @@ -97,27 +127,20 @@ class EventStream( logger.info { "Balloon for submission $runId is updated: was ${existingBalloon.isFTS}, now ${balloon.isFTS}" } } - updateSink(BalloonUpdated(balloon)) + update(BalloonUpdated(balloon)) } } else if (existingBalloon != null) { // Extremely rare: possibly rejudge from OK to WA. We're removing a balloon. logger.info { "Balloon for submission $runId is recalled" } - updateSink(BalloonDeleted(runId)) + update(BalloonDeleted(runId)) } } - private fun getState() = sink.value.first - - private fun updateSink(event: Event) = sink.update { (state, _) -> state with event } - - /** - * Use this as the last operator in [MutableStateFlow.update]. - * - * It should be THE ONLY way to update the state. Each event should be applied in its own `update`. - * The exception is [Reload] event that sends new state to connected clients. In that case, make - * all updates inside [MutableStateFlow.update] and finish with `state with Reload`. - */ - private infix fun State.with(event: Event) = event.applyTo(this) to event + private suspend fun update(event: Event) = + mutex.withLock { + state = event.applyTo(state) + eventFlow.emit(state to event) + } private fun RunResult.isSolved(): Boolean = when (this) { @@ -138,3 +161,23 @@ class EventStream( private val logger by getLogger() } } + +data class ClientMessage( + val state: State? = null, + val event: Event? = null, +) { + init { + if (state == null && event == null) { + throw IllegalArgumentException("Either state or event must be non-null") + } + if (state != null && event != null) { + throw IllegalArgumentException("Only one of state or event must be non-null") + } + } +} + +fun launchCDS(eventStream: EventStream) { + CoroutineScope(Job()).launch { + eventStream.consumeCds() + } +}