diff --git a/DEPS.bzl b/DEPS.bzl index d9b2cb31bd5a..cf50f6afb5fd 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -8747,10 +8747,10 @@ def go_deps(): ], build_file_proto_mode = "default", importpath = "go.etcd.io/etcd/raft/v3", - sha256 = "62faedd81e10061a4e0d7476865a62b84121ea462514afeaa1b9d66cc53b5a4b", - strip_prefix = "go.etcd.io/etcd/raft/v3@v3.0.0-20210320072418-e51c697ec6e8", + sha256 = "eb8a7668eab08f0a7845ba34c7de6b00e7caf83eb26b67dd79366c8302c38b50", + strip_prefix = "github.com/nvanbenschoten/etcd/raft/v3@v3.0.0-20220826205122-68e4f4f51143", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/raft/v3/io_etcd_go_etcd_raft_v3-v3.0.0-20210320072418-e51c697ec6e8.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nvanbenschoten/etcd/raft/v3/com_github_nvanbenschoten_etcd_raft_v3-v3.0.0-20220826205122-68e4f4f51143.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index a4173cded00e..a0c29495a60e 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -643,6 +643,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nightlyone/lockfile/com_github_nightlyone_lockfile-v1.0.0.zip": "0abd22d55b704c18426167732414806b2a70d99bce65fa9f943cb88c185689ad", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nishanths/predeclared/com_github_nishanths_predeclared-v0.0.0-20200524104333-86fad755b4d3.zip": "f3a40ab7d3e0570570e7bc41a6cc7b08b3e23df5ef5f08553ef622a3752d6e03", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nkovacs/streamquote/com_github_nkovacs_streamquote-v0.0.0-20170412213628-49af9bddb229.zip": "679a789b4b1409ea81054cb12e5f8441199f5fb17d4a2d3510c51f3aa5f3f0cc", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nvanbenschoten/etcd/raft/v3/com_github_nvanbenschoten_etcd_raft_v3-v3.0.0-20220826205122-68e4f4f51143.zip": "eb8a7668eab08f0a7845ba34c7de6b00e7caf83eb26b67dd79366c8302c38b50", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/nxadm/tail/com_github_nxadm_tail-v1.4.4.zip": "c9bb9d05b3afd1bacc35e7d305a22b07cd7db38f5fabd4ccd95a9227c5709890", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/oklog/oklog/com_github_oklog_oklog-v0.3.2.zip": "b37d032de5b0dd5e96063c06b77fcb29a692a07bd52a4d99a361f2fef68822ec", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/oklog/run/com_github_oklog_run-v1.1.0.zip": "d6f69fc71aa155043f926c2a98fc1e5b3a8ebab422f2f36d785cfba38a7ebee4", @@ -838,7 +839,6 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/client/v2/io_etcd_go_etcd_client_v2-v2.305.0.zip": "91fcb507fe8c193844b56bfb6c8741aaeb6ffa11ee9043de2af0f141173679f3", "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/io_etcd_go_etcd-v0.5.0-alpha.5.0.20200910180754-dd1b699fc489.zip": "d982ee501979b41b68625693bad77d15e4ae79ab9d0eae5f6028205f96a74e49", "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/pkg/v3/io_etcd_go_etcd_pkg_v3-v3.0.0-20201109164711-01844fd28560.zip": "1700dfed48becf82ccfe6865fe59daac2121d48f60b7c4bf090f0ff2320d33d4", - "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/raft/v3/io_etcd_go_etcd_raft_v3-v3.0.0-20210320072418-e51c697ec6e8.zip": "62faedd81e10061a4e0d7476865a62b84121ea462514afeaa1b9d66cc53b5a4b", "https://storage.googleapis.com/cockroach-godeps/gomod/go.mongodb.org/mongo-driver/org_mongodb_go_mongo_driver-v1.5.1.zip": "446cff132e82c64af7ffcf48e268eb16ec81f694914aa6baecb06cbbae1be0d7", "https://storage.googleapis.com/cockroach-godeps/gomod/go.mozilla.org/pkcs7/org_mozilla_go_pkcs7-v0.0.0-20200128120323-432b2356ecb1.zip": "3c4c1667907ff3127e371d44696326bad9e965216d4257917ae28e8b82a9e08d", "https://storage.googleapis.com/cockroach-godeps/gomod/go.opencensus.io/io_opencensus_go-v0.23.0.zip": "81c78beb84872084d6d5ddc0a0bffc47294412898472c891a29cfcb66f3fa2d8", diff --git a/go.mod b/go.mod index 4e59392db834..a1a699eac8c1 100644 --- a/go.mod +++ b/go.mod @@ -125,6 +125,7 @@ require ( github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 github.com/pierrre/geohash v1.0.0 github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 + github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 github.com/pressly/goose/v3 v3.5.3 github.com/prometheus/client_golang v1.12.0 @@ -147,7 +148,7 @@ require ( github.com/xdg-go/scram v1.0.2 github.com/xdg-go/stringprep v1.0.2 github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292 - go.etcd.io/etcd/raft/v3 v3.0.0-20210320072418-e51c697ec6e8 + go.etcd.io/etcd/raft/v3 v3.5.4 go.opentelemetry.io/otel v1.0.0-RC3 go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC3 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.0.0-RC3 @@ -296,7 +297,6 @@ require ( github.com/openzipkin/zipkin-go v0.2.5 // indirect github.com/pelletier/go-toml v1.9.3 // indirect github.com/pierrec/lz4 v2.6.0+incompatible // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pkg/profile v1.6.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/pquerna/cachecontrol v0.0.0-20200921180117-858c6e7e6b7e // indirect @@ -366,6 +366,8 @@ replace gopkg.in/yaml.v2 => github.com/cockroachdb/yaml v0.0.0-20210825132133-2d replace github.com/knz/go-libedit => github.com/otan-cockroach/go-libedit v1.10.2-0.20201030151939-7cced08450e7 +replace go.etcd.io/etcd/raft/v3 => github.com/nvanbenschoten/etcd/raft/v3 v3.0.0-20220826205122-68e4f4f51143 + // At the time of writing (i.e. as of this version below) the `etcd` repo is in the process of properly introducing // modules, and as part of that uses an unsatisfiable version for this dependency (v3.0.0-00010101000000-000000000000). // We just force it to the same SHA as the `go.etcd.io/etcd/raft/v3` module (they live in the same VCS root). diff --git a/go.sum b/go.sum index 973c1c7ba756..b66d7193bde7 100644 --- a/go.sum +++ b/go.sum @@ -1736,6 +1736,8 @@ github.com/nightlyone/lockfile v1.0.0 h1:RHep2cFKK4PonZJDdEl4GmkabuhbsRMgk/k3uAm github.com/nightlyone/lockfile v1.0.0/go.mod h1:rywoIealpdNse2r832aiD9jRk8ErCatROs6LzC841CI= github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso= github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= +github.com/nvanbenschoten/etcd/raft/v3 v3.0.0-20220826205122-68e4f4f51143 h1:pe+4QxsScT+K8cpDOJEF8olYpNc0LQI4X1Wk1xruJTw= +github.com/nvanbenschoten/etcd/raft/v3 v3.0.0-20220826205122-68e4f4f51143/go.mod h1:i+srOieUHQl4y/EwlGOpuYtoKG7nb2uhtA/hrFsFTsc= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= @@ -2244,8 +2246,6 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3 go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ= go.etcd.io/etcd/pkg/v3 v3.0.0-20201109164711-01844fd28560 h1:U/PIBuOTa8JXLPKF81Xh7xhIjA0jbpyqFWUPIiT4Ilc= go.etcd.io/etcd/pkg/v3 v3.0.0-20201109164711-01844fd28560/go.mod h1:0HiXlybqS+XtfgnNkiEZWwGXYYEhWsWL8fDVdZzb7is= -go.etcd.io/etcd/raft/v3 v3.0.0-20210320072418-e51c697ec6e8 h1:aRP8pJvbOsFy8SaZ0tcWeV4RYTlp8ZIWS49usJjO4Ac= -go.etcd.io/etcd/raft/v3 v3.0.0-20210320072418-e51c697ec6e8/go.mod h1:i+srOieUHQl4y/EwlGOpuYtoKG7nb2uhtA/hrFsFTsc= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 4427a90cf0e8..abcd486d5fb5 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1126,6 +1126,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/protectedts:protectedts_test", "//pkg/kv/kvserver/raftentry:raftentry", "//pkg/kv/kvserver/raftentry:raftentry_test", + "//pkg/kv/kvserver/raftlog:raftlog", "//pkg/kv/kvserver/raftutil:raftutil", "//pkg/kv/kvserver/raftutil:raftutil_test", "//pkg/kv/kvserver/rangefeed:rangefeed", @@ -2382,6 +2383,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/protectedts/ptstorage:get_x_data", "//pkg/kv/kvserver/protectedts/ptutil:get_x_data", "//pkg/kv/kvserver/raftentry:get_x_data", + "//pkg/kv/kvserver/raftlog:get_x_data", "//pkg/kv/kvserver/raftutil:get_x_data", "//pkg/kv/kvserver/rangefeed:get_x_data", "//pkg/kv/kvserver/rditer:get_x_data", diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 6986e9a4182a..e673bd387e85 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -59,6 +59,7 @@ go_library( "replica_raft.go", "replica_raft_overload.go", "replica_raft_quiesce.go", + "replica_raftlog_writer.go", "replica_raftstorage.go", "replica_range_lease.go", "replica_rangefeed.go", @@ -139,6 +140,7 @@ go_library( "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/raftentry", + "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", diff --git a/pkg/kv/kvserver/raftlog/BUILD.bazel b/pkg/kv/kvserver/raftlog/BUILD.bazel new file mode 100644 index 000000000000..96fd7f02e422 --- /dev/null +++ b/pkg/kv/kvserver/raftlog/BUILD.bazel @@ -0,0 +1,23 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "raftlog", + srcs = ["writer.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv/kvserver/stateloader", + "//pkg/roachpb", + "//pkg/storage", + "//pkg/storage/enginepb", + "//pkg/util/hlc", + "//pkg/util/stop", + "//pkg/util/syncutil", + "@io_etcd_go_etcd_raft_v3//:raft", + "@io_etcd_go_etcd_raft_v3//raftpb", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/raftlog/writer.go b/pkg/kv/kvserver/raftlog/writer.go new file mode 100644 index 000000000000..5791e0916a11 --- /dev/null +++ b/pkg/kv/kvserver/raftlog/writer.go @@ -0,0 +1,466 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package raftlog + +import ( + "context" + "sort" + "sync" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +// Writer is responsible for performing log writes to a collection of replicas' +// raft logs. It exposes an asynchronous interface so that replicas can enqueue +// log writes without waiting for their completion. Instead, completion is +// signalled using a callback interface. +type Writer struct { + eng storage.Engine + cache RaftEntryCache + shards []writerShard + stopped int32 +} + +// writerShard is responsible for a subset of ranges, sharded by range ID. +type writerShard struct { + w *Writer + eventsMu syncutil.Mutex + eventsCond sync.Cond + events []event +} + +// event is a union of different event types that the Writer goroutines needs +// to be informed of. It is used so that all events can be sent over the same +// channel, which is necessary to prevent reordering. +type event struct { + app appendEvent + syncC chan struct{} +} + +type appendEvent struct { + rr RaftRange + rangeID roachpb.RangeID + entries []raftpb.Entry + hardState raftpb.HardState +} + +func NewWriter(eng storage.Engine, cache RaftEntryCache) *Writer { + // NOTE: The optimal number of shards is currently 1. With any value greater + // than one, most calls to sync end up waiting for another shard to complete a + // sync before they are able to sync, so average end-to-end latency doubles. + // + // This demonstrates a limitation of the Pebble sync API, which mandates that + // some goroutine wait on a sync to be notified of durability. An asynchronous + // variant of Pebble's API (similar to Pebble's own syncQueue) that is hooked + // directly into Pebble's LogWriter.flushLoop would help. + const shards = 1 + w := &Writer{ + eng: eng, + cache: cache, + shards: make([]writerShard, shards), + } + for i := range w.shards { + s := &w.shards[i] + s.w = w + s.eventsCond.L = &s.eventsMu + } + return w +} + +func (w *Writer) Start(stopper *stop.Stopper) { + ctx := context.Background() + waitQuiesce := func(context.Context) { + <-stopper.ShouldQuiesce() + w.stop() + } + // TODO: hook up to scheduler. + //_ = stopper.RunAsyncTaskEx(ctx, + // stop.TaskOpts{ + // TaskName: "raftlog-writer-wait-quiesce", + // // This task doesn't reference a parent because it runs for the server's + // // lifetime. + // SpanOpt: stop.SterileRootSpan, + // }, + // waitQuiesce) + // + //for i := 0; i < len(w.pipeline.stages); i++ { + // _ = stopper.RunAsyncTaskEx(ctx, + // stop.TaskOpts{ + // TaskName: "raftlog-writer-worker", + // // This task doesn't reference a parent because it runs for the server's + // // lifetime. + // SpanOpt: stop.SterileRootSpan, + // }, + // w.writerLoop) + //} + go waitQuiesce(ctx) + for i := range w.shards { + s := &w.shards[i] + go s.writerLoop(ctx) + } +} + +func (w *Writer) pushEvent(rangeID roachpb.RangeID, ev event) { + shard := &w.shards[int(rangeID)%len(w.shards)] + shard.eventsMu.Lock() + wasEmpty := len(shard.events) == 0 + shard.events = append(shard.events, ev) + shard.eventsMu.Unlock() + if wasEmpty { + shard.eventsCond.Signal() + } +} + +func (w *Writer) Append(rangeID roachpb.RangeID, rr RaftRange, rd raft.Ready) { + w.pushEvent(rangeID, event{app: appendEvent{ + rr: rr, + rangeID: rangeID, + entries: rd.Entries, + hardState: rd.HardState, + }}) +} + +func (w *Writer) Sync(rangeID roachpb.RangeID) { + ch := make(chan struct{}) + w.pushEvent(rangeID, event{syncC: ch}) + // TODO: handle shutdown? + <-ch +} + +func (s *writerShard) writerLoop(ctx context.Context) { + var recycled []event + var rangeIDs []roachpb.RangeID + workByRangeID := make(map[roachpb.RangeID]work) + for { + events, ok := s.waitForEvents(recycled) + if !ok { + return + } + appends, syncs := s.splitEvents(events) + + s.prepareAppends(appends, &rangeIDs, workByRangeID) + batch := s.w.eng.NewUnindexedBatch(false /* writeOnly */) + s.stageAppends(ctx, rangeIDs, workByRangeID, batch) + s.commitAppends(rangeIDs, workByRangeID, batch) + s.processSyncEvents(syncs) + + // Recycle data structures. + for i := range events { + events[i] = event{} + } + recycled = events[:0] + rangeIDs = rangeIDs[:0] + for i := range workByRangeID { + delete(workByRangeID, i) + } + } +} + +func (s *writerShard) waitForEvents(recycled []event) ([]event, bool) { + s.eventsMu.Lock() + defer s.eventsMu.Unlock() + for { + if s.w.isStopped() { + return nil, false + } + if len(s.events) > 0 { + events := s.events + s.events = recycled + return events, true + } + s.eventsCond.Wait() + } +} + +func (s *writerShard) splitEvents(events []event) (appends, syncs []event) { + // Stable sort, append up front, by range ID, then sync. + sort.SliceStable(events, func(i, j int) bool { + // Sync events sort last. + if events[i].syncC != nil { + return false + } + if events[j].syncC != nil { + return true + } + + // Append events sort by range ID. + return events[i].app.rangeID < events[j].app.rangeID + }) + i := sort.Search(len(events), func(i int) bool { + return events[i].syncC != nil + }) + return events[:i], events[i:] +} + +type work struct { + entSlices []event + meta RaftLogMetadata +} + +func (s *writerShard) prepareAppends( + appends []event, rangeIDs *[]roachpb.RangeID, workByRangeID map[roachpb.RangeID]work, +) { + for i := 0; i < len(appends); { + rangeID := appends[i].app.rangeID + j := i + 1 + for j < len(appends) { + if appends[j].app.rangeID != rangeID { + break + } + j++ + } + *rangeIDs = append(*rangeIDs, rangeID) + workByRangeID[rangeID] = work{ + entSlices: appends[i:j], + meta: appends[i].app.rr.GetRaftLogMetadata(), + } + i = j + } + + // Remove duplicate entries. Entries with an index from later batches replace + // entries with the same index from earlier batches. + for _, rangeID := range *rangeIDs { + entrySlices := workByRangeID[rangeID].entSlices + for i := len(entrySlices) - 1; i > 0; i-- { + laterEnts := entrySlices[i].app.entries + if len(laterEnts) == 0 { + continue + } + for j := i - 1; j >= 0; j-- { + earlierEnts := entrySlices[j].app.entries + if len(earlierEnts) == 0 { + continue + } + idxOffset := int(laterEnts[0].Index) - int(earlierEnts[0].Index) + if idxOffset <= 0 { + entrySlices[j].app.entries = nil + continue + } + if idxOffset < len(earlierEnts) { + entrySlices[j].app.entries = entrySlices[j].app.entries[:idxOffset] + } + break + } + } + } +} + +func (s *writerShard) stageAppends( + ctx context.Context, + rangeIDs []roachpb.RangeID, + workByRangeID map[roachpb.RangeID]work, + batch storage.Batch, +) { + for _, rangeID := range rangeIDs { + rangeWork := workByRangeID[rangeID] + for _, app := range rangeWork.entSlices { + var err error + rangeWork.meta, err = s.processPreAppend(ctx, &app.app, rangeWork.meta, batch) + if err != nil { + panic(err) + } + } + workByRangeID[rangeID] = rangeWork + } +} + +func (s *writerShard) processPreAppend( + ctx context.Context, app *appendEvent, meta RaftLogMetadata, batch storage.Batch, +) (RaftLogMetadata, error) { + if !raft.IsEmptyHardState(app.hardState) { + // NB: Note that without additional safeguards, it's incorrect to write + // the HardState before appending rd.Entries. When catching up, a follower + // will receive Entries that are immediately Committed in the same + // Ready. If we persist the HardState but happen to lose the Entries, + // assertions can be tripped. + // + // We have both in the same batch, so there's no problem. If that ever + // changes, we must write and sync the Entries before the HardState. + if err := app.rr.StateLoader().SetHardState(ctx, batch, app.hardState); err != nil { + return RaftLogMetadata{}, err + } + } + + thinEntries, sideLoadedSize, err := app.rr.MaybeSideloadEntries(ctx, app.entries) + if err != nil { + return RaftLogMetadata{}, err + } + meta.LogSize += sideLoadedSize + + meta.LastIndex, meta.LastTerm, meta.LogSize, err = appendEntries( + ctx, batch, app.rr.StateLoader(), meta.LastIndex, meta.LastTerm, meta.LogSize, thinEntries) + if err != nil { + return RaftLogMetadata{}, err + } + + // Update raft log entry cache. We clear any older, uncommitted log entries + // and cache the latest ones. + s.w.cache.Add(app.rangeID, app.entries, true /* truncate */) + + return meta, nil +} + +func (s *writerShard) commitAppends( + rangeIDs []roachpb.RangeID, workByRangeID map[roachpb.RangeID]work, batch storage.Batch, +) { + if err := batch.Commit(true); err != nil { + panic(err) + } + batch.Close() + + // Notify each range about the sync. + for _, rangeID := range rangeIDs { + rangeWork := workByRangeID[rangeID] + for i := len(rangeWork.entSlices) - 1; i >= 0; i-- { + app := &rangeWork.entSlices[len(rangeWork.entSlices)-1].app + if len(app.entries) > 0 { + rd := raft.Ready{Entries: app.entries} + app.rr.LogStableTo(rd, rangeWork.meta) + break + } + } + } +} + +// TODO: how does this work? +//func (w *Writer) processPostAppend(app *appendEvent) { +// //// We may have just overwritten parts of the log which contain +// //// sideloaded SSTables from a previous term (and perhaps discarded some +// //// entries that we didn't overwrite). Remove any such leftover on-disk +// //// payloads (we can do that now because we've committed the deletion +// //// just above). +// //firstPurge := app.entries[0].Index // first new entry written +// //purgeTerm := app.entries[0].Term - 1 +// //lastPurge := prevLastIndex // old end of the log, include in deletion +// //purgedSize, err := app.rr.MaybePurgeSideloaded(ctx, firstPurge, lastPurge, purgeTerm) +// //if err != nil { +// // return RaftLogMetadata{}, err +// //} +// //meta.LogSize -= purgedSize +// +// // Update raft log entry cache. We clear any older, uncommitted log entries +// // and cache the latest ones. +// w.cache.Add(app.rangeID, app.entries, true /* truncate */) +// +// return +//} + +func (s *writerShard) processSyncEvents(events []event) { + for i := range events { + if c := events[i].syncC; c != nil { + close(c) + } + } +} + +func (w *Writer) stop() { + atomic.StoreInt32(&w.stopped, 1) + for i := range w.shards { + w.shards[i].eventsCond.Broadcast() + } +} + +func (w *Writer) isStopped() bool { + return atomic.LoadInt32(&w.stopped) > 0 +} + +type RaftLogMetadata struct { + LastIndex uint64 + LastTerm uint64 + LogSize int64 +} + +// RaftRange is a handle to a Replica. +type RaftRange interface { + StateLoader() stateloader.StateLoader + GetRaftLogMetadata() RaftLogMetadata + LogStableTo(raft.Ready, RaftLogMetadata) + MaybeSideloadEntries(context.Context, []raftpb.Entry) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) + //MaybePurgeSideloaded(_ context.Context, firstIndex, LastIndex, term uint64) (size int64, _ error) +} + +// RaftEntryCache is a specialized data structure for storing deserialized +// raftpb.Entry values tailored to the access patterns of the storage package. +type RaftEntryCache interface { + Add(id roachpb.RangeID, ents []raftpb.Entry, truncate bool) +} + +// append the given entries to the raft log. Takes the previous values of +// r.mu.LastIndex, r.mu.LastTerm, and r.mu.LogSize, and returns new values. +// We do this rather than modifying them directly because these modifications +// need to be atomic with the commit of the batch. This method requires that +// r.raftMu is held. +// +// append is intentionally oblivious to the existence of sideloaded proposals. +// They are managed by the caller, including cleaning up obsolete on-disk +// payloads in case the log tail is replaced. +func appendEntries( + ctx context.Context, + batch storage.Batch, + stateLoader stateloader.StateLoader, + prevLastIndex uint64, + prevLastTerm uint64, + prevRaftLogSize int64, + entries []raftpb.Entry, +) (uint64, uint64, int64, error) { + if len(entries) == 0 { + return prevLastIndex, prevLastTerm, prevRaftLogSize, nil + } + prefix := stateLoader.RaftLogPrefix() + var diff enginepb.MVCCStats + var value roachpb.Value + for i := range entries { + ent := &entries[i] + key := keys.RaftLogKeyFromPrefix(prefix, ent.Index) + + if err := value.SetProto(ent); err != nil { + return 0, 0, 0, err + } + value.InitChecksum(key) + var err error + if ent.Index > prevLastIndex { + err = storage.MVCCBlindPut(ctx, batch, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */) + } else { + err = storage.MVCCPut(ctx, batch, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */) + } + if err != nil { + return 0, 0, 0, err + } + } + + lastIndex := entries[len(entries)-1].Index + lastTerm := entries[len(entries)-1].Term + // Delete any previously appended log entries which never committed. + if prevLastIndex > 0 { + for i := lastIndex + 1; i <= prevLastIndex; i++ { + // Note that the caller is in charge of deleting any sideloaded payloads + // (which they must only do *after* the batch has committed). + key := keys.RaftLogKeyFromPrefix(prefix, i) + _, err := storage.MVCCDelete(ctx, batch, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil) + if err != nil { + return 0, 0, 0, err + } + } + } + + raftLogSize := prevRaftLogSize + diff.SysBytes + return lastIndex, lastTerm, raftLogSize, nil +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 5617a9d19add..48821695fb2f 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -327,6 +327,9 @@ type Replica struct { // range descriptor. When it is temporarily dropped and recreated, the // newly recreated replica will have a complete range descriptor. lastToReplica, lastFromReplica roachpb.ReplicaDescriptor + + logWriterStateLoader stateloader.StateLoader + logWriterReady raft.Ready } // Contains the lease history when enabled. diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index ba9518b08d3f..8c5276614510 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -90,6 +90,10 @@ func (r *Replica) preDestroyRaftMuLocked( log.Fatalf(ctx, "replica not marked as destroyed before call to preDestroyRaftMuLocked: %v", r) } + // Sync the async raft log writer so that nothing new will be written to the + // range-local keyspace after we clear the range's data. + r.store.raftLogWriter.Sync(r.RangeID) + err := clearRangeData(desc, reader, writer, clearRangeIDLocalOnly, mustUseClearRange) if err != nil { return err diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 18555e0761b1..0de71fd04cd2 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -123,6 +123,7 @@ func newUnloadedReplica( // replica GC issues, but is a distraction at the moment. // r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r))) r.raftMu.stateLoader = stateloader.Make(desc.RangeID) + r.raftMu.logWriterStateLoader = stateloader.Make(desc.RangeID) r.splitQueueThrottle = util.Every(splitQueueThrottleDuration) r.mergeQueueThrottle = util.Every(mergeQueueThrottleDuration) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index cb5aa863c4d9..9a4ea09ef74c 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -689,11 +689,14 @@ func (r *Replica) handleRaftReadyRaftMuLocked( var rd raft.Ready r.mu.Lock() lastIndex := r.mu.lastIndex // used for append below - lastTerm := r.mu.lastTerm - raftLogSize := r.mu.raftLogSize leaderID := r.mu.leaderID lastLeaderID := leaderID err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { + if len(r.raftMu.logWriterReady.Entries) != 0 { + raftGroup.StableTo(r.raftMu.logWriterReady) + r.raftMu.logWriterReady = raft.Ready{} + } + numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup) if err != nil { return false, err @@ -773,6 +776,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked( log.Fatalf(ctx, "incoming snapshot id doesn't match raft snapshot id: %s != %s", snapUUID, inSnap.SnapUUID) } + r.store.raftLogWriter.Sync(r.RangeID) + // Applying this snapshot may require us to subsume one or more of our right // neighbors. This occurs if this replica is informed about the merges via a // Raft snapshot instead of a MsgApp containing the merge commits, e.g., @@ -789,15 +794,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( stats.tSnapEnd = timeutil.Now() stats.snap.applied = true - // r.mu.lastIndex, r.mu.lastTerm and r.mu.raftLogSize were updated in - // applySnapshot, but we also want to make sure we reflect these changes in - // the local variables we're tracking here. - r.mu.RLock() - lastIndex = r.mu.lastIndex - lastTerm = r.mu.lastTerm - raftLogSize = r.mu.raftLogSize - r.mu.RUnlock() - // We refresh pending commands after applying a snapshot because this // replica may have been temporarily partitioned from the Raft group and // missed leadership changes that occurred. Suppose node A is the leader, @@ -906,111 +902,22 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // durably synced. // See: // https://github.com/etcd-io/etcd/issues/7625#issuecomment-489232411 + // TODO: remove this splitting logic. It's not needed anymore. msgApps, otherMsgs := splitMsgApps(rd.Messages) r.traceMessageSends(msgApps, "sending msgApp") r.sendRaftMessagesRaftMuLocked(ctx, msgApps, pausedFollowers) - // Use a more efficient write-only batch because we don't need to do any - // reads from the batch. Any reads are performed on the underlying DB. - batch := r.store.Engine().NewUnindexedBatch(false /* writeOnly */) - defer batch.Close() - - prevLastIndex := lastIndex - if len(rd.Entries) > 0 { - stats.tAppendBegin = timeutil.Now() - // All of the entries are appended to distinct keys, returning a new - // last index. - thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := r.maybeSideloadEntriesRaftMuLocked(ctx, rd.Entries) - if err != nil { - const expl = "during sideloading" - return stats, expl, errors.Wrap(err, expl) - } - raftLogSize += sideLoadedEntriesSize - if lastIndex, lastTerm, raftLogSize, err = r.append( - ctx, batch, lastIndex, lastTerm, raftLogSize, thinEntries, - ); err != nil { - const expl = "during append" - return stats, expl, errors.Wrap(err, expl) - } - stats.appendedRegularCount += len(thinEntries) - numSideloaded - stats.appendedRegularBytes += otherEntriesSize - stats.appendedSideloadedCount += numSideloaded - stats.appendedSideloadedBytes += sideLoadedEntriesSize - stats.tAppendEnd = timeutil.Now() - } - - if !raft.IsEmptyHardState(rd.HardState) { - if !r.IsInitialized() && rd.HardState.Commit != 0 { - log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState) - } - // NB: Note that without additional safeguards, it's incorrect to write - // the HardState before appending rd.Entries. When catching up, a follower - // will receive Entries that are immediately Committed in the same - // Ready. If we persist the HardState but happen to lose the Entries, - // assertions can be tripped. - // - // We have both in the same batch, so there's no problem. If that ever - // changes, we must write and sync the Entries before the HardState. - if err := r.raftMu.stateLoader.SetHardState(ctx, batch, rd.HardState); err != nil { - const expl = "during setHardState" - return stats, expl, errors.Wrap(err, expl) - } - } - // Synchronously commit the batch with the Raft log entries and Raft hard - // state as we're promising not to lose this data. - // - // Note that the data is visible to other goroutines before it is synced to - // disk. This is fine. The important constraints are that these syncs happen - // before Raft messages are sent and before the call to RawNode.Advance. Our - // regular locking is sufficient for this and if other goroutines can see the - // data early, that's fine. In particular, snapshots are not a problem (I - // think they're the only thing that might access log entries or HardState - // from other goroutines). Snapshots do not include either the HardState or - // uncommitted log entries, and even if they did include log entries that - // were not persisted to disk, it wouldn't be a problem because raft does not - // infer the that entries are persisted on the node that sends a snapshot. - stats.tPebbleCommitBegin = timeutil.Now() - stats.pebbleBatchBytes = int64(batch.Len()) - sync := rd.MustSync && !disableSyncRaftLog.Get(&r.store.cfg.Settings.SV) - if err := batch.Commit(sync); err != nil { - const expl = "while committing batch" - return stats, expl, errors.Wrap(err, expl) - } - stats.sync = sync - stats.tPebbleCommitEnd = timeutil.Now() - if rd.MustSync { - r.store.metrics.RaftLogCommitLatency.RecordValue( - stats.tPebbleCommitEnd.Sub(stats.tPebbleCommitBegin).Nanoseconds()) - } - - if len(rd.Entries) > 0 { - // We may have just overwritten parts of the log which contain - // sideloaded SSTables from a previous term (and perhaps discarded some - // entries that we didn't overwrite). Remove any such leftover on-disk - // payloads (we can do that now because we've committed the deletion - // just above). - firstPurge := rd.Entries[0].Index // first new entry written - purgeTerm := rd.Entries[0].Term - 1 - lastPurge := prevLastIndex // old end of the log, include in deletion - purgedSize, err := maybePurgeSideloaded(ctx, r.raftMu.sideloaded, firstPurge, lastPurge, purgeTerm) - if err != nil { - const expl = "while purging sideloaded storage" - return stats, expl, err - } - raftLogSize -= purgedSize - if raftLogSize < 0 { - // Might have gone negative if node was recently restarted. - raftLogSize = 0 + if len(rd.Entries) > 0 || !raft.IsEmptyHardState(rd.HardState) { + r.store.raftLogWriter.Append(r.RangeID, (*replicaRaftLog)(r), rd) + if rd.MustSync { + r.store.raftLogWriter.Sync(r.RangeID) } } // Update protected state - last index, last term, raft log size, and raft // leader ID. r.mu.Lock() - r.mu.lastIndex = lastIndex - r.mu.lastTerm = lastTerm - r.mu.raftLogSize = raftLogSize var becameLeader bool if r.mu.leaderID != leaderID { r.mu.leaderID = leaderID @@ -1027,9 +934,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.replicateQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) } - // Update raft log entry cache. We clear any older, uncommitted log entries - // and cache the latest ones. - r.store.raftEntryCache.Add(r.RangeID, rd.Entries, true /* truncate */) r.sendRaftMessagesRaftMuLocked(ctx, otherMsgs, nil /* blocked */) r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") diff --git a/pkg/kv/kvserver/replica_raftlog_writer.go b/pkg/kv/kvserver/replica_raftlog_writer.go new file mode 100644 index 000000000000..485017d10330 --- /dev/null +++ b/pkg/kv/kvserver/replica_raftlog_writer.go @@ -0,0 +1,55 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +type replicaRaftLog Replica + +var _ raftlog.RaftRange = &replicaRaftLog{} + +func (r *replicaRaftLog) StateLoader() stateloader.StateLoader { + return r.raftMu.logWriterStateLoader +} + +func (r *replicaRaftLog) MaybeSideloadEntries( + ctx context.Context, entries []raftpb.Entry, +) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) { + thinEntries, _, sideloadedEntriesSize, _, err := maybeSideloadEntriesImpl(ctx, entries, r.raftMu.sideloaded) + return thinEntries, sideloadedEntriesSize, err +} + +func (r *replicaRaftLog) GetRaftLogMetadata() raftlog.RaftLogMetadata { + r.mu.RLock() + defer r.mu.RUnlock() + return raftlog.RaftLogMetadata{ + LastIndex: r.mu.lastIndex, + LastTerm: r.mu.lastTerm, + LogSize: r.mu.raftLogSize, + } +} + +func (r *replicaRaftLog) LogStableTo(rd raft.Ready, meta raftlog.RaftLogMetadata) { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.lastIndex = meta.LastIndex + r.mu.lastTerm = meta.LastTerm + r.mu.raftLogSize = meta.LogSize + r.raftMu.logWriterReady = rd + r.store.enqueueRaftUpdateCheck(r.RangeID) +} diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 788ae283ba35..2b018e6e4efb 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -635,84 +635,6 @@ func snapshot( }, nil } -// append the given entries to the raft log. Takes the previous values of -// r.mu.lastIndex, r.mu.lastTerm, and r.mu.raftLogSize, and returns new values. -// We do this rather than modifying them directly because these modifications -// need to be atomic with the commit of the batch. This method requires that -// r.raftMu is held. -// -// append is intentionally oblivious to the existence of sideloaded proposals. -// They are managed by the caller, including cleaning up obsolete on-disk -// payloads in case the log tail is replaced. -// -// NOTE: This method takes a engine.Writer because reads are unnecessary when -// prevLastIndex is 0 and prevLastTerm is invalidLastTerm. In the case where -// reading is necessary (I.E. entries are getting overwritten or deleted), a -// engine.ReadWriter must be passed in. -func (r *Replica) append( - ctx context.Context, - writer storage.Writer, - prevLastIndex uint64, - prevLastTerm uint64, - prevRaftLogSize int64, - entries []raftpb.Entry, -) (uint64, uint64, int64, error) { - if len(entries) == 0 { - return prevLastIndex, prevLastTerm, prevRaftLogSize, nil - } - prefix := r.raftMu.stateLoader.RaftLogPrefix() - var diff enginepb.MVCCStats - var value roachpb.Value - for i := range entries { - ent := &entries[i] - key := keys.RaftLogKeyFromPrefix(prefix, ent.Index) - - if err := value.SetProto(ent); err != nil { - return 0, 0, 0, err - } - value.InitChecksum(key) - var err error - if ent.Index > prevLastIndex { - err = storage.MVCCBlindPut(ctx, writer, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */) - } else { - // We type assert `writer` to also be an engine.ReadWriter only in - // the case where we're replacing existing entries. - eng, ok := writer.(storage.ReadWriter) - if !ok { - panic("expected writer to be a engine.ReadWriter when overwriting log entries") - } - err = storage.MVCCPut(ctx, eng, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */) - } - if err != nil { - return 0, 0, 0, err - } - } - - lastIndex := entries[len(entries)-1].Index - lastTerm := entries[len(entries)-1].Term - // Delete any previously appended log entries which never committed. - if prevLastIndex > 0 { - // We type assert `writer` to also be an engine.ReadWriter only in the - // case where we're deleting existing entries. - eng, ok := writer.(storage.ReadWriter) - if !ok { - panic("expected writer to be a engine.ReadWriter when deleting log entries") - } - for i := lastIndex + 1; i <= prevLastIndex; i++ { - // Note that the caller is in charge of deleting any sideloaded payloads - // (which they must only do *after* the batch has committed). - _, err := storage.MVCCDelete(ctx, eng, &diff, keys.RaftLogKeyFromPrefix(prefix, i), - hlc.Timestamp{}, hlc.ClockTimestamp{}, nil) - if err != nil { - return 0, 0, 0, err - } - } - } - - raftLogSize := prevRaftLogSize + diff.SysBytes - return lastIndex, lastTerm, raftLogSize, nil -} - // updateRangeInfo is called whenever a range is updated by ApplySnapshot // or is created by range splitting to setup the fields which are // uninitialized or need updating. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b69d7c639a22..5b43d0b1873b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" @@ -928,6 +929,8 @@ type Store struct { scheduler *raftScheduler + raftLogWriter *raftlog.Writer + // livenessMap is a map from nodeID to a bool indicating // liveness. It is updated periodically in raftTickLoop() // and reactively in nodeIsLiveCallback() on liveness updates. @@ -1237,6 +1240,8 @@ func NewStore( s.raftEntryCache = raftentry.NewCache(cfg.RaftEntryCacheSize) s.metrics.registry.AddMetricStruct(s.raftEntryCache.Metrics()) + s.raftLogWriter = raftlog.NewWriter(s.engine, s.raftEntryCache) + s.coalescedMu.Lock() s.coalescedMu.heartbeats = map[roachpb.StoreIdent][]kvserverpb.RaftHeartbeat{} s.coalescedMu.heartbeatResponses = map[roachpb.StoreIdent][]kvserverpb.RaftHeartbeat{} diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index eefd61f34f95..651a8b242872 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -709,6 +709,8 @@ func (s *Store) processRaft(ctx context.Context) { s.scheduler.Wait(ctx) } + s.raftLogWriter.Start(s.stopper) + _ = s.stopper.RunAsyncTask(ctx, "sched-tick-loop", s.raftTickLoop) _ = s.stopper.RunAsyncTask(ctx, "coalesced-hb-loop", s.coalescedHeartbeatsLoop) s.stopper.AddCloser(stop.CloserFn(func() { diff --git a/vendor b/vendor index de32bf616c1b..13d49285f530 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit de32bf616c1ba93bbcbecf6691698ed081d0c72b +Subproject commit 13d49285f530221a7c22d86809d0ab9dd28f1da9