Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions cmd/xtcp2client/printer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package main

import (
"fmt"
"io"
"log"
"sync"

"github.com/randomizedcoder/xtcp2/pkg/recordfmt"
"github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record"
)

// recordPrinter writes streamed records to an io.Writer in the chosen format,
// reusing the shared pkg/recordfmt library so the client and daemon format
// identically. It serializes writes (the listen path fans out across worker
// goroutines) and emits the csv/tsv header exactly once.
type recordPrinter struct {
mu sync.Mutex
w io.Writer
format string
cols []recordfmt.Column
comma rune
headerDone bool
}

// newRecordPrinter validates the format (and -columns for csv/tsv) and returns
// a printer. An unknown format or bad column spec is an error.
func newRecordPrinter(w io.Writer, format, columns string) (*recordPrinter, error) {
p := &recordPrinter{w: w, format: format}
switch format {
case recordfmt.FormatJSON, recordfmt.FormatHumanize, recordfmt.FormatNull:
// no columns
case recordfmt.FormatCSV, recordfmt.FormatTSV:
cols, err := recordfmt.SelectColumns(columns)
if err != nil {
return nil, err
}
p.cols = cols
p.comma = ','
if format == recordfmt.FormatTSV {
p.comma = '\t'
}
default:
return nil, fmt.Errorf("unknown -format %q (want json, csv, tsv, humanize, or null)", format)
}
return p, nil
}

// record formats and writes one record. Safe for concurrent callers.
func (p *recordPrinter) record(r *xtcp_flat_record.XtcpFlatRecord) {
if p == nil || r == nil || p.format == recordfmt.FormatNull {
return
}

switch p.format {
case recordfmt.FormatCSV, recordfmt.FormatTSV:
// One-row envelope through the shared table encoder; header once.
env := &xtcp_flat_record.Envelope{Row: []*xtcp_flat_record.XtcpFlatRecord{r}}
p.mu.Lock()
defer p.mu.Unlock()
first := !p.headerDone
p.headerDone = true
b, err := recordfmt.MarshalEnvelopeTable(env, p.cols, p.comma, first)
if err != nil {
log.Printf("xtcp2client: format %s: %v", p.format, err)
return
}
p.write(b)

default: // json, humanize — one object per line
var b []byte
var err error
if p.format == recordfmt.FormatHumanize {
b, err = recordfmt.MarshalHumanizedJSON(r)
} else {
b, err = recordfmt.MarshalJSON(r)
}
if err != nil {
log.Printf("xtcp2client: format %s: %v", p.format, err)
return
}
p.mu.Lock()
defer p.mu.Unlock()
p.write(append(b, '\n'))
}
}

// write emits bytes to the sink, logging (not failing) on a write error —
// the caller holds the mutex (csv path) or has just taken it (json path).
func (p *recordPrinter) write(b []byte) {
if _, err := p.w.Write(b); err != nil {
log.Printf("xtcp2client: write: %v", err)
}
}
89 changes: 34 additions & 55 deletions cmd/xtcp2client/xtcp2client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ import (
_ "unsafe"

"github.com/randomizedcoder/xtcp2/pkg/misc"
"github.com/randomizedcoder/xtcp2/pkg/recordfmt"
"github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

// unsafe for FastRandN
Expand Down Expand Up @@ -125,7 +124,9 @@ func runMain(ctx context.Context, args []string, stdout, stderr io.Writer) int {
poll := fs.Bool("poll", false, "poll mode means the client will trigger polling via the PollFlatRecords service")
pollFrequency := fs.Duration("pollFrequency", pollFrequencyCst, "poll mode frequency")
workers := fs.Int("workers", 10, "workers")
json := fs.Bool("json", false, "json output")
format := fs.String("format", recordfmt.FormatJSON, "output format: json | csv | tsv | humanize | null")
columns := fs.String("columns", "", "csv/tsv only: comma-separated XtcpFlatRecord json field names; empty = all")
jsonFlag := fs.Bool("json", false, "deprecated alias for -format json")
d := fs.Uint("d", 11, "debugLevel")
v := fs.Bool("v", false, "show version")
if err := fs.Parse(args); err != nil {
Expand All @@ -138,19 +139,29 @@ func runMain(ctx context.Context, args []string, stdout, stderr io.Writer) int {
}
debugLevel = *d

chosen := *format
if *jsonFlag {
chosen = recordfmt.FormatJSON
}
printer, err := newRecordPrinter(stdout, chosen, *columns)
if err != nil {
fmt.Fprintf(stderr, "xtcp2client: %v\n", err)
return 2
}

complete := make(chan struct{}, signalChannelSizeCst)
addr := *target + ":" + *port
if *poll {
pollMode(ctx, addr, &complete, *pollFrequency, *json, debugLevel)
pollMode(ctx, addr, &complete, *pollFrequency, printer, debugLevel)
} else {
listenMode(ctx, addr, *workers, &complete, *json)
listenMode(ctx, addr, *workers, &complete, printer)
}
return 0
}

// func (c *xTCPFlatRecordServiceClient) PollFlatRecords(
// ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[PollFlatRecordsRequest, FlatRecordsResponse], error) {
func pollMode(ctx context.Context, addr string, complete *chan struct{}, pollFrequency time.Duration, json bool, debugLevel uint) {
func pollMode(ctx context.Context, addr string, complete *chan struct{}, pollFrequency time.Duration, printer *recordPrinter, debugLevel uint) {

if debugLevel > 10 {
log.Printf("pollMode starting")
Expand Down Expand Up @@ -187,7 +198,7 @@ func pollMode(ctx context.Context, addr string, complete *chan struct{}, pollFre
// recvCh := make(chan *xtcp_flat_record.FlatRecordsResponse)
wg := new(sync.WaitGroup)
wg.Add(1)
go pollStreamRecv(ctx, wg, json, &stream, debugLevel)
go pollStreamRecv(ctx, wg, printer, &stream, debugLevel)

breakPoint:
for i := 0; ; i++ {
Expand Down Expand Up @@ -223,7 +234,7 @@ breakPoint:
func pollStreamRecv(
ctx context.Context,
wg *sync.WaitGroup,
json bool,
printer *recordPrinter,
// recvCh chan *xtcp_flat_record.FlatRecordsResponse,
stream *grpc.BidiStreamingClient[xtcp_flat_record.PollFlatRecordsRequest, xtcp_flat_record.PollFlatRecordsResponse],
debugLevel uint) {
Expand Down Expand Up @@ -265,7 +276,7 @@ breakPoint:
continue
}
// log.Printf("rec:%v", rec)
printPollFlatRecordsResponse(pollFlatRecordsResponse, 1, json, debugLevel)
printPollFlatRecordsResponse(pollFlatRecordsResponse, 1, printer, debugLevel)

// recvCh <- rec

Expand All @@ -278,7 +289,7 @@ breakPoint:
}
}

func listenMode(ctx context.Context, addr string, workers int, complete *chan struct{}, json bool) {
func listenMode(ctx context.Context, addr string, workers int, complete *chan struct{}, printer *recordPrinter) {

var wg sync.WaitGroup
wg.Add(workers)
Expand All @@ -288,7 +299,7 @@ func listenMode(ctx context.Context, addr string, workers int, complete *chan st
// dialed once and passed the conn down, but stream()
// deferred-Close'd it on first return, so every "reconnect
// after sleep" iteration after the first used a dead conn.
go singleStreamingClient(ctx, &wg, addr, json, j)
go singleStreamingClient(ctx, &wg, addr, printer, j)
}

wg.Wait()
Expand All @@ -306,7 +317,7 @@ func listenMode(ctx context.Context, addr string, workers int, complete *chan st
// restart branch without waiting 10 seconds.
var reconnectTimeVar = reconnectTime

func singleStreamingClient(ctx context.Context, wg *sync.WaitGroup, addr string, json bool, id int) {
func singleStreamingClient(ctx context.Context, wg *sync.WaitGroup, addr string, printer *recordPrinter, id int) {

defer wg.Done()

Expand All @@ -327,7 +338,7 @@ breakPoint:
// dead code after iteration 0.
conn := newGRPCClient(addr)
wg.Add(1)
stream(ctx, wg, conn, json, id)
stream(ctx, wg, conn, printer, id)

select {
case <-ctx.Done():
Expand Down Expand Up @@ -451,7 +462,7 @@ func handleRecvContinueErr(ctx context.Context, client any, err error) bool {
return false
}

func stream(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn, json bool, id int) {
func stream(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn, printer *recordPrinter, id int) {

defer wg.Done()
defer func() {
Expand Down Expand Up @@ -485,7 +496,7 @@ func stream(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn, json
case recvBreak:
return
case recvPrint:
printFlatRecordsResponse(resp, id, json, debugLevel)
printFlatRecordsResponse(resp, id, printer, debugLevel)
continue
case recvContinue:
// fall through to handleRecvContinueErr below
Expand All @@ -500,48 +511,16 @@ func stream(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn, json
}
}

func printFlatRecordsResponse(flatRecordsResponse *xtcp_flat_record.FlatRecordsResponse, id int, json bool, debugLevel uint) {

if debugLevel > 10 {
b, err := proto.Marshal(flatRecordsResponse.GetXtcpFlatRecord())
if err != nil {
if debugLevel > 10 {
log.Println("FlatRecords proto.Marshal(x) err: ", err)
}
}
log.Printf("id:%d, FlatRecords len(b):%d", id, len(b))
}

if debugLevel > 10 {
if json {
jsonStr := protojson.Format(flatRecordsResponse.GetXtcpFlatRecord())
log.Printf("id:%d, %s", id, jsonStr)
return
}

log.Printf("id:%d, %s", id, flatRecordsResponse.GetXtcpFlatRecord())
func printFlatRecordsResponse(flatRecordsResponse *xtcp_flat_record.FlatRecordsResponse, id int, printer *recordPrinter, debugLevel uint) {
if debugLevel > 1000 {
log.Printf("id:%d, FlatRecords %s", id, flatRecordsResponse.GetXtcpFlatRecord())
}
printer.record(flatRecordsResponse.GetXtcpFlatRecord())
}

func printPollFlatRecordsResponse(pollFlatRecordsResponse *xtcp_flat_record.PollFlatRecordsResponse, id int, json bool, debugLevel uint) {

if debugLevel > 10 {
b, err := proto.Marshal(pollFlatRecordsResponse.GetXtcpFlatRecord())
if err != nil {
if debugLevel > 10 {
log.Println("FlatRecords proto.Marshal(x) err: ", err)
}
}
log.Printf("id:%d, FlatRecords len(b):%d", id, len(b))
}

if debugLevel > 10 {
if json {
jsonStr := protojson.Format(pollFlatRecordsResponse.GetXtcpFlatRecord())
log.Printf("id:%d, %s", id, jsonStr)
return
}

log.Printf("id:%d, %s", id, pollFlatRecordsResponse.GetXtcpFlatRecord())
func printPollFlatRecordsResponse(pollFlatRecordsResponse *xtcp_flat_record.PollFlatRecordsResponse, id int, printer *recordPrinter, debugLevel uint) {
if debugLevel > 1000 {
log.Printf("id:%d, PollFlatRecords %s", id, pollFlatRecordsResponse.GetXtcpFlatRecord())
}
printer.record(pollFlatRecordsResponse.GetXtcpFlatRecord())
}
Loading