From 76d9b6042373228e1381af36e9ce54f3f31394f5 Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 3 Jun 2026 01:34:47 +0300 Subject: [PATCH 1/2] Add Redis integration --- redis/Makefile | 30 ++ redis/README.md | 67 +++++ redis/exporter/schema.json | 24 ++ redis/go.mod | 39 +++ redis/go.sum | 187 ++++++++++++ redis/importer/schema.json | 52 ++++ redis/manifest.yaml | 22 ++ redis/plugin/exporter/main.go | 12 + redis/plugin/importer/main.go | 12 + redis/redis.go | 523 ++++++++++++++++++++++++++++++++++ redis/redis_test.go | 218 ++++++++++++++ 11 files changed, 1186 insertions(+) create mode 100644 redis/Makefile create mode 100644 redis/README.md create mode 100644 redis/exporter/schema.json create mode 100644 redis/go.mod create mode 100644 redis/go.sum create mode 100644 redis/importer/schema.json create mode 100644 redis/manifest.yaml create mode 100644 redis/plugin/exporter/main.go create mode 100644 redis/plugin/importer/main.go create mode 100644 redis/redis.go create mode 100644 redis/redis_test.go diff --git a/redis/Makefile b/redis/Makefile new file mode 100644 index 0000000..f5cade4 --- /dev/null +++ b/redis/Makefile @@ -0,0 +1,30 @@ +GO = go +EXT = + +PLAKAR ?= plakar +VERSION ?= v1.0.0 + +GOOS := $(shell go env GOOS) +GOARCH := $(shell go env GOARCH) +PTAR := redis_$(VERSION)_$(GOOS)_$(GOARCH).ptar + +all: build + +build: + ${GO} build -v -o redisImporter${EXT} ./plugin/importer + ${GO} build -v -o redisFileExporter${EXT} ./plugin/exporter + +package: build + rm -f $(PTAR) + $(PLAKAR) pkg create ./manifest.yaml $(VERSION) + +uninstall: + -$(PLAKAR) pkg rm redis + +install: package + $(PLAKAR) pkg add ./$(PTAR) + +reinstall: uninstall install + +clean: + rm -f redisImporter redisFileExporter redis_*.ptar diff --git a/redis/README.md b/redis/README.md new file mode 100644 index 0000000..1c98e59 --- /dev/null +++ b/redis/README.md @@ -0,0 +1,67 @@ +# Redis Integration + +This integration lets Plakar back up Redis persistent data as an RDB dump and restore that dump to a file Redis can load on startup. + +## How it works + +The importer connects to a running Redis server using the Redis replication protocol and stores one record in the snapshot: + +- `/dump.rdb` - an RDB snapshot generated by Redis for the backup stream. + +This covers Redis deployments where Redis is used as a primary data store. The exporter writes the saved RDB record back to a local file, usually `/var/lib/redis/dump.rdb` or another path configured by the Redis server. + +## Importer options + +| Parameter | Default | Description | +|---|---|---| +| `location` | `redis://localhost:6379` | Redis URI: `redis://[user[:password]@]host[:port]` or `rediss://...` for TLS. | +| `host` | `localhost` | Server hostname. Overrides the location host. | +| `port` | `6379` | Server port. Overrides the location port. | +| `username` | | ACL username. Overrides the location username. | +| `password` | | Redis password. Overrides the location password. | +| `tls` | `false` | Enable TLS. `rediss://` enables it automatically. | +| `tls_server_name` | `host` | Server name used for TLS verification. | +| `insecure_skip_verify` | `false` | Skip TLS certificate verification. Intended for local testing only. | +| `timeout` | `30s` | Dial and command timeout. Accepts Go durations such as `10s` or a number of seconds. | + +## Exporter options + +| Parameter | Default | Description | +|---|---|---| +| `location` | | `redis+file:///path/to/dump.rdb` destination URI. | +| `path` | | Output file or directory. Overrides location. | +| `output` | | Alias for path. | + +If the destination path is a directory, the exporter writes `dump.rdb` inside it. Otherwise it writes exactly to the configured file path. + +## Examples + +Back up a local unauthenticated Redis instance: + +```bash +plakar source add myredis redis://localhost:6379 +plakar backup @myredis +``` + +Back up an authenticated Redis instance: + +```bash +plakar source add myredis redis://default:secret@redis.example.com:6379 +plakar backup @myredis +``` + +Back up Redis over TLS: + +```bash +plakar source add myredis rediss://default:secret@redis.example.com:6380 +plakar backup @myredis +``` + +Restore the dump to a file Redis can load on startup: + +```bash +plakar destination add redisfile redis+file:///var/lib/redis/dump.rdb +plakar restore -to @redisfile +``` + +Stop Redis before replacing its configured RDB file, then start Redis again so it loads the restored dump. diff --git a/redis/exporter/schema.json b/redis/exporter/schema.json new file mode 100644 index 0000000..77ce964 --- /dev/null +++ b/redis/exporter/schema.json @@ -0,0 +1,24 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "Plakar Redis File Exporter Connector Config", + "type": "object", + "additionalProperties": false, + "properties": { + "location": { + "type": "string", + "minLength": 1, + "description": "Redis RDB file destination URI: redis+file:///path/to/dump.rdb.", + "pattern": "^redis\\+file://.*$" + }, + "path": { + "type": "string", + "minLength": 1, + "description": "Output file or directory for dump.rdb. Overrides the location URI." + }, + "output": { + "type": "string", + "minLength": 1, + "description": "Alias for path." + } + } +} diff --git a/redis/go.mod b/redis/go.mod new file mode 100644 index 0000000..cc606be --- /dev/null +++ b/redis/go.mod @@ -0,0 +1,39 @@ +module github.com/PlakarKorp/integration-redis + +go 1.25.5 + +require ( + github.com/PlakarKorp/go-kloset-sdk v1.1.0-beta.1 + github.com/PlakarKorp/kloset v1.1.0-beta.2 +) + +require ( + github.com/PlakarKorp/integration-grpc v1.1.0-beta.3 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/golang/snappy v1.0.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/nickball/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5 // indirect + github.com/pierrec/lz4/v4 v4.1.25 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/tink-crypto/tink-go/v2 v2.6.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + github.com/zeebo/blake3 v0.2.4 // indirect + golang.org/x/crypto v0.47.0 // indirect + golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect + golang.org/x/mod v0.32.0 // indirect + golang.org/x/net v0.48.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.40.0 // indirect + golang.org/x/text v0.33.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b // indirect + google.golang.org/grpc v1.78.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect + modernc.org/libc v1.67.6 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect + modernc.org/sqlite v1.44.3 // indirect +) diff --git a/redis/go.sum b/redis/go.sum new file mode 100644 index 0000000..4f65c7c --- /dev/null +++ b/redis/go.sum @@ -0,0 +1,187 @@ +github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE= +github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/NickBall/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5 h1:5BIUS5hwyLM298mOf8e8TEgD3cCYqc86uaJdQCYZo/o= +github.com/NickBall/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5/go.mod h1:w5D10RxC0NmPYxmQ438CC1S07zaC1zpvuNW7s5sUk2Q= +github.com/PlakarKorp/go-cdc-chunkers v1.0.3 h1:6ozBFcNMHvGe6IsbPcAZUUEAExCgcNx3aa8xiCA6+Qw= +github.com/PlakarKorp/go-cdc-chunkers v1.0.3/go.mod h1:y7ag92JABKPBDoSOPwedssQ5NIOgjRm4Mu6yTBpmUMY= +github.com/PlakarKorp/go-kloset-sdk v1.1.0-beta.1 h1:gPetIUfg///RiaML7CRINCcdXo55NHvaQIbpIoIBWGk= +github.com/PlakarKorp/go-kloset-sdk v1.1.0-beta.1/go.mod h1:ni69BgWur3+rHb7cOg/8JOKEMFh8J7tEPxTTSUhGNjE= +github.com/PlakarKorp/integration-grpc v1.1.0-beta.3 h1:u0n6Uyz7wqHOMoMYbfBLrcSrfAGqzcNQtcgkgYfU5TQ= +github.com/PlakarKorp/integration-grpc v1.1.0-beta.3/go.mod h1:lrkbUc9iT0jHiZ5wmB19wAhjDTMRO/VDhMJEt5trFlY= +github.com/PlakarKorp/kloset v1.1.0-beta.2 h1:YUMCCguPW6buz3h17eYYX+c0QMVE9c8WqkwTI6ueLwA= +github.com/PlakarKorp/kloset v1.1.0-beta.2/go.mod h1:LVCuJUI+ojPMyGeB32ydlT8xMl4NqVbvbnGwwFBaBMM= +github.com/RaduBerinde/axisds v0.1.0 h1:YItk/RmU5nvlsv/awo2Fjx97Mfpt4JfgtEVAGPrLdz8= +github.com/RaduBerinde/axisds v0.1.0/go.mod h1:UHGJonU9z4YYGKJxSaC6/TNcLOBptpmM5m2Cksbnw0Y= +github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54 h1:bsU8Tzxr/PNz75ayvCnxKZWEYdLMPDkUgticP4a4Bvk= +github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54/go.mod h1:0tr7FllbE9gJkHq7CVeeDDFAFKQVy5RnCSSNBOvdqbc= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cockroachdb/crlib v0.0.0-20250110162118-b7c9be99e911 h1:X+r2Lb1qj0APqrxM8NhBD3X3JDM1Fe+u+WAvhvKuLdM= +github.com/cockroachdb/crlib v0.0.0-20250110162118-b7c9be99e911/go.mod h1:Gq51ZeKaFCXk6QwuGM0w1dnaOqc/F5zKT2zA9D6Xeac= +github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= +github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= +github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506 h1:ASDL+UJcILMqgNeV5jiqR4j+sTuvQNHdf2chuKj1M5k= +github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506/go.mod h1:Mw7HqKr2kdtu6aYGn3tPmAftiP3QPX63LdK/zcariIo= +github.com/cockroachdb/pebble/v2 v2.1.4 h1:j9wPgMDbkErFdAKYFGhsoCcvzcjR+6zrJ4jhKtJ6bOk= +github.com/cockroachdb/pebble/v2 v2.1.4/go.mod h1:Reo1RTniv1UjVTAu/Fv74y5i3kJ5gmVrPhO9UtFiKn8= +github.com/cockroachdb/redact v1.1.6 h1:zXJBwDZ84xJNlHl1rMyCojqyIxv+7YUpQiJLQ7n4314= +github.com/cockroachdb/redact v1.1.6/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b h1:VXvSNzmr8hMj8XTuY0PT9Ane9qZGul/p67vGYwl9BFI= +github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b/go.mod h1:yBRu/cnL4ks9bgy4vAASdjIW+/xMlFwuHKqtmh3GZQg= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM= +github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= +github.com/getsentry/sentry-go v0.31.1 h1:ELVc0h7gwyhnXHDouXkhqTFSO5oslsRDk0++eyE0KJ4= +github.com/getsentry/sentry-go v0.31.1/go.mod h1:CYNcMMz73YigoHljQRG+qPF+eMq8gG72XcGN/p71BAY= +github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 h1:+zs/tPmkDkHx3U66DAb0lQFJrpS6731Oaa12ikc+DiI= +github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376/go.mod h1:an3vInlBmSxCcxctByoQdvwPiA7DTK7jaaFDBTtu0ic= +github.com/go-git/go-billy/v5 v5.6.2 h1:6Q86EsPXMa7c3YZ3aLAQsMA0VlWmy43r6FHqa/UNbRM= +github.com/go-git/go-billy/v5 v5.6.2/go.mod h1:rcFC2rAsp/erv7CMz9GczHcuD0D32fWzH+MJAU+jaUU= +github.com/go-git/go-git/v5 v5.16.4 h1:7ajIEZHZJULcyJebDLo99bGgS0jRrOxzZG4uCk2Yb2Y= +github.com/go-git/go-git/v5 v5.16.4/go.mod h1:4Ge4alE/5gPs30F2H1esi2gPd69R0C39lolkucHBOp8= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= +github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/minio/minlz v1.0.1-0.20250507153514-87eb42fe8882 h1:0lgqHvJWHLGW5TuObJrfyEi6+ASTKDBWikGvPqy9Yiw= +github.com/minio/minlz v1.0.1-0.20250507153514-87eb42fe8882/go.mod h1:qT0aEB35q79LLornSzeDH75LBf3aH1MV+jB5w9Wasec= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/nickball/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5 h1:eQr2od6dyd9gCLYHgMX2TlAYQtMUpxK7S0nsZXyH0L8= +github.com/nickball/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5/go.mod h1:1VYCE0dvZM9Y2q8kcAHdXZB6YwfrCUQDeSJ2DuIiA4k= +github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0= +github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.21.1 h1:DOvXXTqVzvkIewV/CDPFdejpMCGeMcbGCQ8YOmu+Ibk= +github.com/prometheus/client_golang v1.21.1/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA98k= +github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18= +github.com/prometheus/procfs v0.16.0 h1:xh6oHhKwnOJKMYiYBDWmkHqQPyiY40sny36Cmx2bbsM= +github.com/prometheus/procfs v0.16.0/go.mod h1:8veyXUu3nGP7oaCxhX6yeaM5u4stL2FeMXnCqhDthZg= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/tink-crypto/tink-go/v2 v2.6.0 h1:+KHNBHhWH33Vn+igZWcsgdEPUxKwBMEe0QC60t388v4= +github.com/tink-crypto/tink-go/v2 v2.6.0/go.mod h1:2WbBA6pfNsAfBwDCggboaHeB2X29wkU8XHtGwh2YIk8= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/zeebo/assert v1.1.0 h1:hU1L1vLTHsnO8x8c9KAR5GmM5QscxHg5RNU5z5qbUWY= +github.com/zeebo/assert v1.1.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/blake3 v0.2.4 h1:KYQPkhpRtcqh0ssGYcKLG1JYvddkEA8QwCM/yBqhaZI= +github.com/zeebo/blake3 v0.2.4/go.mod h1:7eeQ6d2iXWRGF6npfaxl2CU+xy2Fjo2gxeyZGCRUjcE= +github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo= +github.com/zeebo/pcg v1.0.1/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= +golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= +golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c= +golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU= +golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= +golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= +golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= +golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b h1:Mv8VFug0MP9e5vUxfBcE3vUkV6CImK3cMNMIDFjmzxU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME= +gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= +modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc= +modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM= +modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA= +modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE= +modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.67.6 h1:eVOQvpModVLKOdT+LvBPjdQqfrZq+pC39BygcT+E7OI= +modernc.org/libc v1.67.6/go.mod h1:JAhxUVlolfYDErnwiqaLvUqc8nfb2r6S6slAgZOnaiE= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.44.3 h1:+39JvV/HWMcYslAwRxHb8067w+2zowvFOUrOWIy9PjY= +modernc.org/sqlite v1.44.3/go.mod h1:CzbrU2lSB1DKUusvwGz7rqEKIq+NUd8GWuBBZDs9/nA= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/redis/importer/schema.json b/redis/importer/schema.json new file mode 100644 index 0000000..c49d0f4 --- /dev/null +++ b/redis/importer/schema.json @@ -0,0 +1,52 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "Plakar Redis Importer Connector Config", + "type": "object", + "additionalProperties": false, + "properties": { + "location": { + "type": "string", + "minLength": 1, + "description": "Redis connection URI: redis://[user[:password]@]host[:port] or rediss://... for TLS.", + "pattern": "^rediss?://.*$" + }, + "host": { + "type": "string", + "minLength": 1, + "description": "Redis server hostname. Overrides the host in the location URI." + }, + "port": { + "type": "string", + "minLength": 1, + "description": "Redis server port. Overrides the port in the location URI." + }, + "username": { + "type": "string", + "minLength": 1, + "description": "Redis ACL username. Overrides the username in the location URI." + }, + "password": { + "type": "string", + "minLength": 1, + "description": "Redis password. Overrides the password in the location URI." + }, + "tls": { + "type": "boolean", + "description": "Enable TLS. rediss:// enables TLS automatically." + }, + "tls_server_name": { + "type": "string", + "minLength": 1, + "description": "Server name used for TLS verification. Defaults to the Redis host." + }, + "insecure_skip_verify": { + "type": "boolean", + "description": "Skip TLS certificate verification. Intended for local testing only." + }, + "timeout": { + "type": "string", + "minLength": 1, + "description": "Dial and command timeout, as a Go duration like 30s or as a number of seconds." + } + } +} diff --git a/redis/manifest.yaml b/redis/manifest.yaml new file mode 100644 index 0000000..44f342e --- /dev/null +++ b/redis/manifest.yaml @@ -0,0 +1,22 @@ +name: redis +display_name: Redis +description: Integration providing RDB import and file restore capabilities for Redis. +api_version: v1.1.0 +homepage: https://github.com/PlakarKorp/integration-redis +license: ISC +tier: official +contact: mailto:help@plakar.io +connectors: + - type: importer + executable: redisImporter + protocols: [redis, rediss] + validator: ./importer/schema.json + class: database + subclass: redis + - type: exporter + executable: redisFileExporter + protocols: [redis+file] + validator: ./exporter/schema.json + flags: [localfs] + class: database + subclass: redis diff --git a/redis/plugin/exporter/main.go b/redis/plugin/exporter/main.go new file mode 100644 index 0000000..7c5dbc6 --- /dev/null +++ b/redis/plugin/exporter/main.go @@ -0,0 +1,12 @@ +package main + +import ( + "os" + + sdk "github.com/PlakarKorp/go-kloset-sdk" + redis "github.com/PlakarKorp/integration-redis" +) + +func main() { + sdk.EntrypointExporter(os.Args, redis.NewExporter) +} diff --git a/redis/plugin/importer/main.go b/redis/plugin/importer/main.go new file mode 100644 index 0000000..37d637c --- /dev/null +++ b/redis/plugin/importer/main.go @@ -0,0 +1,12 @@ +package main + +import ( + "os" + + sdk "github.com/PlakarKorp/go-kloset-sdk" + redis "github.com/PlakarKorp/integration-redis" +) + +func main() { + sdk.EntrypointImporter(os.Args, redis.NewImporter) +} diff --git a/redis/redis.go b/redis/redis.go new file mode 100644 index 0000000..f8a6d2f --- /dev/null +++ b/redis/redis.go @@ -0,0 +1,523 @@ +package redis + +import ( + "bufio" + "context" + "crypto/tls" + "errors" + "fmt" + "io" + "net" + "net/url" + "os" + "path" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/PlakarKorp/kloset/connectors" + "github.com/PlakarKorp/kloset/connectors/exporter" + "github.com/PlakarKorp/kloset/connectors/importer" + "github.com/PlakarKorp/kloset/location" + "github.com/PlakarKorp/kloset/objects" +) + +const dumpRecordPath = "/dump.rdb" + +func init() { + importer.Register("redis", location.FLAG_STREAM, NewImporter) + importer.Register("rediss", location.FLAG_STREAM, NewImporter) + exporter.Register("redis+file", location.FLAG_LOCALFS, NewExporter) +} + +type config struct { + address string + host string + username string + password string + tls bool + tlsServerName string + insecureSkipVerify bool + timeout time.Duration + outputPath string +} + +type connector struct { + cfg config + proto string +} + +func NewImporter(ctx context.Context, opts *connectors.Options, proto string, raw map[string]string) (importer.Importer, error) { + cfg, err := parseRedisConfig(proto, raw) + if err != nil { + return nil, err + } + return &connector{cfg: cfg, proto: proto}, nil +} + +func NewExporter(ctx context.Context, opts *connectors.Options, proto string, raw map[string]string) (exporter.Exporter, error) { + cfg, err := parseFileConfig(proto, raw) + if err != nil { + return nil, err + } + return &connector{cfg: cfg, proto: proto}, nil +} + +func parseRedisConfig(proto string, raw map[string]string) (config, error) { + cfg := config{host: "localhost", timeout: 30 * time.Second} + loc := raw["location"] + if loc == "" { + loc = proto + "://localhost:6379" + } + + u, err := url.Parse(loc) + if err != nil { + return cfg, fmt.Errorf("parse location: %w", err) + } + if u.Scheme != "" && u.Scheme != "redis" && u.Scheme != "rediss" { + return cfg, fmt.Errorf("unsupported Redis scheme %q", u.Scheme) + } + if u.Scheme == "rediss" || proto == "rediss" { + cfg.tls = true + } + if u.Hostname() != "" { + cfg.host = u.Hostname() + } + port := u.Port() + if port == "" { + port = "6379" + } + if u.User != nil { + cfg.username = u.User.Username() + if pw, ok := u.User.Password(); ok { + cfg.password = pw + } else if cfg.username != "" { + cfg.password = cfg.username + cfg.username = "" + } + } + + if v := raw["host"]; v != "" { + cfg.host = v + } + if v := raw["port"]; v != "" { + port = v + } + if v := raw["username"]; v != "" { + cfg.username = v + } + if v := raw["password"]; v != "" { + cfg.password = v + } + if v := raw["tls"]; v != "" { + cfg.tls, err = strconv.ParseBool(v) + if err != nil { + return cfg, fmt.Errorf("tls: %w", err) + } + } + if v := raw["tls_server_name"]; v != "" { + cfg.tlsServerName = v + } + if v := raw["insecure_skip_verify"]; v != "" { + cfg.insecureSkipVerify, err = strconv.ParseBool(v) + if err != nil { + return cfg, fmt.Errorf("insecure_skip_verify: %w", err) + } + } + if v := raw["timeout"]; v != "" { + cfg.timeout, err = parseTimeout(v) + if err != nil { + return cfg, err + } + } + if cfg.tlsServerName == "" { + cfg.tlsServerName = cfg.host + } + if cfg.host == "" { + return cfg, fmt.Errorf("missing Redis host") + } + cfg.address = net.JoinHostPort(cfg.host, port) + return cfg, nil +} + +func parseFileConfig(proto string, raw map[string]string) (config, error) { + cfg := config{} + if v := raw["path"]; v != "" { + cfg.outputPath = v + } else if v := raw["output"]; v != "" { + cfg.outputPath = v + } else if loc := raw["location"]; loc != "" { + u, err := url.Parse(loc) + if err != nil { + return cfg, fmt.Errorf("parse location: %w", err) + } + if u.Scheme != "" && u.Scheme != "redis+file" { + return cfg, fmt.Errorf("unsupported Redis file scheme %q", u.Scheme) + } + cfg.outputPath = fileURLPath(u) + } + if cfg.outputPath == "" { + return cfg, fmt.Errorf("missing output path") + } + return cfg, nil +} + +func fileURLPath(u *url.URL) string { + if u == nil { + return "" + } + if u.Opaque != "" { + return u.Opaque + } + p, _ := url.PathUnescape(u.Path) + p = filepath.FromSlash(p) + if u.Host != "" && p != "" { + return string(os.PathSeparator) + filepath.Join(u.Host, p) + } + if u.Host != "" { + return u.Host + } + return p +} + +func parseTimeout(s string) (time.Duration, error) { + if d, err := time.ParseDuration(s); err == nil { + return d, nil + } + seconds, err := strconv.Atoi(s) + if err != nil { + return 0, fmt.Errorf("timeout: %w", err) + } + return time.Duration(seconds) * time.Second, nil +} + +func (c *connector) Root() string { return "/" } +func (c *connector) Type() string { return "redis" } + +func (c *connector) Origin() string { + if c.cfg.address != "" { + return c.cfg.address + } + return c.cfg.outputPath +} + +func (c *connector) Flags() location.Flags { + if c.proto == "redis+file" { + return location.FLAG_LOCALFS + } + return location.FLAG_STREAM +} + +func (c *connector) Ping(ctx context.Context) error { + if c.proto == "redis+file" { + return nil + } + conn, br, err := c.openConn(ctx) + if err != nil { + return err + } + defer conn.Close() + if err := c.authenticate(conn, br); err != nil { + return err + } + _, err = doCommand(conn, br, "PING") + return err +} + +func (c *connector) Close(ctx context.Context) error { return nil } + +func (c *connector) Import(ctx context.Context, records chan<- *connectors.Record, results <-chan *connectors.Result) error { + defer close(records) + + finfo := objects.FileInfo{ + Lname: path.Base(dumpRecordPath), + Lsize: -1, + Lmode: 0o444, + LmodTime: time.Now(), + } + + records <- connectors.NewRecord(dumpRecordPath, "", finfo, nil, func() (io.ReadCloser, error) { + return c.openRDB(ctx) + }) + return nil +} + +func (c *connector) Export(ctx context.Context, records <-chan *connectors.Record, results chan<- *connectors.Result) error { + defer close(results) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case record, ok := <-records: + if !ok { + return nil + } + if record.Err != nil { + results <- record.Ok() + continue + } + if record.IsXattr { + results <- record.Error(fmt.Errorf("unexpected xattr %q", record.Pathname)) + continue + } + if record.FileInfo.Lmode.IsDir() { + results <- record.Ok() + continue + } + if err := c.restoreRecord(ctx, record); err != nil { + results <- record.Error(err) + } else { + results <- record.Ok() + } + } + } +} + +func (c *connector) restoreRecord(ctx context.Context, record *connectors.Record) error { + if record.Reader == nil { + return fmt.Errorf("record %q has no reader", record.Pathname) + } + if filepath.Base(record.Pathname) != "dump.rdb" && filepath.Ext(record.Pathname) != ".rdb" { + return fmt.Errorf("unexpected Redis dump record %q", record.Pathname) + } + + target := targetPath(c.cfg.outputPath, record.Pathname) + if err := os.MkdirAll(filepath.Dir(target), 0o755); err != nil { + return err + } + + tmp, err := os.CreateTemp(filepath.Dir(target), "."+filepath.Base(target)+"-*.tmp") + if err != nil { + return err + } + tmpName := tmp.Name() + defer os.Remove(tmpName) + + _, copyErr := copyWithContext(ctx, tmp, record.Reader) + closeErr := tmp.Close() + if copyErr != nil { + return copyErr + } + if closeErr != nil { + return closeErr + } + if err := os.Chmod(tmpName, 0o644); err != nil { + return err + } + return os.Rename(tmpName, target) +} + +func targetPath(base, recordPath string) string { + if base == "" { + base = "dump.rdb" + } + if info, err := os.Stat(base); err == nil && info.IsDir() { + return filepath.Join(base, filepath.Base(recordPath)) + } + if strings.HasSuffix(base, string(os.PathSeparator)) || strings.HasSuffix(base, "/") { + return filepath.Join(base, filepath.Base(recordPath)) + } + return base +} + +func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) { + buf := make([]byte, 32*1024) + var written int64 + for { + select { + case <-ctx.Done(): + return written, ctx.Err() + default: + } + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[:nr]) + written += int64(nw) + if ew != nil { + return written, ew + } + if nw != nr { + return written, io.ErrShortWrite + } + } + if er != nil { + if errors.Is(er, io.EOF) { + return written, nil + } + return written, er + } + } +} + +func (c *connector) openConn(ctx context.Context) (net.Conn, *bufio.Reader, error) { + dialer := net.Dialer{Timeout: c.cfg.timeout} + conn, err := dialer.DialContext(ctx, "tcp", c.cfg.address) + if err != nil { + return nil, nil, err + } + if c.cfg.timeout > 0 { + _ = conn.SetDeadline(time.Now().Add(c.cfg.timeout)) + } + if c.cfg.tls { + tlsConn := tls.Client(conn, &tls.Config{ + ServerName: c.cfg.tlsServerName, + InsecureSkipVerify: c.cfg.insecureSkipVerify, + }) + if err := tlsConn.HandshakeContext(ctx); err != nil { + _ = conn.Close() + return nil, nil, err + } + conn = tlsConn + } + return conn, bufio.NewReader(conn), nil +} + +func (c *connector) authenticate(conn net.Conn, br *bufio.Reader) error { + if c.cfg.password == "" { + return nil + } + if c.cfg.username != "" { + _, err := doCommand(conn, br, "AUTH", c.cfg.username, c.cfg.password) + return err + } + _, err := doCommand(conn, br, "AUTH", c.cfg.password) + return err +} + +func (c *connector) openRDB(ctx context.Context) (io.ReadCloser, error) { + r, err := c.openRDBWithCommand(ctx, "PSYNC", "?", "-1") + if err == nil { + return r, nil + } + if !canFallbackToSync(err) { + return nil, err + } + return c.openRDBWithCommand(ctx, "SYNC") +} + +func canFallbackToSync(err error) bool { + if err == nil { + return false + } + s := strings.ToUpper(err.Error()) + return strings.Contains(s, "PSYNC") || strings.Contains(s, "UNKNOWN COMMAND") || strings.Contains(s, "NOPSYNC") +} + +func (c *connector) openRDBWithCommand(ctx context.Context, args ...string) (io.ReadCloser, error) { + conn, br, err := c.openConn(ctx) + if err != nil { + return nil, err + } + ok := false + defer func() { + if !ok { + _ = conn.Close() + } + }() + + if err := c.authenticate(conn, br); err != nil { + return nil, err + } + if err := sendCommand(conn, args...); err != nil { + return nil, err + } + + line, err := readLine(br) + if err != nil { + return nil, err + } + if strings.HasPrefix(line, "-") { + return nil, fmt.Errorf("redis %s: %s", args[0], strings.TrimPrefix(line, "-")) + } + if strings.HasPrefix(line, "+") { + line, err = readLine(br) + if err != nil { + return nil, err + } + } + if !strings.HasPrefix(line, "$") { + return nil, fmt.Errorf("unexpected Redis RDB response %q", line) + } + if strings.HasPrefix(line, "$EOF:") { + return nil, fmt.Errorf("Redis returned an EOF-delimited RDB stream, which is not supported without REPLCONF capa eof") + } + n, err := strconv.ParseInt(strings.TrimPrefix(line, "$"), 10, 64) + if err != nil { + return nil, fmt.Errorf("parse RDB length: %w", err) + } + if n < 0 { + return nil, fmt.Errorf("Redis returned a nil RDB stream") + } + _ = conn.SetDeadline(time.Time{}) + ok = true + return &rdbReader{conn: conn, reader: &io.LimitedReader{R: br, N: n}}, nil +} + +type rdbReader struct { + conn net.Conn + reader *io.LimitedReader +} + +func (r *rdbReader) Read(p []byte) (int, error) { + return r.reader.Read(p) +} + +func (r *rdbReader) Close() error { + return r.conn.Close() +} + +func sendCommand(w io.Writer, args ...string) error { + if _, err := fmt.Fprintf(w, "*%d\r\n", len(args)); err != nil { + return err + } + for _, arg := range args { + if _, err := fmt.Fprintf(w, "$%d\r\n%s\r\n", len(arg), arg); err != nil { + return err + } + } + return nil +} + +func doCommand(w io.Writer, br *bufio.Reader, args ...string) (string, error) { + if err := sendCommand(w, args...); err != nil { + return "", err + } + line, err := readLine(br) + if err != nil { + return "", err + } + if line == "" { + return "", fmt.Errorf("empty Redis response") + } + switch line[0] { + case '+', ':': + return line[1:], nil + case '-': + return "", fmt.Errorf("redis: %s", line[1:]) + case '$': + n, err := strconv.Atoi(line[1:]) + if err != nil { + return "", err + } + if n < 0 { + return "", nil + } + buf := make([]byte, n+2) + if _, err := io.ReadFull(br, buf); err != nil { + return "", err + } + return string(buf[:n]), nil + default: + return "", fmt.Errorf("unexpected Redis response %q", line) + } +} + +func readLine(br *bufio.Reader) (string, error) { + line, err := br.ReadString('\n') + if err != nil { + return "", err + } + return strings.TrimSuffix(strings.TrimSuffix(line, "\n"), "\r"), nil +} diff --git a/redis/redis_test.go b/redis/redis_test.go new file mode 100644 index 0000000..22240cd --- /dev/null +++ b/redis/redis_test.go @@ -0,0 +1,218 @@ +package redis + +import ( + "bufio" + "bytes" + "context" + "io" + "net" + "net/url" + "path/filepath" + "strconv" + "strings" + "testing" + "time" +) + +func TestParseRedisConfigFromURI(t *testing.T) { + cfg, err := parseRedisConfig("redis", map[string]string{ + "location": "redis://default:secret@example.com:6380", + "timeout": "5s", + }) + if err != nil { + t.Fatal(err) + } + if cfg.address != "example.com:6380" { + t.Fatalf("address = %q", cfg.address) + } + if cfg.username != "default" || cfg.password != "secret" { + t.Fatalf("credentials = %q/%q", cfg.username, cfg.password) + } + if cfg.timeout != 5*time.Second { + t.Fatalf("timeout = %s", cfg.timeout) + } +} + +func TestParseRedisConfigPasswordOnlyURI(t *testing.T) { + cfg, err := parseRedisConfig("redis", map[string]string{"location": "redis://secret@example.com"}) + if err != nil { + t.Fatal(err) + } + if cfg.username != "" || cfg.password != "secret" { + t.Fatalf("credentials = %q/%q", cfg.username, cfg.password) + } + if cfg.address != "example.com:6379" { + t.Fatalf("address = %q", cfg.address) + } +} + +func TestParseRedisConfigTLS(t *testing.T) { + cfg, err := parseRedisConfig("rediss", map[string]string{"location": "rediss://cache.example.com:6380"}) + if err != nil { + t.Fatal(err) + } + if !cfg.tls { + t.Fatal("expected TLS to be enabled") + } + if cfg.tlsServerName != "cache.example.com" { + t.Fatalf("tls server name = %q", cfg.tlsServerName) + } +} + +func TestFileURLPath(t *testing.T) { + u, err := url.Parse("redis+file:///tmp/redis/dump.rdb") + if err != nil { + t.Fatal(err) + } + if got := fileURLPath(u); got != filepath.FromSlash("/tmp/redis/dump.rdb") { + t.Fatalf("path = %q", got) + } +} + +func TestTargetPath(t *testing.T) { + got := targetPath(filepath.FromSlash("/tmp/redis/"), "/dump.rdb") + want := filepath.FromSlash("/tmp/redis/dump.rdb") + if got != want { + t.Fatalf("target = %q, want %q", got, want) + } + got = targetPath(filepath.FromSlash("/tmp/redis.rdb"), "/dump.rdb") + want = filepath.FromSlash("/tmp/redis.rdb") + if got != want { + t.Fatalf("target = %q, want %q", got, want) + } +} + +func TestCopyWithContext(t *testing.T) { + var dst bytes.Buffer + n, err := copyWithContext(context.Background(), &dst, bytes.NewBufferString("redis")) + if err != nil { + t.Fatal(err) + } + if n != 5 || dst.String() != "redis" { + t.Fatalf("copy = %d/%q", n, dst.String()) + } +} + +func TestSendCommand(t *testing.T) { + var buf bytes.Buffer + if err := sendCommand(&buf, "AUTH", "default", "secret"); err != nil { + t.Fatal(err) + } + want := "*3\r\n$4\r\nAUTH\r\n$7\r\ndefault\r\n$6\r\nsecret\r\n" + if buf.String() != want { + t.Fatalf("command = %q, want %q", buf.String(), want) + } +} + +func TestRDBReaderClosesConnection(t *testing.T) { + conn := &fakeConn{} + r := &rdbReader{conn: conn, reader: &io.LimitedReader{R: bytes.NewBufferString("abc"), N: 3}} + data, err := io.ReadAll(r) + if err != nil { + t.Fatal(err) + } + if string(data) != "abc" { + t.Fatalf("data = %q", data) + } + if err := r.Close(); err != nil { + t.Fatal(err) + } + if !conn.closed { + t.Fatal("close not called") + } +} + +type fakeConn struct{ closed bool } + +func (f *fakeConn) Read([]byte) (int, error) { return 0, io.EOF } +func (f *fakeConn) Write([]byte) (int, error) { return 0, io.ErrClosedPipe } +func (f *fakeConn) Close() error { f.closed = true; return nil } +func (f *fakeConn) LocalAddr() net.Addr { return netAddr("local") } +func (f *fakeConn) RemoteAddr() net.Addr { return netAddr("remote") } +func (f *fakeConn) SetDeadline(time.Time) error { return nil } +func (f *fakeConn) SetReadDeadline(time.Time) error { return nil } +func (f *fakeConn) SetWriteDeadline(time.Time) error { return nil } + +type netAddr string + +func (a netAddr) Network() string { return string(a) } +func (a netAddr) String() string { return string(a) } + +func TestOpenRDBWithFakeRedis(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + done := make(chan error, 1) + go func() { + conn, err := ln.Accept() + if err != nil { + done <- err + return + } + defer conn.Close() + br := bufio.NewReader(conn) + args, err := readTestCommand(br) + if err != nil { + done <- err + return + } + if strings.Join(args, " ") != "PSYNC ? -1" { + done <- io.ErrUnexpectedEOF + return + } + _, err = conn.Write([]byte("+FULLRESYNC 0000000000000000000000000000000000000000 0\r\n$5\r\nREDIS")) + done <- err + }() + + c := &connector{cfg: config{address: ln.Addr().String(), timeout: time.Second}, proto: "redis"} + r, err := c.openRDB(context.Background()) + if err != nil { + t.Fatal(err) + } + data, err := io.ReadAll(r) + if err != nil { + t.Fatal(err) + } + if err := r.Close(); err != nil { + t.Fatal(err) + } + if string(data) != "REDIS" { + t.Fatalf("RDB data = %q", data) + } + if err := <-done; err != nil { + t.Fatal(err) + } +} + +func readTestCommand(br *bufio.Reader) ([]string, error) { + line, err := br.ReadString('\n') + if err != nil { + return nil, err + } + line = strings.TrimSuffix(strings.TrimSuffix(line, "\n"), "\r") + n, err := strconv.Atoi(strings.TrimPrefix(line, "*")) + if err != nil { + return nil, err + } + args := make([]string, 0, n) + for i := 0; i < n; i++ { + line, err := br.ReadString('\n') + if err != nil { + return nil, err + } + line = strings.TrimSuffix(strings.TrimSuffix(line, "\n"), "\r") + size, err := strconv.Atoi(strings.TrimPrefix(line, "$")) + if err != nil { + return nil, err + } + buf := make([]byte, size+2) + if _, err := io.ReadFull(br, buf); err != nil { + return nil, err + } + args = append(args, string(buf[:size])) + } + return args, nil +} From ee01a68872d9fe44b6ff092516441680834a18f7 Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 3 Jun 2026 02:15:39 +0300 Subject: [PATCH 2/2] Detect truncated Redis dumps --- redis/redis.go | 20 ++++++++++++++++---- redis/redis_test.go | 10 +++++++++- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/redis/redis.go b/redis/redis.go index f8a6d2f..b1394aa 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -452,16 +452,28 @@ func (c *connector) openRDBWithCommand(ctx context.Context, args ...string) (io. } _ = conn.SetDeadline(time.Time{}) ok = true - return &rdbReader{conn: conn, reader: &io.LimitedReader{R: br, N: n}}, nil + return &rdbReader{conn: conn, reader: br, remaining: n}, nil } type rdbReader struct { - conn net.Conn - reader *io.LimitedReader + conn net.Conn + reader io.Reader + remaining int64 } func (r *rdbReader) Read(p []byte) (int, error) { - return r.reader.Read(p) + if r.remaining <= 0 { + return 0, io.EOF + } + if int64(len(p)) > r.remaining { + p = p[:r.remaining] + } + n, err := r.reader.Read(p) + r.remaining -= int64(n) + if err == io.EOF && r.remaining > 0 { + err = io.ErrUnexpectedEOF + } + return n, err } func (r *rdbReader) Close() error { diff --git a/redis/redis_test.go b/redis/redis_test.go index 22240cd..0bd306e 100644 --- a/redis/redis_test.go +++ b/redis/redis_test.go @@ -106,7 +106,7 @@ func TestSendCommand(t *testing.T) { func TestRDBReaderClosesConnection(t *testing.T) { conn := &fakeConn{} - r := &rdbReader{conn: conn, reader: &io.LimitedReader{R: bytes.NewBufferString("abc"), N: 3}} + r := &rdbReader{conn: conn, reader: bytes.NewBufferString("abc"), remaining: 3} data, err := io.ReadAll(r) if err != nil { t.Fatal(err) @@ -138,6 +138,14 @@ type netAddr string func (a netAddr) Network() string { return string(a) } func (a netAddr) String() string { return string(a) } +func TestRDBReaderReportsShortStream(t *testing.T) { + conn := &fakeConn{} + r := &rdbReader{conn: conn, reader: bytes.NewBufferString("ab"), remaining: 3} + _, err := io.ReadAll(r) + if err != io.ErrUnexpectedEOF { + t.Fatalf("err = %v, want %v", err, io.ErrUnexpectedEOF) + } +} func TestOpenRDBWithFakeRedis(t *testing.T) { ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil {