Skip to content

cordfuse/turnq

Repository files navigation

turnq

When multiple agents or processes write to a shared resource concurrently — a git branch, a deploy slot, a database migration — the naive fix is retry with jitter: sleep a random amount, try again, hope for the best. It works until it doesn't. Under load, agents collide repeatedly, waste work already done, and wait time is unbounded.

turnq replaces that gamble with a queue. Clients enqueue on a named channel, receive their turn when they're at the front, do their work, and release. Exactly one client holds the token at a time. Order is strict FIFO. No retries, no conflicts, no tuning magic numbers.

Two modes: local (file lock, no server) and distributed (HTTP server, multi-host).


Quick start

import { createCoordinator } from '@cordfuse/turnq/coordinator';

const coordinator = await createCoordinator();

await coordinator.withTurn('my-channel', async () => {
  // exactly one process is here at a time
});

No server, no config. createCoordinator() with no arguments uses a local file lock (flock(2) via POSIX). Safe across multiple processes on the same host.


Modes

Local (default)

No server required. Uses flock(2) on a temp file — the OS releases the lock automatically if the process dies, so stale locks are impossible.

const coordinator = await createCoordinator();
// [turnq] local file lock mode

Lock files live at os.tmpdir()/turnq-locks/<channel>.lock.

Distributed

Runs against a turnq HTTP server. Serializes turns across multiple hosts.

const coordinator = await createCoordinator({
  url: 'https://turnq.example.com',
  apiKey: process.env.TURNQ_API_KEY,
});
// [turnq] distributed — https://turnq.example.com

Fallback (default: true)

If the distributed server is unreachable or credentials are missing, createCoordinator falls back to local mode and logs a warning.

// default — falls back to local silently
const coordinator = await createCoordinator({
  url: 'https://turnq.example.com',
  apiKey: process.env.TURNQ_API_KEY,
});

// strict — throws if distributed is unavailable
const coordinator = await createCoordinator({
  url: 'https://turnq.example.com',
  apiKey: process.env.TURNQ_API_KEY,
  fallback: false,
});

API

createCoordinator(opts?)

interface CoordinatorOptions {
  url?: string;       // turnq server URL — omit for local mode
  apiKey?: string;    // required when url is set
  fallback?: boolean; // default: true — fall back to local if distributed unavailable
}

createCoordinator(opts?: CoordinatorOptions): Promise<Coordinator>

Coordinator

interface Coordinator {
  createChannel(name: string, opts?: { leaseMs?: number }): Promise<void>;
  withTurn<T>(channel: string, fn: () => Promise<T>): Promise<T>;
  close(): void;
}

createChannel is a no-op in local mode. In distributed mode it ensures the channel exists on the server before use.


Direct client usage

For lower-level control, import TurnqClient directly:

import { TurnqClient } from '@cordfuse/turnq/client';

const client = new TurnqClient('https://turnq.example.com', {
  apiKey: process.env.TURNQ_API_KEY,
});

await client.createChannel('my-channel', { leaseMs: 60_000 });

await client.withTurn('my-channel', async (ctx) => {
  await ctx.withStep('fetch',  () => git.fetch());
  await ctx.withStep('commit', () => git.commit(message, files));
  await ctx.withStep('push',   () => git.push('origin', 'main'));
});

client.close();

Running the server

Clone the repo and use docker compose:

git clone https://github.com/cordfuse/turnq
cd turnq
TURNQ_API_KEY=your-key docker compose up -d

Admin endpoints

GET    /health
GET    /metrics                           Prometheus-compatible
GET    /channels/:name/queue              inspect queue
DELETE /channels/:name/holder             force-release current holder

Protocol at a glance

HTTP for actions, SSE for notifications. JSON throughout.

POST   /channels                          create channel
POST   /channels/{name}/enqueue           → { requestId, position }
GET    /channels/{name}/subscribe         SSE stream
POST   /channels/{name}/release           complete turn
POST   /channels/{name}/abort             leave queue

Turn lifecycle: enqueue → subscribe → wait for your-turn → do work → release.


bash + curl

TURNQ_URL="https://turnq.example.com"
CHANNEL="my-channel"
CLIENT_ID=$(uuidgen)

REQUEST_ID=$(curl -s -X POST "$TURNQ_URL/channels/$CHANNEL/enqueue" \
  -H "x-api-key: $TURNQ_API_KEY" \
  -H "content-type: application/json" \
  -d "{\"clientId\":\"$CLIENT_ID\"}" | jq -r .requestId)

curl -sN "$TURNQ_URL/channels/$CHANNEL/subscribe?clientId=$CLIENT_ID&requestId=$REQUEST_ID" \
  -H "x-api-key: $TURNQ_API_KEY" | while IFS= read -r line; do
  [[ "$line" == "event: your-turn" ]] || continue
  do_work
  curl -s -X POST "$TURNQ_URL/channels/$CHANNEL/release" \
    -H "x-api-key: $TURNQ_API_KEY" \
    -H "content-type: application/json" \
    -d "{\"clientId\":\"$CLIENT_ID\",\"requestId\":\"$REQUEST_ID\"}"
  break
done

Go

func withTurn(baseURL, channel, apiKey string, fn func() error) error {
    clientID := newUUID()
    setHeaders := func(r *http.Request) {
        r.Header.Set("x-api-key", apiKey)
        r.Header.Set("content-type", "application/json")
    }

    body, _ := json.Marshal(map[string]string{"clientId": clientID})
    req, _  := http.NewRequest("POST", fmt.Sprintf("%s/channels/%s/enqueue", baseURL, channel), bytes.NewReader(body))
    setHeaders(req)
    resp, _ := http.DefaultClient.Do(req)
    var enq struct{ RequestID string `json:"requestId"` }
    json.NewDecoder(resp.Body).Decode(&enq)
    resp.Body.Close()

    url   := fmt.Sprintf("%s/channels/%s/subscribe?clientId=%s&requestId=%s", baseURL, channel, clientID, enq.RequestID)
    req, _ = http.NewRequest("GET", url, nil)
    req.Header.Set("x-api-key", apiKey)
    resp, _ = http.DefaultClient.Do(req)
    defer resp.Body.Close()

    scanner, currentEvent := bufio.NewScanner(resp.Body), ""
    for scanner.Scan() {
        line := scanner.Text()
        if strings.HasPrefix(line, "event:") { currentEvent = strings.TrimSpace(line[6:]) }
        if strings.HasPrefix(line, "data:") && currentEvent == "your-turn" { break }
    }

    err := fn()

    body, _ = json.Marshal(map[string]any{
        "clientId": clientID, "requestId": enq.RequestID,
        "result": map[string]bool{"success": err == nil},
    })
    req, _ = http.NewRequest("POST", fmt.Sprintf("%s/channels/%s/release", baseURL, channel), bytes.NewReader(body))
    setHeaders(req)
    http.DefaultClient.Do(req)
    return err
}

Java

import java.net.http.*;
import java.net.URI;
import java.util.UUID;

var turnqUrl  = "https://turnq.example.com";
var channel   = "my-channel";
var clientId  = UUID.randomUUID().toString();
var http      = HttpClient.newHttpClient();

// Enqueue
var enqResp = http.send(
    HttpRequest.newBuilder()
        .POST(HttpRequest.BodyPublishers.ofString("{\"clientId\":\"" + clientId + "\"}"))
        .uri(URI.create(turnqUrl + "/channels/" + channel + "/enqueue"))
        .header("x-api-key", apiKey).header("content-type", "application/json")
        .build(),
    HttpResponse.BodyHandlers.ofString());
var requestId = parseRequestId(enqResp.body());

// Subscribe (SSE)
var sseResp = http.send(
    HttpRequest.newBuilder()
        .GET()
        .uri(URI.create(turnqUrl + "/channels/" + channel +
            "/subscribe?clientId=" + clientId + "&requestId=" + requestId))
        .header("x-api-key", apiKey)
        .build(),
    HttpResponse.BodyHandlers.ofLines());

String currentEvent = "";
for (var line : (Iterable<String>) sseResp.body()::iterator) {
    if (line.startsWith("event:"))            currentEvent = line.substring(6).trim();
    else if (line.startsWith("data:") && "your-turn".equals(currentEvent)) break;
}

// Do work
doWork();

// Release
http.send(
    HttpRequest.newBuilder()
        .POST(HttpRequest.BodyPublishers.ofString(
            "{\"clientId\":\"" + clientId + "\",\"requestId\":\"" + requestId + "\"}"))
        .uri(URI.create(turnqUrl + "/channels/" + channel + "/release"))
        .header("x-api-key", apiKey).header("content-type", "application/json")
        .build(),
    HttpResponse.BodyHandlers.discarding());

C#

using System.Net.Http;
using System.Text;
using System.Text.Json;

var turnqUrl  = "https://turnq.example.com";
var channel   = "my-channel";
var clientId  = Guid.NewGuid().ToString();
var http      = new HttpClient();
http.DefaultRequestHeaders.Add("x-api-key", Environment.GetEnvironmentVariable("TURNQ_API_KEY"));

// Enqueue
var enqRes = await http.PostAsync(
    $"{turnqUrl}/channels/{channel}/enqueue",
    new StringContent($$"""{"clientId":"{{clientId}}"}""", Encoding.UTF8, "application/json"));
var enqJson   = JsonDocument.Parse(await enqRes.Content.ReadAsStringAsync());
var requestId = enqJson.RootElement.GetProperty("requestId").GetString()!;

// Subscribe (SSE)
using var stream = await http.GetStreamAsync(
    $"{turnqUrl}/channels/{channel}/subscribe?clientId={clientId}&requestId={requestId}");
using var reader = new StreamReader(stream);

string currentEvent = "", line;
while ((line = await reader.ReadLineAsync() ?? "") != null) {
    if (line.StartsWith("event:"))                                    currentEvent = line[6..].Trim();
    else if (line.StartsWith("data:") && currentEvent == "your-turn") break;
}

// Do work
await DoWorkAsync();

// Release
await http.PostAsync(
    $"{turnqUrl}/channels/{channel}/release",
    new StringContent($$"""{"clientId":"{{clientId}}","requestId":"{{requestId}}"}""",
        Encoding.UTF8, "application/json"));

Rust

use reqwest::Client;
use serde_json::{json, Value};
use uuid::Uuid;

async fn with_turn<F, Fut>(base_url: &str, channel: &str, api_key: &str, work: F) -> anyhow::Result<()>
where
    F: FnOnce() -> Fut,
    Fut: std::future::Future<Output = anyhow::Result<()>>,
{
    let client    = Client::new();
    let client_id = Uuid::new_v4().to_string();

    // Enqueue
    let enq: Value = client
        .post(format!("{base_url}/channels/{channel}/enqueue"))
        .header("x-api-key", api_key)
        .json(&json!({ "clientId": client_id }))
        .send().await?.json().await?;
    let request_id = enq["requestId"].as_str().unwrap().to_owned();

    // Subscribe (SSE)
    let url      = format!("{base_url}/channels/{channel}/subscribe?clientId={client_id}&requestId={request_id}");
    let mut resp = client.get(&url).header("x-api-key", api_key).send().await?;

    let mut current_event = String::new();
    'sse: while let Some(chunk) = resp.chunk().await? {
        for line in std::str::from_utf8(&chunk)?.lines() {
            if let Some(ev) = line.strip_prefix("event:") { current_event = ev.trim().to_owned(); }
            else if line.starts_with("data:") && current_event == "your-turn" { break 'sse; }
        }
    }

    // Do work
    let result = work().await;

    // Release
    client.post(format!("{base_url}/channels/{channel}/release"))
        .header("x-api-key", api_key)
        .json(&json!({
            "clientId":  client_id,
            "requestId": request_id,
            "result": { "success": result.is_ok() }
        }))
        .send().await?;

    result
}

PowerShell

$TurnqUrl  = "https://turnq.example.com"
$Channel   = "my-channel"
$ClientId  = [System.Guid]::NewGuid().ToString()
$Headers   = @{ "x-api-key" = $env:TURNQ_API_KEY; "Content-Type" = "application/json" }

# Enqueue
$Enq       = Invoke-RestMethod -Method POST -Uri "$TurnqUrl/channels/$Channel/enqueue" `
               -Headers $Headers -Body (ConvertTo-Json @{ clientId = $ClientId })
$RequestId = $Enq.requestId

# Subscribe (SSE) — read until your-turn
$Req    = [System.Net.WebRequest]::Create("$TurnqUrl/channels/$Channel/subscribe?clientId=$ClientId&requestId=$RequestId")
$Req.Headers["x-api-key"] = $env:TURNQ_API_KEY
$Stream = $Req.GetResponse().GetResponseStream()
$Reader = New-Object System.IO.StreamReader($Stream)

$CurrentEvent = ""
while (-not $Reader.EndOfStream) {
    $Line = $Reader.ReadLine()
    if ($Line -match "^event:\s*(.+)")            { $CurrentEvent = $Matches[1] }
    elseif ($Line -match "^data:" -and $CurrentEvent -eq "your-turn") { break }
}
$Reader.Close()

# Do work
Invoke-YourWork

# Release
Invoke-RestMethod -Method POST -Uri "$TurnqUrl/channels/$Channel/release" `
    -Headers $Headers `
    -Body (ConvertTo-Json @{ clientId = $ClientId; requestId = $RequestId })

License

MIT.

About

Named-channel turn coordinator. Turn-taking primitive for git push serialization, deploy queues, and any FIFO work coordination across processes.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors