Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/raftexample/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (rc *raftNode) replayWAL() *wal.WAL {
if err != nil {
log.Fatalf("raftexample: failed to read WAL (%v)", err)
}
rc.raftStorage = raft.NewMemoryStorage()
rc.raftStorage = raft.NewMemoryStorage(0)
if snapshot != nil {
rc.raftStorage.ApplySnapshot(*snapshot)
}
Expand Down
2 changes: 2 additions & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Config struct {
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapCount uint64 `json:"snapshot-count"`
SnapSize uint64 `json:"snapshot-size"`
AutoCompactionRetention int `json:"auto-compaction-retention"`
AutoCompactionMode string `json:"auto-compaction-mode"`

Expand Down Expand Up @@ -182,6 +183,7 @@ func NewConfig() *Config {
SnapCount: etcdserver.DefaultSnapCount,
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
SnapSize: etcdserver.DefaultSnapSize,
TickMs: 100,
ElectionMs: 1000,
LPUrls: []url.URL{*lpurl},
Expand Down
1 change: 1 addition & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapCount: cfg.SnapCount,
SnapSize: cfg.SnapSize,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func newConfig() *config {
fs.UintVar(&cfg.MaxWalFiles, "max-wals", cfg.MaxWalFiles, "Maximum number of wal files to retain (0 is unlimited).")
fs.StringVar(&cfg.Name, "name", cfg.Name, "Human-readable name for this member.")
fs.Uint64Var(&cfg.SnapCount, "snapshot-count", cfg.SnapCount, "Number of committed transactions to trigger a snapshot to disk.")
fs.Uint64Var(&cfg.SnapSize, "snapshot-size", cfg.SnapSize, "Size of committed transactions to trigger a snapshot to disk.")
fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ServerConfig struct {
// rather than the dataDir/member/wal.
DedicatedWALDir string
SnapCount uint64
SnapSize uint64
MaxSnapFiles uint
MaxWALFiles uint
InitialPeerURLsMap types.URLsMap
Expand Down Expand Up @@ -202,6 +203,7 @@ func (c *ServerConfig) print(initial bool) {
plog.Infof("heartbeat = %dms", c.TickMs)
plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs))
plog.Infof("snapshot count = %d", c.SnapCount)
plog.Infof("snapshot size = %d", c.SnapSize)
if len(c.DiscoveryURL) != 0 {
plog.Infof("discovery URL= %s", c.DiscoveryURL)
if len(c.DiscoveryProxy) != 0 {
Expand Down
6 changes: 3 additions & 3 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
}
id = member.ID
plog.Infof("starting member %s in cluster %s", id, cl.ID())
s = raft.NewMemoryStorage()
s = raft.NewMemoryStorage(cfg.SnapSize)
c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
Expand Down Expand Up @@ -429,7 +429,7 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := membership.NewCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
s := raft.NewMemoryStorage(cfg.SnapSize)
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
Expand Down Expand Up @@ -485,7 +485,7 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := membership.NewCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
s := raft.NewMemoryStorage(cfg.SnapSize)
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
Expand Down
4 changes: 2 additions & 2 deletions etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
r := newRaftNode(raftNodeConfig{
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{r: *r}
Expand All @@ -183,7 +183,7 @@ func TestConfgChangeBlocksApply(t *testing.T) {
r := newRaftNode(raftNodeConfig{
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{r: *r}
Expand Down
40 changes: 34 additions & 6 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (

const (
DefaultSnapCount = 100000
DefaultSnapSize = 0 // 0 means no size based compaction

StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1"
Expand Down Expand Up @@ -175,6 +176,9 @@ type EtcdServer struct {
readych chan struct{}
Cfg ServerConfig

snapCount uint64
snapSize uint64

w wait.Wait

readMu sync.RWMutex
Expand Down Expand Up @@ -406,6 +410,8 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
srv = &EtcdServer{
readych: make(chan struct{}),
Cfg: cfg,
snapCount: cfg.SnapCount,
snapSize: cfg.SnapSize,
errorc: make(chan error, 1),
store: st,
snapshotter: ss,
Expand Down Expand Up @@ -534,6 +540,10 @@ func (s *EtcdServer) start() {
plog.Infof("set snapshot count to default %d", DefaultSnapCount)
s.Cfg.SnapCount = DefaultSnapCount
}
if s.snapSize == 0 {
plog.Infof("set snapshot size to default %d", DefaultSnapSize)
s.snapSize = DefaultSnapSize
}
s.w = wait.New()
s.applyWait = wait.NewTimeList()
s.done = make(chan struct{})
Expand Down Expand Up @@ -914,8 +924,12 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
}
}

func (s *EtcdServer) shouldTriggerSnapshot(ep *etcdProgress) bool {
return s.snapCount < ep.appliedi-ep.snapi || s.r.raftStorage.ShouldCompactBySize()
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if ep.appliedi-ep.snapi <= s.Cfg.SnapCount {
if !s.shouldTriggerSnapshot(ep) {
return
}

Expand Down Expand Up @@ -1416,6 +1430,24 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
return false, nil
}

func (s *EtcdServer) getCompactIndex(snapi uint64) uint64 {
compacti := uint64(1)

if snapi > numberOfCatchUpEntries {
// keep some in memory log entries for slow followers.
compacti = snapi - numberOfCatchUpEntries
}

szCompacti := s.r.raftStorage.SizeBasedCompactIndex()
if compacti < szCompacti {
if snapi < szCompacti {
return snapi
}
return szCompacti
}
return compacti
}

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
clone := s.store.Clone()
Expand Down Expand Up @@ -1460,11 +1492,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > numberOfCatchUpEntries {
compacti = snapi - numberOfCatchUpEntries
}
compacti := s.getCompactIndex(snapi)
err = s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand Down
20 changes: 10 additions & 10 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestApplyRepeat(t *testing.T) {
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
Expand Down Expand Up @@ -679,7 +679,7 @@ func TestDoProposal(t *testing.T) {
r := newRaftNode(raftNodeConfig{
Node: newNodeCommitter(),
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
Expand Down Expand Up @@ -847,7 +847,7 @@ func TestSyncTrigger(t *testing.T) {
tk := &time.Ticker{C: st}
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
transport: rafthttp.NewNopTransporter(),
storage: mockstorage.NewStorageRecorder(""),
})
Expand Down Expand Up @@ -901,7 +901,7 @@ func TestSnapshot(t *testing.T) {
os.RemoveAll(tmpPath)
}()

s := raft.NewMemoryStorage()
s := raft.NewMemoryStorage(0)
s.Append([]raftpb.Entry{{Index: 1}})
st := mockstore.NewRecorderStream()
p := mockstorage.NewStorageRecorderStream("")
Expand Down Expand Up @@ -970,7 +970,7 @@ func TestSnapshotOrdering(t *testing.T) {
t.Fatalf("couldn't make snap dir (%v)", err)
}

rs := raft.NewMemoryStorage()
rs := raft.NewMemoryStorage(0)
p := mockstorage.NewStorageRecorderStream(testdir)
tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir)
r := newRaftNode(raftNodeConfig{
Expand Down Expand Up @@ -1037,7 +1037,7 @@ func TestTriggerSnap(t *testing.T) {
p := mockstorage.NewStorageRecorderStream("")
r := newRaftNode(raftNodeConfig{
Node: newNodeCommitter(),
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
storage: p,
transport: rafthttp.NewNopTransporter(),
})
Expand Down Expand Up @@ -1096,7 +1096,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
t.Fatalf("Couldn't make snap dir (%v)", err)
}

rs := raft.NewMemoryStorage()
rs := raft.NewMemoryStorage(0)
tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
r := newRaftNode(raftNodeConfig{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Expand Down Expand Up @@ -1185,7 +1185,7 @@ func TestAddMember(t *testing.T) {
cl.SetStore(st)
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
Expand Down Expand Up @@ -1226,7 +1226,7 @@ func TestRemoveMember(t *testing.T) {
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
Expand Down Expand Up @@ -1266,7 +1266,7 @@ func TestUpdateMember(t *testing.T) {
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
Expand Down
Loading