From da0a68ae7b6066de8959c6b4c890c57d49e4b9cd Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Fri, 8 May 2026 12:13:56 +0200 Subject: [PATCH 1/4] client: offer an sqlite backend for the state store --- client/state/db_bench_test.go | 412 +++++++++++++++++++ client/state/db_sqlite.go | 732 ++++++++++++++++++++++++++++++++++ client/state/db_test.go | 20 + go.mod | 6 + go.sum | 13 + 5 files changed, 1183 insertions(+) create mode 100644 client/state/db_bench_test.go create mode 100644 client/state/db_sqlite.go diff --git a/client/state/db_bench_test.go b/client/state/db_bench_test.go new file mode 100644 index 00000000000..c4deed91689 --- /dev/null +++ b/client/state/db_bench_test.go @@ -0,0 +1,412 @@ +// Copyright IBM Corp. 2015, 2025 +// SPDX-License-Identifier: BUSL-1.1 + +package state + +import ( + "fmt" + "testing" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +// --------------------------------------------------------------------------- +// Setup helpers +// --------------------------------------------------------------------------- + +// benchEntry pairs a backend name with an open StateDB ready for benchmarking. +type benchEntry struct { + name string + db StateDB +} + +// benchDBs returns one BoltDB and one SQLite StateDB, both using a null logger +// so that log I/O does not skew timing results. Cleanup is registered on b. +func benchDBs(b *testing.B) []benchEntry { + b.Helper() + logger := hclog.NewNullLogger() + + boltDir := b.TempDir() + boltDB, err := NewBoltStateDB(logger, boltDir) + if err != nil { + b.Fatalf("create boltdb: %v", err) + } + b.Cleanup(func() { _ = boltDB.Close() }) + + sqliteDir := b.TempDir() + sqliteDB, err := NewSQLiteStateDB(logger, sqliteDir) + if err != nil { + b.Fatalf("create sqlite: %v", err) + } + b.Cleanup(func() { _ = sqliteDB.Close() }) + + return []benchEntry{ + {"boltdb", boltDB}, + {"sqlite", sqliteDB}, + } +} + +// populate inserts n mock allocations into db and returns them. The benchmark +// timer is stopped during population so that setup time is not counted. +func populate(b *testing.B, db StateDB, n int) []*structs.Allocation { + b.Helper() + b.StopTimer() + allocs := make([]*structs.Allocation, n) + for i := range allocs { + allocs[i] = mock.Alloc() + if err := db.PutAllocation(allocs[i]); err != nil { + b.Fatalf("populate: put alloc: %v", err) + } + } + b.StartTimer() + return allocs +} + +// checkResult builds a realistic CheckQueryResult for benchmarking. +func checkResult(allocID string) *structs.CheckQueryResult { + return &structs.CheckQueryResult{ + ID: structs.CheckID(allocID + "-chk"), + Mode: "healthiness", + Status: "passing", + Output: "nomad: tcp ok", + Timestamp: 1, + Group: "group", + Task: "task", + Service: "service", + Check: "check", + } +} + +// --------------------------------------------------------------------------- +// PutAllocation +// --------------------------------------------------------------------------- + +// BenchmarkStateDB_PutAllocation measures the cost of persisting a single +// allocation sequentially. This is representative of the steady-state path +// where one alloc at a time receives an update. +func BenchmarkStateDB_PutAllocation(b *testing.B) { + for _, entry := range benchDBs(b) { + b.Run(entry.name, func(b *testing.B) { + b.ReportAllocs() + alloc := mock.Alloc() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := entry.db.PutAllocation(alloc); err != nil { + b.Fatal(err) + } + } + }) + } +} + +// BenchmarkStateDB_PutAllocation_Parallel measures write throughput when +// multiple goroutines each write a distinct allocation concurrently. Unlike the +// batch benchmark below this does not use WithBatchMode, so every write is its +// own transaction for both backends. +func BenchmarkStateDB_PutAllocation_Parallel(b *testing.B) { + for _, entry := range benchDBs(b) { + b.Run(entry.name, func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + // Each goroutine works on its own allocation so we are not + // serialising on a single hot key. + alloc := mock.Alloc() + for pb.Next() { + if err := entry.db.PutAllocation(alloc); err != nil { + b.Fatal(err) + } + } + }) + }) + } +} + +// BenchmarkStateDB_PutAllocation_Batch measures concurrent writes issued with +// WithBatchMode. BoltDB can coalesce these into fewer transactions; SQLite +// treats the option as a no-op and serialises normally through its single +// connection. +func BenchmarkStateDB_PutAllocation_Batch(b *testing.B) { + for _, entry := range benchDBs(b) { + b.Run(entry.name, func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + alloc := mock.Alloc() + for pb.Next() { + if err := entry.db.PutAllocation(alloc, WithBatchMode()); err != nil { + b.Fatal(err) + } + } + }) + }) + } +} + +// --------------------------------------------------------------------------- +// GetAllAllocations +// --------------------------------------------------------------------------- + +// BenchmarkStateDB_GetAllAllocations measures how long it takes to read back +// all persisted allocations. The database is pre-populated with n allocations +// before the timer starts; only the read is timed. This is the critical path +// on agent restart. +func BenchmarkStateDB_GetAllAllocations(b *testing.B) { + for _, n := range []int{1, 10, 100, 1000} { + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + for _, entry := range benchDBs(b) { + b.Run(entry.name, func(b *testing.B) { + b.ReportAllocs() + populate(b, entry.db, n) + b.ResetTimer() + for i := 0; i < b.N; i++ { + allocs, _, err := entry.db.GetAllAllocations() + if err != nil { + b.Fatal(err) + } + if len(allocs) != n { + b.Fatalf("got %d allocs, want %d", len(allocs), n) + } + } + }) + } + }) + } +} + +// --------------------------------------------------------------------------- +// Task state +// --------------------------------------------------------------------------- + +// BenchmarkStateDB_PutTaskState measures writing a TaskState for a single +// task, which happens frequently during a task's lifecycle. +func BenchmarkStateDB_PutTaskState(b *testing.B) { + for _, entry := range benchDBs(b) { + b.Run(entry.name, func(b *testing.B) { + b.ReportAllocs() + state := structs.NewTaskState() + state.State = structs.TaskStateRunning + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := entry.db.PutTaskState("alloc-id", "web", state); err != nil { + b.Fatal(err) + } + } + }) + } +} + +// BenchmarkStateDB_GetTaskRunnerState measures reading back both the +// LocalState and TaskState for a single task. +func BenchmarkStateDB_GetTaskRunnerState(b *testing.B) { + for _, entry := range benchDBs(b) { + b.Run(entry.name, func(b *testing.B) { + b.ReportAllocs() + + // Pre-populate both halves of the task state. + b.StopTimer() + state := structs.NewTaskState() + state.State = structs.TaskStateRunning + if err := entry.db.PutTaskState("alloc-id", "web", state); err != nil { + b.Fatal(err) + } + b.StartTimer() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, err := entry.db.GetTaskRunnerState("alloc-id", "web") + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +// --------------------------------------------------------------------------- +// Check results +// --------------------------------------------------------------------------- + +// BenchmarkStateDB_PutCheckResult measures writing a single check result, +// which happens on every health-check tick. +func BenchmarkStateDB_PutCheckResult(b *testing.B) { + for _, entry := range benchDBs(b) { + b.Run(entry.name, func(b *testing.B) { + b.ReportAllocs() + qr := checkResult("alloc-bench") + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := entry.db.PutCheckResult("alloc-bench", qr); err != nil { + b.Fatal(err) + } + } + }) + } +} + +// BenchmarkStateDB_GetCheckResults measures scanning all check results. +// n is the number of distinct (allocID, checkID) pairs pre-loaded. +func BenchmarkStateDB_GetCheckResults(b *testing.B) { + for _, n := range []int{10, 100, 1000} { + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + for _, entry := range benchDBs(b) { + b.Run(entry.name, func(b *testing.B) { + b.ReportAllocs() + + b.StopTimer() + for i := 0; i < n; i++ { + allocID := fmt.Sprintf("alloc-%d", i) + qr := checkResult(allocID) + if err := entry.db.PutCheckResult(allocID, qr); err != nil { + b.Fatal(err) + } + } + b.StartTimer() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + results, err := entry.db.GetCheckResults() + if err != nil { + b.Fatal(err) + } + if len(results) != n { + b.Fatalf("got %d check results, want %d", len(results), n) + } + } + }) + } + }) + } +} + +// --------------------------------------------------------------------------- +// DeleteAllocationBucket +// --------------------------------------------------------------------------- + +// BenchmarkStateDB_DeleteAllocationBucket measures the time to remove one +// allocation and all its associated state. Because the allocation must exist +// before it can be deleted, setup (PutAllocation) is performed outside the +// timed section. +func BenchmarkStateDB_DeleteAllocationBucket(b *testing.B) { + for _, entry := range benchDBs(b) { + b.Run(entry.name, func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + // Insert a fresh alloc, then time only the delete. + alloc := mock.Alloc() + b.StopTimer() + if err := entry.db.PutAllocation(alloc); err != nil { + b.Fatal(err) + } + b.StartTimer() + + if err := entry.db.DeleteAllocationBucket(alloc.ID); err != nil { + b.Fatal(err) + } + } + }) + } +} + +// --------------------------------------------------------------------------- +// Realistic allocation lifecycle +// --------------------------------------------------------------------------- + +// BenchmarkStateDB_AllocLifecycle exercises a condensed version of the write +// pattern a single allocation produces during its lifetime: +// +// 1. PutAllocation (scheduler places the alloc) +// 2. PutNetworkStatus (driver reports network info) +// 3. PutTaskState × 2 (task transitions: pending → running → dead) +// 4. PutCheckResult (health-check tick) +// 5. DeleteAllocationBucket (alloc completes) +// +// All five steps count toward b.N; one "lifecycle" equals one b.N unit. +func BenchmarkStateDB_AllocLifecycle(b *testing.B) { + for _, entry := range benchDBs(b) { + b.Run(entry.name, func(b *testing.B) { + b.ReportAllocs() + ns := mock.AllocNetworkStatus() + b.ResetTimer() + for i := 0; i < b.N; i++ { + alloc := mock.Alloc() + id := alloc.ID + + if err := entry.db.PutAllocation(alloc); err != nil { + b.Fatal(err) + } + + if err := entry.db.PutNetworkStatus(id, ns); err != nil { + b.Fatal(err) + } + + running := structs.NewTaskState() + running.State = structs.TaskStateRunning + if err := entry.db.PutTaskState(id, "web", running); err != nil { + b.Fatal(err) + } + + dead := structs.NewTaskState() + dead.State = structs.TaskStateDead + if err := entry.db.PutTaskState(id, "web", dead); err != nil { + b.Fatal(err) + } + + if err := entry.db.PutCheckResult(id, checkResult(id)); err != nil { + b.Fatal(err) + } + + if err := entry.db.DeleteAllocationBucket(id); err != nil { + b.Fatal(err) + } + } + }) + } +} + +// BenchmarkStateDB_AllocLifecycle_Parallel runs the same lifecycle as above +// but with GOMAXPROCS goroutines concurrently, each operating on its own +// allocation. This reflects the real agent workload where many allocs progress +// through their lifecycle simultaneously. +func BenchmarkStateDB_AllocLifecycle_Parallel(b *testing.B) { + for _, entry := range benchDBs(b) { + b.Run(entry.name, func(b *testing.B) { + b.ReportAllocs() + ns := mock.AllocNetworkStatus() + running := structs.NewTaskState() + running.State = structs.TaskStateRunning + dead := structs.NewTaskState() + dead.State = structs.TaskStateDead + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + alloc := mock.Alloc() + id := alloc.ID + + if err := entry.db.PutAllocation(alloc); err != nil { + b.Fatal(err) + } + if err := entry.db.PutNetworkStatus(id, ns); err != nil { + b.Fatal(err) + } + if err := entry.db.PutTaskState(id, "web", running); err != nil { + b.Fatal(err) + } + if err := entry.db.PutTaskState(id, "web", dead); err != nil { + b.Fatal(err) + } + if err := entry.db.PutCheckResult(id, checkResult(id)); err != nil { + b.Fatal(err) + } + if err := entry.db.DeleteAllocationBucket(id); err != nil { + b.Fatal(err) + } + } + }) + }) + } +} diff --git a/client/state/db_sqlite.go b/client/state/db_sqlite.go new file mode 100644 index 00000000000..a4992c50e2a --- /dev/null +++ b/client/state/db_sqlite.go @@ -0,0 +1,732 @@ +// Copyright IBM Corp. 2015, 2025 +// SPDX-License-Identifier: BUSL-1.1 + +package state + +import ( + "database/sql" + "encoding/json" + "fmt" + "os" + "path/filepath" + + hclog "github.com/hashicorp/go-hclog" + arstate "github.com/hashicorp/nomad/client/allocrunner/state" + trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" + dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/dynamicplugins" + driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" + "github.com/hashicorp/nomad/client/serviceregistration/checks" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" + + _ "modernc.org/sqlite" +) + +// sqliteSchemaVersion is the current version of the SQLite state schema. Bump +// this when making incompatible schema changes and add a migration in Upgrade. +const sqliteSchemaVersion = 1 + +// Keys used in alloc_state for per-allocation data. +const ( + sqlKeyAlloc = "alloc" + sqlKeyDeployStatus = "deploy_status" + sqlKeyNetworkStatus = "network_status" + sqlKeyAcknowledgedState = "acknowledged_state" + sqlKeyAllocVolumes = "alloc_volumes" + sqlKeyAllocIdentities = "alloc_identities" + sqlKeyAllocConsulACLTokens = "alloc_consul_acl_tokens" +) + +// Keys used in task_state for per-task data. +const ( + sqlKeyTaskLocalState = "local_state" + sqlKeyTaskState = "task_state" +) + +// Keys used in kv_state for singleton / global state. +const ( + sqlKeyDevicePluginState = "device_plugin_state" + sqlKeyDriverPluginState = "driver_plugin_state" + sqlKeyDynamicPluginRegistryState = "dynamic_plugin_registry_state" + sqlKeyNodeMeta = "node_meta" + sqlKeyNodeRegistration = "node_registration" + sqlKeyNodeIdentity = "node_identity" +) + +// createSchemaSQL is the DDL executed (idempotently) on every open to ensure +// the schema is in place. All CREATE TABLE statements use IF NOT EXISTS, so +// calling this on an already-initialised database is a no-op. +const createSchemaSQL = ` +PRAGMA journal_mode = WAL; +PRAGMA synchronous = NORMAL; +PRAGMA busy_timeout = 5000; +PRAGMA foreign_keys = ON; + +-- Schema version tracking. +CREATE TABLE IF NOT EXISTS schema_meta ( + key TEXT PRIMARY KEY NOT NULL, + value TEXT NOT NULL +); + +-- Per-allocation key-value blobs (alloc data, deploy/network status, etc.). +CREATE TABLE IF NOT EXISTS alloc_state ( + alloc_id TEXT NOT NULL, + key TEXT NOT NULL, + value BLOB NOT NULL, + PRIMARY KEY (alloc_id, key) +); + +-- Per-task key-value blobs (local state, task state). +CREATE TABLE IF NOT EXISTS task_state ( + alloc_id TEXT NOT NULL, + task_name TEXT NOT NULL, + key TEXT NOT NULL, + value BLOB NOT NULL, + PRIMARY KEY (alloc_id, task_name, key) +); + +-- Check query results, indexed for fast alloc-level purge. +CREATE TABLE IF NOT EXISTS check_results ( + alloc_id TEXT NOT NULL, + check_id TEXT NOT NULL, + value BLOB NOT NULL, + PRIMARY KEY (alloc_id, check_id) +); + +-- Dynamic host volume state, each row is one volume. +CREATE TABLE IF NOT EXISTS host_volumes ( + vol_id TEXT PRIMARY KEY NOT NULL, + value BLOB NOT NULL +); + +-- Singleton / global key-value blobs (managers, node meta, etc.). +CREATE TABLE IF NOT EXISTS kv_state ( + key TEXT PRIMARY KEY NOT NULL, + value BLOB NOT NULL +); + +INSERT OR IGNORE INTO schema_meta (key, value) + VALUES ('schema_version', '1'); +` + +// SQLiteStateDB persists and restores Nomad client state in a SQLite database. +// All public methods are safe for concurrent access; internally a single +// connection is used (SetMaxOpenConns(1)) so SQLite's serialised write mode +// applies and "database is locked" errors are avoided. +type SQLiteStateDB struct { + stateDir string + db *sql.DB + logger hclog.Logger +} + +var _ StateDB = (*SQLiteStateDB)(nil) + +// NewSQLiteStateDB creates or opens a SQLite-backed state database rooted at +// stateDir. The file is named "state.db" inside that directory. +func NewSQLiteStateDB(logger hclog.Logger, stateDir string) (StateDB, error) { + fn := filepath.Join(stateDir, "state.db") + + // Check whether the file exists before opening so we can log first-run. + fi, err := os.Stat(fn) + if err != nil && !os.IsNotExist(err) { + return nil, fmt.Errorf("failed to stat state database: %w", err) + } + firstRun := fi == nil + + db, err := sql.Open("sqlite", fn) + if err != nil { + return nil, fmt.Errorf("failed to open state database: %w", err) + } + + // Use a single connection pool entry. SQLite supports only one writer at + // a time; serialising through one connection avoids "database is locked" + // errors without sacrificing meaningful concurrency for a local agent. + db.SetMaxOpenConns(1) + + sdb := &SQLiteStateDB{ + stateDir: stateDir, + db: db, + logger: logger.Named("sqlite_state_db"), + } + + if err := sdb.createSchema(); err != nil { + _ = db.Close() + return nil, fmt.Errorf("failed to initialise state database schema: %w", err) + } + + if firstRun { + sdb.logger.Info("created new SQLite state database", "path", fn) + } else { + sdb.logger.Info("opened existing SQLite state database", "path", fn) + } + + return sdb, nil +} + +// createSchema applies the DDL. Because every CREATE TABLE uses IF NOT EXISTS, +// this is idempotent and safe to call on an already-populated database. +func (s *SQLiteStateDB) createSchema() error { + _, err := s.db.Exec(createSchemaSQL) + if err != nil { + return fmt.Errorf("schema DDL failed: %w", err) + } + return nil +} + +// Name implements StateDB. +func (s *SQLiteStateDB) Name() string { return "sqlite" } + +// Upgrade implements StateDB. For a freshly-created SQLite database the schema +// is already current. Future schema versions should add migration logic here. +func (s *SQLiteStateDB) Upgrade() error { + return s.createSchema() +} + +// Close implements StateDB. +func (s *SQLiteStateDB) Close() error { return s.db.Close() } + +// --------------------------------------------------------------------------- +// Serialisation helpers +// --------------------------------------------------------------------------- + +// sqlMarshal encodes v to JSON for storage. Using encoding/json keeps the +// implementation dependency-free; all Nomad state types are JSON-safe. +func sqlMarshal(v any) ([]byte, error) { + b, err := json.Marshal(v) + if err != nil { + return nil, fmt.Errorf("marshal failed: %w", err) + } + return b, nil +} + +// sqlUnmarshal decodes JSON data into v. +func sqlUnmarshal(data []byte, v any) error { + if err := json.Unmarshal(data, v); err != nil { + return fmt.Errorf("unmarshal failed: %w", err) + } + return nil +} + +// --------------------------------------------------------------------------- +// Low-level helpers for the three main table patterns +// --------------------------------------------------------------------------- + +// putKV upserts a singleton value into kv_state. +func (s *SQLiteStateDB) putKV(key string, val any) error { + data, err := sqlMarshal(val) + if err != nil { + return fmt.Errorf("serialize kv key=%q: %w", key, err) + } + _, err = s.db.Exec( + `INSERT INTO kv_state (key, value) VALUES (?, ?) + ON CONFLICT(key) DO UPDATE SET value = excluded.value`, + key, data, + ) + return err +} + +// getKV retrieves a singleton value from kv_state. +// Returns (false, nil) when the key does not exist. +func (s *SQLiteStateDB) getKV(key string, v any) (bool, error) { + var data []byte + err := s.db.QueryRow(`SELECT value FROM kv_state WHERE key = ?`, key).Scan(&data) + if err == sql.ErrNoRows { + return false, nil + } + if err != nil { + return false, err + } + return true, sqlUnmarshal(data, v) +} + +// putAllocState upserts a keyed blob for a specific allocation. +func (s *SQLiteStateDB) putAllocState(allocID, key string, val any) error { + data, err := sqlMarshal(val) + if err != nil { + return fmt.Errorf("serialize alloc state alloc=%q key=%q: %w", allocID, key, err) + } + _, err = s.db.Exec( + `INSERT INTO alloc_state (alloc_id, key, value) VALUES (?, ?, ?) + ON CONFLICT(alloc_id, key) DO UPDATE SET value = excluded.value`, + allocID, key, data, + ) + return err +} + +// getAllocState retrieves a keyed blob for a specific allocation. +// Returns (false, nil) when the row does not exist. +func (s *SQLiteStateDB) getAllocState(allocID, key string, v any) (bool, error) { + var data []byte + err := s.db.QueryRow( + `SELECT value FROM alloc_state WHERE alloc_id = ? AND key = ?`, + allocID, key, + ).Scan(&data) + if err == sql.ErrNoRows { + return false, nil + } + if err != nil { + return false, err + } + return true, sqlUnmarshal(data, v) +} + +// putTaskState upserts a keyed blob for a specific task. +func (s *SQLiteStateDB) putTaskStateRow(allocID, taskName, key string, val any) error { + data, err := sqlMarshal(val) + if err != nil { + return fmt.Errorf("serialize task state alloc=%q task=%q key=%q: %w", allocID, taskName, key, err) + } + _, err = s.db.Exec( + `INSERT INTO task_state (alloc_id, task_name, key, value) VALUES (?, ?, ?, ?) + ON CONFLICT(alloc_id, task_name, key) DO UPDATE SET value = excluded.value`, + allocID, taskName, key, data, + ) + return err +} + +// getTaskStateRow retrieves a keyed blob for a specific task. +// Returns (false, nil) when the row does not exist. +func (s *SQLiteStateDB) getTaskStateRow(allocID, taskName, key string, v any) (bool, error) { + var data []byte + err := s.db.QueryRow( + `SELECT value FROM task_state WHERE alloc_id = ? AND task_name = ? AND key = ?`, + allocID, taskName, key, + ).Scan(&data) + if err == sql.ErrNoRows { + return false, nil + } + if err != nil { + return false, err + } + return true, sqlUnmarshal(data, v) +} + +// --------------------------------------------------------------------------- +// Allocation methods +// --------------------------------------------------------------------------- + +// GetAllAllocations implements StateDB. +func (s *SQLiteStateDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) { + rows, err := s.db.Query( + `SELECT alloc_id, value FROM alloc_state WHERE key = ?`, sqlKeyAlloc, + ) + if err != nil { + return nil, nil, fmt.Errorf("query allocations: %w", err) + } + defer rows.Close() + + allocs := make([]*structs.Allocation, 0) + errs := make(map[string]error) + + for rows.Next() { + var allocID string + var data []byte + if err := rows.Scan(&allocID, &data); err != nil { + return nil, nil, fmt.Errorf("scan allocation row: %w", err) + } + var alloc structs.Allocation + if err := sqlUnmarshal(data, &alloc); err != nil { + errs[allocID] = fmt.Errorf("failed to decode alloc: %w", err) + continue + } + alloc.Canonicalize() + if alloc.Job != nil { + alloc.Job.Canonicalize() + } + allocs = append(allocs, &alloc) + } + if err := rows.Err(); err != nil { + return nil, nil, fmt.Errorf("iterate allocations: %w", err) + } + + return allocs, errs, nil +} + +// PutAllocation implements StateDB. +func (s *SQLiteStateDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error { + return s.putAllocState(alloc.ID, sqlKeyAlloc, alloc) +} + +// GetDeploymentStatus implements StateDB. +func (s *SQLiteStateDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) { + var ds structs.AllocDeploymentStatus + found, err := s.getAllocState(allocID, sqlKeyDeployStatus, &ds) + if !found || err != nil { + return nil, err + } + return &ds, nil +} + +// PutDeploymentStatus implements StateDB. +func (s *SQLiteStateDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error { + return s.putAllocState(allocID, sqlKeyDeployStatus, ds) +} + +// GetNetworkStatus implements StateDB. +func (s *SQLiteStateDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error) { + var ns structs.AllocNetworkStatus + found, err := s.getAllocState(allocID, sqlKeyNetworkStatus, &ns) + if !found || err != nil { + return nil, err + } + return &ns, nil +} + +// PutNetworkStatus implements StateDB. +func (s *SQLiteStateDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error { + return s.putAllocState(allocID, sqlKeyNetworkStatus, ns) +} + +// GetAcknowledgedState implements StateDB. +func (s *SQLiteStateDB) GetAcknowledgedState(allocID string) (*arstate.State, error) { + var st arstate.State + found, err := s.getAllocState(allocID, sqlKeyAcknowledgedState, &st) + if !found || err != nil { + return nil, err + } + return &st, nil +} + +// PutAcknowledgedState implements StateDB. +func (s *SQLiteStateDB) PutAcknowledgedState(allocID string, state *arstate.State, opts ...WriteOption) error { + return s.putAllocState(allocID, sqlKeyAcknowledgedState, state) +} + +// GetAllocVolumes implements StateDB. +func (s *SQLiteStateDB) GetAllocVolumes(allocID string) (*arstate.AllocVolumes, error) { + var vols arstate.AllocVolumes + found, err := s.getAllocState(allocID, sqlKeyAllocVolumes, &vols) + if !found || err != nil { + return nil, err + } + return &vols, nil +} + +// PutAllocVolumes implements StateDB. +func (s *SQLiteStateDB) PutAllocVolumes(allocID string, state *arstate.AllocVolumes, opts ...WriteOption) error { + return s.putAllocState(allocID, sqlKeyAllocVolumes, state) +} + +// GetAllocIdentities implements StateDB. +func (s *SQLiteStateDB) GetAllocIdentities(allocID string) ([]*structs.SignedWorkloadIdentity, error) { + var ids []*structs.SignedWorkloadIdentity + found, err := s.getAllocState(allocID, sqlKeyAllocIdentities, &ids) + if !found || err != nil { + return nil, err + } + return ids, nil +} + +// PutAllocIdentities implements StateDB. +func (s *SQLiteStateDB) PutAllocIdentities(allocID string, identities []*structs.SignedWorkloadIdentity, opts ...WriteOption) error { + return s.putAllocState(allocID, sqlKeyAllocIdentities, identities) +} + +// GetAllocConsulACLTokens implements StateDB. +func (s *SQLiteStateDB) GetAllocConsulACLTokens(allocID string) ([]*cstructs.ConsulACLToken, error) { + var tokens []*cstructs.ConsulACLToken + found, err := s.getAllocState(allocID, sqlKeyAllocConsulACLTokens, &tokens) + if !found || err != nil { + return nil, err + } + return tokens, nil +} + +// PutAllocConsulACLTokens implements StateDB. +func (s *SQLiteStateDB) PutAllocConsulACLTokens(allocID string, tokens []*cstructs.ConsulACLToken, opts ...WriteOption) error { + return s.putAllocState(allocID, sqlKeyAllocConsulACLTokens, tokens) +} + +// DeleteAllocationBucket implements StateDB. It removes all rows associated +// with the given allocation in a single transaction. +func (s *SQLiteStateDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error { + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() //nolint:errcheck + + for _, stmt := range []string{ + `DELETE FROM alloc_state WHERE alloc_id = ?`, + `DELETE FROM task_state WHERE alloc_id = ?`, + `DELETE FROM check_results WHERE alloc_id = ?`, + } { + if _, err := tx.Exec(stmt, allocID); err != nil { + return err + } + } + return tx.Commit() +} + +// --------------------------------------------------------------------------- +// Task methods +// --------------------------------------------------------------------------- + +// GetTaskRunnerState implements StateDB. +func (s *SQLiteStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.LocalState, *structs.TaskState, error) { + var ls *trstate.LocalState + var ts *structs.TaskState + + var localState trstate.LocalState + found, err := s.getTaskStateRow(allocID, taskName, sqlKeyTaskLocalState, &localState) + if err != nil { + return nil, nil, fmt.Errorf("read local task runner state: %w", err) + } + if found { + ls = &localState + } + + var taskState structs.TaskState + found, err = s.getTaskStateRow(allocID, taskName, sqlKeyTaskState, &taskState) + if err != nil { + return nil, nil, fmt.Errorf("read task state: %w", err) + } + if found { + ts = &taskState + } + + return ls, ts, nil +} + +// PutTaskRunnerLocalState implements StateDB. +func (s *SQLiteStateDB) PutTaskRunnerLocalState(allocID, taskName string, val *trstate.LocalState) error { + return s.putTaskStateRow(allocID, taskName, sqlKeyTaskLocalState, val) +} + +// PutTaskState implements StateDB. +func (s *SQLiteStateDB) PutTaskState(allocID, taskName string, state *structs.TaskState) error { + return s.putTaskStateRow(allocID, taskName, sqlKeyTaskState, state) +} + +// DeleteTaskBucket implements StateDB. +func (s *SQLiteStateDB) DeleteTaskBucket(allocID, taskName string) error { + _, err := s.db.Exec( + `DELETE FROM task_state WHERE alloc_id = ? AND task_name = ?`, + allocID, taskName, + ) + return err +} + +// --------------------------------------------------------------------------- +// Plugin-manager state methods +// --------------------------------------------------------------------------- + +// GetDevicePluginState implements StateDB. +func (s *SQLiteStateDB) GetDevicePluginState() (*dmstate.PluginState, error) { + var ps dmstate.PluginState + found, err := s.getKV(sqlKeyDevicePluginState, &ps) + if !found || err != nil { + return nil, err + } + return &ps, nil +} + +// PutDevicePluginState implements StateDB. +func (s *SQLiteStateDB) PutDevicePluginState(state *dmstate.PluginState) error { + return s.putKV(sqlKeyDevicePluginState, state) +} + +// GetDriverPluginState implements StateDB. +func (s *SQLiteStateDB) GetDriverPluginState() (*driverstate.PluginState, error) { + var ps driverstate.PluginState + found, err := s.getKV(sqlKeyDriverPluginState, &ps) + if !found || err != nil { + return nil, err + } + return &ps, nil +} + +// PutDriverPluginState implements StateDB. +func (s *SQLiteStateDB) PutDriverPluginState(state *driverstate.PluginState) error { + return s.putKV(sqlKeyDriverPluginState, state) +} + +// GetDynamicPluginRegistryState implements StateDB. +// +// Note: dynamicplugins.RegistryState.Plugins contains container/list.List +// values, which have no exported fields. JSON (and msgpack) will serialise +// them as empty objects. On restoration the lists are empty; running plugin +// tasks re-register themselves through RegisterPlugin, so this is acceptable. +func (s *SQLiteStateDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) { + var rs dynamicplugins.RegistryState + found, err := s.getKV(sqlKeyDynamicPluginRegistryState, &rs) + if !found || err != nil { + return nil, err + } + return &rs, nil +} + +// PutDynamicPluginRegistryState implements StateDB. +func (s *SQLiteStateDB) PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error { + return s.putKV(sqlKeyDynamicPluginRegistryState, state) +} + +// --------------------------------------------------------------------------- +// Check-result methods +// --------------------------------------------------------------------------- + +// PutCheckResult implements StateDB. +func (s *SQLiteStateDB) PutCheckResult(allocID string, qr *structs.CheckQueryResult) error { + data, err := sqlMarshal(qr) + if err != nil { + return fmt.Errorf("serialize check result: %w", err) + } + _, err = s.db.Exec( + `INSERT INTO check_results (alloc_id, check_id, value) VALUES (?, ?, ?) + ON CONFLICT(alloc_id, check_id) DO UPDATE SET value = excluded.value`, + allocID, string(qr.ID), data, + ) + return err +} + +// GetCheckResults implements StateDB. +func (s *SQLiteStateDB) GetCheckResults() (checks.ClientResults, error) { + rows, err := s.db.Query(`SELECT alloc_id, value FROM check_results`) + if err != nil { + return nil, fmt.Errorf("query check results: %w", err) + } + defer rows.Close() + + m := make(checks.ClientResults) + for rows.Next() { + var allocID string + var data []byte + if err := rows.Scan(&allocID, &data); err != nil { + return nil, fmt.Errorf("scan check result row: %w", err) + } + var qr structs.CheckQueryResult + if err := sqlUnmarshal(data, &qr); err != nil { + return nil, fmt.Errorf("decode check result: %w", err) + } + m.Insert(allocID, &qr) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate check results: %w", err) + } + return m, nil +} + +// DeleteCheckResults implements StateDB. +func (s *SQLiteStateDB) DeleteCheckResults(allocID string, checkIDs []structs.CheckID) error { + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() //nolint:errcheck + + for _, id := range checkIDs { + if _, err := tx.Exec( + `DELETE FROM check_results WHERE alloc_id = ? AND check_id = ?`, + allocID, string(id), + ); err != nil { + return err + } + } + return tx.Commit() +} + +// PurgeCheckResults implements StateDB. +func (s *SQLiteStateDB) PurgeCheckResults(allocID string) error { + _, err := s.db.Exec(`DELETE FROM check_results WHERE alloc_id = ?`, allocID) + return err +} + +// --------------------------------------------------------------------------- +// Node-level state methods +// --------------------------------------------------------------------------- + +// GetNodeMeta implements StateDB. +func (s *SQLiteStateDB) GetNodeMeta() (map[string]*string, error) { + m := make(map[string]*string) + found, err := s.getKV(sqlKeyNodeMeta, &m) + if !found || err != nil { + return nil, err + } + return m, nil +} + +// PutNodeMeta implements StateDB. +func (s *SQLiteStateDB) PutNodeMeta(meta map[string]*string) error { + return s.putKV(sqlKeyNodeMeta, meta) +} + +// GetNodeRegistration implements StateDB. +func (s *SQLiteStateDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) { + var reg cstructs.NodeRegistration + found, err := s.getKV(sqlKeyNodeRegistration, ®) + if !found || err != nil { + return nil, err + } + return ®, nil +} + +// PutNodeRegistration implements StateDB. +func (s *SQLiteStateDB) PutNodeRegistration(reg *cstructs.NodeRegistration) error { + return s.putKV(sqlKeyNodeRegistration, reg) +} + +// GetNodeIdentity implements StateDB. +func (s *SQLiteStateDB) GetNodeIdentity() (string, error) { + var identity string + found, err := s.getKV(sqlKeyNodeIdentity, &identity) + if !found || err != nil { + return "", err + } + return identity, nil +} + +// PutNodeIdentity implements StateDB. +func (s *SQLiteStateDB) PutNodeIdentity(identity string) error { + return s.putKV(sqlKeyNodeIdentity, identity) +} + +// --------------------------------------------------------------------------- +// Dynamic host volume methods +// --------------------------------------------------------------------------- + +// PutDynamicHostVolume implements StateDB. +func (s *SQLiteStateDB) PutDynamicHostVolume(vol *cstructs.HostVolumeState) error { + data, err := sqlMarshal(vol) + if err != nil { + return fmt.Errorf("serialize host volume: %w", err) + } + _, err = s.db.Exec( + `INSERT INTO host_volumes (vol_id, value) VALUES (?, ?) + ON CONFLICT(vol_id) DO UPDATE SET value = excluded.value`, + vol.ID, data, + ) + return err +} + +// GetDynamicHostVolumes implements StateDB. +func (s *SQLiteStateDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) { + rows, err := s.db.Query(`SELECT value FROM host_volumes`) + if err != nil { + return nil, fmt.Errorf("query host volumes: %w", err) + } + defer rows.Close() + + var vols []*cstructs.HostVolumeState + for rows.Next() { + var data []byte + if err := rows.Scan(&data); err != nil { + return nil, fmt.Errorf("scan host volume row: %w", err) + } + var vol cstructs.HostVolumeState + if err := sqlUnmarshal(data, &vol); err != nil { + return nil, fmt.Errorf("decode host volume: %w", err) + } + vols = append(vols, &vol) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate host volumes: %w", err) + } + return vols, nil +} + +// DeleteDynamicHostVolume implements StateDB. +func (s *SQLiteStateDB) DeleteDynamicHostVolume(id string) error { + _, err := s.db.Exec(`DELETE FROM host_volumes WHERE vol_id = ?`, id) + return err +} diff --git a/client/state/db_test.go b/client/state/db_test.go index 8615ec77199..68b530aa4e2 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -30,6 +30,7 @@ var ( _ StateDB = (*MemDB)(nil) _ StateDB = (*NoopDB)(nil) _ StateDB = (*ErrDB)(nil) + _ StateDB = (*SQLiteStateDB)(nil) ) func setupBoltStateDB(t *testing.T) *BoltStateDB { @@ -52,10 +53,29 @@ func setupBoltStateDB(t *testing.T) *BoltStateDB { return db.(*BoltStateDB) } +func setupSQLiteStateDB(t *testing.T) *SQLiteStateDB { + t.Helper() + dir := t.TempDir() + + db, err := NewSQLiteStateDB(testlog.HCLogger(t), dir) + if err != nil { + t.Fatalf("error creating sqlite state db: %v", err) + } + + t.Cleanup(func() { + if closeErr := db.Close(); closeErr != nil { + t.Errorf("error closing sqlite state db: %v", closeErr) + } + }) + + return db.(*SQLiteStateDB) +} + func testDB(t *testing.T, f func(*testing.T, StateDB)) { dbs := []StateDB{ setupBoltStateDB(t), NewMemDB(testlog.HCLogger(t)), + setupSQLiteStateDB(t), } for _, db := range dbs { diff --git a/go.mod b/go.mod index 2dd7d43cf1e..1a5c4f16754 100644 --- a/go.mod +++ b/go.mod @@ -293,6 +293,7 @@ require ( github.com/morikuni/aec v1.1.0 // indirect github.com/mrunalp/fileutils v0.5.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 // indirect github.com/oklog/run v1.2.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect @@ -305,6 +306,7 @@ require ( github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/procfs v0.17.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect @@ -354,4 +356,8 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect kernel.org/pub/linux/libs/security/libcap/psx v1.2.77 // indirect + modernc.org/libc v1.72.0 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect + modernc.org/sqlite v1.50.0 // indirect ) diff --git a/go.sum b/go.sum index 34b854f5f1c..0a383ec55c6 100644 --- a/go.sum +++ b/go.sum @@ -366,6 +366,7 @@ github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9 github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k= github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -679,6 +680,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h1:TLb2Sg7HQcgGdloNxkrmtgDNR9uVYF3lfdFIN4Ro6Sk= github.com/oklog/run v1.2.0 h1:O8x3yXwah4A73hJdlrwo/2X6J62gE5qTMusH0dvz60E= @@ -754,6 +757,8 @@ github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7D github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw= github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03 h1:Wdi9nwnhFNAlseAOekn6B5G/+GMtks9UKbvRU/CMM/o= github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod h1:gRAiPF5C5Nd0eyyRdqIu9qTiFSoZzpTq727b5B8fkkU= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -1109,6 +1114,14 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= kernel.org/pub/linux/libs/security/libcap/psx v1.2.77 h1:Z06sMOzc0GNCwp6efaVrIrz4ywGJ1v+DP0pjVkOfDuA= kernel.org/pub/linux/libs/security/libcap/psx v1.2.77/go.mod h1:+l6Ee2F59XiJ2I6WR5ObpC1utCQJZ/VLsEbQCD8RG24= +modernc.org/libc v1.72.0 h1:IEu559v9a0XWjw0DPoVKtXpO2qt5NVLAnFaBbjq+n8c= +modernc.org/libc v1.72.0/go.mod h1:tTU8DL8A+XLVkEY3x5E/tO7s2Q/q42EtnNWda/L5QhQ= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/sqlite v1.50.0 h1:eMowQSWLK0MeiQTdmz3lqoF5dqclujdlIKeJA11+7oM= +modernc.org/sqlite v1.50.0/go.mod h1:m0w8xhwYUVY3H6pSDwc3gkJ/irZT/0YEXwBlhaxQEew= oss.indeed.com/go/libtime v1.6.0 h1:XQyczJihse/wQGo59OfPF3f4f+Sywv4R8vdGB3S9BfU= oss.indeed.com/go/libtime v1.6.0/go.mod h1:B2sdEcuzB0zhTKkAuHy4JInKRc7Al3tME4qWam6R7mA= pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk= From b1f4949fb38606054d2e69dca4c99d65cb39abe7 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Fri, 8 May 2026 12:26:16 +0200 Subject: [PATCH 2/4] must --- client/state/db_test.go | 193 ++++++++++++++++++---------------------- 1 file changed, 89 insertions(+), 104 deletions(-) diff --git a/client/state/db_test.go b/client/state/db_test.go index 68b530aa4e2..b3389576e62 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -21,7 +21,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/kr/pretty" "github.com/shoenig/test/must" - "github.com/stretchr/testify/require" ) // assert each implementation satisfies StateDB interface @@ -58,9 +57,7 @@ func setupSQLiteStateDB(t *testing.T) *SQLiteStateDB { dir := t.TempDir() db, err := NewSQLiteStateDB(testlog.HCLogger(t), dir) - if err != nil { - t.Fatalf("error creating sqlite state db: %v", err) - } + must.Nil(t, err) t.Cleanup(func() { if closeErr := db.Close(); closeErr != nil { @@ -91,28 +88,26 @@ func TestStateDB_Allocations(t *testing.T) { ci.Parallel(t) testDB(t, func(t *testing.T, db StateDB) { - require := require.New(t) - // Empty database should return empty non-nil results allocs, errs, err := db.GetAllAllocations() - require.NoError(err) - require.NotNil(allocs) - require.Empty(allocs) - require.NotNil(errs) - require.Empty(errs) + must.NoError(t, err) + must.NotNil(t, allocs) + must.SliceEmpty(t, allocs) + must.NotNil(t, errs) + must.MapEmpty(t, errs) // Put allocations alloc1 := mock.Alloc() alloc2 := mock.BatchAlloc() - require.NoError(db.PutAllocation(alloc1)) - require.NoError(db.PutAllocation(alloc2)) + must.NoError(t, db.PutAllocation(alloc1)) + must.NoError(t, db.PutAllocation(alloc2)) // Retrieve them allocs, errs, err = db.GetAllAllocations() - require.NoError(err) - require.NotNil(allocs) - require.Len(allocs, 2) + must.NoError(t, err) + must.NotNil(t, allocs) + must.Len(t, 2, allocs) for _, a := range allocs { switch a.ID { case alloc1.ID: @@ -129,39 +124,39 @@ func TestStateDB_Allocations(t *testing.T) { t.Fatalf("unexpected alloc id %q", a.ID) } } - require.NotNil(errs) - require.Empty(errs) + must.NotNil(t, errs) + must.MapEmpty(t, errs) // Add another alloc3 := mock.SystemAlloc() - require.NoError(db.PutAllocation(alloc3)) + must.NoError(t, db.PutAllocation(alloc3)) allocs, errs, err = db.GetAllAllocations() - require.NoError(err) - require.NotNil(allocs) - require.Len(allocs, 3) - require.Contains(allocs, alloc1) - require.Contains(allocs, alloc2) - require.Contains(allocs, alloc3) - require.NotNil(errs) - require.Empty(errs) + must.NoError(t, err) + must.NotNil(t, allocs) + must.Len(t, 3, allocs) + must.SliceContainsFunc(t, allocs, alloc1, func(a, b *structs.Allocation) bool { return reflect.DeepEqual(a, b) }) + must.SliceContainsFunc(t, allocs, alloc2, func(a, b *structs.Allocation) bool { return reflect.DeepEqual(a, b) }) + must.SliceContainsFunc(t, allocs, alloc3, func(a, b *structs.Allocation) bool { return reflect.DeepEqual(a, b) }) + must.NotNil(t, errs) + must.MapEmpty(t, errs) // Deleting a nonexistent alloc is a noop - require.NoError(db.DeleteAllocationBucket("asdf")) + must.NoError(t, db.DeleteAllocationBucket("asdf")) allocs, _, err = db.GetAllAllocations() - require.NoError(err) - require.NotNil(allocs) - require.Len(allocs, 3) + must.NoError(t, err) + must.NotNil(t, allocs) + must.Len(t, 3, allocs) // Delete alloc1 - require.NoError(db.DeleteAllocationBucket(alloc1.ID)) + must.NoError(t, db.DeleteAllocationBucket(alloc1.ID)) allocs, errs, err = db.GetAllAllocations() - require.NoError(err) - require.NotNil(allocs) - require.Len(allocs, 2) - require.Contains(allocs, alloc2) - require.Contains(allocs, alloc3) - require.NotNil(errs) - require.Empty(errs) + must.NoError(t, err) + must.NotNil(t, allocs) + must.Len(t, 2, allocs) + must.SliceContainsFunc(t, allocs, alloc2, func(a, b *structs.Allocation) bool { return reflect.DeepEqual(a, b) }) + must.SliceContainsFunc(t, allocs, alloc3, func(a, b *structs.Allocation) bool { return reflect.DeepEqual(a, b) }) + must.NotNil(t, errs) + must.MapEmpty(t, errs) }) } @@ -176,8 +171,6 @@ func TestStateDB_Batch(t *testing.T) { ci.Parallel(t) testDB(t, func(t *testing.T, db StateDB) { - require := require.New(t) - // For BoltDB, get initial tx_id var getTxID func() int var prevTxID int @@ -187,7 +180,7 @@ func TestStateDB_Batch(t *testing.T) { boltdb := boltStateDB.DB().BoltDB() getTxID = func() int { tx, err := boltdb.Begin(true) - require.NoError(err) + must.NoError(t, err) defer tx.Rollback() return tx.ID() } @@ -207,8 +200,8 @@ func TestStateDB_Batch(t *testing.T) { for _, alloc := range allocs { wg.Add(1) go func(alloc *structs.Allocation) { - require.NoError(db.PutNetworkStatus(alloc.ID, mock.AllocNetworkStatus(), WithBatchMode())) - require.NoError(db.PutAllocation(alloc, WithBatchMode())) + must.NoError(t, db.PutNetworkStatus(alloc.ID, mock.AllocNetworkStatus(), WithBatchMode())) + must.NoError(t, db.PutAllocation(alloc, WithBatchMode())) wg.Done() }(alloc) } @@ -221,19 +214,19 @@ func TestStateDB_Batch(t *testing.T) { // See boltdb MaxBatchDelay and MaxBatchSize parameters for more details. if getTxID != nil { numTransactions := getTxID() - prevTxID - writeTime := time.Now().Sub(startTime) + writeTime := time.Since(startTime) expectedNumTransactions := ceilDiv(2*numAllocs, batchSize) + ceilDiv(int(writeTime), int(batchDelay)) - require.LessOrEqual(numTransactions, expectedNumTransactions) + must.LessEq(t, numTransactions, expectedNumTransactions) prevTxID = getTxID() } // Retrieve allocs and make sure they are the same (order can differ) readAllocs, errs, err := db.GetAllAllocations() - require.NoError(err) - require.NotNil(readAllocs) - require.Len(readAllocs, len(allocs)) - require.NotNil(errs) - require.Empty(errs) + must.NoError(t, err) + must.NotNil(t, readAllocs) + must.Len(t, len(allocs), readAllocs) + must.NotNil(t, errs) + must.MapEmpty(t, errs) readAllocsById := make(map[string]*structs.Allocation) for _, readAlloc := range readAllocs { @@ -255,7 +248,7 @@ func TestStateDB_Batch(t *testing.T) { for _, alloc := range allocs { wg.Add(1) go func(alloc *structs.Allocation) { - require.NoError(db.DeleteAllocationBucket(alloc.ID, WithBatchMode())) + must.NoError(t, db.DeleteAllocationBucket(alloc.ID, WithBatchMode())) wg.Done() }(alloc) } @@ -264,17 +257,17 @@ func TestStateDB_Batch(t *testing.T) { // Check BoltDB combined DeleteAllocationBucket calls into much fewer transactions. if getTxID != nil { numTransactions := getTxID() - prevTxID - writeTime := time.Now().Sub(startTime) + writeTime := time.Since(startTime) expectedNumTransactions := ceilDiv(numAllocs, batchSize) + ceilDiv(int(writeTime), int(batchDelay)) - require.LessOrEqual(numTransactions, expectedNumTransactions) + must.LessEq(t, numTransactions, expectedNumTransactions) prevTxID = getTxID() } // Check all allocs were deleted. readAllocs, errs, err = db.GetAllAllocations() - require.NoError(err) - require.Empty(readAllocs) - require.Empty(errs) + must.NoError(t, err) + must.SliceEmpty(t, readAllocs) + must.MapEmpty(t, errs) }) } @@ -284,49 +277,47 @@ func TestStateDB_TaskState(t *testing.T) { ci.Parallel(t) testDB(t, func(t *testing.T, db StateDB) { - require := require.New(t) - // Getting nonexistent state should return nils ls, ts, err := db.GetTaskRunnerState("allocid", "taskname") - require.NoError(err) - require.Nil(ls) - require.Nil(ts) + must.NoError(t, err) + must.Nil(t, ls) + must.Nil(t, ts) // Putting TaskState without first putting the allocation should work state := structs.NewTaskState() state.Failed = true // set a non-default value - require.NoError(db.PutTaskState("allocid", "taskname", state)) + must.NoError(t, db.PutTaskState("allocid", "taskname", state)) // Getting should return the available state ls, ts, err = db.GetTaskRunnerState("allocid", "taskname") - require.NoError(err) - require.Nil(ls) - require.Equal(state, ts) + must.NoError(t, err) + must.Nil(t, ls) + must.Eq(t, state, ts) // Deleting a nonexistent task should not error - require.NoError(db.DeleteTaskBucket("adsf", "asdf")) - require.NoError(db.DeleteTaskBucket("asllocid", "asdf")) + must.NoError(t, db.DeleteTaskBucket("adsf", "asdf")) + must.NoError(t, db.DeleteTaskBucket("asllocid", "asdf")) // Data should be untouched ls, ts, err = db.GetTaskRunnerState("allocid", "taskname") - require.NoError(err) - require.Nil(ls) - require.Equal(state, ts) + must.NoError(t, err) + must.Nil(t, ls) + must.Eq(t, state, ts) // Deleting the task should remove the state - require.NoError(db.DeleteTaskBucket("allocid", "taskname")) + must.NoError(t, db.DeleteTaskBucket("allocid", "taskname")) ls, ts, err = db.GetTaskRunnerState("allocid", "taskname") - require.NoError(err) - require.Nil(ls) - require.Nil(ts) + must.NoError(t, err) + must.Nil(t, ls) + must.Nil(t, ts) // Putting LocalState should work just like TaskState origLocalState := trstate.NewLocalState() - require.NoError(db.PutTaskRunnerLocalState("allocid", "taskname", origLocalState)) + must.NoError(t, db.PutTaskRunnerLocalState("allocid", "taskname", origLocalState)) ls, ts, err = db.GetTaskRunnerState("allocid", "taskname") - require.NoError(err) - require.Equal(origLocalState, ls) - require.Nil(ts) + must.NoError(t, err) + must.Eq(t, origLocalState, ls) + must.Nil(t, ts) }) } @@ -336,22 +327,20 @@ func TestStateDB_DeviceManager(t *testing.T) { ci.Parallel(t) testDB(t, func(t *testing.T, db StateDB) { - require := require.New(t) - // Getting nonexistent state should return nils ps, err := db.GetDevicePluginState() - require.NoError(err) - require.Nil(ps) + must.NoError(t, err) + must.Nil(t, ps) // Putting PluginState should work state := &dmstate.PluginState{} - require.NoError(db.PutDevicePluginState(state)) + must.NoError(t, db.PutDevicePluginState(state)) // Getting should return the available state ps, err = db.GetDevicePluginState() - require.NoError(err) - require.NotNil(ps) - require.Equal(state, ps) + must.NoError(t, err) + must.NotNil(t, ps) + must.Eq(t, state, ps) }) } @@ -361,22 +350,20 @@ func TestStateDB_DriverManager(t *testing.T) { ci.Parallel(t) testDB(t, func(t *testing.T, db StateDB) { - require := require.New(t) - // Getting nonexistent state should return nils ps, err := db.GetDriverPluginState() - require.NoError(err) - require.Nil(ps) + must.NoError(t, err) + must.Nil(t, ps) // Putting PluginState should work state := &driverstate.PluginState{} - require.NoError(db.PutDriverPluginState(state)) + must.NoError(t, db.PutDriverPluginState(state)) // Getting should return the available state ps, err = db.GetDriverPluginState() - require.NoError(err) - require.NotNil(ps) - require.Equal(state, ps) + must.NoError(t, err) + must.NotNil(t, ps) + must.Eq(t, state, ps) }) } @@ -386,22 +373,20 @@ func TestStateDB_DynamicRegistry(t *testing.T) { ci.Parallel(t) testDB(t, func(t *testing.T, db StateDB) { - require := require.New(t) - // Getting nonexistent state should return nils ps, err := db.GetDynamicPluginRegistryState() - require.NoError(err) - require.Nil(ps) + must.NoError(t, err) + must.Nil(t, ps) // Putting PluginState should work state := &dynamicplugins.RegistryState{} - require.NoError(db.PutDynamicPluginRegistryState(state)) + must.NoError(t, db.PutDynamicPluginRegistryState(state)) // Getting should return the available state ps, err = db.GetDynamicPluginRegistryState() - require.NoError(err) - require.NotNil(ps) - require.Equal(state, ps) + must.NoError(t, err) + must.NotNil(t, ps) + must.Eq(t, state, ps) }) } @@ -563,6 +548,6 @@ func TestStateDB_Upgrade(t *testing.T) { ci.Parallel(t) testDB(t, func(t *testing.T, db StateDB) { - require.NoError(t, db.Upgrade()) + must.NoError(t, db.Upgrade()) }) } From d4f71222d3a1c467df8165130755e6300f1a7d0d Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Fri, 8 May 2026 12:28:21 +0200 Subject: [PATCH 3/4] oopsie arguments order --- client/state/db_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/state/db_test.go b/client/state/db_test.go index b3389576e62..c5e3dd77e60 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -216,7 +216,7 @@ func TestStateDB_Batch(t *testing.T) { numTransactions := getTxID() - prevTxID writeTime := time.Since(startTime) expectedNumTransactions := ceilDiv(2*numAllocs, batchSize) + ceilDiv(int(writeTime), int(batchDelay)) - must.LessEq(t, numTransactions, expectedNumTransactions) + must.LessEq(t, expectedNumTransactions, numTransactions) prevTxID = getTxID() } @@ -259,7 +259,7 @@ func TestStateDB_Batch(t *testing.T) { numTransactions := getTxID() - prevTxID writeTime := time.Since(startTime) expectedNumTransactions := ceilDiv(numAllocs, batchSize) + ceilDiv(int(writeTime), int(batchDelay)) - must.LessEq(t, numTransactions, expectedNumTransactions) + must.LessEq(t, expectedNumTransactions, numTransactions) prevTxID = getTxID() } From d80e26cfb2cec584239a2d7777861b586ad6eb6e Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Fri, 8 May 2026 13:45:31 +0200 Subject: [PATCH 4/4] tidy --- go.mod | 2 +- go.sum | 23 +++++++++++++++++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 1a5c4f16754..d5c433721b6 100644 --- a/go.mod +++ b/go.mod @@ -141,6 +141,7 @@ require ( google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 + modernc.org/sqlite v1.50.0 oss.indeed.com/go/libtime v1.6.0 pgregory.net/rapid v1.2.0 ) @@ -359,5 +360,4 @@ require ( modernc.org/libc v1.72.0 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect - modernc.org/sqlite v1.50.0 // indirect ) diff --git a/go.sum b/go.sum index 0a383ec55c6..46ddaf13ae2 100644 --- a/go.sum +++ b/go.sum @@ -364,9 +364,8 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9Fc= github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= -github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k= -github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -1114,14 +1113,34 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= kernel.org/pub/linux/libs/security/libcap/psx v1.2.77 h1:Z06sMOzc0GNCwp6efaVrIrz4ywGJ1v+DP0pjVkOfDuA= kernel.org/pub/linux/libs/security/libcap/psx v1.2.77/go.mod h1:+l6Ee2F59XiJ2I6WR5ObpC1utCQJZ/VLsEbQCD8RG24= +modernc.org/cc/v4 v4.27.3 h1:uNCgn37E5U09mTv1XgskEVUJ8ADKpmFMPxzGJ0TSo+U= +modernc.org/cc/v4 v4.27.3/go.mod h1:3YjcbCqhoTTHPycJDRl2WZKKFj0nwcOIPBfEZK0Hdk8= +modernc.org/ccgo/v4 v4.32.4 h1:L5OB8rpEX4ZsXEQwGozRfJyJSFHbbNVOoQ59DU9/KuU= +modernc.org/ccgo/v4 v4.32.4/go.mod h1:lY7f+fiTDHfcv6YlRgSkxYfhs+UvOEEzj49jAn2TOx0= +modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM= +modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo= +modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= modernc.org/libc v1.72.0 h1:IEu559v9a0XWjw0DPoVKtXpO2qt5NVLAnFaBbjq+n8c= modernc.org/libc v1.72.0/go.mod h1:tTU8DL8A+XLVkEY3x5E/tO7s2Q/q42EtnNWda/L5QhQ= modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= modernc.org/sqlite v1.50.0 h1:eMowQSWLK0MeiQTdmz3lqoF5dqclujdlIKeJA11+7oM= modernc.org/sqlite v1.50.0/go.mod h1:m0w8xhwYUVY3H6pSDwc3gkJ/irZT/0YEXwBlhaxQEew= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= oss.indeed.com/go/libtime v1.6.0 h1:XQyczJihse/wQGo59OfPF3f4f+Sywv4R8vdGB3S9BfU= oss.indeed.com/go/libtime v1.6.0/go.mod h1:B2sdEcuzB0zhTKkAuHy4JInKRc7Al3tME4qWam6R7mA= pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk=