Skip to content

tetsuo/muxer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

muxer

Framing protocol for multiplexing typed messages over a continuous byte stream using bipartite (bip) ring buffers.

Example

#include "muxer.h"

// Callback fires when a complete frame arrives
void on_msg(const muxer_msg_t *msg, void *ud) {
    printf("ch=%d type=%s len=%d\n",
           msg->channel, muxer_type_name(msg->type), msg->data_len);
}

// Receiver side: initialize a reader with the callback
muxer_reader_t reader;
muxer_reader_init(&reader, on_msg, NULL);

// Feed bytes as they arrive
muxer_reader_feed(&reader, chunk, chunk_len);

// Sender side: encode a message into a wire buffer
uint8_t wire[256];
muxer_msg_t msg = {
    .channel  = 1,
    .type     = YOUR_MSG_TYPE,
    .data     = (uint8_t *)"Hello",
    .data_len = 5
};
int n = muxer_encode(wire, sizeof(wire), &msg);
// send wire[0..n-1] over your transport

Configure payload and buffer sizes

// Maximum payload bytes in a single muxer frame.
// Default is 4KB, but you can increase it.
#define MUXER_MAX_PAYLOAD 65536
#include "muxer.h"

Build

Run make to build the library and tests. Use make clean to remove build artifacts.

Wire format

[length varint] [header varint] [payload bytes]

length = size of (header + payload) in bytes
header = (channel_id << 4) | (msg_type & 0x0F)

Varint encoding is protobuf-style LEB128.

Example — "Tell me a joke" on channel 0:

Length: 1 (header) + 14 (payload) = 15   [0x0F]
Header: (0 << 4) | 1 = 1                 [0x01]
Payload: "Tell me a joke"                [54 65 6C 6C ...]

Wire: [0x0F][0x01][54 65 6C 6C 20 6D 65 20 61 20 6A 6F 6B 65]
       16 bytes total

The bipartite buffer: how it works and how to use it safely

muxer uses muxbuf, a bipartite (bip) ring buffer, internally, but you can also use it directly for any producer/consumer pattern.

A bipartite buffer (or bip-buffer) is a specialized circular buffer that guarantees contiguous memory for both reading and writing, eliminating the need to split data across the end and beginning of the buffer. It efficiently handles variable-length data by managing two distinct, revolving memory regions: Region A for reading and Region B for writing. When Region A is consumed, the buffer seamlessly transitions to Region B, allowing for continuous data flow without fragmentation.

Visual model

Buffer memory:
┌──────────────────────────────────────────────────────────────┐
│        B region        │         A region          │  free   │
│ ←────── b_end ───────→ │ ← a_start    a_end →      │         │
└──────────────────────────────────────────────────────────────┘
  Writes go to whichever end has more free space.
  Reads always come from region A (contiguous).
  When A is emptied, B is promoted to A.

Critical safety rules

  1. Size the buffer at ≥ 4x your max read size. Ensure a complete "message" always fits within region A.
  2. Gate reads with muxbuf_a_used(), not muxbuf_used(). muxbuf_used() counts both regions A and B, but muxbuf_poll() can only read from region A. If you check muxbuf_used() >= N and then call muxbuf_poll(N), you'll get NULL when the data is split across regions.
  3. The returned pointer is valid until the next muxbuf_offer(). Process or copy the data before writing more into the buffer.

Usage

// Static allocation
uint8_t mem[MUXBUF_SIZEOF(4096)];
muxbuf_t *b = muxbuf_init(mem, 4096);

// Dynamic allocation
muxbuf_t *b = muxbuf_new(4096);

// Write
muxbuf_offer(b, data, len);       // returns len on success, 0 if full

// Read
int avail = muxbuf_a_used(b);     // ALWAYS use a_used, not used
uint8_t *p = muxbuf_peek(b, n);   // look without consuming
uint8_t *p = muxbuf_poll(b, n);   // read and consume

// Query
muxbuf_unused(b);     // bytes available for writing
muxbuf_used(b);       // total bytes (A + B)
muxbuf_a_used(b);     // bytes readable in one poll() call
muxbuf_is_empty(b);   // true if nothing to read

// Cleanup
muxbuf_free(b);       // only if allocated with muxbuf_new()

About

Framing protocol using bip buffer

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors