Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/main/kotlin/org/icpclive/balloons/Application.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ import org.icpclive.balloons.auth.authController
import org.icpclive.balloons.auth.installJwt
import org.icpclive.balloons.db.DatabaseConfig
import org.icpclive.balloons.db.databaseModule
import org.icpclive.balloons.event.CDSFetcher
import org.icpclive.balloons.event.EventStream
import org.icpclive.balloons.event.contestController
import org.icpclive.balloons.event.launchCDSFetcher
import org.icpclive.balloons.event.launchCDS
import org.icpclive.balloons.tools.H2Shell
import org.icpclive.balloons.tools.ResetContest
import org.icpclive.balloons.tools.Volunteer
Expand Down Expand Up @@ -75,8 +74,7 @@ object Application : CliktCommand("balloons") {
val jwtVerifier = JWT.require(Algorithm.HMAC256(secretKeyRepository.secretKey)).build()
val credentialValidator = CredentialValidator(volunteerRepository)
val webSocketAuthenticator = WebSocketAuthenticator(jwtVerifier, credentialValidator)
val eventStream = EventStream(balloonRepository)
val cdsFetcher = CDSFetcher(eventStream, cdsSettings)
val eventStream = EventStream(balloonRepository, cdsSettings)

embeddedServer(
Netty,
Expand Down Expand Up @@ -113,7 +111,7 @@ object Application : CliktCommand("balloons") {

installJwt(jwtVerifier, credentialValidator)

launchCDSFetcher(cdsFetcher)
launchCDS(eventStream)

routing {
adminController(volunteerRepository)
Expand Down
36 changes: 0 additions & 36 deletions src/main/kotlin/org/icpclive/balloons/event/CDSFetcher.kt

This file was deleted.

20 changes: 9 additions & 11 deletions src/main/kotlin/org/icpclive/balloons/event/ContestController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import io.ktor.websocket.readText
import io.ktor.websocket.send
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import org.icpclive.balloons.BalloonOptions
Expand Down Expand Up @@ -68,17 +67,16 @@ fun Route.contestController(
}

val outgoingStream =
launch {
var expectState = true

eventStream.stream.collect { (state, event) ->
if (expectState || event == Reload) {
expectState = false
send(jsonSerializer.encodeToString(state))
} else {
send(jsonSerializer.encodeToString(event))
eventStream.subscribe(this) { message ->
// TODO: serialize whole message
val frameContent =
when {
message.state != null -> jsonSerializer.encodeToString(message.state)
message.event != null -> jsonSerializer.encodeToString(message.event)
else -> throw IllegalArgumentException("invalid event: $message")
}
}

send(frameContent)
}

try {
Expand Down
149 changes: 96 additions & 53 deletions src/main/kotlin/org/icpclive/balloons/event/EventStream.kt
Original file line number Diff line number Diff line change
@@ -1,85 +1,115 @@
package org.icpclive.balloons.event

import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.icpclive.balloons.db.BalloonRepository
import org.icpclive.balloons.db.tables.references.BALLOON
import org.icpclive.balloons.db.tables.references.VOLUNTEER
import org.icpclive.cds.InfoUpdate
import org.icpclive.cds.RunUpdate
import org.icpclive.cds.adapters.addComputedData
import org.icpclive.cds.api.ContestInfo
import org.icpclive.cds.api.RunInfo
import org.icpclive.cds.api.RunResult
import org.icpclive.cds.cli.CdsCommandLineOptions
import org.icpclive.cds.util.getLogger
import kotlin.getValue

class EventStream(
private val balloonRepository: BalloonRepository,
settings: CdsCommandLineOptions,
) {
private val sink = MutableStateFlow<Pair<State, Event>>(State(Contest("Loading", listOf(), listOf()), mapOf()) to Reload)
val stream = sink.asStateFlow()
// We maintain current [state] of the contest, and [eventFlow] with updates to that state.
// When new subscriber arrives, they receive state at the time of connection, and all subsequent updates.
// To avoid locking by slow clients, we extend buffer capacity of the flow. It may eat RAM.
//
// You should never emit manually to [eventFlow] or update [state]. Always use [update] method that
// prevents concurrent modifications.
private var state: State = State(Contest("", listOf(), listOf()), mapOf())
private val eventFlow = MutableSharedFlow<Pair<State, Event>>(replay = 1, extraBufferCapacity = Int.MAX_VALUE)

private val mutex = Mutex()

private val cds = settings.toFlow().addComputedData()

suspend fun consumeCds() {
cds.buffer(capacity = UNLIMITED).collect { event ->
when (event) {
is RunUpdate -> processRun(event.newInfo)
is InfoUpdate -> processContestInfo(event.newInfo)
else -> {}
}
}
}

fun subscribe(
scope: CoroutineScope,
process: suspend (ClientMessage) -> Unit,
): Job =
scope.launch {
var hasState = false
eventFlow.collect { (state, event) ->
if (!hasState) {
process(ClientMessage(state = state))
hasState = true
} else {
process(ClientMessage(event = event))
}
}
}

/**
* @return `true` if command succeeded, `false` otherwise (in case of concurrent modification, etc.)
*/
suspend fun processCommand(
command: Command,
volunteerId: Long,
): Boolean =
when (command) {
is BalloonCommand -> processBalloonCommand(command, volunteerId)
): Boolean {
if (command !is BalloonCommand) {
throw NotImplementedError("${command::class.qualifiedName} command is not supported yet")
}

private suspend fun processBalloonCommand(
command: BalloonCommand,
volunteerId: Long,
): Boolean {
val balloon =
getState().balloons[command.runId]
?: return false

when (command) {
is TakeBalloon -> {
if (!balloonRepository.reserveBalloon(balloon, volunteerId)) {
return false
}
}
val balloon = state.balloons[command.runId] ?: return false

is DropBalloon -> {
if (!balloonRepository.dropBalloon(balloon, volunteerId)) {
return false
}
val commandResult =
when (command) {
is TakeBalloon -> balloonRepository.reserveBalloon(balloon, volunteerId)
is DropBalloon -> balloonRepository.dropBalloon(balloon, volunteerId)
is DeliverBalloon -> balloonRepository.deliverBalloon(balloon, volunteerId)
}

is DeliverBalloon -> {
if (!balloonRepository.deliverBalloon(balloon, volunteerId)) {
return false
}
}
if (!commandResult) {
return false
}

val newBalloon = balloon.withDelivery()

if (balloon != newBalloon) {
updateSink(BalloonUpdated(newBalloon))
update(BalloonUpdated(newBalloon))
}

return true
}

// This can be written in non-concurrent fashion.
fun processContestInfo(contestInfo: ContestInfo) {
private suspend fun processContestInfo(contestInfo: ContestInfo) {
val newContest = Contest(contestInfo)

if (newContest != getState().contest) {
updateSink(ContestUpdated(newContest))
if (newContest != state.contest) {
update(ContestUpdated(newContest))
}
}

// This can be written in non-concurrent fashion.
suspend fun processRun(runInfo: RunInfo) {
private suspend fun processRun(runInfo: RunInfo) {
val runId = runInfo.id.value
val isBalloon = !runInfo.isHidden && runInfo.result.isSolved() && !(runInfo.result as RunResult.ICPC).isAfterFirstOk

val existingBalloon = getState().balloons[runId]
val existingBalloon = state.balloons[runId]

if (isBalloon) {
val balloon =
Expand All @@ -97,27 +127,20 @@ class EventStream(
logger.info { "Balloon for submission $runId is updated: was ${existingBalloon.isFTS}, now ${balloon.isFTS}" }
}

updateSink(BalloonUpdated(balloon))
update(BalloonUpdated(balloon))
}
} else if (existingBalloon != null) {
// Extremely rare: possibly rejudge from OK to WA. We're removing a balloon.
logger.info { "Balloon for submission $runId is recalled" }
updateSink(BalloonDeleted(runId))
update(BalloonDeleted(runId))
}
}

private fun getState() = sink.value.first

private fun updateSink(event: Event) = sink.update { (state, _) -> state with event }

/**
* Use this as the last operator in [MutableStateFlow.update].
*
* It should be THE ONLY way to update the state. Each event should be applied in its own `update`.
* The exception is [Reload] event that sends new state to connected clients. In that case, make
* all updates inside [MutableStateFlow.update] and finish with `state with Reload`.
*/
private infix fun State.with(event: Event) = event.applyTo(this) to event
private suspend fun update(event: Event) =
mutex.withLock {
state = event.applyTo(state)
eventFlow.emit(state to event)
}

private fun RunResult.isSolved(): Boolean =
when (this) {
Expand All @@ -138,3 +161,23 @@ class EventStream(
private val logger by getLogger()
}
}

data class ClientMessage(
val state: State? = null,
val event: Event? = null,
) {
init {
if (state == null && event == null) {
throw IllegalArgumentException("Either state or event must be non-null")
}
if (state != null && event != null) {
throw IllegalArgumentException("Only one of state or event must be non-null")
}
}
}

fun launchCDS(eventStream: EventStream) {
CoroutineScope(Job()).launch {
eventStream.consumeCds()
}
}
Loading