Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package main

import (
"bytes"
"context"
"encoding/csv"
_ "embed"
"strings"
"testing"

"cloud.google.com/go/spanner"
Expand Down Expand Up @@ -403,4 +406,57 @@ func TestWithCloudSpannerEmulator(t *testing.T) {
t.Fatalf("redacted .rows[]: got %d values, want 0", len(redactedRows))
}
})

t.Run("experimental_csv writeCsvFromRowIter", func(t *testing.T) {
readCSV := func(t *testing.T, out string) [][]string {
t.Helper()
recs, err := csv.NewReader(strings.NewReader(out)).ReadAll()
if err != nil {
t.Fatalf("parse csv: %v\noutput:\n%s", err, out)
}
return recs
}
writeCSV := func(t *testing.T, sql string, redact bool) [][]string {
t.Helper()
var buf bytes.Buffer
iter := client.Single().Query(ctx, spanner.Statement{SQL: sql})
if err := writeCsvFromRowIter(&buf, iter, redact); err != nil {
t.Fatalf("writeCsvFromRowIter(redact=%v): %v", redact, err)
}
return readCSV(t, buf.String())
}

t.Run("normal", func(t *testing.T) {
recs := writeCSV(t, "SELECT SingerId, FirstName FROM Singers ORDER BY SingerId LIMIT 2", false)
if len(recs) != 3 {
t.Fatalf("got %d csv records, want header + 2 rows: %#v", len(recs), recs)
}
if got, want := recs[0], []string{"SingerId", "FirstName"}; !cmp.Equal(got, want) {
t.Fatalf("header: got %v want %v", got, want)
}
if recs[1][0] != "1" || recs[2][0] != "2" {
t.Fatalf("SingerId column: %#v", recs)
}
})

t.Run("zero_rows", func(t *testing.T) {
recs := writeCSV(t, "SELECT SingerId FROM Singers WHERE SingerId = -1", false)
if len(recs) != 1 {
t.Fatalf("got %d csv records, want header only: %#v", len(recs), recs)
}
if got, want := recs[0], []string{"SingerId"}; !cmp.Equal(got, want) {
t.Fatalf("header: got %v want %v", got, want)
}
})

t.Run("redact_rows", func(t *testing.T) {
recs := writeCSV(t, "SELECT SingerId, FirstName FROM Singers ORDER BY SingerId LIMIT 2", true)
if len(recs) != 1 {
t.Fatalf("got %d csv records, want header only: %#v", len(recs), recs)
}
if got, want := recs[0], []string{"SingerId", "FirstName"}; !cmp.Equal(got, want) {
t.Fatalf("header: got %v want %v", got, want)
}
})
})
}
54 changes: 19 additions & 35 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,11 @@ func runAndWriteCsv(ctx context.Context, client *spanner.Client, stmt spanner.St
_, err = io.Copy(os.Stdout, &buf)
return err
case single:
iter := client.Single().WithTimestampBound(mode.TimestampBound).QueryWithOptions(ctx, stmt, opts)
return writeCsvFromRowIter(os.Stdout, iter, redactRows)
return writeCsvFromRowIter(
os.Stdout,
client.Single().WithTimestampBound(mode.TimestampBound).QueryWithOptions(ctx, stmt, opts),
redactRows,
)
case partitionedDML:
count, err := client.PartitionedUpdateWithOptions(ctx, stmt, opts)
if err != nil {
Expand All @@ -378,47 +381,28 @@ func runAndWriteCsv(ctx context.Context, client *spanner.Client, stmt spanner.St
}
}

// csvRedactRowIteratorWriter implements [svwriter.RowIteratorWriter] for --redact-rows CSV:
// it registers schema and flushes the header via the embedded [svwriter.DelimitedWriter] but
// discards row bodies in WriteRow while WriteRowIterator drains the iterator.
type csvRedactRowIteratorWriter struct {
*svwriter.DelimitedWriter
}

func (csvRedactRowIteratorWriter) WriteRow(*spanner.Row) error { return nil }
Comment thread
apstndb marked this conversation as resolved.

// writeCsvFromRowIter streams query rows to CSV without materializing a ResultSet.
// It follows spanvalue RowIterator guidance: PrepareRowType after the first Next
// (including iterator.Done), WriteRow in the loop, return Flush (not defer Flush).
// Pass the query iterator directly to WriteRowIterator (it owns Stop); do not defer Stop at the call site.
func writeCsvFromRowIter(writer io.Writer, rowIter *spanner.RowIterator, redactRows bool) error {
defer rowIter.Stop()

csvWriter, err := svwriter.NewCSVWriter(writer)
Comment thread
apstndb marked this conversation as resolved.
if err != nil {
return err
}

iterWriter := svwriter.RowIteratorWriter(csvWriter)
if redactRows {
if err := skipRowIter(rowIter); err != nil {
return err
}
if err := prepareCsvRowType(csvWriter, rowIter.Metadata); err != nil {
return err
}
return csvWriter.Flush()
iterWriter = csvRedactRowIteratorWriter{csvWriter}
}

first := true
for {
row, err := rowIter.Next()
if err != nil && !errors.Is(err, iterator.Done) {
return err
}
if first {
first = false
if err := prepareCsvRowType(csvWriter, rowIter.Metadata); err != nil {
return err
}
}
if errors.Is(err, iterator.Done) {
break
}
if err := csvWriter.WriteRow(row); err != nil {
return err
}
}
return csvWriter.Flush()
_, err = svwriter.WriteRowIterator(rowIter, iterWriter)
return err
}

func prepareCsvRowType(csvWriter *svwriter.DelimitedWriter, metadata *sppb.ResultSetMetadata) error {
Expand Down
Loading