Skip to content

Ruchuee/Weft

Repository files navigation

Weft

Weft is a small structured concurrency toolkit for Kotlin on the JVM. It runs ordinary blocking Kotlin code on virtual threads, without suspend, coroutine scopes, dispatchers, or continuation machinery.

Use it when you want to start several blocking operations, keep their lifetime tied to a scope, and get predictable cleanup when the scope exits.

import me.ruchuee.weft.scope.async
import me.ruchuee.weft.scope.weftScope

data class UserPage(val user: User, val posts: List<Post>)

fun loadPage(userId: UserId): UserPage = weftScope {
    val user = async { loadUser(userId) }
    val posts = async { loadPosts(userId) }

    UserPage(
        user = user.await(),
        posts = posts.await(),
    )
}

Why Use Weft

  • Write regular blocking Kotlin functions.
  • Run child tasks on JVM virtual threads by default.
  • Keep every child task owned by a scope.
  • Cancel and join unfinished work automatically when a lambda scope exits.
  • Choose whether child failures are inspected manually or propagated at scope exit.
  • Limit, retry, take the first result, and time-box work with small composable helpers.

Requirements

  • JDK 25+
  • Kotlin/JVM

The Gradle build uses Kotlin 2.3.21 and the JDK 25 toolchain.

Packages

  • me.ruchuee.weft.scope — scopes, async, fork, and failure modes.
  • me.ruchuee.weft.taskTask, TaskState, and TaskResult.
  • me.ruchuee.weft.join — join and first-completion helpers.
  • me.ruchuee.weft.cancellation — cancellation helpers and Weft exceptions.
  • me.ruchuee.weft.timeout — timeout support.
  • me.ruchuee.weft.retry — retry policies and helpers.
  • me.ruchuee.weft.limiter — blocking concurrency limiter.

Basic Usage

Create a lambda scope with weftScope. Any task created with async belongs to that scope. When the scope returns or throws, unfinished children are cancelled and joined before control leaves the scope.

import me.ruchuee.weft.join.joinAll
import me.ruchuee.weft.scope.async
import me.ruchuee.weft.scope.weftScope

val values = weftScope {
    val tasks = (1..100).map { n ->
        async { fetchValue(n) }
    }

    tasks.joinAll()
}

async starts immediately. await() blocks until the task completes and returns its value or throws its failure.

import me.ruchuee.weft.scope.async
import me.ruchuee.weft.scope.weftScope

val greeting = weftScope {
    val task = async { "hello" }
    task.await()
}

For long-lived owners stored in objects, use WeftScopeOwner and close it when the owner shuts down.

import me.ruchuee.weft.scope.WeftScopeOwner
import me.ruchuee.weft.scope.async
import me.ruchuee.weft.task.Task

class UserLoader : AutoCloseable {
    private val scope = WeftScopeOwner()

    fun load(id: UserId): Task<User> =
        scope.async { loadUser(id) }

    override fun close() {
        scope.close()
    }
}

Failure Behavior

The default scope mode is lenient. A failed child stores its exception in the Task, and the scope does not invent a policy for you.

import me.ruchuee.weft.scope.async
import me.ruchuee.weft.scope.weftScope

val result = weftScope {
    val task = async { callRemoteService() }
    task.awaitResult()
}

Use weftScopeStrict when an unobserved child failure should fail the scope on exit.

import me.ruchuee.weft.scope.async
import me.ruchuee.weft.scope.weftScopeStrict

val page = weftScopeStrict {
    val user = async { loadUser(userId) }
    val posts = async { loadPosts(userId) }

    UserPage(user.await(), posts.await())
}

If you await a failed task, that failure is considered observed. If the scope body itself throws, that exception wins and child failures are attached as suppressed exceptions where applicable.

Common Helpers

Join a group of tasks in input order:

import me.ruchuee.weft.join.joinAll
import me.ruchuee.weft.scope.async
import me.ruchuee.weft.scope.weftScopeStrict

val results = weftScopeStrict {
    urls.map { url ->
        async { httpGet(url) }
    }.joinAll()
}

Collect successes, failures, and cancellations without throwing for task outcomes:

import me.ruchuee.weft.join.joinAllSettled
import me.ruchuee.weft.scope.async
import me.ruchuee.weft.scope.weftScope
import me.ruchuee.weft.task.TaskResult

val settled: List<TaskResult<Response>> = weftScope {
    mirrors.map { mirror ->
        async { mirror.fetch() }
    }.joinAllSettled()
}

Take the first successful task and cancel the rest:

import me.ruchuee.weft.join.firstSuccess
import me.ruchuee.weft.scope.async
import me.ruchuee.weft.scope.weftScope

val response = weftScope {
    mirrors.map { mirror ->
        async { mirror.fetch() }
    }.firstSuccess()
}

Apply a timeout:

import me.ruchuee.weft.timeout.timeout
import kotlin.time.Duration.Companion.milliseconds

val user = timeout(500.milliseconds) {
    loadUser(userId)
}

Retry work:

import me.ruchuee.weft.retry.RetryDelay
import me.ruchuee.weft.retry.RetryPolicy
import me.ruchuee.weft.retry.retry
import kotlin.time.Duration.Companion.milliseconds

val value = retry(
    RetryPolicy(
        maxAttempts = 3,
        delay = RetryDelay.Exponential(initial = 100.milliseconds),
    )
) {
    callRemoteService()
}

Limit access to an external resource:

import me.ruchuee.weft.join.joinAll
import me.ruchuee.weft.limiter.limiter
import me.ruchuee.weft.scope.async
import me.ruchuee.weft.scope.weftScopeStrict

val db = limiter(32)

weftScopeStrict {
    items.map { item ->
        async {
            db.run {
                writeItem(item)
            }
        }
    }.joinAll()
}

Create a child sub-scope with fork when you need a bounded bundle of work owned by an outer scope:

import me.ruchuee.weft.scope.WeftScope
import me.ruchuee.weft.scope.async
import me.ruchuee.weft.scope.fork
import me.ruchuee.weft.task.Task

fun handle(scope: WeftScope, request: Request): Task<Unit> =
    scope.fork {
        val user = async { loadUser(request.userId) }
        val audit = async { writeAudit(request) }

        user.await()
        audit.await()
        Unit
    }

Cancellation

Cancellation is cooperative and uses JVM interruption. Weft interrupts running tasks when a task or scope is cancelled, but user code must still reach an interruptible operation or check cancellation explicitly.

Inside a scope body:

import me.ruchuee.weft.scope.weftScope

weftScope {
    for (item in items) {
        checkCancelled()
        process(item)
    }
}

Inside an async task body, the top-level checkCancelled() checks the current thread interrupt state.

Custom Executors

By default, Weft uses a process-wide virtual-thread-per-task executor. You can pass a custom Executor to any scope-creating API.

import me.ruchuee.weft.join.joinAll
import me.ruchuee.weft.scope.async
import me.ruchuee.weft.scope.weftScopeStrict

val pool = Executors.newWorkStealingPool()
try {
    val sum = weftScopeStrict(executor = pool) {
        inputs.map { input ->
            async { cpuHeavyWork(input) }
        }.joinAll().sum()
    }
} finally {
    pool.shutdown()
}

Weft does not shut down executors supplied by user code.

About

Structured concurrency for Kotlin/JVM using virtual threads.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages