From ef34b02eb41ff3421c0a0a2ba174b91a76fe715b Mon Sep 17 00:00:00 2001 From: James Barton <1583995322+barton87@users.noreply.github.com> Date: Tue, 2 Jun 2026 22:29:05 +0000 Subject: [PATCH] Add Redis integration --- redis/Makefile | 31 +++ redis/README.md | 73 ++++++ redis/exporter/exporter.go | 107 +++++++++ redis/exporter/exporter_test.go | 78 ++++++ redis/exporter/schema.json | 13 + redis/go.mod | 39 +++ redis/go.sum | 187 +++++++++++++++ redis/importer/importer.go | 222 ++++++++++++++++++ redis/importer/schema.json | 22 ++ redis/manifest.yaml | 21 ++ redis/plugin/redis-exporter/redis-exporter.go | 17 ++ redis/plugin/redis-importer/redis-importer.go | 22 ++ redis/redisconn/conn.go | 188 +++++++++++++++ redis/redisconn/conn_test.go | 36 +++ 14 files changed, 1056 insertions(+) create mode 100644 redis/Makefile create mode 100644 redis/README.md create mode 100644 redis/exporter/exporter.go create mode 100644 redis/exporter/exporter_test.go create mode 100644 redis/exporter/schema.json create mode 100644 redis/go.mod create mode 100644 redis/go.sum create mode 100644 redis/importer/importer.go create mode 100644 redis/importer/schema.json create mode 100644 redis/manifest.yaml create mode 100644 redis/plugin/redis-exporter/redis-exporter.go create mode 100644 redis/plugin/redis-importer/redis-importer.go create mode 100644 redis/redisconn/conn.go create mode 100644 redis/redisconn/conn_test.go diff --git a/redis/Makefile b/redis/Makefile new file mode 100644 index 0000000..2ff9398 --- /dev/null +++ b/redis/Makefile @@ -0,0 +1,31 @@ +GO = go +EXT = +PLAKAR ?= plakar +VERSION ?= v1.0.0 +GOOS := $(shell go env GOOS) +GOARCH := $(shell go env GOARCH) +PTAR := redis_$(VERSION)_$(GOOS)_$(GOARCH).ptar + +all: build + +build: + ${GO} build -v -o redisImporter${EXT} ./plugin/redis-importer + ${GO} build -v -o redisExporter${EXT} ./plugin/redis-exporter + +package: build + rm -f $(PTAR) + $(PLAKAR) pkg create ./manifest.yaml $(VERSION) + +uninstall: + -$(PLAKAR) pkg rm redis + +install: package + $(PLAKAR) pkg add ./$(PTAR) + +reinstall: uninstall install + +test: + ${GO} test -v ./... + +clean: + rm -f redisImporter redisExporter diff --git a/redis/README.md b/redis/README.md new file mode 100644 index 0000000..c8d05e5 --- /dev/null +++ b/redis/README.md @@ -0,0 +1,73 @@ +# Redis Integration for Plakar + +Back up Redis persistent data as an RDB file and restore that file to disk for Redis startup. + +## What it does + +- Connects to a local or remote Redis instance with `redis-cli`. +- Optionally triggers `BGSAVE` and waits for the background save to finish. +- Captures the resulting `dump.rdb` into a Plakar snapshot as `/dump.rdb`. +- Restores `/dump.rdb` back to a file path that Redis can use on startup. + +This integration is intended for deployments where Redis is used as a primary data store with persistence enabled, not just as a disposable cache. + +## Prerequisites + +- Plakar ≥ 1.1. +- `redis-cli` available in `$PATH`, or set `redis_bin_dir` / `redis_cli`. +- A Redis user allowed to run `PING`, `BGSAVE`, `INFO persistence`, and `CONFIG GET dir/dbfilename`. +- Filesystem access to the Redis RDB file when backing up local Redis. If the RDB file is not locally readable, the importer falls back to `redis-cli --rdb -`. + +## Back up + +```sh +plakar source add redis redis://:secret@127.0.0.1:6379/0 +plakar backup @redis +``` + +TLS connections can use `rediss://` or `tls=true`: + +```sh +plakar source add redis rediss://default:secret@redis.example.com:6379/0 +``` + +## Restore + +Restore writes the RDB file to disk. Stop Redis first, restore to its configured `dbfilename` under `dir`, fix ownership if needed, then start Redis. + +```sh +plakar restore -to redis-file:///var/lib/redis/dump.rdb +``` + +Use `force=true` to overwrite an existing file: + +```sh +plakar restore -to redis-file:///var/lib/redis/dump.rdb -o force=true +``` + +## Importer options + +| Option | Default | Description | +| --- | --- | --- | +| `location` | — | `redis://` or `rediss://` URI | +| `host` | `127.0.0.1` | Redis host | +| `port` | `6379` | Redis port | +| `username` | — | ACL username | +| `password` | — | Password, passed via `REDISCLI_AUTH` | +| `database` | URI path | Logical database number for command context | +| `tls` | `false` | Enable TLS | +| `insecure_tls` | `false` | Skip TLS verification for redis-cli | +| `ca_cert`, `cert`, `key` | — | redis-cli TLS certificate flags | +| `redis_bin_dir` | — | Directory containing `redis-cli` | +| `redis_cli` | `redis-cli` | redis-cli executable name | +| `trigger_bgsave` | `true` | Run `BGSAVE` before reading the RDB | +| `wait_timeout` | `5m` | Maximum wait for BGSAVE completion | +| `output` | `/dump.rdb` | Snapshot pathname for the emitted RDB | + +## Exporter options + +| Option | Default | Description | +| --- | --- | --- | +| `location` | — | `redis-file://` output path | +| `output` | — | Output path override | +| `force` | `false` | Overwrite an existing RDB file | diff --git a/redis/exporter/exporter.go b/redis/exporter/exporter.go new file mode 100644 index 0000000..3240d0b --- /dev/null +++ b/redis/exporter/exporter.go @@ -0,0 +1,107 @@ +package exporter + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/PlakarKorp/kloset/connectors" + "github.com/PlakarKorp/kloset/location" +) + +type Exporter struct { + proto, output string + force bool +} + +func New(proto string, config map[string]string) (*Exporter, error) { + out := strings.TrimSpace(config["output"]) + if out == "" { + loc := strings.TrimPrefix(config["location"], proto+"://") + if loc != "" && loc != config["location"] { + out = loc + } + } + if out == "" { + return nil, fmt.Errorf("output is required (path to restored dump.rdb)") + } + force := false + if v := config["force"]; v != "" { + switch strings.ToLower(v) { + case "1", "t", "true", "yes": + force = true + case "0", "f", "false", "no": + force = false + default: + return nil, fmt.Errorf("invalid value for force: %q", v) + } + } + return &Exporter{proto: proto, output: out, force: force}, nil +} + +func (e *Exporter) Origin() string { return e.output } +func (e *Exporter) Type() string { return e.proto } +func (e *Exporter) Root() string { return "/" } +func (e *Exporter) Flags() location.Flags { return 0 } +func (e *Exporter) Ping(context.Context) error { return nil } +func (e *Exporter) Close(context.Context) error { return nil } + +func (e *Exporter) Export(ctx context.Context, records <-chan *connectors.Record, results chan<- *connectors.Result) error { + defer close(results) + for record := range records { + if record.Err != nil { + results <- record.Ok() + continue + } + if record.FileInfo.Lmode.IsDir() { + results <- record.Ok() + continue + } + if filepath.Base(record.Pathname) != "dump.rdb" { + results <- record.Ok() + continue + } + if err := e.restore(ctx, record); err != nil { + results <- record.Error(err) + } else { + results <- record.Ok() + } + } + return nil +} + +func (e *Exporter) restore(ctx context.Context, record *connectors.Record) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if !e.force { + if _, err := os.Stat(e.output); err == nil { + return fmt.Errorf("refusing to overwrite %s without force=true", e.output) + } + if err := os.MkdirAll(filepath.Dir(e.output), 0755); err != nil { + return err + } + f, err := os.OpenFile(e.output, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0644) + if err != nil { + return err + } + defer f.Close() + _, err = io.Copy(f, record.Reader) + return err + } + if err := os.MkdirAll(filepath.Dir(e.output), 0755); err != nil { + return err + } + f, err := os.OpenFile(e.output, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return err + } + defer f.Close() + _, err = io.Copy(f, record.Reader) + return err +} diff --git a/redis/exporter/exporter_test.go b/redis/exporter/exporter_test.go new file mode 100644 index 0000000..b61f38e --- /dev/null +++ b/redis/exporter/exporter_test.go @@ -0,0 +1,78 @@ +package exporter + +import ( + "context" + "io" + "os" + "path/filepath" + "testing" + "time" + + "github.com/PlakarKorp/kloset/connectors" + "github.com/PlakarKorp/kloset/objects" +) + +func TestExporterWritesDumpRDB(t *testing.T) { + dir := t.TempDir() + out := filepath.Join(dir, "dump.rdb") + exp, err := New("redis-file", map[string]string{"output": out}) + if err != nil { + t.Fatal(err) + } + records := make(chan *connectors.Record, 1) + results := make(chan *connectors.Result, 1) + records <- connectors.NewRecord("/dump.rdb", "", objects.FileInfo{Lname: "dump.rdb", Lmode: 0444, LmodTime: time.Now()}, nil, func() (io.ReadCloser, error) { + return io.NopCloser(&readString{s: "REDIS0009"}), nil + }) + close(records) + if err := exp.Export(context.Background(), records, results); err != nil { + t.Fatal(err) + } + res := <-results + if res.Err != nil { + t.Fatal(res.Err) + } + got, err := os.ReadFile(out) + if err != nil { + t.Fatal(err) + } + if string(got) != "REDIS0009" { + t.Fatalf("unexpected restored content %q", string(got)) + } +} + +func TestExporterRefusesOverwriteWithoutForce(t *testing.T) { + dir := t.TempDir() + out := filepath.Join(dir, "dump.rdb") + if err := os.WriteFile(out, []byte("old"), 0644); err != nil { + t.Fatal(err) + } + exp, err := New("redis-file", map[string]string{"output": out}) + if err != nil { + t.Fatal(err) + } + records := make(chan *connectors.Record, 1) + results := make(chan *connectors.Result, 1) + records <- connectors.NewRecord("/dump.rdb", "", objects.FileInfo{Lname: "dump.rdb", Lmode: 0444, LmodTime: time.Now()}, nil, func() (io.ReadCloser, error) { + return io.NopCloser(&readString{s: "new"}), nil + }) + close(records) + if err := exp.Export(context.Background(), records, results); err != nil { + t.Fatal(err) + } + res := <-results + if res.Err == nil { + t.Fatal("expected overwrite refusal") + } +} + +type readString struct{ s string } + +func (r *readString) Read(p []byte) (int, error) { + if r.s == "" { + return 0, io.EOF + } + n := copy(p, r.s) + r.s = r.s[n:] + return n, nil +} diff --git a/redis/exporter/schema.json b/redis/exporter/schema.json new file mode 100644 index 0000000..3bcb4e7 --- /dev/null +++ b/redis/exporter/schema.json @@ -0,0 +1,13 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": { + "location": { "type": "string", "description": "redis-file:// path to write dump.rdb" }, + "output": { "type": "string", "description": "Path to restored dump.rdb" }, + "force": { "type": "boolean", "default": false } + }, + "anyOf": [ + { "required": ["location"] }, + { "required": ["output"] } + ] +} diff --git a/redis/go.mod b/redis/go.mod new file mode 100644 index 0000000..21a3e40 --- /dev/null +++ b/redis/go.mod @@ -0,0 +1,39 @@ +module github.com/PlakarKorp/integration-redis + +go 1.25.7 + +require ( + github.com/PlakarKorp/go-kloset-sdk v1.1.0-beta.1 + github.com/PlakarKorp/kloset v1.1.0-beta.2 +) + +require ( + github.com/PlakarKorp/integration-grpc v1.1.0-beta.3 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/golang/snappy v1.0.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/nickball/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5 // indirect + github.com/pierrec/lz4/v4 v4.1.25 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/tink-crypto/tink-go/v2 v2.6.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + github.com/zeebo/blake3 v0.2.4 // indirect + golang.org/x/crypto v0.47.0 // indirect + golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect + golang.org/x/mod v0.32.0 // indirect + golang.org/x/net v0.48.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.40.0 // indirect + golang.org/x/text v0.33.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b // indirect + google.golang.org/grpc v1.78.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect + modernc.org/libc v1.67.6 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect + modernc.org/sqlite v1.44.3 // indirect +) diff --git a/redis/go.sum b/redis/go.sum new file mode 100644 index 0000000..4f65c7c --- /dev/null +++ b/redis/go.sum @@ -0,0 +1,187 @@ +github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE= +github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/NickBall/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5 h1:5BIUS5hwyLM298mOf8e8TEgD3cCYqc86uaJdQCYZo/o= +github.com/NickBall/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5/go.mod h1:w5D10RxC0NmPYxmQ438CC1S07zaC1zpvuNW7s5sUk2Q= +github.com/PlakarKorp/go-cdc-chunkers v1.0.3 h1:6ozBFcNMHvGe6IsbPcAZUUEAExCgcNx3aa8xiCA6+Qw= +github.com/PlakarKorp/go-cdc-chunkers v1.0.3/go.mod h1:y7ag92JABKPBDoSOPwedssQ5NIOgjRm4Mu6yTBpmUMY= +github.com/PlakarKorp/go-kloset-sdk v1.1.0-beta.1 h1:gPetIUfg///RiaML7CRINCcdXo55NHvaQIbpIoIBWGk= +github.com/PlakarKorp/go-kloset-sdk v1.1.0-beta.1/go.mod h1:ni69BgWur3+rHb7cOg/8JOKEMFh8J7tEPxTTSUhGNjE= +github.com/PlakarKorp/integration-grpc v1.1.0-beta.3 h1:u0n6Uyz7wqHOMoMYbfBLrcSrfAGqzcNQtcgkgYfU5TQ= +github.com/PlakarKorp/integration-grpc v1.1.0-beta.3/go.mod h1:lrkbUc9iT0jHiZ5wmB19wAhjDTMRO/VDhMJEt5trFlY= +github.com/PlakarKorp/kloset v1.1.0-beta.2 h1:YUMCCguPW6buz3h17eYYX+c0QMVE9c8WqkwTI6ueLwA= +github.com/PlakarKorp/kloset v1.1.0-beta.2/go.mod h1:LVCuJUI+ojPMyGeB32ydlT8xMl4NqVbvbnGwwFBaBMM= +github.com/RaduBerinde/axisds v0.1.0 h1:YItk/RmU5nvlsv/awo2Fjx97Mfpt4JfgtEVAGPrLdz8= +github.com/RaduBerinde/axisds v0.1.0/go.mod h1:UHGJonU9z4YYGKJxSaC6/TNcLOBptpmM5m2Cksbnw0Y= +github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54 h1:bsU8Tzxr/PNz75ayvCnxKZWEYdLMPDkUgticP4a4Bvk= +github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54/go.mod h1:0tr7FllbE9gJkHq7CVeeDDFAFKQVy5RnCSSNBOvdqbc= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cockroachdb/crlib v0.0.0-20250110162118-b7c9be99e911 h1:X+r2Lb1qj0APqrxM8NhBD3X3JDM1Fe+u+WAvhvKuLdM= +github.com/cockroachdb/crlib v0.0.0-20250110162118-b7c9be99e911/go.mod h1:Gq51ZeKaFCXk6QwuGM0w1dnaOqc/F5zKT2zA9D6Xeac= +github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= +github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= +github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506 h1:ASDL+UJcILMqgNeV5jiqR4j+sTuvQNHdf2chuKj1M5k= +github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506/go.mod h1:Mw7HqKr2kdtu6aYGn3tPmAftiP3QPX63LdK/zcariIo= +github.com/cockroachdb/pebble/v2 v2.1.4 h1:j9wPgMDbkErFdAKYFGhsoCcvzcjR+6zrJ4jhKtJ6bOk= +github.com/cockroachdb/pebble/v2 v2.1.4/go.mod h1:Reo1RTniv1UjVTAu/Fv74y5i3kJ5gmVrPhO9UtFiKn8= +github.com/cockroachdb/redact v1.1.6 h1:zXJBwDZ84xJNlHl1rMyCojqyIxv+7YUpQiJLQ7n4314= +github.com/cockroachdb/redact v1.1.6/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b h1:VXvSNzmr8hMj8XTuY0PT9Ane9qZGul/p67vGYwl9BFI= +github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b/go.mod h1:yBRu/cnL4ks9bgy4vAASdjIW+/xMlFwuHKqtmh3GZQg= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM= +github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= +github.com/getsentry/sentry-go v0.31.1 h1:ELVc0h7gwyhnXHDouXkhqTFSO5oslsRDk0++eyE0KJ4= +github.com/getsentry/sentry-go v0.31.1/go.mod h1:CYNcMMz73YigoHljQRG+qPF+eMq8gG72XcGN/p71BAY= +github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 h1:+zs/tPmkDkHx3U66DAb0lQFJrpS6731Oaa12ikc+DiI= +github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376/go.mod h1:an3vInlBmSxCcxctByoQdvwPiA7DTK7jaaFDBTtu0ic= +github.com/go-git/go-billy/v5 v5.6.2 h1:6Q86EsPXMa7c3YZ3aLAQsMA0VlWmy43r6FHqa/UNbRM= +github.com/go-git/go-billy/v5 v5.6.2/go.mod h1:rcFC2rAsp/erv7CMz9GczHcuD0D32fWzH+MJAU+jaUU= +github.com/go-git/go-git/v5 v5.16.4 h1:7ajIEZHZJULcyJebDLo99bGgS0jRrOxzZG4uCk2Yb2Y= +github.com/go-git/go-git/v5 v5.16.4/go.mod h1:4Ge4alE/5gPs30F2H1esi2gPd69R0C39lolkucHBOp8= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= +github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/minio/minlz v1.0.1-0.20250507153514-87eb42fe8882 h1:0lgqHvJWHLGW5TuObJrfyEi6+ASTKDBWikGvPqy9Yiw= +github.com/minio/minlz v1.0.1-0.20250507153514-87eb42fe8882/go.mod h1:qT0aEB35q79LLornSzeDH75LBf3aH1MV+jB5w9Wasec= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/nickball/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5 h1:eQr2od6dyd9gCLYHgMX2TlAYQtMUpxK7S0nsZXyH0L8= +github.com/nickball/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5/go.mod h1:1VYCE0dvZM9Y2q8kcAHdXZB6YwfrCUQDeSJ2DuIiA4k= +github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0= +github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.21.1 h1:DOvXXTqVzvkIewV/CDPFdejpMCGeMcbGCQ8YOmu+Ibk= +github.com/prometheus/client_golang v1.21.1/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA98k= +github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18= +github.com/prometheus/procfs v0.16.0 h1:xh6oHhKwnOJKMYiYBDWmkHqQPyiY40sny36Cmx2bbsM= +github.com/prometheus/procfs v0.16.0/go.mod h1:8veyXUu3nGP7oaCxhX6yeaM5u4stL2FeMXnCqhDthZg= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/tink-crypto/tink-go/v2 v2.6.0 h1:+KHNBHhWH33Vn+igZWcsgdEPUxKwBMEe0QC60t388v4= +github.com/tink-crypto/tink-go/v2 v2.6.0/go.mod h1:2WbBA6pfNsAfBwDCggboaHeB2X29wkU8XHtGwh2YIk8= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/zeebo/assert v1.1.0 h1:hU1L1vLTHsnO8x8c9KAR5GmM5QscxHg5RNU5z5qbUWY= +github.com/zeebo/assert v1.1.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/blake3 v0.2.4 h1:KYQPkhpRtcqh0ssGYcKLG1JYvddkEA8QwCM/yBqhaZI= +github.com/zeebo/blake3 v0.2.4/go.mod h1:7eeQ6d2iXWRGF6npfaxl2CU+xy2Fjo2gxeyZGCRUjcE= +github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo= +github.com/zeebo/pcg v1.0.1/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= +golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= +golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c= +golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU= +golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= +golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= +golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= +golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b h1:Mv8VFug0MP9e5vUxfBcE3vUkV6CImK3cMNMIDFjmzxU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME= +gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= +modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc= +modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM= +modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA= +modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE= +modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.67.6 h1:eVOQvpModVLKOdT+LvBPjdQqfrZq+pC39BygcT+E7OI= +modernc.org/libc v1.67.6/go.mod h1:JAhxUVlolfYDErnwiqaLvUqc8nfb2r6S6slAgZOnaiE= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.44.3 h1:+39JvV/HWMcYslAwRxHb8067w+2zowvFOUrOWIy9PjY= +modernc.org/sqlite v1.44.3/go.mod h1:CzbrU2lSB1DKUusvwGz7rqEKIq+NUd8GWuBBZDs9/nA= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/redis/importer/importer.go b/redis/importer/importer.go new file mode 100644 index 0000000..f22bdae --- /dev/null +++ b/redis/importer/importer.go @@ -0,0 +1,222 @@ +package importer + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "path" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/PlakarKorp/integration-redis/redisconn" + "github.com/PlakarKorp/kloset/connectors" + iimporter "github.com/PlakarKorp/kloset/connectors/importer" + "github.com/PlakarKorp/kloset/location" + "github.com/PlakarKorp/kloset/objects" +) + +const DumpPath = "/dump.rdb" + +type Importer struct { + proto string + conn redisconn.ConnConfig + output string + triggerBGSAVE bool + waitTimeout time.Duration +} + +func New(proto string, conn redisconn.ConnConfig, config map[string]string) (*Importer, error) { + imp := &Importer{proto: proto, conn: conn, output: DumpPath, triggerBGSAVE: true, waitTimeout: 5 * time.Minute} + if v := config["output"]; v != "" { + if !strings.HasPrefix(v, "/") { + v = "/" + v + } + imp.output = path.Clean(v) + } + if v := config["trigger_bgsave"]; v != "" { + b, err := strconv.ParseBool(v) + if err != nil { + return nil, fmt.Errorf("invalid value for trigger_bgsave: %w", err) + } + imp.triggerBGSAVE = b + } + if v := config["wait_timeout"]; v != "" { + d, err := time.ParseDuration(v) + if err != nil { + return nil, fmt.Errorf("invalid value for wait_timeout: %w", err) + } + imp.waitTimeout = d + } + return imp, nil +} + +func (i *Importer) Origin() string { return i.conn.Origin(i.proto) } +func (i *Importer) Type() string { return i.proto } +func (i *Importer) Root() string { return "/" } +func (i *Importer) Flags() location.Flags { return location.FLAG_STREAM } +func (i *Importer) Ping(ctx context.Context) error { return i.conn.Ping(ctx) } +func (i *Importer) Close(_ context.Context) error { return nil } + +var _ iimporter.Importer = (*Importer)(nil) + +func (i *Importer) Import(ctx context.Context, records chan<- *connectors.Record, _ <-chan *connectors.Result) error { + defer close(records) + if i.triggerBGSAVE { + if err := i.run(ctx, "BGSAVE"); err != nil { + if !strings.Contains(err.Error(), "Background save already in progress") { + return err + } + } + if err := i.waitForBGSAVE(ctx); err != nil { + return err + } + } + reader, err := i.dumpReader(ctx) + if err != nil { + return err + } + fileinfo := objects.FileInfo{Lname: path.Base(i.output), Lmode: 0444, LmodTime: time.Now().UTC()} + select { + case <-ctx.Done(): + _ = reader.Close() + return ctx.Err() + case records <- connectors.NewRecord(i.output, "", fileinfo, nil, func() (io.ReadCloser, error) { return reader, nil }): + return nil + } +} + +func (i *Importer) dumpReader(ctx context.Context) (io.ReadCloser, error) { + if src, err := i.rdbPath(ctx); err == nil && src != "" { + return os.Open(src) + } + return i.redisCLIDump(ctx) +} + +func (i *Importer) rdbPath(ctx context.Context) (string, error) { + dir, err := i.configGet(ctx, "dir") + if err != nil { + return "", err + } + dbfilename, err := i.configGet(ctx, "dbfilename") + if err != nil { + return "", err + } + if dir == "" || dbfilename == "" { + return "", fmt.Errorf("Redis did not return dir/dbfilename") + } + return filepath.Join(dir, dbfilename), nil +} + +func (i *Importer) configGet(ctx context.Context, key string) (string, error) { + out, err := i.outputCommand(ctx, "CONFIG", "GET", key) + if err != nil { + return "", err + } + lines := nonEmptyLines(out) + if len(lines) < 2 { + return "", fmt.Errorf("unexpected CONFIG GET %s response", key) + } + return lines[len(lines)-1], nil +} + +func (i *Importer) waitForBGSAVE(ctx context.Context) error { + deadline := time.Now().Add(i.waitTimeout) + for { + out, err := i.outputCommand(ctx, "INFO", "persistence") + if err != nil { + return err + } + if strings.Contains(out, "rdb_bgsave_in_progress:0") { + return nil + } + if time.Now().After(deadline) { + return fmt.Errorf("timed out waiting for Redis BGSAVE") + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second): + } + } +} + +func (i *Importer) run(ctx context.Context, args ...string) error { + _, err := i.outputCommand(ctx, args...) + return err +} + +func (i *Importer) outputCommand(ctx context.Context, args ...string) (string, error) { + cmd := exec.CommandContext(ctx, i.conn.Bin(), i.conn.Args(args...)...) + cmd.Env = i.conn.Env() + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("redis-cli %s failed: %w: %s", strings.Join(args, " "), err, strings.TrimSpace(string(out))) + } + return string(out), nil +} + +func (i *Importer) redisCLIDump(ctx context.Context) (io.ReadCloser, error) { + tmp, err := os.CreateTemp("", "plakar-redis-*.rdb") + if err != nil { + return nil, err + } + tmpName := tmp.Name() + _ = tmp.Close() + + cmd := exec.CommandContext(ctx, i.conn.Bin(), i.conn.Args("--rdb", tmpName)...) + cmd.Env = i.conn.Env() + out, err := cmd.CombinedOutput() + if err != nil { + _ = os.Remove(tmpName) + return nil, fmt.Errorf("redis-cli --rdb failed: %w: %s", err, strings.TrimSpace(string(out))) + } + fp, err := os.Open(tmpName) + if err != nil { + _ = os.Remove(tmpName) + return nil, err + } + return &tempFileReader{File: fp, name: tmpName}, nil +} + +type cmdReader struct { + io.ReadCloser + cmd *exec.Cmd + stderr *bytes.Buffer +} + +func (r *cmdReader) Close() error { + _ = r.ReadCloser.Close() + err := r.cmd.Wait() + if err != nil && strings.TrimSpace(r.stderr.String()) != "" { + return fmt.Errorf("%w: %s", err, strings.TrimSpace(r.stderr.String())) + } + return err +} + +type tempFileReader struct { + *os.File + name string +} + +func (r *tempFileReader) Close() error { + err := r.File.Close() + if rmErr := os.Remove(r.name); err == nil { + err = rmErr + } + return err +} + +func nonEmptyLines(s string) []string { + var out []string + for _, line := range strings.Split(s, "\n") { + if line = strings.TrimSpace(line); line != "" { + out = append(out, line) + } + } + return out +} diff --git a/redis/importer/schema.json b/redis/importer/schema.json new file mode 100644 index 0000000..fe92a3a --- /dev/null +++ b/redis/importer/schema.json @@ -0,0 +1,22 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": { + "location": { "type": "string", "description": "Redis URI: redis://[:password@]host:port[/db] or rediss://..." }, + "host": { "type": "string", "default": "127.0.0.1" }, + "port": { "type": "string", "default": "6379" }, + "username": { "type": "string" }, + "password": { "type": "string" }, + "database": { "type": "string" }, + "tls": { "type": "boolean", "default": false }, + "insecure_tls": { "type": "boolean", "default": false }, + "ca_cert": { "type": "string" }, + "cert": { "type": "string" }, + "key": { "type": "string" }, + "redis_bin_dir": { "type": "string", "description": "Directory containing redis-cli" }, + "redis_cli": { "type": "string", "default": "redis-cli" }, + "trigger_bgsave": { "type": "boolean", "default": true }, + "wait_timeout": { "type": "string", "default": "5m", "description": "Go duration to wait for BGSAVE completion" }, + "output": { "type": "string", "default": "/dump.rdb", "description": "Snapshot path for the emitted RDB file" } + } +} diff --git a/redis/manifest.yaml b/redis/manifest.yaml new file mode 100644 index 0000000..8a87497 --- /dev/null +++ b/redis/manifest.yaml @@ -0,0 +1,21 @@ +name: redis +display_name: Redis +description: Integration providing import and export capabilities for Redis RDB persistence files. +homepage: https://github.com/PlakarKorp/integration-redis +license: ISC +api_version: v1.1.0 +tier: official +contact: mailto:help@plakar.io +connectors: +- type: importer + executable: redisImporter + protocols: [redis, rediss] + validator: ./importer/schema.json + class: database + subclass: redis +- type: exporter + executable: redisExporter + protocols: [redis-file] + validator: ./exporter/schema.json + class: database + subclass: redis diff --git a/redis/plugin/redis-exporter/redis-exporter.go b/redis/plugin/redis-exporter/redis-exporter.go new file mode 100644 index 0000000..c7b98ce --- /dev/null +++ b/redis/plugin/redis-exporter/redis-exporter.go @@ -0,0 +1,17 @@ +package main + +import ( + "context" + "os" + + sdk "github.com/PlakarKorp/go-kloset-sdk" + "github.com/PlakarKorp/integration-redis/exporter" + "github.com/PlakarKorp/kloset/connectors" + exporterIface "github.com/PlakarKorp/kloset/connectors/exporter" +) + +func newRedis(_ context.Context, _ *connectors.Options, proto string, config map[string]string) (exporterIface.Exporter, error) { + return exporter.New(proto, config) +} + +func main() { sdk.EntrypointExporter(os.Args, newRedis) } diff --git a/redis/plugin/redis-importer/redis-importer.go b/redis/plugin/redis-importer/redis-importer.go new file mode 100644 index 0000000..9def316 --- /dev/null +++ b/redis/plugin/redis-importer/redis-importer.go @@ -0,0 +1,22 @@ +package main + +import ( + "context" + "os" + + sdk "github.com/PlakarKorp/go-kloset-sdk" + "github.com/PlakarKorp/integration-redis/importer" + "github.com/PlakarKorp/integration-redis/redisconn" + "github.com/PlakarKorp/kloset/connectors" + iimporter "github.com/PlakarKorp/kloset/connectors/importer" +) + +func newRedis(_ context.Context, _ *connectors.Options, proto string, config map[string]string) (iimporter.Importer, error) { + conn, err := redisconn.ParseConnConfig(config) + if err != nil { + return nil, err + } + return importer.New(proto, conn, config) +} + +func main() { sdk.EntrypointImporter(os.Args, newRedis) } diff --git a/redis/redisconn/conn.go b/redis/redisconn/conn.go new file mode 100644 index 0000000..234f43d --- /dev/null +++ b/redis/redisconn/conn.go @@ -0,0 +1,188 @@ +package redisconn + +import ( + "context" + "fmt" + "net/url" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" +) + +// ConnConfig holds redis-cli connection settings. +type ConnConfig struct { + Host string + Port string + Username string + Password string + Database string + TLS bool + InsecureTLS bool + CACert string + Cert string + Key string + BinDir string + ClientBin string +} + +func ParseConnConfig(config map[string]string) (ConnConfig, error) { + cc := ConnConfig{Host: "127.0.0.1", Port: "6379", ClientBin: "redis-cli"} + if location := config["location"]; location != "" { + if err := parseURI(location, &cc); err != nil { + return cc, err + } + } + if v := config["host"]; v != "" { + cc.Host = v + } + if v := config["port"]; v != "" { + p, err := strconv.Atoi(v) + if err != nil || p < 1 || p > 65535 { + return cc, fmt.Errorf("invalid port %q: must be an integer between 1 and 65535", v) + } + cc.Port = v + } + if v := config["username"]; v != "" { + cc.Username = v + } + if v := config["password"]; v != "" { + cc.Password = v + } + if v := config["database"]; v != "" { + cc.Database = v + } + if v := config["redis_bin_dir"]; v != "" { + cc.BinDir = v + } + if v := config["redis_cli"]; v != "" { + cc.ClientBin = v + } + var err error + if cc.TLS, err = boolOpt(config, "tls", cc.TLS); err != nil { + return cc, err + } + if cc.InsecureTLS, err = boolOpt(config, "insecure_tls", false); err != nil { + return cc, err + } + if v := config["ca_cert"]; v != "" { + cc.CACert = v + } + if v := config["cert"]; v != "" { + cc.Cert = v + } + if v := config["key"]; v != "" { + cc.Key = v + } + return cc, nil +} + +func boolOpt(config map[string]string, key string, def bool) (bool, error) { + v := config[key] + if v == "" { + return def, nil + } + b, err := strconv.ParseBool(v) + if err != nil { + return false, fmt.Errorf("invalid value for %s: %w", key, err) + } + return b, nil +} + +func parseURI(raw string, cc *ConnConfig) error { + u, err := url.Parse(raw) + if err != nil { + return fmt.Errorf("invalid Redis URI %q: %w", raw, err) + } + switch u.Scheme { + case "redis": + case "rediss": + cc.TLS = true + default: + return fmt.Errorf("unsupported URI scheme %q: expected redis:// or rediss://", u.Scheme) + } + if h := u.Hostname(); h != "" { + cc.Host = h + } + if p := u.Port(); p != "" { + cc.Port = p + } + if u.User != nil { + if name := u.User.Username(); name != "" { + cc.Username = name + } + if pass, ok := u.User.Password(); ok { + cc.Password = pass + } + } + if db := strings.TrimPrefix(u.Path, "/"); db != "" { + cc.Database = db + } + return nil +} + +func (cc ConnConfig) Bin() string { + bin := cc.ClientBin + if bin == "" { + bin = "redis-cli" + } + if cc.BinDir == "" { + return bin + } + return filepath.Join(cc.BinDir, bin) +} + +func (cc ConnConfig) Args(extra ...string) []string { + args := []string{"-h", cc.Host, "-p", cc.Port} + if cc.Username != "" { + args = append(args, "--user", cc.Username) + } + if cc.Database != "" { + args = append(args, "-n", cc.Database) + } + if cc.TLS { + args = append(args, "--tls") + } + if cc.InsecureTLS { + args = append(args, "--insecure") + } + if cc.CACert != "" { + args = append(args, "--cacert", cc.CACert) + } + if cc.Cert != "" { + args = append(args, "--cert", cc.Cert) + } + if cc.Key != "" { + args = append(args, "--key", cc.Key) + } + return append(args, extra...) +} + +func (cc ConnConfig) Env() []string { + env := os.Environ() + if cc.Password != "" { + env = append(env, "REDISCLI_AUTH="+cc.Password) + } + return env +} + +func (cc ConnConfig) Ping(ctx context.Context) error { + cmd := exec.CommandContext(ctx, cc.Bin(), cc.Args("PING")...) + cmd.Env = cc.Env() + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("redis ping failed: %w: %s", err, strings.TrimSpace(string(out))) + } + if strings.TrimSpace(string(out)) != "PONG" { + return fmt.Errorf("unexpected Redis PING response: %q", strings.TrimSpace(string(out))) + } + return nil +} + +func (cc ConnConfig) Origin(proto string) string { + if cc.Database != "" { + return proto + "://" + cc.Host + ":" + cc.Port + "/" + cc.Database + } + return proto + "://" + cc.Host + ":" + cc.Port +} diff --git a/redis/redisconn/conn_test.go b/redis/redisconn/conn_test.go new file mode 100644 index 0000000..c4ef7d6 --- /dev/null +++ b/redis/redisconn/conn_test.go @@ -0,0 +1,36 @@ +package redisconn + +import "testing" + +func TestParseConnConfigFromURI(t *testing.T) { + cfg, err := ParseConnConfig(map[string]string{"location": "rediss://default:s3cr3t@redis.example.com:6380/2"}) + if err != nil { + t.Fatal(err) + } + if cfg.Host != "redis.example.com" || cfg.Port != "6380" || cfg.Username != "default" || cfg.Password != "s3cr3t" || cfg.Database != "2" || !cfg.TLS { + t.Fatalf("unexpected config: %+v", cfg) + } +} + +func TestParseConnConfigOverridesURI(t *testing.T) { + cfg, err := ParseConnConfig(map[string]string{ + "location": "redis://:old@redis.example.com:6379/0", + "host": "127.0.0.1", + "port": "6381", + "password": "new", + "database": "4", + "redis_bin_dir": "/opt/redis/bin", + }) + if err != nil { + t.Fatal(err) + } + if cfg.Host != "127.0.0.1" || cfg.Port != "6381" || cfg.Password != "new" || cfg.Database != "4" || cfg.Bin() != "/opt/redis/bin/redis-cli" { + t.Fatalf("overrides were not applied: %+v", cfg) + } +} + +func TestParseConnConfigRejectsBadScheme(t *testing.T) { + if _, err := ParseConnConfig(map[string]string{"location": "http://redis.example.com"}); err == nil { + t.Fatal("expected unsupported scheme error") + } +}