Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ type PolicyRunner struct {
done chan struct{}
path string // persistence path (~/.pilot/policy_<netID>.json)

// startStopMu serializes Start/Stop and guards `started`. Stop()
// before Start() must be a safe no-op: cycleLoop (which closes
// `done`) never runs in that case, so an unconditional `<-pr.done`
// would block forever.
startStopMu sync.Mutex
started bool
stopped bool

// fetchMembers backoff. Some networks are too large for the registry's
// list_nodes response to fit a single read window — fetchMembers EOFs
// every 5s tick and pounds the regConn mutex, which adds 5+ seconds
Expand All @@ -66,18 +74,34 @@ type policySnapshot struct {
CycleNum int `json:"cycle_num"`
}

// NewPolicyRunner creates a policy runner for a network with the given compiled policy.
func NewPolicyRunner(netID uint16, cp *CompiledPolicy, d Runtime) *PolicyRunner {
// State directory: PILOT_HOME env wins (lets parallel tests and
// alternate-deploy operators point at a per-instance path), else
// $HOME/.pilot — the prior default. Without the override every
// PolicyRunner for the same netID shared one JSON file on disk
// and parallel tests using t.Parallel raced through it.
// stateDir returns the directory holding persisted runner state.
//
// PILOT_HOME env wins (lets parallel tests and alternate-deploy
// operators point at a per-instance path), else $HOME/.pilot — the
// prior default. Without the override every PolicyRunner for the same
// netID shared one JSON file on disk and parallel tests using
// t.Parallel raced through it.
//
// NewPolicyRunner and Service.LoadPersisted MUST agree on this path —
// they previously disagreed (one honored PILOT_HOME, the other called
// os.UserHomeDir directly), so files written under PILOT_HOME were
// invisible to the scan and never re-applied.
func stateDir() string {
home := os.Getenv("PILOT_HOME")
if home == "" {
home, _ = os.UserHomeDir()
}
path := filepath.Join(home, ".pilot", fmt.Sprintf("policy_%d.json", netID))
return filepath.Join(home, ".pilot")
}

// statePathForNetwork is the persisted-state file for a single network.
func statePathForNetwork(netID uint16) string {
return filepath.Join(stateDir(), fmt.Sprintf("policy_%d.json", netID))
}

// NewPolicyRunner creates a policy runner for a network with the given compiled policy.
func NewPolicyRunner(netID uint16, cp *CompiledPolicy, d Runtime) *PolicyRunner {
path := statePathForNetwork(netID)

pr := &PolicyRunner{
netID: netID,
Expand All @@ -99,13 +123,32 @@ func NewPolicyRunner(netID uint16, cp *CompiledPolicy, d Runtime) *PolicyRunner
}

// Start begins the cycle loop if the policy has cycle rules.
// Calling Start after Stop is a no-op (a stopped runner stays stopped).
func (pr *PolicyRunner) Start() {
pr.startStopMu.Lock()
defer pr.startStopMu.Unlock()
if pr.started || pr.stopped {
return
}
pr.started = true
go pr.cycleLoop()
slog.Info("policy runner started", "network_id", pr.netID)
}

// Stop signals the cycle loop to exit and waits for it.
// Stop signals the cycle loop to exit and waits for it. Stop before
// Start is a safe no-op: cycleLoop never ran, so `done` would never be
// closed and waiting on it would deadlock.
func (pr *PolicyRunner) Stop() {
pr.startStopMu.Lock()
defer pr.startStopMu.Unlock()
if pr.stopped {
return
}
pr.stopped = true
if !pr.started {
// cycleLoop was never launched; nothing to signal or wait on.
return
}
select {
case <-pr.stopCh:
default:
Expand Down
74 changes: 57 additions & 17 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type Service struct {
// were passed to Start (e.g. unit tests that bypass the runtime).
subStop func()
subDone chan struct{}

// persistedNetworks holds the network IDs whose on-disk state was
// discovered by the last LoadPersisted call. Guarded by mu.
persistedNetworks map[uint16]struct{}
}

func NewService(runtime Runtime) *Service {
Expand Down Expand Up @@ -260,16 +264,25 @@ func (s *Service) stopInternal(netID uint16) {
}
}

// LoadPersisted scans ~/.pilot/policy_*.json and re-creates a runner
// for each. Called from daemon-Start after the registry connection is
// up. Each file's name is `policy_<netID>.json`; the contents are the
// policy JSON.
// LoadPersisted scans the state directory for policy_<netID>.json
// snapshots and records which networks have persisted state. Called
// from daemon-Start after the registry connection is up.
//
// The directory MUST be the same one NewPolicyRunner persists to —
// resolved via stateDir() so PILOT_HOME is honored consistently. (This
// previously called os.UserHomeDir directly, so when PILOT_HOME was set
// the scan looked in the wrong place and the persisted state was
// silently ignored.)
//
// A snapshot holds per-peer scores/history, not the compiled policy, so
// a runner can only be rebuilt once its network rejoins (handled by
// handleNetworkJoined -> startInternal -> NewPolicyRunner, whose load()
// re-applies the matching snapshot from the same dir). Here we validate
// each discovered file by unmarshaling it into a policySnapshot and
// remember the set of persisted network IDs so callers can tell which
// networks carry restorable state.
func (s *Service) LoadPersisted() error {
home, err := os.UserHomeDir()
if err != nil {
return err
}
dir := filepath.Join(home, ".pilot")
dir := stateDir()
entries, err := os.ReadDir(dir)
if err != nil {
// No directory yet — nothing to load.
Expand All @@ -278,22 +291,49 @@ func (s *Service) LoadPersisted() error {
}
return err
}
persisted := make(map[uint16]struct{})
for _, e := range entries {
name := e.Name()
if !strings.HasPrefix(name, "policy_") || !strings.HasSuffix(name, ".json") {
if e.IsDir() || !strings.HasPrefix(name, "policy_") || !strings.HasSuffix(name, ".json") {
continue
}
data, err := os.ReadFile(filepath.Join(dir, name))
if err != nil {
slog.Warn("policy: failed to read persisted state", "file", name, "err", err)
continue
}
if len(data) == 0 {
continue
}
// State files for individual runners are named
// `policy_<netID>.json`. Compiled policy bytes live separately
// (the persisted state holds peers/scores, not the compiled
// policy). LoadPersisted is invoked after the daemon's startup
// path that re-registers networks; runners get re-created via
// Start when networks rejoin. Nothing else to do here today.
_ = name
var snap policySnapshot
if err := json.Unmarshal(data, &snap); err != nil {
slog.Warn("policy: skipping malformed persisted state", "file", name, "err", err)
continue
}
persisted[snap.NetworkID] = struct{}{}
slog.Info("policy: discovered persisted state",
"network_id", snap.NetworkID, "peers", len(snap.Peers))
}

s.mu.Lock()
s.persistedNetworks = persisted
s.mu.Unlock()
return nil
}

// PersistedNetworks reports the set of network IDs that have on-disk
// state discovered by the last LoadPersisted call. Used by the daemon
// to know which networks carry restorable per-peer scores.
func (s *Service) PersistedNetworks() []uint16 {
s.mu.RLock()
defer s.mu.RUnlock()
out := make([]uint16, 0, len(s.persistedNetworks))
for id := range s.persistedNetworks {
out = append(out, id)
}
return out
}

// --- coreapi.PolicyManager interface impl ---

// Use a method-set-only matching naming. coreapi.PolicyManager defines
Expand Down
24 changes: 10 additions & 14 deletions zz_coverage_holes4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,18 @@ import (
"testing"
)

// TestLoadPersisted_UserHomeDirError covers service.go:268 — UserHomeDir
// returns an error when HOME (or its platform equivalent) is unset.
func TestLoadPersisted_UserHomeDirError(t *testing.T) {
// Cannot t.Parallel — mutates HOME env at process level.
// t.Setenv("", "") would do it on macOS/linux; explicit unset is clearer.
prev, hadHome := os.LookupEnv("HOME")
os.Unsetenv("HOME")
t.Cleanup(func() {
if hadHome {
os.Setenv("HOME", prev)
}
})
// TestLoadPersisted_MissingDirNoError covers the reconciled path
// convention: LoadPersisted resolves its directory via stateDir()
// (PILOT_HOME-first, same as NewPolicyRunner). When that directory does
// not exist, LoadPersisted is a graceful no-op rather than an error.
func TestLoadPersisted_MissingDirNoError(t *testing.T) {
// Cannot t.Parallel — mutates PILOT_HOME env at process level.
tmp := t.TempDir() // exists, but $PILOT_HOME/.pilot does not
t.Setenv("PILOT_HOME", tmp)

s := NewService(&fakeRuntime{})
if err := s.LoadPersisted(); err == nil {
t.Error("expected UserHomeDir error when HOME is unset")
if err := s.LoadPersisted(); err != nil {
t.Errorf("LoadPersisted with missing state dir should be a no-op, got %v", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion zz_coverage_holes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ func TestLoadPersisted_ReaddirError(t *testing.T) {
if err := os.WriteFile(pilotPath, []byte("not a dir"), 0600); err != nil {
t.Fatalf("setup: %v", err)
}
t.Setenv("HOME", tmp)
t.Setenv("PILOT_HOME", tmp)
s := NewService(&fakeRuntime{})
if err := s.LoadPersisted(); err == nil {
t.Error("expected ReadDir error on file-instead-of-dir, got nil")
Expand Down
120 changes: 120 additions & 0 deletions zz_persist_roundtrip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

package policy

import (
"os"
"path/filepath"
"testing"
"time"
)

// TestStopBeforeStartIsNoOp guards the deadlock where Stop() before
// Start() waited on `done`, which cycleLoop never closes if it never ran.
func TestStopBeforeStartIsNoOp(t *testing.T) {
t.Parallel()
cp := compileTestPolicy(t)
pr := NewPolicyRunner(uniqueNetID(), cp, &fakeRuntime{})

done := make(chan struct{})
go func() {
pr.Stop() // must return immediately, not block on pr.done
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("Stop() before Start() blocked")
}

// A subsequent Start on an already-stopped runner is a no-op, and a
// second Stop must also return.
pr.Start()
pr.Stop()
}

// TestStartThenStop exercises the normal lifecycle: Start launches the
// loop, Stop signals and waits for it.
func TestStartThenStop(t *testing.T) {
t.Parallel()
cp := compileTestPolicy(t)
pr := NewPolicyRunner(uniqueNetID(), cp, &fakeRuntime{})
pr.Start()
done := make(chan struct{})
go func() {
pr.Stop()
close(done)
}()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("Stop() after Start() blocked")
}
}

// TestPersistLoadRoundTripPilotHome verifies that a runner persists to
// the PILOT_HOME-derived path and a fresh runner for the same netID
// loads that state back. Reconciles runner.go and service.go on the
// single stateDir() convention.
func TestPersistLoadRoundTripPilotHome(t *testing.T) {
// Cannot t.Parallel — uses t.Setenv.
tmp := t.TempDir()
t.Setenv("PILOT_HOME", tmp)

netID := uint16(4321)
cp := compileTestPolicy(t)

pr := NewPolicyRunner(netID, cp, &fakeRuntime{})
pr.mu.Lock()
pr.peers[100] = &managedPeer{NodeID: 100, Tags: []string{"elite"}, AddedAt: time.Now().Truncate(time.Second)}
pr.peers[200] = &managedPeer{NodeID: 200, AddedAt: time.Now().Truncate(time.Second)}
pr.cycleNum = 9
pr.mu.Unlock()
pr.persist()

wantPath := filepath.Join(tmp, ".pilot", "policy_4321.json")
if _, err := os.Stat(wantPath); err != nil {
t.Fatalf("expected persisted file at %s: %v", wantPath, err)
}

// Fresh runner for the same netID must load the persisted snapshot
// from the same PILOT_HOME-derived path.
pr2 := NewPolicyRunner(netID, cp, &fakeRuntime{})
if len(pr2.peers) != 2 {
t.Fatalf("reloaded peers = %d, want 2", len(pr2.peers))
}
if pr2.peers[100] == nil || len(pr2.peers[100].Tags) == 0 || pr2.peers[100].Tags[0] != "elite" {
t.Errorf("peer 100 tags not restored: %+v", pr2.peers[100])
}
if pr2.cycleNum != 9 {
t.Errorf("cycleNum = %d, want 9", pr2.cycleNum)
}
}

// TestLoadPersistedHonorsPilotHome verifies Service.LoadPersisted scans
// the same PILOT_HOME directory NewPolicyRunner writes to, and surfaces
// the discovered networks.
func TestLoadPersistedHonorsPilotHome(t *testing.T) {
// Cannot t.Parallel — uses t.Setenv.
tmp := t.TempDir()
t.Setenv("PILOT_HOME", tmp)

cp := compileTestPolicy(t)
pr := NewPolicyRunner(777, cp, &fakeRuntime{})
pr.persist()

s := NewService(&fakeRuntime{})
if err := s.LoadPersisted(); err != nil {
t.Fatalf("LoadPersisted: %v", err)
}
got := s.PersistedNetworks()
found := false
for _, id := range got {
if id == 777 {
found = true
}
}
if !found {
t.Errorf("PersistedNetworks = %v, want to include 777", got)
}
}
Loading