Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ docs/

# Subagent worktree scratch paths (managed by the harness, not source)
.claude/worktrees/

# Local code-intelligence cache (codeiq) — runtime DB + neo4j store, not source
.codeiq/
49 changes: 45 additions & 4 deletions cmd/log_tool_use.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"encoding/json"
"io"
"log/slog"
"os"
"path/filepath"
"regexp"
Expand Down Expand Up @@ -46,23 +47,58 @@ func sanitizeSessionID(id string) string {
return clean
}

// warn surfaces hook failures via stderr (claude usually captures this)
// AND appends to ~/.config/ctm/logs/.hook-errors.log so silent drops leave
// a forensic trail. Both paths are best-effort — we still return nil from
// the hook to preserve the contract that hook failures never block tools.
func warn(reason string, attrs ...slog.Attr) {
args := make([]any, 0, len(attrs)*2)
for _, a := range attrs {
args = append(args, a.Key, a.Value)
}
slog.Warn("log-tool-use: "+reason, args...)
if errLog, err := os.OpenFile(filepath.Join(config.Dir(), "logs", ".hook-errors.log"), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600); err == nil {
fields := map[string]any{"ts": time.Now().UTC().Format(time.RFC3339Nano), "reason": reason}
for _, a := range attrs {
fields[a.Key] = a.Value.Any()
}
if line, mErr := json.Marshal(fields); mErr == nil {
_, _ = errLog.Write(append(line, '\n'))
}
_ = errLog.Close()
}
}

func runLogToolUse(cmd *cobra.Command, args []string) error {
// Read all of stdin. Hook payloads are small (<100KB typically).
data, err := io.ReadAll(io.LimitReader(os.Stdin, 1<<20)) // 1 MiB cap
if err != nil || len(data) == 0 {
if err != nil {
warn("stdin read failed", slog.String("err", err.Error()))
return nil
}
if len(data) == 0 {
// Empty stdin is legitimate (hook fired with no payload — shouldn't
// happen but isn't an error from our side). Silent skip.
return nil
}

// Parse into a generic map so we preserve all fields claude sends.
var payload map[string]interface{}
if err := json.Unmarshal(data, &payload); err != nil {
warn("payload parse failed", slog.String("err", err.Error()), slog.Int("bytes", len(data)))
return nil
}

// Extract and sanitize session_id for the filename.
rawSessionID, _ := payload["session_id"].(string)
sessionID := "unknown"
if v, ok := payload["session_id"].(string); ok && v != "" {
sessionID = sanitizeSessionID(v)
if rawSessionID != "" {
sessionID = sanitizeSessionID(rawSessionID)
}
if sessionID == "unknown" {
// session_id missing or unsanitizable — file lands at logs/unknown.jsonl
// and the daemon won't tail it under any session name. Surface this.
warn("session_id missing or invalid", slog.String("raw", rawSessionID))
}

// Add a ctm-side timestamp so the log is readable even if claude
Expand All @@ -72,19 +108,22 @@ func runLogToolUse(cmd *cobra.Command, args []string) error {
logDir := filepath.Join(config.Dir(), "logs")
// 0700 on the dir — tool payloads can contain file paths and contents.
if err := os.MkdirAll(logDir, 0700); err != nil {
warn("mkdir logs failed", slog.String("dir", logDir), slog.String("err", err.Error()))
return nil
}
logFile := filepath.Join(logDir, sessionID+".jsonl")

line, err := json.Marshal(payload)
if err != nil {
warn("marshal payload failed", slog.String("err", err.Error()), slog.String("session", sessionID))
return nil
}
line = append(line, '\n')

// 0600 on the file — same reasoning as the dir.
f, err := os.OpenFile(logFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
warn("open log failed", slog.String("path", logFile), slog.String("err", err.Error()))
return nil
}

Expand All @@ -96,7 +135,9 @@ func runLogToolUse(cmd *cobra.Command, args []string) error {
fd := int(f.Fd())
lockAcquired := syscall.Flock(fd, syscall.LOCK_EX) == nil

_, _ = f.Write(line)
if _, werr := f.Write(line); werr != nil {
warn("write log failed", slog.String("path", logFile), slog.String("err", werr.Error()), slog.Int("bytes", len(line)))
}

// Release the advisory lock and close explicitly *before* calling
// MaybeRotate: rotation takes its own sibling lock, and keeping our
Expand Down
128 changes: 116 additions & 12 deletions internal/serve/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,21 @@ func (s *Server) Run(ctx context.Context) error {
if home, err := os.UserHomeDir(); err == nil {
claudeProjectsRoot = filepath.Join(home, ".claude", "projects")
}
adopted := 0
// Each tmux session can accumulate many jsonls in s.logDir (one per
// claude conversation in that workdir over time). The tailer manager
// keys on session NAME, so calling Start() for the same name with
// different UUIDs replaces the prior tailer — meaning the last UUID
// iterated wins. os.ReadDir returns entries alphabetically, which
// means the order is unrelated to recency, so we'd often end up
// glued to a stale UUID while claude writes to a fresh one. Group
// log files by resolved session name and pick the freshest per name.
type tailCand struct {
uuid string
mtime time.Time
}
freshest := make(map[string]tailCand) // session name → freshest log
orphanUUIDs := make([]string, 0)
adoptedViaWorkdir := 0
orphans := 0
if entries, err := os.ReadDir(s.logDir); err == nil {
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".jsonl") {
Expand All @@ -388,22 +400,55 @@ func (s *Server) Run(ctx context.Context) error {
}
}
if !ok {
short := uuid
if len(short) > 8 {
short = short[:8]
}
name = "uuid:" + short
orphans++
orphanUUIDs = append(orphanUUIDs, uuid)
continue
}
info, infoErr := e.Info()
if infoErr != nil {
continue
}
cand := tailCand{uuid: uuid, mtime: info.ModTime()}
if existing, found := freshest[name]; !found || cand.mtime.After(existing.mtime) {
freshest[name] = cand
}
s.tailers.Start(tailerCtx, name, uuid)
adopted++
}
}
for name, cand := range freshest {
s.tailers.Start(tailerCtx, name, cand.uuid)
}
for _, uuid := range orphanUUIDs {
short := uuid
if len(short) > 8 {
short = short[:8]
}
s.tailers.Start(tailerCtx, "uuid:"+short, uuid)
}
slog.Info("ctm serve tailers started",
"sessions_in_projection", len(s.proj.All()),
"tailers_started", adopted,
"tailers_started", len(freshest)+len(orphanUUIDs),
"adopted_via_workdir", adoptedViaWorkdir,
"orphan_uuids", orphans)
"orphan_uuids", len(orphanUUIDs))

// Periodic rescan: claude rotates to a new UUID jsonl whenever a new
// conversation starts inside an existing tmux session, but the tailer
// manager only reads the projection at startup. Without this loop,
// the daemon stays glued to the previous UUID's file (which claude
// no longer writes to) and the UI shows stale activity. Re-running
// the freshest-per-name selection every 30s and calling Start() —
// which is a no-op when the UUID hasn't changed — keeps tailers in
// sync with whatever conversation claude is writing to right now.
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-tailerCtx.Done():
return
case <-ticker.C:
s.rescanTailers(tailerCtx, claudeProjectsRoot)
}
}
}()

serveDone := make(chan error, 1)
go func() { serveDone <- s.http.Serve(s.listener) }()
Expand Down Expand Up @@ -467,6 +512,65 @@ func (s *Server) Run(ctx context.Context) error {
}
}

// rescanTailers re-runs the freshest-per-name adoption pass against
// s.logDir, calling Start() for each mapped session. TailerManager.Start
// is idempotent when the UUID hasn't changed and rotates cleanly when
// it has, so calling it on every tick is safe even if nothing's moved.
// Orphan UUIDs are not (re-)registered here — they only get tailers at
// startup; a new claude conversation's UUID becomes mappable as soon as
// the projection picks up the session_new hook event.
func (s *Server) rescanTailers(ctx context.Context, claudeProjectsRoot string) {
all := s.proj.All()
uuidToName := make(map[string]string, len(all))
claudeDirToName := make(map[string]string, len(all))
for _, sess := range all {
if sess.UUID != "" {
uuidToName[sess.UUID] = sess.Name
}
if sess.Workdir != "" {
claudeDirToName[strings.ReplaceAll(sess.Workdir, "/", "-")] = sess.Name
}
}
type tailCand struct {
uuid string
mtime time.Time
}
freshest := make(map[string]tailCand)
entries, err := os.ReadDir(s.logDir)
if err != nil {
return
}
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".jsonl") {
continue
}
uuid := strings.TrimSuffix(e.Name(), ".jsonl")
name, ok := uuidToName[uuid]
if !ok && claudeProjectsRoot != "" {
if matches, _ := filepath.Glob(filepath.Join(claudeProjectsRoot, "*", uuid+".jsonl")); len(matches) == 1 {
if mapped, ok2 := claudeDirToName[filepath.Base(filepath.Dir(matches[0]))]; ok2 {
name = mapped
ok = true
}
}
}
if !ok {
continue
}
info, infoErr := e.Info()
if infoErr != nil {
continue
}
cand := tailCand{uuid: uuid, mtime: info.ModTime()}
if existing, found := freshest[name]; !found || cand.mtime.After(existing.mtime) {
freshest[name] = cand
}
}
for name, cand := range freshest {
s.tailers.Start(ctx, name, cand.uuid)
}
}

func (s *Server) registerRoutes(mux *http.ServeMux) {
// authHF wraps h so that every request carries a valid session
// token (V27). Existing mux.Handle(..., authHF(h)) callsites
Expand Down
7 changes: 6 additions & 1 deletion ui/src/hooks/useEventStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ export function useEventStream({
fetchEventSource(url, {
signal: ctrl.signal,
headers: { ...authHeaders(), Accept: "text/event-stream" },
openWhenHidden: true,
// openWhenHidden: false (default). On mobile Safari the OS will
// suspend the network when the tab is backgrounded; an "open"
// stream is silently dead. Letting fetch-event-source close on
// hidden and reopen on visible gives a clean reconnect every
// time the user returns, which the server replays from
// Last-Event-ID.
async onopen(res) {
if (res.status === 401) {
onUnauthorizedRef.current?.();
Expand Down
Loading