diff --git a/cmd/xtcp2client/printer.go b/cmd/xtcp2client/printer.go new file mode 100644 index 0000000..17e3bb3 --- /dev/null +++ b/cmd/xtcp2client/printer.go @@ -0,0 +1,94 @@ +package main + +import ( + "fmt" + "io" + "log" + "sync" + + "github.com/randomizedcoder/xtcp2/pkg/recordfmt" + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" +) + +// recordPrinter writes streamed records to an io.Writer in the chosen format, +// reusing the shared pkg/recordfmt library so the client and daemon format +// identically. It serializes writes (the listen path fans out across worker +// goroutines) and emits the csv/tsv header exactly once. +type recordPrinter struct { + mu sync.Mutex + w io.Writer + format string + cols []recordfmt.Column + comma rune + headerDone bool +} + +// newRecordPrinter validates the format (and -columns for csv/tsv) and returns +// a printer. An unknown format or bad column spec is an error. +func newRecordPrinter(w io.Writer, format, columns string) (*recordPrinter, error) { + p := &recordPrinter{w: w, format: format} + switch format { + case recordfmt.FormatJSON, recordfmt.FormatHumanize, recordfmt.FormatNull: + // no columns + case recordfmt.FormatCSV, recordfmt.FormatTSV: + cols, err := recordfmt.SelectColumns(columns) + if err != nil { + return nil, err + } + p.cols = cols + p.comma = ',' + if format == recordfmt.FormatTSV { + p.comma = '\t' + } + default: + return nil, fmt.Errorf("unknown -format %q (want json, csv, tsv, humanize, or null)", format) + } + return p, nil +} + +// record formats and writes one record. Safe for concurrent callers. +func (p *recordPrinter) record(r *xtcp_flat_record.XtcpFlatRecord) { + if p == nil || r == nil || p.format == recordfmt.FormatNull { + return + } + + switch p.format { + case recordfmt.FormatCSV, recordfmt.FormatTSV: + // One-row envelope through the shared table encoder; header once. + env := &xtcp_flat_record.Envelope{Row: []*xtcp_flat_record.XtcpFlatRecord{r}} + p.mu.Lock() + defer p.mu.Unlock() + first := !p.headerDone + p.headerDone = true + b, err := recordfmt.MarshalEnvelopeTable(env, p.cols, p.comma, first) + if err != nil { + log.Printf("xtcp2client: format %s: %v", p.format, err) + return + } + p.write(b) + + default: // json, humanize — one object per line + var b []byte + var err error + if p.format == recordfmt.FormatHumanize { + b, err = recordfmt.MarshalHumanizedJSON(r) + } else { + b, err = recordfmt.MarshalJSON(r) + } + if err != nil { + log.Printf("xtcp2client: format %s: %v", p.format, err) + return + } + p.mu.Lock() + defer p.mu.Unlock() + p.write(append(b, '\n')) + } +} + +// write emits bytes to the sink, logging (not failing) on a write error — +// the caller holds the mutex (csv path) or has just taken it (json path). +func (p *recordPrinter) write(b []byte) { + if _, err := p.w.Write(b); err != nil { + log.Printf("xtcp2client: write: %v", err) + } +} diff --git a/cmd/xtcp2client/xtcp2client.go b/cmd/xtcp2client/xtcp2client.go index 1d890d2..743f751 100644 --- a/cmd/xtcp2client/xtcp2client.go +++ b/cmd/xtcp2client/xtcp2client.go @@ -13,6 +13,7 @@ import ( _ "unsafe" "github.com/randomizedcoder/xtcp2/pkg/misc" + "github.com/randomizedcoder/xtcp2/pkg/recordfmt" "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -20,8 +21,6 @@ import ( "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" ) // unsafe for FastRandN @@ -125,7 +124,9 @@ func runMain(ctx context.Context, args []string, stdout, stderr io.Writer) int { poll := fs.Bool("poll", false, "poll mode means the client will trigger polling via the PollFlatRecords service") pollFrequency := fs.Duration("pollFrequency", pollFrequencyCst, "poll mode frequency") workers := fs.Int("workers", 10, "workers") - json := fs.Bool("json", false, "json output") + format := fs.String("format", recordfmt.FormatJSON, "output format: json | csv | tsv | humanize | null") + columns := fs.String("columns", "", "csv/tsv only: comma-separated XtcpFlatRecord json field names; empty = all") + jsonFlag := fs.Bool("json", false, "deprecated alias for -format json") d := fs.Uint("d", 11, "debugLevel") v := fs.Bool("v", false, "show version") if err := fs.Parse(args); err != nil { @@ -138,19 +139,29 @@ func runMain(ctx context.Context, args []string, stdout, stderr io.Writer) int { } debugLevel = *d + chosen := *format + if *jsonFlag { + chosen = recordfmt.FormatJSON + } + printer, err := newRecordPrinter(stdout, chosen, *columns) + if err != nil { + fmt.Fprintf(stderr, "xtcp2client: %v\n", err) + return 2 + } + complete := make(chan struct{}, signalChannelSizeCst) addr := *target + ":" + *port if *poll { - pollMode(ctx, addr, &complete, *pollFrequency, *json, debugLevel) + pollMode(ctx, addr, &complete, *pollFrequency, printer, debugLevel) } else { - listenMode(ctx, addr, *workers, &complete, *json) + listenMode(ctx, addr, *workers, &complete, printer) } return 0 } // func (c *xTCPFlatRecordServiceClient) PollFlatRecords( // ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[PollFlatRecordsRequest, FlatRecordsResponse], error) { -func pollMode(ctx context.Context, addr string, complete *chan struct{}, pollFrequency time.Duration, json bool, debugLevel uint) { +func pollMode(ctx context.Context, addr string, complete *chan struct{}, pollFrequency time.Duration, printer *recordPrinter, debugLevel uint) { if debugLevel > 10 { log.Printf("pollMode starting") @@ -187,7 +198,7 @@ func pollMode(ctx context.Context, addr string, complete *chan struct{}, pollFre // recvCh := make(chan *xtcp_flat_record.FlatRecordsResponse) wg := new(sync.WaitGroup) wg.Add(1) - go pollStreamRecv(ctx, wg, json, &stream, debugLevel) + go pollStreamRecv(ctx, wg, printer, &stream, debugLevel) breakPoint: for i := 0; ; i++ { @@ -223,7 +234,7 @@ breakPoint: func pollStreamRecv( ctx context.Context, wg *sync.WaitGroup, - json bool, + printer *recordPrinter, // recvCh chan *xtcp_flat_record.FlatRecordsResponse, stream *grpc.BidiStreamingClient[xtcp_flat_record.PollFlatRecordsRequest, xtcp_flat_record.PollFlatRecordsResponse], debugLevel uint) { @@ -265,7 +276,7 @@ breakPoint: continue } // log.Printf("rec:%v", rec) - printPollFlatRecordsResponse(pollFlatRecordsResponse, 1, json, debugLevel) + printPollFlatRecordsResponse(pollFlatRecordsResponse, 1, printer, debugLevel) // recvCh <- rec @@ -278,7 +289,7 @@ breakPoint: } } -func listenMode(ctx context.Context, addr string, workers int, complete *chan struct{}, json bool) { +func listenMode(ctx context.Context, addr string, workers int, complete *chan struct{}, printer *recordPrinter) { var wg sync.WaitGroup wg.Add(workers) @@ -288,7 +299,7 @@ func listenMode(ctx context.Context, addr string, workers int, complete *chan st // dialed once and passed the conn down, but stream() // deferred-Close'd it on first return, so every "reconnect // after sleep" iteration after the first used a dead conn. - go singleStreamingClient(ctx, &wg, addr, json, j) + go singleStreamingClient(ctx, &wg, addr, printer, j) } wg.Wait() @@ -306,7 +317,7 @@ func listenMode(ctx context.Context, addr string, workers int, complete *chan st // restart branch without waiting 10 seconds. var reconnectTimeVar = reconnectTime -func singleStreamingClient(ctx context.Context, wg *sync.WaitGroup, addr string, json bool, id int) { +func singleStreamingClient(ctx context.Context, wg *sync.WaitGroup, addr string, printer *recordPrinter, id int) { defer wg.Done() @@ -327,7 +338,7 @@ breakPoint: // dead code after iteration 0. conn := newGRPCClient(addr) wg.Add(1) - stream(ctx, wg, conn, json, id) + stream(ctx, wg, conn, printer, id) select { case <-ctx.Done(): @@ -451,7 +462,7 @@ func handleRecvContinueErr(ctx context.Context, client any, err error) bool { return false } -func stream(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn, json bool, id int) { +func stream(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn, printer *recordPrinter, id int) { defer wg.Done() defer func() { @@ -485,7 +496,7 @@ func stream(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn, json case recvBreak: return case recvPrint: - printFlatRecordsResponse(resp, id, json, debugLevel) + printFlatRecordsResponse(resp, id, printer, debugLevel) continue case recvContinue: // fall through to handleRecvContinueErr below @@ -500,48 +511,16 @@ func stream(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn, json } } -func printFlatRecordsResponse(flatRecordsResponse *xtcp_flat_record.FlatRecordsResponse, id int, json bool, debugLevel uint) { - - if debugLevel > 10 { - b, err := proto.Marshal(flatRecordsResponse.GetXtcpFlatRecord()) - if err != nil { - if debugLevel > 10 { - log.Println("FlatRecords proto.Marshal(x) err: ", err) - } - } - log.Printf("id:%d, FlatRecords len(b):%d", id, len(b)) - } - - if debugLevel > 10 { - if json { - jsonStr := protojson.Format(flatRecordsResponse.GetXtcpFlatRecord()) - log.Printf("id:%d, %s", id, jsonStr) - return - } - - log.Printf("id:%d, %s", id, flatRecordsResponse.GetXtcpFlatRecord()) +func printFlatRecordsResponse(flatRecordsResponse *xtcp_flat_record.FlatRecordsResponse, id int, printer *recordPrinter, debugLevel uint) { + if debugLevel > 1000 { + log.Printf("id:%d, FlatRecords %s", id, flatRecordsResponse.GetXtcpFlatRecord()) } + printer.record(flatRecordsResponse.GetXtcpFlatRecord()) } -func printPollFlatRecordsResponse(pollFlatRecordsResponse *xtcp_flat_record.PollFlatRecordsResponse, id int, json bool, debugLevel uint) { - - if debugLevel > 10 { - b, err := proto.Marshal(pollFlatRecordsResponse.GetXtcpFlatRecord()) - if err != nil { - if debugLevel > 10 { - log.Println("FlatRecords proto.Marshal(x) err: ", err) - } - } - log.Printf("id:%d, FlatRecords len(b):%d", id, len(b)) - } - - if debugLevel > 10 { - if json { - jsonStr := protojson.Format(pollFlatRecordsResponse.GetXtcpFlatRecord()) - log.Printf("id:%d, %s", id, jsonStr) - return - } - - log.Printf("id:%d, %s", id, pollFlatRecordsResponse.GetXtcpFlatRecord()) +func printPollFlatRecordsResponse(pollFlatRecordsResponse *xtcp_flat_record.PollFlatRecordsResponse, id int, printer *recordPrinter, debugLevel uint) { + if debugLevel > 1000 { + log.Printf("id:%d, PollFlatRecords %s", id, pollFlatRecordsResponse.GetXtcpFlatRecord()) } + printer.record(pollFlatRecordsResponse.GetXtcpFlatRecord()) } diff --git a/cmd/xtcp2client/xtcp2client_test.go b/cmd/xtcp2client/xtcp2client_test.go index 8d90808..597c091 100644 --- a/cmd/xtcp2client/xtcp2client_test.go +++ b/cmd/xtcp2client/xtcp2client_test.go @@ -3,6 +3,9 @@ package main import ( "bytes" "context" + "encoding/csv" + "encoding/json" + "io" "net" "strings" "sync" @@ -11,6 +14,7 @@ import ( "google.golang.org/grpc" + "github.com/randomizedcoder/xtcp2/pkg/recordfmt" "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" ) @@ -23,38 +27,105 @@ func TestNewGRPCClient(t *testing.T) { _ = conn.Close() } -func TestPrintFlatRecordsResponse_silent(t *testing.T) { - // debugLevel 0 → no log output, just the early-return path. - resp := &xtcp_flat_record.FlatRecordsResponse{ - XtcpFlatRecord: &xtcp_flat_record.XtcpFlatRecord{Hostname: "h1"}, +func newBufPrinter(t *testing.T, format string) (*recordPrinter, *bytes.Buffer) { + t.Helper() + var buf bytes.Buffer + p, err := newRecordPrinter(&buf, format, "") + if err != nil { + t.Fatalf("newRecordPrinter(%q): %v", format, err) } - printFlatRecordsResponse(resp, 1, false, 0) - printFlatRecordsResponse(resp, 1, true, 0) + return p, &buf } -func TestPrintFlatRecordsResponse_verbose(t *testing.T) { - resp := &xtcp_flat_record.FlatRecordsResponse{ - XtcpFlatRecord: &xtcp_flat_record.XtcpFlatRecord{Hostname: "h2"}, - } - // debugLevel > 10 → both proto.Marshal branch AND the per-format printing. - printFlatRecordsResponse(resp, 7, false, 11) - printFlatRecordsResponse(resp, 7, true, 11) // json branch +func nullPrinter() *recordPrinter { + p, _ := newRecordPrinter(io.Discard, recordfmt.FormatNull, "") + return p } -func TestPrintPollFlatRecordsResponse_silent(t *testing.T) { - resp := &xtcp_flat_record.PollFlatRecordsResponse{ - XtcpFlatRecord: &xtcp_flat_record.XtcpFlatRecord{Hostname: "p1"}, +func sampleClientRecord() *xtcp_flat_record.XtcpFlatRecord { + return &xtcp_flat_record.XtcpFlatRecord{ + Hostname: "h1", + InetDiagMsgFamily: 2, + InetDiagMsgSocketSource: []byte{10, 0, 0, 5}, + InetDiagMsgState: 10, // LISTEN + CongestionAlgorithmEnum: xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_CUBIC, } - printPollFlatRecordsResponse(resp, 1, false, 0) - printPollFlatRecordsResponse(resp, 1, true, 0) } -func TestPrintPollFlatRecordsResponse_verbose(t *testing.T) { - resp := &xtcp_flat_record.PollFlatRecordsResponse{ - XtcpFlatRecord: &xtcp_flat_record.XtcpFlatRecord{Hostname: "p2"}, +func TestRecordPrinter_formats(t *testing.T) { + rec := sampleClientRecord() + + t.Run("json", func(t *testing.T) { + p, buf := newBufPrinter(t, recordfmt.FormatJSON) + p.record(rec) + if !bytes.HasSuffix(buf.Bytes(), []byte("\n")) { + t.Error("json line must end with newline") + } + var m map[string]any + if err := json.Unmarshal(bytes.TrimRight(buf.Bytes(), "\n"), &m); err != nil { + t.Fatalf("not valid JSON: %v", err) + } + }) + + t.Run("humanize", func(t *testing.T) { + p, buf := newBufPrinter(t, recordfmt.FormatHumanize) + p.record(rec) + var m map[string]any + if err := json.Unmarshal(bytes.TrimRight(buf.Bytes(), "\n"), &m); err != nil { + t.Fatalf("not valid JSON: %v", err) + } + if m["inetDiagMsgState"] != "LISTEN" { + t.Errorf("state not humanized: %v", m["inetDiagMsgState"]) + } + }) + + t.Run("csv_header_once", func(t *testing.T) { + p, buf := newBufPrinter(t, recordfmt.FormatCSV) + p.record(rec) + p.record(rec) + rows, err := csv.NewReader(bytes.NewReader(buf.Bytes())).ReadAll() + if err != nil { + t.Fatal(err) + } + if len(rows) != 3 { // header + 2 records + t.Fatalf("want header+2 rows, got %d: %v", len(rows), rows) + } + }) + + t.Run("null_empty", func(t *testing.T) { + p, buf := newBufPrinter(t, recordfmt.FormatNull) + p.record(rec) + if buf.Len() != 0 { + t.Errorf("null should write nothing, got %q", buf.String()) + } + }) + + t.Run("unknown_errors", func(t *testing.T) { + if _, err := newRecordPrinter(&bytes.Buffer{}, "xml", ""); err == nil { + t.Error("expected error for unknown format") + } + }) + + t.Run("bad_columns_errors", func(t *testing.T) { + if _, err := newRecordPrinter(&bytes.Buffer{}, recordfmt.FormatCSV, "nope"); err == nil { + t.Error("expected error for unknown column") + } + }) +} + +// printFlatRecordsResponse / printPollFlatRecordsResponse route through the +// printer; verify both write something for a populated record. +func TestPrintResponses(t *testing.T) { + p, buf := newBufPrinter(t, recordfmt.FormatJSON) + printFlatRecordsResponse(&xtcp_flat_record.FlatRecordsResponse{ + XtcpFlatRecord: &xtcp_flat_record.XtcpFlatRecord{Hostname: "h"}, + }, 1, p, 0) + printPollFlatRecordsResponse(&xtcp_flat_record.PollFlatRecordsResponse{ + XtcpFlatRecord: &xtcp_flat_record.XtcpFlatRecord{Hostname: "p"}, + }, 7, p, 11) + if buf.Len() == 0 { + t.Error("expected printer output") } - printPollFlatRecordsResponse(resp, 7, false, 11) - printPollFlatRecordsResponse(resp, 7, true, 11) } func TestFastRandN(t *testing.T) { @@ -211,7 +282,7 @@ func startTestGRPC(t *testing.T) (addr string, cleanup func()) { func TestListenMode_workersZeroNoOp(t *testing.T) { complete := make(chan struct{}, 1) - listenMode(t.Context(), "127.0.0.1:0", 0, &complete, false) + listenMode(t.Context(), "127.0.0.1:0", 0, &complete, nullPrinter()) // wg.Wait returned immediately; complete signal sent. } @@ -223,7 +294,7 @@ func TestListenMode_oneWorkerCancellable(t *testing.T) { complete := make(chan struct{}, 1) done := make(chan struct{}) go func() { - listenMode(ctx, addr, 1, &complete, false) + listenMode(ctx, addr, 1, &complete, nullPrinter()) close(done) }() // Give the worker time to dial + open the stream. @@ -244,7 +315,7 @@ func TestPollMode_dialAndCancel(t *testing.T) { complete := make(chan struct{}, 1) done := make(chan struct{}) go func() { - pollMode(ctx, addr, &complete, 50*time.Millisecond, false, 0) + pollMode(ctx, addr, &complete, 50*time.Millisecond, nullPrinter(), 0) close(done) }() time.Sleep(150 * time.Millisecond) // let one tick fire @@ -263,7 +334,7 @@ func TestPollMode_completeChannel(t *testing.T) { complete := make(chan struct{}, 1) done := make(chan struct{}) go func() { - pollMode(t.Context(), addr, &complete, time.Hour, false, 0) + pollMode(t.Context(), addr, &complete, time.Hour, nullPrinter(), 0) close(done) }() time.Sleep(50 * time.Millisecond) @@ -287,7 +358,7 @@ func TestPollMode_recordingServer(t *testing.T) { done := make(chan struct{}) go func() { // debugLevel=11 hits more printPollFlatRecordsResponse log branches. - pollMode(ctx, addr, &complete, 50*time.Millisecond, true, 11) + pollMode(ctx, addr, &complete, 50*time.Millisecond, nullPrinter(), 11) close(done) }() // Let one tick fire so stream.Send + server.Recv complete. @@ -318,7 +389,7 @@ func TestStream_recordingServer(t *testing.T) { go func() { // debugLevel=200 hits the per-record + EOF log paths. debugLevel = 200 - stream(ctx, wg, conn, true, 0) + stream(ctx, wg, conn, nullPrinter(), 0) close(done) }() time.Sleep(200 * time.Millisecond) @@ -349,7 +420,7 @@ func TestSingleStreamingClient_restartLoop(t *testing.T) { done := make(chan struct{}) go func() { debugLevel = 200 // hit the restart log branch - singleStreamingClient(ctx, wg, addr, false, 0) + singleStreamingClient(ctx, wg, addr, nullPrinter(), 0) close(done) }() // Let stream() complete + sleep at least once before cancel. @@ -375,7 +446,7 @@ func TestSingleStreamingClient_preCancelled(t *testing.T) { wg.Add(1) done := make(chan struct{}) go func() { - singleStreamingClient(ctx, wg, addr, false, 0) + singleStreamingClient(ctx, wg, addr, nullPrinter(), 0) close(done) }() select { @@ -399,7 +470,7 @@ func TestStream_dialAndCancel(t *testing.T) { wg.Add(1) done := make(chan struct{}) go func() { - stream(ctx, wg, conn, false, 0) + stream(ctx, wg, conn, nullPrinter(), 0) close(done) }() time.Sleep(100 * time.Millisecond) diff --git a/pkg/recordfmt/columns.go b/pkg/recordfmt/columns.go new file mode 100644 index 0000000..4d7cc94 --- /dev/null +++ b/pkg/recordfmt/columns.go @@ -0,0 +1,141 @@ +package recordfmt + +import ( + "encoding/base64" + "fmt" + "strconv" + "strings" + "sync" + + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" + "google.golang.org/protobuf/reflect/protoreflect" +) + +// Column identifies one XtcpFlatRecord field for tabular (CSV/TSV) output. +// The set is derived once from the protobuf descriptor via reflection, so it +// never drifts from the schema. +type Column struct { + Name string // protojson camelCase name — the CSV header cell + fd protoreflect.FieldDescriptor +} + +var ( + colsOnce sync.Once + colsAll []Column + colsIndex map[string]Column +) + +func initColumns() { + colsOnce.Do(func() { + fields := (&xtcp_flat_record.XtcpFlatRecord{}).ProtoReflect().Descriptor().Fields() + colsAll = make([]Column, 0, fields.Len()) + colsIndex = make(map[string]Column, fields.Len()) + for i := 0; i < fields.Len(); i++ { + fd := fields.Get(i) + c := Column{Name: fd.JSONName(), fd: fd} + colsAll = append(colsAll, c) + colsIndex[c.Name] = c + } + }) +} + +// AllColumns returns the full ordered column list (proto declaration order). +func AllColumns() []Column { + initColumns() + return colsAll +} + +// SelectColumns resolves a comma-separated column spec to an ordered list. +// Empty (or whitespace) selects all columns. Unknown names are an error so a +// typo fails fast rather than silently dropping a column. +func SelectColumns(spec string) ([]Column, error) { + initColumns() + spec = strings.TrimSpace(spec) + if spec == "" { + return colsAll, nil + } + parts := strings.Split(spec, ",") + out := make([]Column, 0, len(parts)) + for _, p := range parts { + name := strings.TrimSpace(p) + if name == "" { + continue + } + c, ok := colsIndex[name] + if !ok { + return nil, fmt.Errorf("unknown column %q (expect an XtcpFlatRecord json name, e.g. hostname, inetDiagMsgSocketSourcePort, tcpInfoRtt)", name) + } + out = append(out, c) + } + if len(out) == 0 { + return colsAll, nil + } + return out, nil +} + +// Header returns the header cells (column names) for the given columns. +func Header(cols []Column) []string { + h := make([]string, len(cols)) + for i, c := range cols { + h[i] = c.Name + } + return h +} + +// Row renders one record as string cells for the given columns. When humanize +// is set, the machine-valued fields (IP addresses, TCP state, congestion enum, +// timestamp) are rendered human-readably; everything else is the scalar value. +func Row(r *xtcp_flat_record.XtcpFlatRecord, cols []Column, humanize bool) []string { + m := r.ProtoReflect() + out := make([]string, len(cols)) + for i, c := range cols { + out[i] = formatField(r, m, c, humanize) + } + return out +} + +func formatField(r *xtcp_flat_record.XtcpFlatRecord, m protoreflect.Message, c Column, humanize bool) string { + if humanize { + switch c.Name { + case "inetDiagMsgSocketSource": + return IPString(r.GetInetDiagMsgFamily(), r.GetInetDiagMsgSocketSource()) + case "inetDiagMsgSocketDestination": + return IPString(r.GetInetDiagMsgFamily(), r.GetInetDiagMsgSocketDestination()) + case "inetDiagMsgState": + return TCPStateName(r.GetInetDiagMsgState()) + case "tcpInfoState": + return TCPStateName(r.GetTcpInfoState()) + case "congestionAlgorithmEnum": + return CongestionAlgorithmName(r.GetCongestionAlgorithmEnum()) + case "timestampNs": + return TimestampRFC3339(r.GetTimestampNs()) + } + } + return formatScalar(c.fd, m.Get(c.fd)) +} + +// formatScalar renders a protoreflect scalar value as a string. XtcpFlatRecord +// has no nested-message or repeated fields, so scalar coverage is sufficient; +// the two bytes fields are IP addresses (base64 here, humanized elsewhere). +func formatScalar(fd protoreflect.FieldDescriptor, v protoreflect.Value) string { + switch fd.Kind() { + case protoreflect.BoolKind: + return strconv.FormatBool(v.Bool()) + case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind, + protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind: + return strconv.FormatInt(v.Int(), 10) + case protoreflect.Uint32Kind, protoreflect.Fixed32Kind, + protoreflect.Uint64Kind, protoreflect.Fixed64Kind: + return strconv.FormatUint(v.Uint(), 10) + case protoreflect.FloatKind, protoreflect.DoubleKind: + return strconv.FormatFloat(v.Float(), 'f', -1, 64) + case protoreflect.StringKind: + return v.String() + case protoreflect.BytesKind: + return base64.StdEncoding.EncodeToString(v.Bytes()) + case protoreflect.EnumKind: + return strconv.FormatInt(int64(v.Enum()), 10) + default: + return v.String() + } +} diff --git a/pkg/recordfmt/humanize.go b/pkg/recordfmt/humanize.go new file mode 100644 index 0000000..2931454 --- /dev/null +++ b/pkg/recordfmt/humanize.go @@ -0,0 +1,92 @@ +// Package recordfmt formats xtcp2 TCP records (xtcp_flat_record.XtcpFlatRecord +// and the Envelope batch) into the various export encodings — protobufList, +// protoJson, protoText, msgpack, jsonl, csv, tsv, and a humanized JSON. It is +// a pure library shared by the xtcp2 daemon (pkg/xtcp) and the xtcp2client +// gRPC client: functions return ([]byte, error) and never log, count metrics, +// or pool buffers — callers own those concerns. +package recordfmt + +import ( + "net" + "strconv" + "strings" + "time" + + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" +) + +// Address family values from the kernel inet_diag message. +const ( + afInet = 2 // AF_INET + afInet6 = 10 // AF_INET6 +) + +// IPString renders an inet_diag address (raw bytes) as a dotted-quad or +// RFC-5952 IPv6 string. The kernel returns the address in a 16-byte +// __be32[4] slot regardless of family, with only the first 4 bytes +// meaningful for IPv4 — so family is authoritative and consulted before +// length, otherwise an IPv4 address in a 16-byte buffer is misread as IPv6 +// ("c0a8:7a01::"). Empty input → "". +func IPString(family uint32, b []byte) string { + if len(b) == 0 { + return "" + } + switch family { + case afInet: + if len(b) >= net.IPv4len { + return net.IP(b[:net.IPv4len]).String() + } + case afInet6: + if len(b) >= net.IPv6len { + return net.IP(b[:net.IPv6len]).String() + } + } + return net.IP(b).String() +} + +// tcpStateNames maps kernel TCP state integers (include/net/tcp_states.h) to +// the conventional names `ss` prints. xtcp2 carries state as a bare uint8. +var tcpStateNames = map[uint32]string{ + 1: "ESTABLISHED", + 2: "SYN_SENT", + 3: "SYN_RECV", + 4: "FIN_WAIT1", + 5: "FIN_WAIT2", + 6: "TIME_WAIT", + 7: "CLOSE", + 8: "CLOSE_WAIT", + 9: "LAST_ACK", + 10: "LISTEN", + 11: "CLOSING", + 12: "NEW_SYN_RECV", +} + +// TCPStateName returns the conventional name for a TCP state integer, or the +// decimal value as a string for anything outside the known range. +func TCPStateName(state uint32) string { + if name, ok := tcpStateNames[state]; ok { + return name + } + return strconv.FormatUint(uint64(state), 10) +} + +// CongestionAlgorithmName returns the short congestion-control name (e.g. +// "CUBIC", "BBR3") by trimming the generated enum's CONGESTION_ALGORITHM_ +// prefix. UNSPECIFIED renders as "". +func CongestionAlgorithmName(e xtcp_flat_record.XtcpFlatRecord_CongestionAlgorithm) string { + if e == xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_UNSPECIFIED { + return "" + } + return strings.TrimPrefix(e.String(), "CONGESTION_ALGORITHM_") +} + +// TimestampRFC3339 formats a record's timestamp_ns (Unix nanoseconds held as a +// double) as RFC3339 with nanosecond precision in UTC. Zero → "". +func TimestampRFC3339(ns float64) string { + if ns <= 0 { + return "" + } + sec := int64(ns) / 1e9 + nsec := int64(ns) % 1e9 + return time.Unix(sec, nsec).UTC().Format(time.RFC3339Nano) +} diff --git a/pkg/recordfmt/marshal.go b/pkg/recordfmt/marshal.go new file mode 100644 index 0000000..1891be8 --- /dev/null +++ b/pkg/recordfmt/marshal.go @@ -0,0 +1,98 @@ +package recordfmt + +import ( + "encoding/json" + "fmt" + + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" + msgpack "github.com/vmihailenco/msgpack/v5" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/encoding/prototext" +) + +// Format names — the single source of truth shared by the daemon's -marshal +// flag and the client's -format flag. +const ( + FormatProtobufList = "protobufList" // length-delimited Envelope (binary; daemon/Kafka/ClickHouse) + FormatProtoJSON = "protoJson" // one JSON object per Envelope + FormatProtoText = "protoText" // protobuf text + FormatMsgPack = "msgpack" // MessagePack (binary) + FormatJSON = "json" // one compact JSON record (per-record; client default) + FormatJSONL = "jsonl" // one raw JSON record per line (NDJSON) + FormatCSV = "csv" // comma-separated, humanized, header once + FormatTSV = "tsv" // tab-separated, humanized, header once + FormatHumanize = "humanize" // one humanized JSON record per line + FormatNull = "null" // discard (benchmark the receive/collect path) +) + +// MarshalJSON encodes a single record as compact JSON (no trailing newline). +// Values are raw/machine (addresses base64, state/enum numeric). +func MarshalJSON(r *xtcp_flat_record.XtcpFlatRecord) ([]byte, error) { + b, err := protojson.Marshal(r) + if err != nil { + return nil, fmt.Errorf("recordfmt: protojson.Marshal: %w", err) + } + return b, nil +} + +// MarshalText encodes a single record as protobuf text (no trailing newline). +func MarshalText(r *xtcp_flat_record.XtcpFlatRecord) ([]byte, error) { + return []byte(prototext.Format(r)), nil +} + +// MarshalMsgPack encodes a single record as MessagePack. +func MarshalMsgPack(r *xtcp_flat_record.XtcpFlatRecord) ([]byte, error) { + b, err := msgpack.Marshal(r) + if err != nil { + return nil, fmt.Errorf("recordfmt: msgpack.Marshal: %w", err) + } + return b, nil +} + +// MarshalHumanizedJSON encodes a single record as compact JSON with the +// machine-valued fields rendered human-readably: source/destination addresses +// as dotted-quad/v6, TCP state and congestion as names, timestamp as RFC3339. +// Other fields keep their native JSON types. No trailing newline. +// +// It starts from protojson output (so field presence/omitempty matches the +// other JSON formats) and overwrites only the present special fields. +func MarshalHumanizedJSON(r *xtcp_flat_record.XtcpFlatRecord) ([]byte, error) { + raw, err := protojson.Marshal(r) + if err != nil { + return nil, fmt.Errorf("recordfmt: protojson.Marshal: %w", err) + } + var m map[string]json.RawMessage + if err := json.Unmarshal(raw, &m); err != nil { + return nil, fmt.Errorf("recordfmt: json.Unmarshal: %w", err) + } + + set := func(key, val string) error { + if _, present := m[key]; !present { + return nil + } + enc, err := json.Marshal(val) + if err != nil { + return err + } + m[key] = enc + return nil + } + for _, kv := range [][2]string{ + {"inetDiagMsgSocketSource", IPString(r.GetInetDiagMsgFamily(), r.GetInetDiagMsgSocketSource())}, + {"inetDiagMsgSocketDestination", IPString(r.GetInetDiagMsgFamily(), r.GetInetDiagMsgSocketDestination())}, + {"inetDiagMsgState", TCPStateName(r.GetInetDiagMsgState())}, + {"tcpInfoState", TCPStateName(r.GetTcpInfoState())}, + {"congestionAlgorithmEnum", CongestionAlgorithmName(r.GetCongestionAlgorithmEnum())}, + {"timestampNs", TimestampRFC3339(r.GetTimestampNs())}, + } { + if err := set(kv[0], kv[1]); err != nil { + return nil, fmt.Errorf("recordfmt: humanize %s: %w", kv[0], err) + } + } + + b, err := json.Marshal(m) + if err != nil { + return nil, fmt.Errorf("recordfmt: json.Marshal: %w", err) + } + return b, nil +} diff --git a/pkg/recordfmt/marshal_envelope.go b/pkg/recordfmt/marshal_envelope.go new file mode 100644 index 0000000..87ab157 --- /dev/null +++ b/pkg/recordfmt/marshal_envelope.go @@ -0,0 +1,123 @@ +package recordfmt + +import ( + "bytes" + "encoding/csv" + "fmt" + + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" + msgpack "github.com/vmihailenco/msgpack/v5" + "google.golang.org/protobuf/encoding/protodelim" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/encoding/prototext" +) + +// Framing note: the text/line formats below terminate their output with a +// newline so consecutive flushes stay separated on a stream (one object/row +// per line); the binary formats (protobufList, msgpack) do not. Sinks write +// the bytes verbatim — framing is the marshaller's job. + +// byteSliceWriter appends to a []byte; used for length-delimited encoding. +type byteSliceWriter struct{ buf *[]byte } + +func (w *byteSliceWriter) Write(b []byte) (int, error) { + *w.buf = append(*w.buf, b...) + return len(b), nil +} + +// MarshalEnvelopeProtobufList encodes the Envelope as length-delimited +// protobuf — varint(size) || bytes — exactly what ClickHouse's ProtobufList +// input format reads. Binary; no trailing newline. +func MarshalEnvelopeProtobufList(e *xtcp_flat_record.Envelope) ([]byte, error) { + return AppendEnvelopeProtobufList(nil, e) +} + +// AppendEnvelopeProtobufList appends the length-delimited Envelope encoding to +// dst and returns the extended slice. Lets a caller reuse a pooled buffer +// (pass dst[:0]) on a hot path; pass nil for a fresh allocation. +func AppendEnvelopeProtobufList(dst []byte, e *xtcp_flat_record.Envelope) ([]byte, error) { + w := &byteSliceWriter{buf: &dst} + if _, err := protodelim.MarshalTo(w, e); err != nil { + return dst, fmt.Errorf("recordfmt: protodelim.MarshalTo: %w", err) + } + return dst, nil +} + +// MarshalEnvelopeJSON encodes the whole Envelope as one compact JSON object, +// newline-terminated. +func MarshalEnvelopeJSON(e *xtcp_flat_record.Envelope) ([]byte, error) { + b, err := protojson.Marshal(e) + if err != nil { + return nil, fmt.Errorf("recordfmt: protojson.Marshal(envelope): %w", err) + } + return append(b, '\n'), nil +} + +// MarshalEnvelopeText encodes the whole Envelope as protobuf text, +// newline-terminated. +func MarshalEnvelopeText(e *xtcp_flat_record.Envelope) ([]byte, error) { + return append([]byte(prototext.Format(e)), '\n'), nil +} + +// MarshalEnvelopeMsgPack encodes the whole Envelope as MessagePack. Binary. +func MarshalEnvelopeMsgPack(e *xtcp_flat_record.Envelope) ([]byte, error) { + b, err := msgpack.Marshal(e) + if err != nil { + return nil, fmt.Errorf("recordfmt: msgpack.Marshal(envelope): %w", err) + } + return b, nil +} + +// MarshalEnvelopeJSONL encodes each row as a compact JSON object on its own +// line (NDJSON). Raw/machine values. +func MarshalEnvelopeJSONL(e *xtcp_flat_record.Envelope) ([]byte, error) { + var b bytes.Buffer + for _, r := range e.GetRow() { + line, err := MarshalJSON(r) + if err != nil { + return nil, err + } + b.Write(line) + b.WriteByte('\n') + } + return b.Bytes(), nil +} + +// MarshalEnvelopeHumanizedJSONL encodes each row as a humanized JSON object on +// its own line (NDJSON with readable addresses/state/congestion/timestamp). +func MarshalEnvelopeHumanizedJSONL(e *xtcp_flat_record.Envelope) ([]byte, error) { + var b bytes.Buffer + for _, r := range e.GetRow() { + line, err := MarshalHumanizedJSON(r) + if err != nil { + return nil, err + } + b.Write(line) + b.WriteByte('\n') + } + return b.Bytes(), nil +} + +// MarshalEnvelopeTable encodes the rows as delimited text (CSV when comma is +// ',', TSV when '\t'), humanized. When includeHeader is set the header row is +// written first. encoding/csv terminates every record with '\n'. +func MarshalEnvelopeTable(e *xtcp_flat_record.Envelope, cols []Column, comma rune, includeHeader bool) ([]byte, error) { + var b bytes.Buffer + w := csv.NewWriter(&b) + w.Comma = comma + if includeHeader { + if err := w.Write(Header(cols)); err != nil { + return nil, fmt.Errorf("recordfmt: csv header: %w", err) + } + } + for _, r := range e.GetRow() { + if err := w.Write(Row(r, cols, true)); err != nil { + return nil, fmt.Errorf("recordfmt: csv row: %w", err) + } + } + w.Flush() + if err := w.Error(); err != nil { + return nil, fmt.Errorf("recordfmt: csv flush: %w", err) + } + return b.Bytes(), nil +} diff --git a/pkg/recordfmt/recordfmt_test.go b/pkg/recordfmt/recordfmt_test.go new file mode 100644 index 0000000..229e0a7 --- /dev/null +++ b/pkg/recordfmt/recordfmt_test.go @@ -0,0 +1,167 @@ +package recordfmt + +import ( + "bytes" + "encoding/csv" + "encoding/json" + "net" + "strings" + "testing" + + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" +) + +func sampleRecord() *xtcp_flat_record.XtcpFlatRecord { + return &xtcp_flat_record.XtcpFlatRecord{ + Hostname: "host-a", + InetDiagMsgFamily: afInet, + InetDiagMsgSocketSource: []byte(net.ParseIP("10.0.0.5").To4()), + InetDiagMsgSocketSourcePort: 443, + InetDiagMsgState: 10, // LISTEN + TcpInfoState: 10, + CongestionAlgorithmEnum: xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_CUBIC, + } +} + +func sampleEnvelope() *xtcp_flat_record.Envelope { + return &xtcp_flat_record.Envelope{ + Row: []*xtcp_flat_record.XtcpFlatRecord{ + sampleRecord(), + {Hostname: "host-b", InetDiagMsgFamily: afInet, InetDiagMsgState: 1}, + }, + } +} + +func TestIPString(t *testing.T) { + cases := []struct { + family uint32 + in []byte + want string + }{ + {afInet, nil, ""}, + {afInet, []byte{192, 168, 0, 1}, "192.168.0.1"}, + {afInet, append([]byte{192, 168, 122, 1}, make([]byte, 12)...), "192.168.122.1"}, // v4 in 16-byte slot + {afInet6, net.IPv6loopback, "::1"}, + } + for _, c := range cases { + if got := IPString(c.family, c.in); got != c.want { + t.Errorf("IPString(%d,%v)=%q want %q", c.family, c.in, got, c.want) + } + } +} + +func TestTCPStateAndCongestionNames(t *testing.T) { + if TCPStateName(10) != "LISTEN" || TCPStateName(1) != "ESTABLISHED" || TCPStateName(99) != "99" { + t.Error("TCPStateName mismatch") + } + if CongestionAlgorithmName(xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_CUBIC) != "CUBIC" { + t.Error("congestion name mismatch") + } + if CongestionAlgorithmName(xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_UNSPECIFIED) != "" { + t.Error("unspecified congestion should be empty") + } +} + +func TestSelectColumns(t *testing.T) { + if all, err := SelectColumns(""); err != nil || len(all) != len(AllColumns()) { + t.Fatalf("empty spec: %v len=%d", err, len(all)) + } + got, err := SelectColumns("hostname, inetDiagMsgState") + if err != nil { + t.Fatal(err) + } + if strings.Join(Header(got), ",") != "hostname,inetDiagMsgState" { + t.Errorf("subset header = %v", Header(got)) + } + if _, err := SelectColumns("hostname,nope"); err == nil { + t.Error("expected error for unknown column") + } +} + +func TestMarshalJSON(t *testing.T) { + b, err := MarshalJSON(sampleRecord()) + if err != nil { + t.Fatal(err) + } + var m map[string]any + if err := json.Unmarshal(b, &m); err != nil { + t.Fatalf("not valid JSON: %v", err) + } + if bytes.HasSuffix(b, []byte("\n")) { + t.Error("per-record JSON must not have a trailing newline") + } +} + +func TestMarshalHumanizedJSON(t *testing.T) { + b, err := MarshalHumanizedJSON(sampleRecord()) + if err != nil { + t.Fatal(err) + } + var m map[string]any + if err := json.Unmarshal(b, &m); err != nil { + t.Fatalf("not valid JSON: %v\n%s", err, b) + } + if m["inetDiagMsgSocketSource"] != "10.0.0.5" { + t.Errorf("address not humanized: %v", m["inetDiagMsgSocketSource"]) + } + if m["inetDiagMsgState"] != "LISTEN" { + t.Errorf("state not humanized: %v", m["inetDiagMsgState"]) + } + if m["congestionAlgorithmEnum"] != "CUBIC" { + t.Errorf("congestion not humanized: %v", m["congestionAlgorithmEnum"]) + } + // A non-special numeric field stays a JSON number. + if _, ok := m["inetDiagMsgSocketSourcePort"].(float64); !ok { + t.Errorf("port should remain numeric, got %T", m["inetDiagMsgSocketSourcePort"]) + } +} + +func TestMarshalEnvelopeJSONL(t *testing.T) { + b, err := MarshalEnvelopeJSONL(sampleEnvelope()) + if err != nil { + t.Fatal(err) + } + lines := strings.Split(strings.TrimRight(string(b), "\n"), "\n") + if len(lines) != 2 { + t.Fatalf("got %d lines want 2", len(lines)) + } + for _, ln := range lines { + var m map[string]any + if err := json.Unmarshal([]byte(ln), &m); err != nil { + t.Errorf("line not JSON: %v", err) + } + } +} + +func TestMarshalEnvelopeTable(t *testing.T) { + cols, _ := SelectColumns("hostname,inetDiagMsgState") + b, err := MarshalEnvelopeTable(sampleEnvelope(), cols, ',', true) + if err != nil { + t.Fatal(err) + } + rows, err := csv.NewReader(bytes.NewReader(b)).ReadAll() + if err != nil { + t.Fatal(err) + } + if len(rows) != 3 || rows[0][0] != "hostname" { + t.Fatalf("rows=%v", rows) + } + if rows[1][1] != "LISTEN" || rows[2][1] != "ESTABLISHED" { + t.Errorf("humanized state cells = %q %q", rows[1][1], rows[2][1]) + } + // TSV path: header omitted. + tb, err := MarshalEnvelopeTable(sampleEnvelope(), cols, '\t', false) + if err != nil { + t.Fatal(err) + } + if !strings.Contains(string(tb), "\t") { + t.Error("tsv should contain tabs") + } +} + +func TestMarshalEnvelopeProtobufList_binaryNoNewline(t *testing.T) { + b, err := MarshalEnvelopeProtobufList(sampleEnvelope()) + if err != nil || len(b) == 0 { + t.Fatalf("protobufList: %v len=%d", err, len(b)) + } +} diff --git a/pkg/xtcp/flat_record_row.go b/pkg/xtcp/flat_record_row.go deleted file mode 100644 index 8b5e75b..0000000 --- a/pkg/xtcp/flat_record_row.go +++ /dev/null @@ -1,141 +0,0 @@ -package xtcp - -import ( - "encoding/base64" - "fmt" - "strconv" - "strings" - "sync" - - "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" - "google.golang.org/protobuf/reflect/protoreflect" -) - -// Tabular (CSV/TSV) row encoding for XtcpFlatRecord. The column set is derived -// once from the protobuf descriptor via reflection, so it never drifts from -// the schema — adding a field to the .proto adds a column automatically. The -// few fields that are machine values in the wire format (IP-address bytes, the -// congestion enum, TCP-state integers, the nanosecond timestamp) are rendered -// human-readably when humanize is set; see humanize.go. - -type flatCol struct { - name string // protojson camelCase name — the CSV header cell - fd protoreflect.FieldDescriptor -} - -var ( - flatColsOnce sync.Once - flatColsAll []flatCol - flatColsIndex map[string]flatCol -) - -// flatColumns returns the full ordered column list (proto declaration order), -// computed once from the XtcpFlatRecord descriptor. -func flatColumns() []flatCol { - flatColsOnce.Do(func() { - fields := (&xtcp_flat_record.XtcpFlatRecord{}).ProtoReflect().Descriptor().Fields() - flatColsAll = make([]flatCol, 0, fields.Len()) - flatColsIndex = make(map[string]flatCol, fields.Len()) - for i := 0; i < fields.Len(); i++ { - fd := fields.Get(i) - c := flatCol{name: fd.JSONName(), fd: fd} - flatColsAll = append(flatColsAll, c) - flatColsIndex[c.name] = c - } - }) - return flatColsAll -} - -// selectColumns resolves a comma-separated `-columns` spec to an ordered -// column list. Empty (or whitespace) selects all columns. Unknown names are -// an error so a typo fails fast rather than silently dropping a column. -func selectColumns(spec string) ([]flatCol, error) { - all := flatColumns() - spec = strings.TrimSpace(spec) - if spec == "" { - return all, nil - } - parts := strings.Split(spec, ",") - out := make([]flatCol, 0, len(parts)) - for _, p := range parts { - name := strings.TrimSpace(p) - if name == "" { - continue - } - c, ok := flatColsIndex[name] - if !ok { - return nil, fmt.Errorf("unknown -columns field %q (expect an XtcpFlatRecord json name, e.g. hostname, inetDiagMsgSocketSourcePort, tcpInfoRtt)", name) - } - out = append(out, c) - } - if len(out) == 0 { - return all, nil - } - return out, nil -} - -func flatRecordHeader(cols []flatCol) []string { - h := make([]string, len(cols)) - for i, c := range cols { - h[i] = c.name - } - return h -} - -func flatRecordValues(r *xtcp_flat_record.XtcpFlatRecord, cols []flatCol, humanize bool) []string { - m := r.ProtoReflect() - out := make([]string, len(cols)) - for i, c := range cols { - out[i] = formatField(r, m, c, humanize) - } - return out -} - -// formatField renders one column. When humanize is set, the handful of -// machine-valued fields are formatted via the humanize.go helpers; everything -// else (and everything when humanize is false) goes through formatScalar. -func formatField(r *xtcp_flat_record.XtcpFlatRecord, m protoreflect.Message, c flatCol, humanize bool) string { - if humanize { - switch c.name { - case "inetDiagMsgSocketSource": - return ipString(r.GetInetDiagMsgFamily(), r.GetInetDiagMsgSocketSource()) - case "inetDiagMsgSocketDestination": - return ipString(r.GetInetDiagMsgFamily(), r.GetInetDiagMsgSocketDestination()) - case "inetDiagMsgState": - return tcpStateName(r.GetInetDiagMsgState()) - case "tcpInfoState": - return tcpStateName(r.GetTcpInfoState()) - case "congestionAlgorithmEnum": - return congestionAlgorithmName(r.GetCongestionAlgorithmEnum()) - case "timestampNs": - return timestampRFC3339(r.GetTimestampNs()) - } - } - return formatScalar(c.fd, m.Get(c.fd)) -} - -// formatScalar renders a protoreflect scalar value as a string. XtcpFlatRecord -// has no nested-message or repeated fields, so scalar coverage is sufficient; -// the two bytes fields are IP addresses (base64 here, humanized elsewhere). -func formatScalar(fd protoreflect.FieldDescriptor, v protoreflect.Value) string { - switch fd.Kind() { - case protoreflect.BoolKind: - return strconv.FormatBool(v.Bool()) - case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind, - protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind: - return strconv.FormatInt(v.Int(), 10) - case protoreflect.Uint32Kind, protoreflect.Fixed32Kind, - protoreflect.Uint64Kind, protoreflect.Fixed64Kind: - return strconv.FormatUint(v.Uint(), 10) - case protoreflect.FloatKind, protoreflect.DoubleKind: - return strconv.FormatFloat(v.Float(), 'f', -1, 64) - case protoreflect.StringKind: - return v.String() - case protoreflect.BytesKind: - return base64.StdEncoding.EncodeToString(v.Bytes()) - case protoreflect.EnumKind: - return strconv.FormatInt(int64(v.Enum()), 10) - default: - return v.String() - } -} diff --git a/pkg/xtcp/flat_record_row_test.go b/pkg/xtcp/flat_record_row_test.go deleted file mode 100644 index dae5c96..0000000 --- a/pkg/xtcp/flat_record_row_test.go +++ /dev/null @@ -1,101 +0,0 @@ -package xtcp - -import ( - "net" - "strings" - "testing" - - "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" -) - -func TestFlatColumns_allFields(t *testing.T) { - cols := flatColumns() - // XtcpFlatRecord field count (from the proto descriptor). Guard it so an - // accidental schema change is noticed; bump deliberately when fields are - // added to the .proto. - if len(cols) != 122 { - t.Errorf("flatColumns len = %d, want 122", len(cols)) - } - // Header names are the protojson camelCase names. - hdr := flatRecordHeader(cols) - if hdr[0] == "" { - t.Error("empty header cell") - } - want := map[string]bool{"hostname": false, "timestampNs": false, "congestionAlgorithmEnum": false} - for _, h := range hdr { - if _, ok := want[h]; ok { - want[h] = true - } - } - for name, seen := range want { - if !seen { - t.Errorf("expected column %q in header", name) - } - } -} - -func TestSelectColumns(t *testing.T) { - t.Run("empty selects all", func(t *testing.T) { - got, err := selectColumns("") - if err != nil { - t.Fatal(err) - } - if len(got) != len(flatColumns()) { - t.Errorf("empty spec selected %d cols, want all %d", len(got), len(flatColumns())) - } - }) - t.Run("subset preserves order", func(t *testing.T) { - got, err := selectColumns("hostname, tcpInfoRtt ,inetDiagMsgState") - if err != nil { - t.Fatal(err) - } - names := flatRecordHeader(got) - wantOrder := []string{"hostname", "tcpInfoRtt", "inetDiagMsgState"} - if strings.Join(names, ",") != strings.Join(wantOrder, ",") { - t.Errorf("subset = %v, want %v", names, wantOrder) - } - }) - t.Run("unknown column errors", func(t *testing.T) { - if _, err := selectColumns("hostname,not_a_field"); err == nil { - t.Fatal("expected error for unknown column") - } - }) -} - -func TestFlatRecordValues_humanize(t *testing.T) { - r := &xtcp_flat_record.XtcpFlatRecord{ - Hostname: "host-a", - InetDiagMsgFamily: afInet, - InetDiagMsgSocketSource: []byte(net.ParseIP("10.0.0.5").To4()), - InetDiagMsgSocketSourcePort: 443, - InetDiagMsgState: 10, // LISTEN - TcpInfoState: 10, - CongestionAlgorithmEnum: xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_CUBIC, - TimestampNs: 1_700_000_000_000_000_000, - } - cols, err := selectColumns("hostname,inetDiagMsgSocketSource,inetDiagMsgSocketSourcePort,inetDiagMsgState,congestionAlgorithmEnum") - if err != nil { - t.Fatal(err) - } - - // Humanized: address dotted-quad, state name, congestion name. - h := flatRecordValues(r, cols, true) - wantH := []string{"host-a", "10.0.0.5", "443", "LISTEN", "CUBIC"} - for i := range wantH { - if h[i] != wantH[i] { - t.Errorf("humanized[%d] = %q, want %q", i, h[i], wantH[i]) - } - } - - // Raw: address base64, state/enum numeric. - raw := flatRecordValues(r, cols, false) - if raw[1] == "10.0.0.5" { - t.Errorf("raw address should not be dotted-quad: %q", raw[1]) - } - if raw[3] != "10" { - t.Errorf("raw state = %q, want \"10\"", raw[3]) - } - if raw[4] != "1" { - t.Errorf("raw congestion enum = %q, want \"1\"", raw[4]) - } -} diff --git a/pkg/xtcp/humanize.go b/pkg/xtcp/humanize.go deleted file mode 100644 index 257faab..0000000 --- a/pkg/xtcp/humanize.go +++ /dev/null @@ -1,93 +0,0 @@ -package xtcp - -import ( - "net" - "strconv" - "strings" - "time" - - "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" -) - -// Human-readable formatting for the machine values in an XtcpFlatRecord. -// Used by the CSV/TSV marshallers so a column of IP addresses, TCP states, -// and congestion algorithms is actually readable (the raw record stores -// addresses as bytes, state as a kernel integer, and the algorithm as an -// enum). The JSON/protobuf formats keep the raw values. - -const ( - afInet = 2 // AF_INET - afInet6 = 10 // AF_INET6 -) - -// ipString renders an inet_diag address as a dotted-quad or RFC-5952 IPv6 -// string. The kernel returns the address in a 16-byte __be32[4] slot -// regardless of family, with only the first 4 bytes meaningful for IPv4 — so -// family is authoritative and must be consulted before length, otherwise an -// IPv4 address (e.g. 192.168.122.1) in a 16-byte buffer is misread as IPv6 -// ("c0a8:7a01::"). Empty input → "". -func ipString(family uint32, b []byte) string { - if len(b) == 0 { - return "" - } - switch family { - case afInet: - if len(b) >= net.IPv4len { - return net.IP(b[:net.IPv4len]).String() - } - case afInet6: - if len(b) >= net.IPv6len { - return net.IP(b[:net.IPv6len]).String() - } - } - // Unknown/unset family: fall back to the byte length. - return net.IP(b).String() -} - -// tcpStateNames maps the kernel TCP state integers (include/net/tcp_states.h) -// to their conventional names — the same names `ss` prints. xtcp2 carries the -// state as a bare uint8 (no protobuf enum), so the mapping lives here. -var tcpStateNames = map[uint32]string{ - 1: "ESTABLISHED", - 2: "SYN_SENT", - 3: "SYN_RECV", - 4: "FIN_WAIT1", - 5: "FIN_WAIT2", - 6: "TIME_WAIT", - 7: "CLOSE", - 8: "CLOSE_WAIT", - 9: "LAST_ACK", - 10: "LISTEN", - 11: "CLOSING", - 12: "NEW_SYN_RECV", -} - -// tcpStateName returns the conventional name for a TCP state integer, or the -// decimal value as a string for anything outside the known range. -func tcpStateName(state uint32) string { - if name, ok := tcpStateNames[state]; ok { - return name - } - return strconv.FormatUint(uint64(state), 10) -} - -// congestionAlgorithmName returns the short congestion-control name (e.g. -// "CUBIC", "BBR3") by trimming the generated enum's CONGESTION_ALGORITHM_ -// prefix. UNSPECIFIED renders as "". -func congestionAlgorithmName(e xtcp_flat_record.XtcpFlatRecord_CongestionAlgorithm) string { - if e == xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_UNSPECIFIED { - return "" - } - return strings.TrimPrefix(e.String(), "CONGESTION_ALGORITHM_") -} - -// timestampRFC3339 formats the record's timestamp_ns (Unix nanoseconds, held -// as a double) as RFC3339 with nanosecond precision in UTC. Zero → "". -func timestampRFC3339(ns float64) string { - if ns <= 0 { - return "" - } - sec := int64(ns) / 1e9 - nsec := int64(ns) % 1e9 - return time.Unix(sec, nsec).UTC().Format(time.RFC3339Nano) -} diff --git a/pkg/xtcp/humanize_test.go b/pkg/xtcp/humanize_test.go deleted file mode 100644 index 55c305a..0000000 --- a/pkg/xtcp/humanize_test.go +++ /dev/null @@ -1,70 +0,0 @@ -package xtcp - -import ( - "net" - "strings" - "testing" - - "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" -) - -func TestIPString(t *testing.T) { - cases := []struct { - name string - family uint32 - in []byte - want string - }{ - {"empty", afInet, nil, ""}, - {"ipv4", afInet, []byte{192, 168, 0, 1}, "192.168.0.1"}, - {"ipv4 loopback", afInet, []byte{127, 0, 0, 1}, "127.0.0.1"}, - // IPv4 in the kernel's 16-byte slot: family must win over length so - // it isn't misread as IPv6 (regression guard for "c0a8:7a01::"). - {"ipv4 in 16-byte slot", afInet, append([]byte{192, 168, 122, 1}, make([]byte, 12)...), "192.168.122.1"}, - {"ipv6 loopback", afInet6, net.IPv6loopback, "::1"}, - {"ipv6 full", afInet6, net.ParseIP("2001:db8::1").To16(), "2001:db8::1"}, - } - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - if got := ipString(c.family, c.in); got != c.want { - t.Errorf("ipString(%d, %v) = %q, want %q", c.family, c.in, got, c.want) - } - }) - } -} - -func TestTCPStateName(t *testing.T) { - if got := tcpStateName(10); got != "LISTEN" { - t.Errorf("state 10 = %q, want LISTEN", got) - } - if got := tcpStateName(1); got != "ESTABLISHED" { - t.Errorf("state 1 = %q, want ESTABLISHED", got) - } - // Unknown state falls back to the decimal value. - if got := tcpStateName(99); got != "99" { - t.Errorf("state 99 = %q, want \"99\"", got) - } -} - -func TestCongestionAlgorithmName(t *testing.T) { - if got := congestionAlgorithmName(xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_CUBIC); got != "CUBIC" { - t.Errorf("CUBIC name = %q", got) - } - if got := congestionAlgorithmName(xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR3); got != "BBR3" { - t.Errorf("BBR3 name = %q", got) - } - if got := congestionAlgorithmName(xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_UNSPECIFIED); got != "" { - t.Errorf("UNSPECIFIED name = %q, want empty", got) - } -} - -func TestTimestampRFC3339(t *testing.T) { - if got := timestampRFC3339(0); got != "" { - t.Errorf("zero ts = %q, want empty", got) - } - // 1_700_000_000.5s expressed in ns. - got := timestampRFC3339(1_700_000_000_500_000_000) - if !strings.HasPrefix(got, "2023-11-14T") || !strings.HasSuffix(got, "Z") { - t.Errorf("ts = %q, want a 2023-11-14 UTC RFC3339 value", got) - } -} diff --git a/pkg/xtcp/marshallers.go b/pkg/xtcp/marshallers.go index 09a4db1..23e60b6 100644 --- a/pkg/xtcp/marshallers.go +++ b/pkg/xtcp/marshallers.go @@ -5,23 +5,23 @@ import ( "strings" "sync" + "github.com/randomizedcoder/xtcp2/pkg/recordfmt" "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" - msgpack "github.com/vmihailenco/msgpack/v5" - "google.golang.org/protobuf/encoding/protodelim" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/encoding/prototext" ) -// Canonical Marshaller names — referenced by tests and config-validation -// alike, kept here so a typo in any one site is a compile error. +// Canonical Marshaller names. Aliased to recordfmt's format constants so the +// daemon (-marshal) and the xtcp2client (-format) share one source of truth; +// the byte production lives in pkg/recordfmt, these wrappers add the daemon's +// buffer pooling, prometheus error counting, and debug logging. const ( - MarshallerProtobufList = "protobufList" - MarshallerProtoJSON = "protoJson" - MarshallerProtoText = "protoText" - MarshallerMsgPack = "msgpack" - MarshallerJSONL = "jsonl" // one JSON record per line (NDJSON / ClickHouse JSONEachRow) - MarshallerCSV = "csv" // comma-separated, humanized, header once - MarshallerTSV = "tsv" // tab-separated, humanized, header once + MarshallerProtobufList = recordfmt.FormatProtobufList + MarshallerProtoJSON = recordfmt.FormatProtoJSON + MarshallerProtoText = recordfmt.FormatProtoText + MarshallerMsgPack = recordfmt.FormatMsgPack + MarshallerJSONL = recordfmt.FormatJSONL // one JSON record per line (NDJSON / ClickHouse JSONEachRow) + MarshallerCSV = recordfmt.FormatCSV // comma-separated, humanized, header once + MarshallerTSV = recordfmt.FormatTSV // tab-separated, humanized, header once + MarshallerHumanize = recordfmt.FormatHumanize // one humanized JSON record per line ) // Envelope-size safety valves. Two independent thresholds — the @@ -44,11 +44,9 @@ const ( ) var ( - // validMarshallersMap is the union of per-record (protoJson, - // protoText, msgpack — debug formats) and per-envelope (protobufList - // — production wire format) marshaller names. InitMarshallers and - // InitEnvelopeMarshallers each only populate their own registry; the - // per-record map will miss the protobufList key on purpose. + // validMarshallersMap is the union of per-record (protoJson, protoText, + // msgpack — debug formats) and per-envelope (protobufList — production + // wire format; plus jsonl/csv/tsv/humanize) marshaller names. validMarshallersMap = map[string]bool{ MarshallerProtobufList: true, // https://clickhouse.com/docs/en/interfaces/formats/ProtobufList MarshallerProtoJSON: true, @@ -57,6 +55,7 @@ var ( MarshallerJSONL: true, MarshallerCSV: true, MarshallerTSV: true, + MarshallerHumanize: true, } ) @@ -67,6 +66,16 @@ func validMarshallers() (marshallers string) { return strings.TrimSuffix(marshallers, ",") } +// marshalErr records a marshaller error: bumps the prometheus counter and logs +// at debug. Centralizes the daemon-side error handling the recordfmt library +// deliberately leaves to its callers. +func (x *XTCP) marshalErr(op string, err error) { + x.pC.WithLabelValues(op, "marshal", "error").Inc() + if x.debugLevel > 10 { + log.Printf("%s: %v", op, err) + } +} + func (x *XTCP) InitMarshallers(wg *sync.WaitGroup) { defer wg.Done() @@ -97,15 +106,12 @@ func (x *XTCP) InitMarshallers(wg *sync.WaitGroup) { } } -// InitEnvelopeMarshallers registers per-Envelope marshallers and stores -// the chosen function in x.EnvelopeMarshaller. Currently the only entry -// is protobufList — additional batched formats would register here. +// InitEnvelopeMarshallers registers per-Envelope marshallers and stores the +// chosen function in x.EnvelopeMarshaller. The destination pipeline is +// envelope-based, so every -marshal value resolves here. // -// Any destination is permitted: kafka receives the bytes via Produce, -// null discards them (used in tests and -dest null deployments), other -// destinations get the length-delimited Envelope as one record. A -// downstream consumer that expects per-record bytes won't decode this -// correctly, but that's a deployment choice, not a daemon-side guard. +// Any destination is permitted: kafka receives the bytes via Produce, null +// discards them, other destinations get the marshalled batch as one record. func (x *XTCP) InitEnvelopeMarshallers(wg *sync.WaitGroup) { defer wg.Done() @@ -113,12 +119,6 @@ func (x *XTCP) InitEnvelopeMarshallers(wg *sync.WaitGroup) { x.EnvelopeMarshallers.Store(MarshallerProtobufList, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { return x.protobufListMarshal(e) }) - - // The human-readable formats are also offered at the Envelope level so - // they work with the destination pipeline (which is envelope-based): - // `-marshal protoJson -dest stdout` prints one JSON Envelope per flush. - // Without these, EnvelopeMarshaller would be nil for any non-protobufList - // format and flushEnvelope would nil-deref. x.EnvelopeMarshallers.Store(MarshallerProtoJSON, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { return x.envelopeProtoJSONMarshal(e) }) @@ -129,10 +129,8 @@ func (x *XTCP) InitEnvelopeMarshallers(wg *sync.WaitGroup) { return x.envelopeMsgPackMarshal(e) }) - // Tabular + per-record-line formats for easy ad-hoc analysis. csv/tsv - // share the reflection row encoder and humanize machine values; jsonl - // emits one raw JSON record per line. Registered here (see - // marshallers_text.go) so they flow through the envelope pipeline. + // jsonl / csv / tsv / humanize — the line/tabular analysis formats + // (see marshallers_text.go). All delegate to pkg/recordfmt. x.registerTextEnvelopeMarshallers() if f, ok := x.EnvelopeMarshallers.Load(x.config.MarshalTo); ok { @@ -142,104 +140,63 @@ func (x *XTCP) InitEnvelopeMarshallers(wg *sync.WaitGroup) { } } -// protobufListMarshal marshals an Envelope as length-delimited protobuf: -// varint(envelope_size) || envelope_bytes. ClickHouse's -// kafka_format='ProtobufList' expects exactly this on the wire. No -// Confluent schema-registry header is prepended; schema-registry -// registration in destinations_kafka is informational only (ClickHouse -// does not consult the registry to decode messages). -// https://clickhouse.com/docs/en/interfaces/formats#protobuflist +// protobufListMarshal marshals an Envelope as length-delimited protobuf into a +// pooled buffer (the production hot path). ClickHouse's +// kafka_format='ProtobufList' expects exactly this on the wire. func (x *XTCP) protobufListMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) { - buf = x.destBytesPool.Get() - *buf = (*buf)[:0] - - writer := &ByteSliceWriter{Buf: buf} - if _, err := protodelim.MarshalTo(writer, e); err != nil { - x.pC.WithLabelValues("protoMarshal", "MarshalTo", "error").Inc() - if x.debugLevel > 10 { - log.Println("protodelim.MarshalTo(envelope) err: ", err) - } + b, err := recordfmt.AppendEnvelopeProtobufList((*buf)[:0], e) + if err != nil { + x.marshalErr("protobufListMarshal", err) } - + *buf = b return buf } -type ByteSliceWriter struct { - Buf *[]byte -} - -func (w *ByteSliceWriter) Write(b []byte) (n int, err error) { - *w.Buf = append(*w.Buf, b...) - return len(b), nil -} - -// envelopeProtoJSONMarshal marshals a whole Envelope (batch of rows) to -// compact single-line JSON, newline-terminated — one JSON object per flush, -// i.e. NDJSON. Pairs with `-dest stdout` for jq-able local output. -// (protojson.Marshal is compact; protojson.Format is multi-line pretty-print -// and would break the one-object-per-line contract.) The trailing newline is -// the marshaller's framing responsibility — writerDest/tcp/http write bytes -// verbatim. func (x *XTCP) envelopeProtoJSONMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) { - b, err := protojson.Marshal(e) + b, err := recordfmt.MarshalEnvelopeJSON(e) if err != nil { - x.pC.WithLabelValues("envelopeProtoJSONMarshal", "Marshal", "error").Inc() - if x.debugLevel > 10 { - log.Println("protojson.Marshal(envelope) err: ", err) - } + x.marshalErr("envelopeProtoJSONMarshal", err) } - b = append(b, '\n') return &b } -// envelopeProtoTextMarshal marshals a whole Envelope to protobuf text, -// newline-terminated so consecutive flushes stay separated on a stream. func (x *XTCP) envelopeProtoTextMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) { - b := []byte(prototext.Format(e)) - b = append(b, '\n') + b, err := recordfmt.MarshalEnvelopeText(e) + if err != nil { + x.marshalErr("envelopeProtoTextMarshal", err) + } return &b } -// envelopeMsgPackMarshal marshals a whole Envelope to MsgPack via reflection. func (x *XTCP) envelopeMsgPackMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) { - b, err := msgpack.Marshal(e) + b, err := recordfmt.MarshalEnvelopeMsgPack(e) if err != nil { - x.pC.WithLabelValues("envelopeMsgPackMarshal", "Marshal", "error").Inc() - if x.debugLevel > 1000 { - log.Println("envelopeMsgPackMarshal err: ", err) - } + x.marshalErr("envelopeMsgPackMarshal", err) } return &b } -// protoJsonMarshal marshals to JSON. -// https://pkg.go.dev/google.golang.org/protobuf/encoding/protojson func (x *XTCP) protoJsonMarshal(r *xtcp_flat_record.XtcpFlatRecord) (buf *[]byte) { - b := []byte(protojson.Format(r)) - buf = &b - return buf + b, err := recordfmt.MarshalJSON(r) + if err != nil { + x.marshalErr("protoJsonMarshal", err) + } + return &b } -// protoTextMarshal marshals to prototext. -// https://pkg.go.dev/google.golang.org/protobuf/encoding/prototext func (x *XTCP) protoTextMarshal(r *xtcp_flat_record.XtcpFlatRecord) (buf *[]byte) { - b := []byte(prototext.Format(r)) - buf = &b - return buf + b, err := recordfmt.MarshalText(r) + if err != nil { + x.marshalErr("protoTextMarshal", err) + } + return &b } -// protoMsgPackMarshal marshals to MsgPack via reflection. -// https://msgpack.uptrace.dev/ -// TODO consider https://github.com/shamaton/msgpackgen for codegen-based throughput. func (x *XTCP) protoMsgPackMarshal(r *xtcp_flat_record.XtcpFlatRecord) (buf *[]byte) { - b, err := msgpack.Marshal(r) + b, err := recordfmt.MarshalMsgPack(r) if err != nil { - x.pC.WithLabelValues("protoMsgPackMarshal", "Marshal", "error").Inc() - if x.debugLevel > 1000 { - log.Println("protoMsgPackMarshal err: ", err) - } + x.marshalErr("protoMsgPackMarshal", err) } - buf = &b - return buf + return &b } diff --git a/pkg/xtcp/marshallers_test.go b/pkg/xtcp/marshallers_test.go index b222d66..9ee9f43 100644 --- a/pkg/xtcp/marshallers_test.go +++ b/pkg/xtcp/marshallers_test.go @@ -87,28 +87,7 @@ func TestProtoMsgPackMarshal_roundtrip(t *testing.T) { } } -// ByteSliceWriter appends raw bytes onto its target. -func TestByteSliceWriter_Write(t *testing.T) { - buf := []byte{} - w := &ByteSliceWriter{Buf: &buf} - n, err := w.Write([]byte("hello")) - if err != nil { - t.Fatalf("Write returned error: %v", err) - } - if n != 5 { - t.Errorf("Write returned n=%d, want 5", n) - } - if string(buf) != "hello" { - t.Errorf("buf = %q, want hello", buf) - } - // Subsequent writes append. - if _, err := w.Write([]byte(" world")); err != nil { - t.Fatalf("Write append err: %v", err) - } - if string(buf) != "hello world" { - t.Errorf("buf = %q, want hello world", buf) - } -} +// (length-delimited encoding is now exercised in pkg/recordfmt.) // InitMarshallers with an invalid MarshalTo: fatalf fires once (the // early-return path); the function exits without populating x.Marshaller. diff --git a/pkg/xtcp/marshallers_text.go b/pkg/xtcp/marshallers_text.go index 9a3d500..88bca97 100644 --- a/pkg/xtcp/marshallers_text.go +++ b/pkg/xtcp/marshallers_text.go @@ -1,31 +1,37 @@ package xtcp import ( - "bytes" - "encoding/csv" - "log" "sync/atomic" + "github.com/randomizedcoder/xtcp2/pkg/recordfmt" "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" - "google.golang.org/protobuf/encoding/protojson" ) // registerTextEnvelopeMarshallers wires the line/tabular envelope marshallers -// (jsonl, csv, tsv) into x.EnvelopeMarshallers. Each is an envelope marshaller -// that iterates Envelope.Row and owns its trailing-newline framing (writerDest -// and the tcp/http sinks write bytes verbatim). -// -// csv/tsv resolve their column set from -columns once here; an invalid spec -// fatals at init only when one of them is the selected format, so a stray -// -columns alongside protoJson is harmless. +// (jsonl, humanize, csv, tsv) into x.EnvelopeMarshallers. Each delegates to +// pkg/recordfmt and adds the daemon's error counting. csv/tsv resolve their +// column set from -columns once here; an invalid spec fatals at init only when +// one of them is the selected format, so a stray -columns alongside protoJson +// is harmless. The header is emitted once per process (atomic guard). func (x *XTCP) registerTextEnvelopeMarshallers() { x.EnvelopeMarshallers.Store(MarshallerJSONL, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { - return x.envelopeJSONLMarshal(e) + b, err := recordfmt.MarshalEnvelopeJSONL(e) + if err != nil { + x.marshalErr("envelopeJSONLMarshal", err) + } + return &b + }) + x.EnvelopeMarshallers.Store(MarshallerHumanize, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { + b, err := recordfmt.MarshalEnvelopeHumanizedJSONL(e) + if err != nil { + x.marshalErr("envelopeHumanizedJSONLMarshal", err) + } + return &b }) - cols := flatColumns() + cols := recordfmt.AllColumns() if x.config.MarshalTo == MarshallerCSV || x.config.MarshalTo == MarshallerTSV { - c, err := selectColumns(x.config.CsvColumns) + c, err := recordfmt.SelectColumns(x.config.CsvColumns) if err != nil { x.callFatalf("InitEnvelopeMarshallers -columns: %v", err) return @@ -37,69 +43,17 @@ func (x *XTCP) registerTextEnvelopeMarshallers() { // exactly once per process on whichever stream they feed. var csvHeader, tsvHeader atomic.Bool x.EnvelopeMarshallers.Store(MarshallerCSV, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { - return x.envelopeDelimitedMarshal(e, cols, ',', &csvHeader) + b, err := recordfmt.MarshalEnvelopeTable(e, cols, ',', csvHeader.CompareAndSwap(false, true)) + if err != nil { + x.marshalErr("envelopeCSVMarshal", err) + } + return &b }) x.EnvelopeMarshallers.Store(MarshallerTSV, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { - return x.envelopeDelimitedMarshal(e, cols, '\t', &tsvHeader) - }) -} - -// envelopeJSONLMarshal emits one compact JSON object per row, each on its own -// line (NDJSON / ClickHouse JSONEachRow). Values are raw (machine) — use -// csv/tsv for humanized addresses/states. -func (x *XTCP) envelopeJSONLMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) { - var b bytes.Buffer - for _, r := range e.Row { - line, err := protojson.Marshal(r) + b, err := recordfmt.MarshalEnvelopeTable(e, cols, '\t', tsvHeader.CompareAndSwap(false, true)) if err != nil { - x.pC.WithLabelValues("envelopeJSONLMarshal", "Marshal", "error").Inc() - if x.debugLevel > 10 { - log.Println("envelopeJSONLMarshal protojson.Marshal err: ", err) - } - continue - } - b.Write(line) - b.WriteByte('\n') - } - out := b.Bytes() - return &out -} - -// envelopeDelimitedMarshal renders the envelope's rows as delimited text -// (CSV or TSV depending on comma), humanized, with the header written once. -// encoding/csv already terminates every record with '\n', so the block is -// self-framing. -func (x *XTCP) envelopeDelimitedMarshal(e *xtcp_flat_record.Envelope, cols []flatCol, comma rune, headerWritten *atomic.Bool) (buf *[]byte) { - var b bytes.Buffer - w := csv.NewWriter(&b) - w.Comma = comma - - if headerWritten.CompareAndSwap(false, true) { - if err := w.Write(flatRecordHeader(cols)); err != nil { - x.pC.WithLabelValues("envelopeDelimitedMarshal", "header", "error").Inc() - if x.debugLevel > 10 { - log.Println("envelopeDelimitedMarshal header err: ", err) - } - } - } - - for _, r := range e.Row { - if err := w.Write(flatRecordValues(r, cols, true)); err != nil { - x.pC.WithLabelValues("envelopeDelimitedMarshal", "row", "error").Inc() - if x.debugLevel > 10 { - log.Println("envelopeDelimitedMarshal row err: ", err) - } + x.marshalErr("envelopeTSVMarshal", err) } - } - - w.Flush() - if err := w.Error(); err != nil { - x.pC.WithLabelValues("envelopeDelimitedMarshal", "flush", "error").Inc() - if x.debugLevel > 10 { - log.Println("envelopeDelimitedMarshal flush err: ", err) - } - } - - out := b.Bytes() - return &out + return &b + }) } diff --git a/pkg/xtcp/marshallers_text_test.go b/pkg/xtcp/marshallers_text_test.go deleted file mode 100644 index bbd529c..0000000 --- a/pkg/xtcp/marshallers_text_test.go +++ /dev/null @@ -1,116 +0,0 @@ -package xtcp - -import ( - "bytes" - "encoding/csv" - "encoding/json" - "strings" - "sync" - "sync/atomic" - "testing" - - "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" -) - -func sampleEnvelope() *xtcp_flat_record.Envelope { - return &xtcp_flat_record.Envelope{ - Row: []*xtcp_flat_record.XtcpFlatRecord{ - {Hostname: "host-a", InetDiagMsgFamily: afInet, InetDiagMsgState: 10}, - {Hostname: "host-b", InetDiagMsgFamily: afInet, InetDiagMsgState: 1}, - }, - } -} - -func TestEnvelopeJSONLMarshal(t *testing.T) { - x, _ := newMarshalFixture(t) - buf := x.envelopeJSONLMarshal(sampleEnvelope()) - lines := strings.Split(strings.TrimRight(string(*buf), "\n"), "\n") - if len(lines) != 2 { - t.Fatalf("got %d lines, want 2: %q", len(lines), string(*buf)) - } - for i, ln := range lines { - var m map[string]any - if err := json.Unmarshal([]byte(ln), &m); err != nil { - t.Errorf("line %d not valid JSON: %v (%q)", i, err, ln) - } - } - // Trailing newline is part of the framing contract. - if !bytes.HasSuffix(*buf, []byte("\n")) { - t.Error("jsonl output must end with a newline") - } -} - -func TestEnvelopeDelimitedMarshal_csv(t *testing.T) { - x, _ := newMarshalFixture(t) - cols, err := selectColumns("hostname,inetDiagMsgState") - if err != nil { - t.Fatal(err) - } - var header atomic.Bool - - // First flush includes the header. - buf := x.envelopeDelimitedMarshal(sampleEnvelope(), cols, ',', &header) - rows, err := csv.NewReader(bytes.NewReader(*buf)).ReadAll() - if err != nil { - t.Fatalf("csv parse: %v", err) - } - if len(rows) != 3 { // header + 2 records - t.Fatalf("got %d csv rows, want 3 (header+2): %v", len(rows), rows) - } - if rows[0][0] != "hostname" || rows[0][1] != "inetDiagMsgState" { - t.Errorf("header = %v", rows[0]) - } - // Humanized state: 10 → LISTEN, 1 → ESTABLISHED. - if rows[1][1] != "LISTEN" || rows[2][1] != "ESTABLISHED" { - t.Errorf("humanized state cells = %q, %q", rows[1][1], rows[2][1]) - } - - // Second flush omits the header (header-once). - buf2 := x.envelopeDelimitedMarshal(sampleEnvelope(), cols, ',', &header) - rows2, err := csv.NewReader(bytes.NewReader(*buf2)).ReadAll() - if err != nil { - t.Fatalf("csv parse 2: %v", err) - } - if len(rows2) != 2 { - t.Errorf("second flush rows = %d, want 2 (no header)", len(rows2)) - } -} - -func TestEnvelopeDelimitedMarshal_tsv(t *testing.T) { - x, _ := newMarshalFixture(t) - cols, _ := selectColumns("hostname,inetDiagMsgState") - var header atomic.Bool - buf := x.envelopeDelimitedMarshal(sampleEnvelope(), cols, '\t', &header) - if !strings.Contains(string(*buf), "\t") { - t.Errorf("tsv output should contain tabs: %q", string(*buf)) - } - r := csv.NewReader(bytes.NewReader(*buf)) - r.Comma = '\t' - rows, err := r.ReadAll() - if err != nil { - t.Fatalf("tsv parse: %v", err) - } - if len(rows) != 3 { - t.Errorf("tsv rows = %d, want 3", len(rows)) - } -} - -func TestRegisterTextEnvelopeMarshallers_selected(t *testing.T) { - for _, name := range []string{MarshallerJSONL, MarshallerCSV, MarshallerTSV} { - t.Run(name, func(t *testing.T) { - x, _ := newMarshalFixture(t) - x.config.MarshalTo = name - var wg sync.WaitGroup - wg.Add(1) - x.InitEnvelopeMarshallers(&wg) - wg.Wait() - if x.EnvelopeMarshaller == nil { - t.Fatalf("EnvelopeMarshaller nil for %q", name) - } - buf := x.EnvelopeMarshaller(sampleEnvelope()) - if buf == nil || len(*buf) == 0 { - t.Fatalf("%q produced empty output", name) - } - }) - } -}