Keep local in-memory caches warm across distributed systems. When data changes, broadcast updates via pluggable transports so every consumer can hydrate its local cache without round-tripping to a central store.
Inspired by studies-redis-stream-distributed-local-caching.
In a typical setup every service instance fetches data from a shared store on every cache miss, creating a hot-path bottleneck:
flowchart LR
S1[Service A] -- cache miss --> DB[(Central Store)]
S2[Service B] -- cache miss --> DB
S3[Service C] -- cache miss --> DB
DB -- read --> S1
DB -- read --> S2
DB -- read --> S3
style DB fill:#f96,stroke:#333
As the fleet scales, the central store becomes the bottleneck, more instances mean more cache-miss reads, higher latency, and risk of overload.
hydratecache flips the model: when data changes, the writer broadcasts the update through a messaging transport and every subscriber hydrates its local cache directly, avoiding most round-trips to the central store:
flowchart LR
W[Writer] -- publish --> T{Transport}
T -- broadcast --> S1[Service A<br/>local cache ✓]
T -- broadcast --> S2[Service B<br/>local cache ✓]
T -- broadcast --> S3[Service C<br/>local cache ✓]
style T fill:#6c6,stroke:#333
Each service keeps a warm local cache, reads are instant, and the central store is only hit by the original write.
go get github.com/nassor/hydratecache-goThen import the transport you need:
go get github.com/nassor/hydratecache-go/redis
go get github.com/nassor/hydratecache-go/nats
go get github.com/nassor/hydratecache-go/postgres
go get github.com/nassor/hydratecache-go/kafkapackage main
import (
"context"
"fmt"
"log"
"github.com/nassor/hydratecache-go"
hcredis "github.com/nassor/hydratecache-go/redis"
goredis "github.com/redis/go-redis/v9"
)
type UserProfile struct {
Name string `cbor:"1,keyasint"`
Email string `cbor:"2,keyasint"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a Redis transport.
client := goredis.NewClient(&goredis.Options{Addr: "localhost:6379"})
transport := hcredis.New(client)
defer transport.Close()
// Create a typed channel (CBOR encoding by default).
ch := hydratecache.NewChannel[UserProfile](transport, "user-updates")
// Subscribe in a goroutine, blocks until context is cancelled.
go func() {
err := ch.Subscribe(ctx, func(ctx context.Context, msg hydratecache.TypedMessage[UserProfile]) {
fmt.Printf("key=%s name=%s\n", msg.Key, msg.Payload.Name)
})
if err != nil {
log.Fatal(err)
}
}()
// Publish an update.
err := ch.Publish(ctx, "user:1", UserProfile{Name: "Alice", Email: "alice@example.com"})
if err != nil {
log.Fatal(err)
}
}When source data has an expiry (e.g. a database TTL), you can attach that metadata so consumers don't outlive the original data in their local caches:
import "time"
err := ch.Publish(ctx, "session:abc", sessionData,
hydratecache.WithTTL(hydratecache.TTL{
Start: time.Now(),
Duration: 30 * time.Minute,
}),
)On the subscriber side, the TTL is available on the message:
ch.Subscribe(ctx, func(ctx context.Context, msg hydratecache.TypedMessage[SessionData]) {
if msg.TTL != nil {
remaining := msg.TTL.Duration - time.Since(msg.TTL.Start)
// Store in local cache with the remaining TTL.
localCache.Set(msg.Key, msg.Payload, remaining)
} else {
localCache.Set(msg.Key, msg.Payload, 0) // no expiry
}
})TTL is optional, messages without WithTTL have msg.TTL == nil. The TTL metadata is transported transparently by all adapters (encoded in the wire envelope for Redis/NATS/Postgres, and as Kafka message headers).
The library has two layers:
Transport, a non-generic interface that moves raw bytes over a messaging backend. Each adapter implements this interface:
| Adapter | Backend | Package |
|---|---|---|
| Redis Pub/Sub | Redis 7+ | hydratecache-go/redis |
| NATS Core | NATS 2+ | hydratecache-go/nats |
| LISTEN/NOTIFY | PostgreSQL 17+ | hydratecache-go/postgres |
| Kafka | Apache Kafka | hydratecache-go/kafka |
Channel[T], a generic, typed layer that composes on any Transport. It handles serialization (CBOR by default) and exposes a simple Publish/Subscribe API.
┌──────────────────────┐
│ Channel[T] │ ← typed API, CBOR codec
├──────────────────────┤
│ Transport │ ← []byte pub/sub interface
├──────────────────────┤
│ Redis │ NATS │ ... │ ← pluggable adapters
└──────────────────────┘
Override the default CBOR serialization with WithEncoder and WithDecoder:
ch := hydratecache.NewChannel[MyType](transport, "topic",
hydratecache.WithEncoder(func(v MyType) ([]byte, error) {
return proto.Marshal(&v)
}),
hydratecache.WithDecoder(func(data []byte) (MyType, error) {
var v MyType
err := proto.Unmarshal(data, &v)
return v, err
}),
)import (
hcnats "github.com/nassor/hydratecache-go/nats"
gonats "github.com/nats-io/nats.go"
)
nc, _ := gonats.Connect("nats://localhost:4222")
transport := hcnats.New(nc)import (
hcpg "github.com/nassor/hydratecache-go/postgres"
"github.com/jackc/pgx/v5/pgxpool"
)
pool, _ := pgxpool.New(ctx, "postgres://user:pass@localhost:5432/mydb")
transport := hcpg.New(pool)import hckafka "github.com/nassor/hydratecache-go/kafka"
transport := hckafka.New([]string{"localhost:9092"})- Go 1.26.2+
- Docker (for integration tests)
- golangci-lint
Clone the repo and start the test infrastructure:
git clone https://github.com/nassor/hydratecache-go.git
cd hydratecache-go
make docker-up # starts Redis, NATS, Postgres, Kafka via Docker Composemake test # go test -race -count=1 ./...
make lint # golangci-lint run
make ci # lint + test (same as CI)Unit tests run without Docker. Integration tests automatically will return an error when the backing service is unavailable, you will always need make docker-up for full coverage.
Stop the infrastructure when done:
make docker-down- Follow Effective Go and the Go Code Review Comments
- Every exported symbol must have a doc comment starting with the symbol name
- Table-driven tests with
t.Runsubtests; usetestify/require - Functional options (
WithXxx) for configurable constructors - Sentinel errors +
fmt.Errorf("...: %w", err)wrapping - All blocking / I/O operations accept
context.Contextas the first parameter - No
init()functions
See LICENSE for details.