Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 68 additions & 16 deletions cmd/transfer-pvc/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type rsyncLogStream struct {
outputFile *string
}

// NewRsyncLogStream creates a log stream reader for the rsync client pod.
func NewRsyncLogStream(restCfg *rest.Config, pvc types.NamespacedName, labels map[string]string, output string) LogStreams {
var outputFile string
if output != "" {
Expand All @@ -47,6 +48,7 @@ func NewRsyncLogStream(restCfg *rest.Config, pvc types.NamespacedName, labels ma
}
}

// Init starts streaming logs from the rsync container and parsing progress.
func (r *rsyncLogStream) Init() error {
r.stdout = make(chan string)
r.stderr = make(chan string)
Expand Down Expand Up @@ -79,20 +81,9 @@ func (r *rsyncLogStream) Init() error {
go func() {
defer podLogStream.Close()
logString := ""
zeroBytes := 0
for {
buf := make([]byte, 32*1024)
n, readErr := podLogStream.Read(buf)
if n > 0 {
zeroBytes = 0
} else {
zeroBytes += 1
}
// sometimes, a stream would end without returning an EOF gracefully
// we force exit the loop when we see null bytes on stream consecutively
if zeroBytes > 4 {
err = io.EOF
}
logString = fmt.Sprintf("%s%s", logString, string(buf[:n]))
if readErr == io.EOF {
err = readErr
Expand All @@ -103,6 +94,8 @@ func (r *rsyncLogStream) Init() error {
}
r.progress.ExitCode = code
logString = finalLogs
} else if readErr != nil {
err = readErr
}
parsedProgress, unparsed := parseRsyncLogs(logString)
r.progress.Merge(parsedProgress)
Expand Down Expand Up @@ -135,6 +128,7 @@ func (r *rsyncLogStream) Init() error {
return nil
}

// writeProgressToFile serializes final transfer progress to a file.
func writeProgressToFile(o string, p *Progress) error {
file, err := os.OpenFile(o, os.O_CREATE, os.ModePerm)
if err != nil {
Expand All @@ -145,12 +139,14 @@ func writeProgressToFile(o string, p *Progress) error {
return ioutil.WriteFile(o, d, os.ModePerm)
}

// Close closes all rsync log stream channels.
func (r *rsyncLogStream) Close() {
close(r.stdout)
close(r.stderr)
close(r.err)
}

// Streams returns channels for stdout, stderr, and terminal stream errors.
func (r *rsyncLogStream) Streams() (stdout chan string, stderr chan string, err chan error) {
return r.stdout, r.stderr, r.err
}
Expand Down Expand Up @@ -187,6 +183,7 @@ type dataSize struct {
unit string
}

// addDataSize adds two sizes by normalizing units when needed.
func addDataSize(a, b *dataSize) *dataSize {
if b == nil {
return nil
Expand All @@ -212,10 +209,12 @@ func addDataSize(a, b *dataSize) *dataSize {
return newDs
}

// String formats a data size as "<value> <unit>".
func (d *dataSize) String() string {
return fmt.Sprintf("%.2f %s", d.val, d.unit)
}

// MarshalJSON marshals dataSize as its human-readable string value.
func (d *dataSize) MarshalJSON() ([]byte, error) {
return json.Marshal(d.String())
}
Expand Down Expand Up @@ -301,7 +300,7 @@ func (p *Progress) Status() status {
return succeeded
}
if p.TransferredFiles == 0 &&
p.TransferredData.val == 0 &&
(p.TransferredData == nil || p.TransferredData.val == 0) &&
p.TotalFiles == nil {
return failed
}
Expand Down Expand Up @@ -476,6 +475,7 @@ func parseRsyncLogs(rawLogs string) (p *Progress, unprocessedData string) {
return p, ""
}

// waitForPodRunning waits until the labeled rsync client pod is ready.
func waitForPodRunning(c *kubernetes.Clientset, namespace string, labels map[string]string) (string, error) {
var podName string
err := wait.PollUntil(time.Second, func() (done bool, err error) {
Expand Down Expand Up @@ -510,11 +510,10 @@ func waitForPodRunning(c *kubernetes.Clientset, namespace string, labels map[str
return podName, err
}

// getFinalPodStatus returns rsync exit code and tail logs after stream completion.
func getFinalPodStatus(c *kubernetes.Clientset, name string, namespace string) (*int32, string, error) {
var exitCode *int32
count := 0
for {
count += 1
for attempt := 0; attempt < 15; attempt++ {
pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, "", err
Expand All @@ -527,9 +526,10 @@ func getFinalPodStatus(c *kubernetes.Clientset, name string, namespace string) (
}
}
}
if count > 5 || exitCode != nil {
if exitCode != nil {
break
}
time.Sleep(time.Second)
}

lastLines := int64(35)
Expand All @@ -553,3 +553,55 @@ func getFinalPodStatus(c *kubernetes.Clientset, name string, namespace string) (

return exitCode, buf.String(), nil
}

// validateTransferCompletion returns an error when transfer did not complete successfully.
func validateTransferCompletion(p *Progress) error {
if p == nil {
return fmt.Errorf("transfer progress unavailable")
}

st := p.Status()
if st == succeeded {
return nil
}

if !st.Completed() {
return fmt.Errorf("transfer did not complete, final status: %s", st)
}

details := []string{fmt.Sprintf("status=%s", st)}
if p.ExitCode != nil {
details = append(details, fmt.Sprintf("exitCode=%d", *p.ExitCode))
}
if len(p.FailedFiles) > 0 {
details = append(details, fmt.Sprintf("failedFiles=%d", len(p.FailedFiles)))
details = append(details, fmt.Sprintf("failedFilesSample=%s", formatFailedFilesSample(p.FailedFiles, 3)))
}
if len(p.Errors) > 0 {
details = append(details, fmt.Sprintf("errors=%d", len(p.Errors)))
details = append(details, fmt.Sprintf("errorsSample=%s", strings.Join(p.Errors[:minInt(len(p.Errors), 3)], "; ")))
}

return fmt.Errorf("rsync transfer failed: %s", strings.Join(details, ", "))
}

// formatFailedFilesSample returns a concise sample of failed files for error messages.
func formatFailedFilesSample(files []FailedFile, max int) string {
if len(files) == 0 {
return ""
}
limit := minInt(len(files), max)
items := make([]string, 0, limit)
for i := 0; i < limit; i++ {
items = append(items, fmt.Sprintf("%s (%s)", files[i].Name, files[i].Err))
}
return strings.Join(items, "; ")
}

// minInt returns the smaller of two integers.
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
12 changes: 10 additions & 2 deletions cmd/transfer-pvc/transfer-pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func (t *TransferPVCCommand) run() error {
err = followClientLogs(
srcCfg, types.NamespacedName{Name: srcPVC.Name, Namespace: srcPVC.Namespace}, labels, t.ProgressOutput)
if err != nil {
log.Fatal(err, "error following rsync client logs")
log.Fatalf("error following rsync client logs: %v", err)
}

return garbageCollect(srcClient, destClient, labels, t.Endpoint.Type, t.PVC.Namespace)
Expand Down Expand Up @@ -646,6 +646,7 @@ type LogStreams interface {
Close()
}

// followClientLogs streams rsync client logs and validates final transfer status.
func followClientLogs(srcConfig *rest.Config, pvc types.NamespacedName, labels map[string]string, outputFile string) error {
logReader := NewRsyncLogStream(srcConfig, pvc, labels, outputFile)
err := logReader.Init()
Expand All @@ -671,7 +672,14 @@ func followClientLogs(srcConfig *rest.Config, pvc types.NamespacedName, labels m
break
}
}
return err
if err != nil {
return err
}
rsyncReader, ok := logReader.(*rsyncLogStream)
if !ok {
return fmt.Errorf("unexpected log reader type %T", logReader)
}
return validateTransferCompletion(rsyncReader.progress)
}

// waitForEndpoint waits for endpoint to become ready
Expand Down
61 changes: 61 additions & 0 deletions cmd/transfer-pvc/transfer-pvc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,67 @@ import (
"testing"
)

func Test_validateTransferCompletion(t *testing.T) {
int100 := int64(100)
exit0 := int32(0)
exit23 := int32(23)

tests := []struct {
name string
p *Progress
wantErr bool
}{
{
name: "succeeded",
p: &Progress{
ExitCode: &exit0,
TransferPercentage: &int100,
TransferredData: &dataSize{val: 10, unit: "M"},
},
wantErr: false,
},
{
name: "failed",
p: &Progress{
ExitCode: &exit23,
TransferredFiles: 0,
FailedFiles: []FailedFile{
{Name: "/data/a", Err: "Permission denied (13)"},
},
},
wantErr: true,
},
{
name: "partially failed",
p: &Progress{
ExitCode: &exit23,
TransferredFiles: 1,
TransferredData: &dataSize{val: 1, unit: "M"},
FailedFiles: []FailedFile{
{Name: "/data/journal", Err: "Permission denied (13)"},
},
},
wantErr: true,
},
{
name: "nil progress",
p: nil,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateTransferCompletion(tt.p)
if tt.wantErr && err == nil {
t.Fatalf("expected error, got nil")
}
if !tt.wantErr && err != nil {
t.Fatalf("expected nil error, got %v", err)
}
})
}
}
func Test_parseSourceDestinationMapping(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading