From 89eeca35ca1993a5761503907f312d3f1315a730 Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Fri, 5 Jun 2026 05:25:03 +0900 Subject: [PATCH 1/3] Use spanvalue WriteRowIterator for experimental CSV streaming. Replace the hand-rolled Next/PrepareRowType/Flush loop with WriteRowIterator, including the redact-rows path after skipRowIter. Co-authored-by: Cursor --- main.go | 33 +++------------------------------ 1 file changed, 3 insertions(+), 30 deletions(-) diff --git a/main.go b/main.go index 545781e..957a720 100644 --- a/main.go +++ b/main.go @@ -379,46 +379,19 @@ func runAndWriteCsv(ctx context.Context, client *spanner.Client, stmt spanner.St } // writeCsvFromRowIter streams query rows to CSV without materializing a ResultSet. -// It follows spanvalue RowIterator guidance: PrepareRowType after the first Next -// (including iterator.Done), WriteRow in the loop, return Flush (not defer Flush). +// It uses spanvalue WriteRowIterator (PrepareRowType on first Next, Flush in Finish, Stop on return). func writeCsvFromRowIter(writer io.Writer, rowIter *spanner.RowIterator, redactRows bool) error { - defer rowIter.Stop() - csvWriter, err := svwriter.NewCSVWriter(writer) if err != nil { return err } - if redactRows { if err := skipRowIter(rowIter); err != nil { return err } - if err := prepareCsvRowType(csvWriter, rowIter.Metadata); err != nil { - return err - } - return csvWriter.Flush() - } - - first := true - for { - row, err := rowIter.Next() - if err != nil && !errors.Is(err, iterator.Done) { - return err - } - if first { - first = false - if err := prepareCsvRowType(csvWriter, rowIter.Metadata); err != nil { - return err - } - } - if errors.Is(err, iterator.Done) { - break - } - if err := csvWriter.WriteRow(row); err != nil { - return err - } } - return csvWriter.Flush() + _, err = svwriter.WriteRowIterator(rowIter, csvWriter) + return err } func prepareCsvRowType(csvWriter *svwriter.DelimitedWriter, metadata *sppb.ResultSetMetadata) error { From eecae9040aa3f8350f407934141867b73ad0e84f Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Fri, 5 Jun 2026 06:28:36 +0900 Subject: [PATCH 2/3] Use nop RowIteratorWriter for redacted experimental CSV. WriteRowIterator drains the iterator once; csvRedactRowIteratorWriter skips WriteRow bodies while still emitting the header. Drop skipRowIter before WriteRowIterator and pass the query iterator directly from the single path. Co-authored-by: Cursor --- main.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index 957a720..86655dd 100644 --- a/main.go +++ b/main.go @@ -360,8 +360,11 @@ func runAndWriteCsv(ctx context.Context, client *spanner.Client, stmt spanner.St _, err = io.Copy(os.Stdout, &buf) return err case single: - iter := client.Single().WithTimestampBound(mode.TimestampBound).QueryWithOptions(ctx, stmt, opts) - return writeCsvFromRowIter(os.Stdout, iter, redactRows) + return writeCsvFromRowIter( + os.Stdout, + client.Single().WithTimestampBound(mode.TimestampBound).QueryWithOptions(ctx, stmt, opts), + redactRows, + ) case partitionedDML: count, err := client.PartitionedUpdateWithOptions(ctx, stmt, opts) if err != nil { @@ -378,19 +381,27 @@ func runAndWriteCsv(ctx context.Context, client *spanner.Client, stmt spanner.St } } +// csvRedactRowIteratorWriter implements [svwriter.RowIteratorWriter] for --redact-rows CSV: +// it registers schema and flushes the header via the embedded [svwriter.DelimitedWriter] but +// discards row bodies in WriteRow while WriteRowIterator drains the iterator. +type csvRedactRowIteratorWriter struct { + *svwriter.DelimitedWriter +} + +func (csvRedactRowIteratorWriter) WriteRow(*spanner.Row) error { return nil } + // writeCsvFromRowIter streams query rows to CSV without materializing a ResultSet. -// It uses spanvalue WriteRowIterator (PrepareRowType on first Next, Flush in Finish, Stop on return). +// Pass the query iterator directly to WriteRowIterator (it owns Stop); do not defer Stop at the call site. func writeCsvFromRowIter(writer io.Writer, rowIter *spanner.RowIterator, redactRows bool) error { csvWriter, err := svwriter.NewCSVWriter(writer) if err != nil { return err } + iterWriter := svwriter.RowIteratorWriter(csvWriter) if redactRows { - if err := skipRowIter(rowIter); err != nil { - return err - } + iterWriter = csvRedactRowIteratorWriter{csvWriter} } - _, err = svwriter.WriteRowIterator(rowIter, csvWriter) + _, err = svwriter.WriteRowIterator(rowIter, iterWriter) return err } From 591168f9ec3e8d3e4257239e811583e675b004cf Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Fri, 5 Jun 2026 07:19:59 +0900 Subject: [PATCH 3/3] Add emulator tests for experimental CSV RowIterator export. Cover writeCsvFromRowIter for normal rows, zero-row header-only SELECT, and redact-rows header-only output. Co-authored-by: Cursor --- integration_test.go | 56 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/integration_test.go b/integration_test.go index 92a2b10..a566d22 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1,8 +1,11 @@ package main import ( + "bytes" "context" + "encoding/csv" _ "embed" + "strings" "testing" "cloud.google.com/go/spanner" @@ -403,4 +406,57 @@ func TestWithCloudSpannerEmulator(t *testing.T) { t.Fatalf("redacted .rows[]: got %d values, want 0", len(redactedRows)) } }) + + t.Run("experimental_csv writeCsvFromRowIter", func(t *testing.T) { + readCSV := func(t *testing.T, out string) [][]string { + t.Helper() + recs, err := csv.NewReader(strings.NewReader(out)).ReadAll() + if err != nil { + t.Fatalf("parse csv: %v\noutput:\n%s", err, out) + } + return recs + } + writeCSV := func(t *testing.T, sql string, redact bool) [][]string { + t.Helper() + var buf bytes.Buffer + iter := client.Single().Query(ctx, spanner.Statement{SQL: sql}) + if err := writeCsvFromRowIter(&buf, iter, redact); err != nil { + t.Fatalf("writeCsvFromRowIter(redact=%v): %v", redact, err) + } + return readCSV(t, buf.String()) + } + + t.Run("normal", func(t *testing.T) { + recs := writeCSV(t, "SELECT SingerId, FirstName FROM Singers ORDER BY SingerId LIMIT 2", false) + if len(recs) != 3 { + t.Fatalf("got %d csv records, want header + 2 rows: %#v", len(recs), recs) + } + if got, want := recs[0], []string{"SingerId", "FirstName"}; !cmp.Equal(got, want) { + t.Fatalf("header: got %v want %v", got, want) + } + if recs[1][0] != "1" || recs[2][0] != "2" { + t.Fatalf("SingerId column: %#v", recs) + } + }) + + t.Run("zero_rows", func(t *testing.T) { + recs := writeCSV(t, "SELECT SingerId FROM Singers WHERE SingerId = -1", false) + if len(recs) != 1 { + t.Fatalf("got %d csv records, want header only: %#v", len(recs), recs) + } + if got, want := recs[0], []string{"SingerId"}; !cmp.Equal(got, want) { + t.Fatalf("header: got %v want %v", got, want) + } + }) + + t.Run("redact_rows", func(t *testing.T) { + recs := writeCSV(t, "SELECT SingerId, FirstName FROM Singers ORDER BY SingerId LIMIT 2", true) + if len(recs) != 1 { + t.Fatalf("got %d csv records, want header only: %#v", len(recs), recs) + } + if got, want := recs[0], []string{"SingerId", "FirstName"}; !cmp.Equal(got, want) { + t.Fatalf("header: got %v want %v", got, want) + } + }) + }) }