diff --git a/cmd/transfer-pvc/progress.go b/cmd/transfer-pvc/progress.go index ab1ea1cf..8b7f217f 100644 --- a/cmd/transfer-pvc/progress.go +++ b/cmd/transfer-pvc/progress.go @@ -34,6 +34,7 @@ type rsyncLogStream struct { outputFile *string } +// NewRsyncLogStream creates a log stream reader for the rsync client pod. func NewRsyncLogStream(restCfg *rest.Config, pvc types.NamespacedName, labels map[string]string, output string) LogStreams { var outputFile string if output != "" { @@ -47,6 +48,7 @@ func NewRsyncLogStream(restCfg *rest.Config, pvc types.NamespacedName, labels ma } } +// Init starts streaming logs from the rsync container and parsing progress. func (r *rsyncLogStream) Init() error { r.stdout = make(chan string) r.stderr = make(chan string) @@ -79,20 +81,9 @@ func (r *rsyncLogStream) Init() error { go func() { defer podLogStream.Close() logString := "" - zeroBytes := 0 for { buf := make([]byte, 32*1024) n, readErr := podLogStream.Read(buf) - if n > 0 { - zeroBytes = 0 - } else { - zeroBytes += 1 - } - // sometimes, a stream would end without returning an EOF gracefully - // we force exit the loop when we see null bytes on stream consecutively - if zeroBytes > 4 { - err = io.EOF - } logString = fmt.Sprintf("%s%s", logString, string(buf[:n])) if readErr == io.EOF { err = readErr @@ -103,6 +94,8 @@ func (r *rsyncLogStream) Init() error { } r.progress.ExitCode = code logString = finalLogs + } else if readErr != nil { + err = readErr } parsedProgress, unparsed := parseRsyncLogs(logString) r.progress.Merge(parsedProgress) @@ -135,6 +128,7 @@ func (r *rsyncLogStream) Init() error { return nil } +// writeProgressToFile serializes final transfer progress to a file. func writeProgressToFile(o string, p *Progress) error { file, err := os.OpenFile(o, os.O_CREATE, os.ModePerm) if err != nil { @@ -145,12 +139,14 @@ func writeProgressToFile(o string, p *Progress) error { return ioutil.WriteFile(o, d, os.ModePerm) } +// Close closes all rsync log stream channels. func (r *rsyncLogStream) Close() { close(r.stdout) close(r.stderr) close(r.err) } +// Streams returns channels for stdout, stderr, and terminal stream errors. func (r *rsyncLogStream) Streams() (stdout chan string, stderr chan string, err chan error) { return r.stdout, r.stderr, r.err } @@ -187,6 +183,7 @@ type dataSize struct { unit string } +// addDataSize adds two sizes by normalizing units when needed. func addDataSize(a, b *dataSize) *dataSize { if b == nil { return nil @@ -212,10 +209,12 @@ func addDataSize(a, b *dataSize) *dataSize { return newDs } +// String formats a data size as " ". func (d *dataSize) String() string { return fmt.Sprintf("%.2f %s", d.val, d.unit) } +// MarshalJSON marshals dataSize as its human-readable string value. func (d *dataSize) MarshalJSON() ([]byte, error) { return json.Marshal(d.String()) } @@ -301,7 +300,7 @@ func (p *Progress) Status() status { return succeeded } if p.TransferredFiles == 0 && - p.TransferredData.val == 0 && + (p.TransferredData == nil || p.TransferredData.val == 0) && p.TotalFiles == nil { return failed } @@ -476,6 +475,7 @@ func parseRsyncLogs(rawLogs string) (p *Progress, unprocessedData string) { return p, "" } +// waitForPodRunning waits until the labeled rsync client pod is ready. func waitForPodRunning(c *kubernetes.Clientset, namespace string, labels map[string]string) (string, error) { var podName string err := wait.PollUntil(time.Second, func() (done bool, err error) { @@ -510,11 +510,10 @@ func waitForPodRunning(c *kubernetes.Clientset, namespace string, labels map[str return podName, err } +// getFinalPodStatus returns rsync exit code and tail logs after stream completion. func getFinalPodStatus(c *kubernetes.Clientset, name string, namespace string) (*int32, string, error) { var exitCode *int32 - count := 0 - for { - count += 1 + for attempt := 0; attempt < 15; attempt++ { pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{}) if err != nil { return nil, "", err @@ -527,9 +526,10 @@ func getFinalPodStatus(c *kubernetes.Clientset, name string, namespace string) ( } } } - if count > 5 || exitCode != nil { + if exitCode != nil { break } + time.Sleep(time.Second) } lastLines := int64(35) @@ -553,3 +553,55 @@ func getFinalPodStatus(c *kubernetes.Clientset, name string, namespace string) ( return exitCode, buf.String(), nil } + +// validateTransferCompletion returns an error when transfer did not complete successfully. +func validateTransferCompletion(p *Progress) error { + if p == nil { + return fmt.Errorf("transfer progress unavailable") + } + + st := p.Status() + if st == succeeded { + return nil + } + + if !st.Completed() { + return fmt.Errorf("transfer did not complete, final status: %s", st) + } + + details := []string{fmt.Sprintf("status=%s", st)} + if p.ExitCode != nil { + details = append(details, fmt.Sprintf("exitCode=%d", *p.ExitCode)) + } + if len(p.FailedFiles) > 0 { + details = append(details, fmt.Sprintf("failedFiles=%d", len(p.FailedFiles))) + details = append(details, fmt.Sprintf("failedFilesSample=%s", formatFailedFilesSample(p.FailedFiles, 3))) + } + if len(p.Errors) > 0 { + details = append(details, fmt.Sprintf("errors=%d", len(p.Errors))) + details = append(details, fmt.Sprintf("errorsSample=%s", strings.Join(p.Errors[:minInt(len(p.Errors), 3)], "; "))) + } + + return fmt.Errorf("rsync transfer failed: %s", strings.Join(details, ", ")) +} + +// formatFailedFilesSample returns a concise sample of failed files for error messages. +func formatFailedFilesSample(files []FailedFile, max int) string { + if len(files) == 0 { + return "" + } + limit := minInt(len(files), max) + items := make([]string, 0, limit) + for i := 0; i < limit; i++ { + items = append(items, fmt.Sprintf("%s (%s)", files[i].Name, files[i].Err)) + } + return strings.Join(items, "; ") +} + +// minInt returns the smaller of two integers. +func minInt(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/cmd/transfer-pvc/transfer-pvc.go b/cmd/transfer-pvc/transfer-pvc.go index 1fd3e04d..ab2144d8 100644 --- a/cmd/transfer-pvc/transfer-pvc.go +++ b/cmd/transfer-pvc/transfer-pvc.go @@ -474,7 +474,7 @@ func (t *TransferPVCCommand) run() error { err = followClientLogs( srcCfg, types.NamespacedName{Name: srcPVC.Name, Namespace: srcPVC.Namespace}, labels, t.ProgressOutput) if err != nil { - log.Fatal(err, "error following rsync client logs") + log.Fatalf("error following rsync client logs: %v", err) } return garbageCollect(srcClient, destClient, labels, t.Endpoint.Type, t.PVC.Namespace) @@ -646,6 +646,7 @@ type LogStreams interface { Close() } +// followClientLogs streams rsync client logs and validates final transfer status. func followClientLogs(srcConfig *rest.Config, pvc types.NamespacedName, labels map[string]string, outputFile string) error { logReader := NewRsyncLogStream(srcConfig, pvc, labels, outputFile) err := logReader.Init() @@ -671,7 +672,14 @@ func followClientLogs(srcConfig *rest.Config, pvc types.NamespacedName, labels m break } } - return err + if err != nil { + return err + } + rsyncReader, ok := logReader.(*rsyncLogStream) + if !ok { + return fmt.Errorf("unexpected log reader type %T", logReader) + } + return validateTransferCompletion(rsyncReader.progress) } // waitForEndpoint waits for endpoint to become ready diff --git a/cmd/transfer-pvc/transfer-pvc_test.go b/cmd/transfer-pvc/transfer-pvc_test.go index b07b78d8..677a4086 100644 --- a/cmd/transfer-pvc/transfer-pvc_test.go +++ b/cmd/transfer-pvc/transfer-pvc_test.go @@ -4,6 +4,67 @@ import ( "testing" ) +func Test_validateTransferCompletion(t *testing.T) { + int100 := int64(100) + exit0 := int32(0) + exit23 := int32(23) + + tests := []struct { + name string + p *Progress + wantErr bool + }{ + { + name: "succeeded", + p: &Progress{ + ExitCode: &exit0, + TransferPercentage: &int100, + TransferredData: &dataSize{val: 10, unit: "M"}, + }, + wantErr: false, + }, + { + name: "failed", + p: &Progress{ + ExitCode: &exit23, + TransferredFiles: 0, + FailedFiles: []FailedFile{ + {Name: "/data/a", Err: "Permission denied (13)"}, + }, + }, + wantErr: true, + }, + { + name: "partially failed", + p: &Progress{ + ExitCode: &exit23, + TransferredFiles: 1, + TransferredData: &dataSize{val: 1, unit: "M"}, + FailedFiles: []FailedFile{ + {Name: "/data/journal", Err: "Permission denied (13)"}, + }, + }, + wantErr: true, + }, + { + name: "nil progress", + p: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateTransferCompletion(tt.p) + if tt.wantErr && err == nil { + t.Fatalf("expected error, got nil") + } + if !tt.wantErr && err != nil { + t.Fatalf("expected nil error, got %v", err) + } + }) + } +} func Test_parseSourceDestinationMapping(t *testing.T) { tests := []struct { name string