diff --git a/CLAUDE.md b/CLAUDE.md index 34b9853..48ca9e4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -60,7 +60,7 @@ When none are present, `DEFAULT_TENANT` (default `"default"`) is assigned. Every | Time Series (in-memory) | `internal/tsdb/` | Ring buffer, sliding windows, pre-computed percentiles | | Graph (in-memory, legacy) | `internal/graph/` | Simple service topology — **being replaced by GraphRAG** | | Vector (embedded) | `internal/vectordb/` | TF-IDF index for semantic log search (pure Go, no CGO). Retained as a fallback similarity index for SQLite mode and for `SimilarErrors` ranking within a Drain template cluster. | -| Relational (persistent) | `internal/storage/` | GORM-based, multi-DB, single source of truth. Driven by `RetentionScheduler` (hourly batched purge + daily VACUUM/ANALYZE). `logs.body` is plain TEXT (Postgres: `pg_trgm` GIN indexed for substring search); `AttributesJSON` and `AIInsight` remain `CompressedText`. | +| Relational (persistent) | `internal/storage/` | GORM-based, multi-DB, single source of truth. Driven by `RetentionScheduler` (hourly batched purge + daily VACUUM/ANALYZE). `logs.body` is plain TEXT. **Log search**: SQLite uses FTS5 virtual table `logs_fts` (porter+unicode61 tokenizer) ordered by `bm25()`, kept in sync via AFTER INSERT/DELETE/UPDATE triggers; Postgres uses `pg_trgm` GIN on `logs.body` and `logs.service_name`. `AttributesJSON` and `AIInsight` remain `CompressedText`. | ## GraphRAG Architecture diff --git a/docs/OPERATIONS.md b/docs/OPERATIONS.md index 03ed766..1aaf352 100644 --- a/docs/OPERATIONS.md +++ b/docs/OPERATIONS.md @@ -115,6 +115,30 @@ SQLite is rejected at startup when `APP_ENV=production` unless you explicitly op **Multi-tenancy.** Every row carries a `tenant_id` column. The write path reads `X-Tenant-ID` (HTTP) or `x-tenant-id` (gRPC metadata) and populates the column. The read path attaches the tenant from the request context to every repository query (`Where("tenant_id = ?", ...)`). +### Log search index + +| Driver | Index | Ranking | +|---|---|---| +| SQLite | FTS5 virtual table `logs_fts` over `(body, service_name)`, kept in sync via AFTER INSERT/DELETE/UPDATE triggers on `logs` | `bm25(logs_fts)` ascending (lower = more relevant) | +| Postgres | `pg_trgm` GIN indexes on `logs.body` and `logs.service_name` | Recency (`timestamp desc`) — substring ILIKE | +| MySQL / SQL Server | None — sequential `LIKE` scan | Recency | + +The FTS5 path uses `tokenize='porter unicode61 remove_diacritics 2'` — case-insensitive, accent-insensitive, English-stemmed (so `panic` matches `panicked`). User input is escaped and prefix-suffixed (`*`) so partial words like `conn` still match `connection`. If FTS5 errors at query time, the repository transparently falls back to LIKE so a misbehaving index does not surface as a 500 to the API. + +The FTS5 table is provisioned automatically by `AutoMigrateModels` on every SQLite boot; setup is idempotent. To rebuild after corruption or a manual schema change: + +```sql +INSERT INTO logs_fts(logs_fts) VALUES('rebuild'); +``` + +The Postgres `pg_trgm` path requires the extension; if missing, AutoMigrate logs a warning and ILIKE falls back to a sequential scan. To install: + +```sql +CREATE EXTENSION pg_trgm; +``` + +Phase 3b will add Postgres declarative partitioning as an opt-in adapter; at that point the GIN indexes will be created per-partition. There is no migration required to use FTS5 — existing SQLite databases are backfilled the first time the upgraded binary boots. + --- ## Backup & Restore diff --git a/internal/storage/factory.go b/internal/storage/factory.go index 633fc65..56109ec 100644 --- a/internal/storage/factory.go +++ b/internal/storage/factory.go @@ -221,6 +221,15 @@ func AutoMigrateModels(db *gorm.DB, driver string) error { log.Println("🔓 Dropped legacy FK constraints (no-op on fresh DBs)") } + // SQLite: provision FTS5 virtual table + triggers on logs.body / logs.service_name. + // Search routes through bm25() ranking on this driver; LIKE remains the fallback + // if FTS5 is unavailable (older SQLite builds without FTS5 compiled in). + if driver == "sqlite" || driver == "" { + if err := setupSQLiteFTS5(db); err != nil { + log.Printf("⚠️ SQLite FTS5 setup failed (%v) — log search will fall back to LIKE", err) + } + } + // Postgres: enable pg_trgm and create a GIN index on logs.body for fuzzy ILIKE search. // Azure Database for PostgreSQL allows pg_trgm by default. If the role lacks // CREATE EXTENSION privilege, an operator can pre-create the extension and this diff --git a/internal/storage/fts5.go b/internal/storage/fts5.go new file mode 100644 index 0000000..96cc56e --- /dev/null +++ b/internal/storage/fts5.go @@ -0,0 +1,119 @@ +package storage + +import ( + "fmt" + "log" + "strings" + + "gorm.io/gorm" +) + +// fts5LogsTable is the FTS5 virtual table mirroring `logs.body` and +// `logs.service_name`. It is an external-content table keyed on `logs.id` so it +// stores no extra copy of the body — instead, INSERT/DELETE/UPDATE on `logs` +// are mirrored via the triggers installed in setupSQLiteFTS5. +const fts5LogsTable = "logs_fts" + +// setupSQLiteFTS5 provisions the FTS5 virtual table for log search on SQLite +// and the AFTER INSERT/DELETE/UPDATE triggers that keep it in sync with the +// `logs` base table. The implementation is idempotent: it tolerates an +// existing virtual table left over from a previous boot, repairs missing +// triggers, and runs an initial backfill via the `rebuild` command so that +// rows present in `logs` before the FTS table existed (e.g. migrating an +// older OtelContext.db) are included in the BM25 index. +// +// Tokenizer rationale: `porter unicode61 remove_diacritics 2` chosen for: +// - unicode61: case-insensitive, splits on whitespace+punctuation +// - remove_diacritics 2: strips accents (latency vs latência both match) +// - porter: English stemming so "panic" matches "panicked"/"panicking" +// +// All three are pure-SQLite — they do not require external linkage and work +// on the modernc.org/sqlite (glebarez) build used in this project. +func setupSQLiteFTS5(db *gorm.DB) error { + create := `CREATE VIRTUAL TABLE IF NOT EXISTS ` + fts5LogsTable + ` USING fts5( + body, + service_name, + content='logs', + content_rowid='id', + tokenize='porter unicode61 remove_diacritics 2' + )` + if err := db.Exec(create).Error; err != nil { + // FTS5 is included in the modernc.org/sqlite amalgamation by default; + // if this fails, the build was compiled without FTS5. Surface the + // failure so SearchLogs can fall back to LIKE rather than producing + // a confusing "no such table" error later. + return fmt.Errorf("create fts5 virtual table: %w", err) + } + + triggers := []struct { + name string + ddl string + }{ + { + name: "logs_ai", + ddl: `CREATE TRIGGER IF NOT EXISTS logs_ai AFTER INSERT ON logs BEGIN + INSERT INTO ` + fts5LogsTable + `(rowid, body, service_name) VALUES (new.id, new.body, new.service_name); + END`, + }, + { + name: "logs_ad", + ddl: `CREATE TRIGGER IF NOT EXISTS logs_ad AFTER DELETE ON logs BEGIN + INSERT INTO ` + fts5LogsTable + `(` + fts5LogsTable + `, rowid, body, service_name) VALUES ('delete', old.id, old.body, old.service_name); + END`, + }, + { + name: "logs_au", + ddl: `CREATE TRIGGER IF NOT EXISTS logs_au AFTER UPDATE ON logs BEGIN + INSERT INTO ` + fts5LogsTable + `(` + fts5LogsTable + `, rowid, body, service_name) VALUES ('delete', old.id, old.body, old.service_name); + INSERT INTO ` + fts5LogsTable + `(rowid, body, service_name) VALUES (new.id, new.body, new.service_name); + END`, + }, + } + for _, tr := range triggers { + if err := db.Exec(tr.ddl).Error; err != nil { + return fmt.Errorf("create trigger %s: %w", tr.name, err) + } + } + + // Backfill any rows already present in `logs` but not yet in the FTS index. + // `rebuild` is a no-op on a fresh DB and cheap on a populated one — FTS5 + // streams the source rows once. + if err := db.Exec(`INSERT INTO ` + fts5LogsTable + `(` + fts5LogsTable + `) VALUES ('rebuild')`).Error; err != nil { + return fmt.Errorf("rebuild fts5 index: %w", err) + } + + log.Println("🔎 SQLite: FTS5 BM25 index ready on logs(body, service_name)") + return nil +} + +// fts5MatchExpr translates a free-form user search string into an FTS5 MATCH +// expression that approximates the previous LIKE %query% semantics: +// +// - whitespace-separated terms are ANDed together +// - each term is double-quoted so FTS5 treats internal punctuation as +// literal token separators rather than query operators +// - each term is suffixed with `*` for prefix match, so a search for "conn" +// still hits "connection"; combined with the porter stemmer this also +// covers inflectional matches like "panic" → "panicked" +// +// Returns the empty string for empty/whitespace-only input — the caller is +// expected to skip the WHERE-clause attachment in that case. +func fts5MatchExpr(input string) string { + fields := strings.Fields(input) + if len(fields) == 0 { + return "" + } + parts := make([]string, 0, len(fields)) + for _, f := range fields { + escaped := strings.ReplaceAll(f, `"`, `""`) + parts = append(parts, `"`+escaped+`"*`) + } + return strings.Join(parts, " ") +} + +// fts5Available reports whether the given driver should use the FTS5 path. We +// only enable FTS5 on SQLite because Postgres has its own pg_trgm GIN path +// (see factory.go) and MySQL/SQL Server are out of scope. +func fts5Available(driver string) bool { + return strings.ToLower(driver) == "sqlite" +} diff --git a/internal/storage/fts5_test.go b/internal/storage/fts5_test.go new file mode 100644 index 0000000..7f66e26 --- /dev/null +++ b/internal/storage/fts5_test.go @@ -0,0 +1,272 @@ +package storage + +import ( + "context" + "strings" + "testing" + "time" +) + +// withTenant attaches a tenant id to ctx — tests use this so that the +// tenant_id WHERE clause matches the explicit tenant we set on each row, +// rather than the implicit "default". +func withTenant(ctx context.Context, t string) context.Context { + return WithTenantContext(ctx, t) +} + +// TestFTS5MatchExpr_Empty verifies the empty/whitespace short-circuit so the +// caller knows to skip MATCH attachment. +func TestFTS5MatchExpr_Empty(t *testing.T) { + cases := []string{"", " ", "\t\n"} + for _, c := range cases { + if got := fts5MatchExpr(c); got != "" { + t.Fatalf("fts5MatchExpr(%q) = %q, want empty", c, got) + } + } +} + +// TestFTS5MatchExpr_Quoting verifies that user input is quoted and prefix-suffixed. +func TestFTS5MatchExpr_Quoting(t *testing.T) { + cases := []struct { + in string + want string + }{ + {"connection", `"connection"*`}, + {"foo bar", `"foo"* "bar"*`}, + {`he said "hi"`, `"he"* "said"* """hi"""*`}, + {" panic ", `"panic"*`}, + } + for _, c := range cases { + if got := fts5MatchExpr(c.in); got != c.want { + t.Fatalf("fts5MatchExpr(%q) = %q, want %q", c.in, got, c.want) + } + } +} + +// TestFTS5Available_DriverGate verifies that only sqlite uses the FTS5 path. +func TestFTS5Available_DriverGate(t *testing.T) { + cases := []struct { + driver string + want bool + }{ + {"sqlite", true}, + {"SQLITE", true}, + {"", false}, + {"postgres", false}, + {"mysql", false}, + {"mssql", false}, + } + for _, c := range cases { + if got := fts5Available(c.driver); got != c.want { + t.Fatalf("fts5Available(%q) = %v, want %v", c.driver, got, c.want) + } + } +} + +// TestSearchLogs_FTS5_BM25_Ordering verifies that BM25 puts the more relevant +// row first (more occurrences of the query token = lower BM25 score = higher rank). +func TestSearchLogs_FTS5_BM25_Ordering(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + rows := []Log{ + {TenantID: "default", Severity: "ERROR", Body: "connection error connection lost connection refused", ServiceName: "api", Timestamp: now.Add(-3 * time.Second)}, + {TenantID: "default", Severity: "INFO", Body: "service started", ServiceName: "api", Timestamp: now.Add(-2 * time.Second)}, + {TenantID: "default", Severity: "WARN", Body: "lost connection to upstream", ServiceName: "api", Timestamp: now.Add(-1 * time.Second)}, + } + if err := repo.db.Create(&rows).Error; err != nil { + t.Fatalf("seed: %v", err) + } + logs, err := repo.SearchLogs(context.Background(), "connection", 10) + if err != nil { + t.Fatalf("search: %v", err) + } + if len(logs) != 2 { + t.Fatalf("want 2 BM25 matches, got %d", len(logs)) + } + if !strings.Contains(logs[0].Body, "connection error connection lost connection refused") { + t.Fatalf("BM25 should rank triple-occurrence row first; got %q", logs[0].Body) + } +} + +// TestSearchLogs_FTS5_PrefixMatch verifies that "conn" matches "connection" +// thanks to the trailing `*` wildcard the helper appends. +func TestSearchLogs_FTS5_PrefixMatch(t *testing.T) { + repo := newTestRepo(t) + repo.db.Create(&[]Log{ + {TenantID: "default", Severity: "ERROR", Body: "connection refused", ServiceName: "api", Timestamp: time.Now().UTC()}, + {TenantID: "default", Severity: "INFO", Body: "kernel panic", ServiceName: "k", Timestamp: time.Now().UTC()}, + }) + logs, err := repo.SearchLogs(context.Background(), "conn", 10) + if err != nil { + t.Fatalf("search: %v", err) + } + if len(logs) != 1 { + t.Fatalf("want 1 prefix match for 'conn', got %d", len(logs)) + } +} + +// TestSearchLogs_FTS5_PorterStemming verifies the porter tokenizer treats +// "panic" and "panicked" as the same stem. +func TestSearchLogs_FTS5_PorterStemming(t *testing.T) { + repo := newTestRepo(t) + repo.db.Create(&[]Log{ + {TenantID: "default", Severity: "ERROR", Body: "the worker panicked at startup", ServiceName: "w", Timestamp: time.Now().UTC()}, + {TenantID: "default", Severity: "INFO", Body: "all good", ServiceName: "w", Timestamp: time.Now().UTC()}, + }) + logs, err := repo.SearchLogs(context.Background(), "panic", 10) + if err != nil { + t.Fatalf("search: %v", err) + } + if len(logs) != 1 { + t.Fatalf("porter stem 'panic' should match 'panicked'; got %d rows", len(logs)) + } +} + +// TestSearchLogs_FTS5_TenantIsolation verifies the tenant_id WHERE is honored +// so a tenant cannot read another tenant's matching rows via FTS5. +func TestSearchLogs_FTS5_TenantIsolation(t *testing.T) { + repo := newTestRepo(t) + repo.db.Create(&[]Log{ + {TenantID: "alpha", Severity: "ERROR", Body: "secret to alpha: connection lost", ServiceName: "api", Timestamp: time.Now().UTC()}, + {TenantID: "beta", Severity: "ERROR", Body: "secret to beta: connection lost", ServiceName: "api", Timestamp: time.Now().UTC()}, + }) + + logs, err := repo.SearchLogs(withTenant(context.Background(), "alpha"), "connection", 10) + if err != nil { + t.Fatalf("search alpha: %v", err) + } + if len(logs) != 1 || logs[0].TenantID != "alpha" { + t.Fatalf("tenant alpha should see only its row; got %d rows (first tenant=%q)", len(logs), firstTenant(logs)) + } + logs, err = repo.SearchLogs(withTenant(context.Background(), "beta"), "connection", 10) + if err != nil { + t.Fatalf("search beta: %v", err) + } + if len(logs) != 1 || logs[0].TenantID != "beta" { + t.Fatalf("tenant beta should see only its row; got %d rows", len(logs)) + } +} + +func firstTenant(l []Log) string { + if len(l) == 0 { + return "" + } + return l[0].TenantID +} + +// TestSearchLogs_FTS5_DeleteTriggerKeepsIndexInSync verifies that a hard +// DELETE on logs propagates through the AFTER DELETE trigger so the row no +// longer appears in MATCH results — required for retention purges. +func TestSearchLogs_FTS5_DeleteTriggerKeepsIndexInSync(t *testing.T) { + repo := newTestRepo(t) + old := time.Now().UTC().Add(-30 * 24 * time.Hour) + row := Log{TenantID: "default", Severity: "ERROR", Body: "rare error string xyzzy", ServiceName: "s", Timestamp: old} + if err := repo.db.Create(&row).Error; err != nil { + t.Fatalf("seed: %v", err) + } + // Confirm row is searchable via FTS5. + logs, err := repo.SearchLogs(context.Background(), "xyzzy", 10) + if err != nil { + t.Fatalf("pre-delete search: %v", err) + } + if len(logs) != 1 { + t.Fatalf("want 1 row before delete, got %d", len(logs)) + } + // Drop via the retention path — exercises GORM hard delete and the AFTER + // DELETE trigger. + if _, err := repo.PurgeLogsBatched(context.Background(), time.Now().UTC().Add(-1*time.Hour), 100, time.Millisecond); err != nil { + t.Fatalf("purge: %v", err) + } + logs, err = repo.SearchLogs(context.Background(), "xyzzy", 10) + if err != nil { + t.Fatalf("post-delete search: %v", err) + } + if len(logs) != 0 { + t.Fatalf("FTS index should have dropped row after delete trigger; got %d rows", len(logs)) + } +} + +// TestSearchLogs_FTS5_SpecialCharsDoNotPanic verifies that user input +// containing FTS5 query operators is escaped and does not produce a syntax +// error or fall through to a 500. +func TestSearchLogs_FTS5_SpecialCharsDoNotPanic(t *testing.T) { + repo := newTestRepo(t) + repo.db.Create(&Log{TenantID: "default", Severity: "INFO", Body: "ok", ServiceName: "s", Timestamp: time.Now().UTC()}) + cases := []string{ + `AND OR NOT`, + `"`, + `* + -`, + `{ } ( )`, + `zzzz-no-such-term`, + } + for _, q := range cases { + t.Run(q, func(t *testing.T) { + if _, err := repo.SearchLogs(context.Background(), q, 10); err != nil { + t.Fatalf("special-char query %q errored: %v", q, err) + } + }) + } +} + +// TestGetLogsV2_FTS5_OrdersByBM25 verifies that GetLogsV2's search path also +// orders results by BM25 relevance on SQLite. +func TestGetLogsV2_FTS5_OrdersByBM25(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + repo.db.Create(&[]Log{ + {TenantID: "default", Severity: "ERROR", Body: "panic: nil pointer dereference at panic at panic", ServiceName: "api", TraceID: "t1", Timestamp: now}, + {TenantID: "default", Severity: "INFO", Body: "no issues", ServiceName: "api", TraceID: "t2", Timestamp: now}, + {TenantID: "default", Severity: "WARN", Body: "panic recovered", ServiceName: "api", TraceID: "t3", Timestamp: now}, + }) + logs, total, err := repo.GetLogsV2(context.Background(), LogFilter{Search: "panic", Limit: 10}) + if err != nil { + t.Fatalf("GetLogsV2: %v", err) + } + if total != 2 || len(logs) != 2 { + t.Fatalf("want 2/2 got %d/%d", total, len(logs)) + } + if logs[0].TraceID != "t1" { + t.Fatalf("BM25 should rank triple-occurrence row first; got TraceID=%q", logs[0].TraceID) + } +} + +// TestGetLogsV2_FTS5_FiltersStillApply verifies that ServiceName and Severity +// filters compose correctly on top of FTS5 MATCH. +func TestGetLogsV2_FTS5_FiltersStillApply(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + repo.db.Create(&[]Log{ + {TenantID: "default", Severity: "ERROR", Body: "connection lost", ServiceName: "api", Timestamp: now}, + {TenantID: "default", Severity: "INFO", Body: "connection ok", ServiceName: "api", Timestamp: now}, + {TenantID: "default", Severity: "ERROR", Body: "connection lost", ServiceName: "auth", Timestamp: now}, + }) + logs, total, err := repo.GetLogsV2(context.Background(), LogFilter{Search: "connection", Severity: "ERROR", ServiceName: "api", Limit: 10}) + if err != nil { + t.Fatalf("GetLogsV2: %v", err) + } + if total != 1 || len(logs) != 1 { + t.Fatalf("want 1 ERROR+api+'connection' row; got %d/%d", total, len(logs)) + } + if logs[0].ServiceName != "api" || logs[0].Severity != "ERROR" { + t.Fatalf("filter mismatch: service=%q severity=%q", logs[0].ServiceName, logs[0].Severity) + } +} + +// TestSetupSQLiteFTS5_Idempotent verifies the setup is safe to re-run on a DB +// that already has the table + triggers. +func TestSetupSQLiteFTS5_Idempotent(t *testing.T) { + repo := newTestRepo(t) + // newTestRepo already ran setupSQLiteFTS5 via AutoMigrateModels; running + // it a second time must not error or duplicate triggers. + if err := setupSQLiteFTS5(repo.db); err != nil { + t.Fatalf("re-run setupSQLiteFTS5: %v", err) + } + repo.db.Create(&Log{TenantID: "default", Severity: "INFO", Body: "second-run sentinel", ServiceName: "s", Timestamp: time.Now().UTC()}) + logs, err := repo.SearchLogs(context.Background(), "sentinel", 10) + if err != nil { + t.Fatalf("search: %v", err) + } + if len(logs) != 1 { + t.Fatalf("expected 1 row after idempotent re-setup; got %d", len(logs)) + } +} diff --git a/internal/storage/log_repo.go b/internal/storage/log_repo.go index e57be6b..bf65f9c 100644 --- a/internal/storage/log_repo.go +++ b/internal/storage/log_repo.go @@ -62,12 +62,30 @@ func (r *Repository) GetRecentLogs(ctx context.Context, limit int) ([]Log, error // GetLogsV2 performs advanced filtering and search on logs scoped to the // tenant on ctx. COUNT and SELECT run in parallel via errgroup for reduced latency. +// +// When `filter.Search` is set and the driver is SQLite, the query routes +// through the FTS5 virtual table (`logs_fts`) and results are ordered by BM25 +// relevance. Other drivers continue to use LIKE/ILIKE against logs.body and +// logs.trace_id. func (r *Repository) GetLogsV2(ctx context.Context, filter LogFilter) ([]Log, int64, error) { tenant := TenantFromContext(ctx) var logs []Log var total int64 + useFTS5 := filter.Search != "" && fts5Available(r.driver) + matchExpr := "" + if useFTS5 { + matchExpr = fts5MatchExpr(filter.Search) + if matchExpr == "" { + useFTS5 = false + } + } + base := r.db.WithContext(ctx).Model(&Log{}).Where("tenant_id = ?", tenant) + if useFTS5 { + base = base.Joins("JOIN "+fts5LogsTable+" ON logs.id = "+fts5LogsTable+".rowid"). + Where(fts5LogsTable+" MATCH ?", matchExpr) + } if filter.ServiceName != "" { base = base.Where("service_name = ?", filter.ServiceName) @@ -84,12 +102,17 @@ func (r *Repository) GetLogsV2(ctx context.Context, filter LogFilter) ([]Log, in if !filter.EndTime.IsZero() { base = base.Where("timestamp <= ?", filter.EndTime) } - if filter.Search != "" { + if filter.Search != "" && !useFTS5 { search := "%" + filter.Search + "%" op := r.likeOp() base = base.Where(fmt.Sprintf("body %s ? OR trace_id %s ?", op, op), search, search) } + orderBy := "timestamp desc" + if useFTS5 { + orderBy = "bm25(" + fts5LogsTable + ") ASC" + } + // Run COUNT and SELECT in parallel using independent sessions. var g errgroup.Group g.Go(func() error { @@ -97,18 +120,62 @@ func (r *Repository) GetLogsV2(ctx context.Context, filter LogFilter) ([]Log, in }) g.Go(func() error { return base.Session(&gorm.Session{}). - Order("timestamp desc"). + Order(orderBy). Limit(filter.Limit). Offset(filter.Offset). Find(&logs).Error }) if err := g.Wait(); err != nil { + if useFTS5 { + // Single retry via the LIKE fallback so a transient FTS5 issue does + // not turn into a 500 for users. + return r.getLogsV2LikeFallback(ctx, filter, tenant) + } return nil, 0, fmt.Errorf("failed to fetch logs: %w", err) } return logs, total, nil } +// getLogsV2LikeFallback re-runs the query using LIKE against body/trace_id — +// used when the FTS5 path errors out so the API never serves a 500 because of +// an index-layer hiccup. +func (r *Repository) getLogsV2LikeFallback(ctx context.Context, filter LogFilter, tenant string) ([]Log, int64, error) { + var logs []Log + var total int64 + base := r.db.WithContext(ctx).Model(&Log{}).Where("tenant_id = ?", tenant) + if filter.ServiceName != "" { + base = base.Where("service_name = ?", filter.ServiceName) + } + if filter.Severity != "" { + base = base.Where("severity = ?", filter.Severity) + } + if filter.TraceID != "" { + base = base.Where("trace_id = ?", filter.TraceID) + } + if !filter.StartTime.IsZero() { + base = base.Where("timestamp >= ?", filter.StartTime) + } + if !filter.EndTime.IsZero() { + base = base.Where("timestamp <= ?", filter.EndTime) + } + if filter.Search != "" { + search := "%" + filter.Search + "%" + op := r.likeOp() + base = base.Where(fmt.Sprintf("body %s ? OR trace_id %s ?", op, op), search, search) + } + var g errgroup.Group + g.Go(func() error { return base.Session(&gorm.Session{}).Count(&total).Error }) + g.Go(func() error { + return base.Session(&gorm.Session{}). + Order("timestamp desc").Limit(filter.Limit).Offset(filter.Offset).Find(&logs).Error + }) + if err := g.Wait(); err != nil { + return nil, 0, fmt.Errorf("failed to fetch logs (fallback): %w", err) + } + return logs, total, nil +} + // GetLogContext returns logs surrounding a specific timestamp (+/- 1 minute), // scoped to the tenant on ctx. func (r *Repository) GetLogContext(ctx context.Context, targetTime time.Time) ([]Log, error) { diff --git a/internal/storage/repository.go b/internal/storage/repository.go index 322562c..de8ba77 100644 --- a/internal/storage/repository.go +++ b/internal/storage/repository.go @@ -218,8 +218,16 @@ func (r *Repository) RecentLogs(ctx context.Context, limit int) ([]Log, error) { } // SearchLogs searches for logs based on query, scoped to the tenant carried on ctx. +// +// On SQLite the search routes through the FTS5 virtual table (`logs_fts`) and +// orders by BM25 score for relevance. On Postgres / MySQL / others it falls +// back to LIKE/ILIKE against logs.body and logs.service_name (Postgres uses +// the pg_trgm GIN indexes built in AutoMigrateModels). func (r *Repository) SearchLogs(ctx context.Context, query string, limit int) ([]Log, error) { tenant := TenantFromContext(ctx) + if query != "" && fts5Available(r.driver) { + return r.searchLogsFTS5(ctx, tenant, query, limit) + } var logs []Log db := r.db.WithContext(ctx).Where("tenant_id = ?", tenant).Order("timestamp desc").Limit(limit) if query != "" { @@ -231,3 +239,43 @@ func (r *Repository) SearchLogs(ctx context.Context, query string, limit int) ([ } return logs, nil } + +// searchLogsFTS5 runs the FTS5 + BM25 path. The MATCH expression is built via +// fts5MatchExpr to keep user input safe from FTS5 query syntax. Results are +// returned ordered by BM25 score (lower = more relevant in SQLite's +// implementation, which returns negative scores). +func (r *Repository) searchLogsFTS5(ctx context.Context, tenant, query string, limit int) ([]Log, error) { + matchExpr := fts5MatchExpr(query) + if matchExpr == "" { + var logs []Log + err := r.db.WithContext(ctx).Where("tenant_id = ?", tenant).Order("timestamp desc").Limit(limit).Find(&logs).Error + return logs, err + } + var logs []Log + err := r.db.WithContext(ctx). + Table("logs"). + Joins("JOIN "+fts5LogsTable+" ON logs.id = "+fts5LogsTable+".rowid"). + Where("logs.tenant_id = ? AND "+fts5LogsTable+" MATCH ?", tenant, matchExpr). + Order("bm25(" + fts5LogsTable + ") ASC"). + Limit(limit). + Find(&logs).Error + if err != nil { + // On any FTS5 query error (malformed expression, missing table on a + // half-migrated DB), fall back to LIKE so we never serve a 500 just + // because the index path is unhappy. + return r.searchLogsLikeFallback(ctx, tenant, query, limit) + } + return logs, nil +} + +func (r *Repository) searchLogsLikeFallback(ctx context.Context, tenant, query string, limit int) ([]Log, error) { + var logs []Log + op := r.likeOp() + err := r.db.WithContext(ctx). + Where("tenant_id = ?", tenant). + Where(fmt.Sprintf("body %s ? OR service_name %s ?", op, op), "%"+query+"%", "%"+query+"%"). + Order("timestamp desc"). + Limit(limit). + Find(&logs).Error + return logs, err +}