From 257ce5163a5efa2dc469134588e1a1ce968e4181 Mon Sep 17 00:00:00 2001 From: M Sajid Mansoori Date: Wed, 22 Apr 2026 18:05:03 +0530 Subject: [PATCH 1/6] add mtc-199 data validation skeleton Signed-off-by: M Sajid Mansoori --- .../tests/mtc_199_data_validation_test.go | 127 ++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 e2e-tests/tests/mtc_199_data_validation_test.go diff --git a/e2e-tests/tests/mtc_199_data_validation_test.go b/e2e-tests/tests/mtc_199_data_validation_test.go new file mode 100644 index 00000000..0ab44787 --- /dev/null +++ b/e2e-tests/tests/mtc_199_data_validation_test.go @@ -0,0 +1,127 @@ +package e2e + +import ( + "fmt" + "log" + + "github.com/konveyor/crane/e2e-tests/config" + . "github.com/konveyor/crane/e2e-tests/framework" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Data validation", func() { + + It("[BUG #213][MTC-199] Should validate data", Label("BUG #213", "tier0"), func() { + appName := "mysql" + namespace := appName + scenario := NewMigrationScenario( + appName, + namespace, + config.K8sDeployBin, + config.CraneBin, + config.SourceContext, + config.TargetContext, + ) + if scenario.KubectlSrcNonAdmin.Context == "" { + Skip("source-nonadmin-context is required for non-admin stateful migration test") + } + if scenario.KubectlTgtNonAdmin.Context == "" { + Skip("target-nonadmin-context is required for non-admin stateful migration test") + } + srcApp := scenario.SrcAppNonAdmin + tgtApp := scenario.TgtAppNonAdmin + runner := scenario.CraneNonAdmin + srcApp.ExtraVars = map[string]any{ + "non_admin_user": "true", + } + tgtApp.ExtraVars = map[string]any{ + "non_admin_user": "true", + } + + By("Grant namespace admin permissions to nonadmin user on source and target") + kubectlSrcNonAdmin, kubectlTgtNonAdmin, cleanup, err := SetupNamespaceAdminUsersForScenario(scenario, namespace) + Expect(err).NotTo(HaveOccurred()) + + DeferCleanup(func() { + By("Delete test namespace on source and target (wait for completion)") + for _, k := range []KubectlRunner{scenario.KubectlSrc, scenario.KubectlTgt} { + if _, err := k.Run("delete", "namespace", namespace, "--ignore-not-found=true", "--wait=true"); err != nil { + log.Printf("cleanup: failed to delete namespace %q on context %q: %v", namespace, k.Context, err) + } + } + }) + DeferCleanup(cleanup) + + By("Deploy and validate source MySQL app") + log.Printf("Deploying %s in namespace %s on source cluster", appName, namespace) + Expect(PrepareSourceApp(srcApp, kubectlSrcNonAdmin)).NotTo(HaveOccurred()) + log.Printf("Source app deployed successfully") + + paths, err := NewScenarioPaths("crane-export-*") + Expect(err).NotTo(HaveOccurred()) + + DeferCleanup(func() { + By("Cleanup source and target resources") + if err := CleanupScenario(paths.TempDir, srcApp, tgtApp); err != nil { + log.Printf("cleanup: %v", err) + } + }) + By("List pvcs in the namespace") + pvcs, err := ListPVCs(srcApp.Namespace, "", srcApp.Context) + Expect(err).NotTo(HaveOccurred()) + Expect(pvcs).NotTo(BeEmpty(), "expected at least one pvc in namespace %q", srcApp.Namespace) + log.Printf("Found %d pvcs in namespace %q", len(pvcs), srcApp.Namespace) + for _, pvc := range pvcs { + log.Printf("Found pvc %s in namespace %q\n", pvc.Name, pvc.Namespace) + } + By("Run crane export/transform/apply pipeline") + log.Printf("Running crane pipeline for namespace %s\n", srcApp.Namespace) + runner.WorkDir = paths.TempDir + Expect(RunCranePipelineWithChecks(runner, srcApp.Namespace, paths)).NotTo(HaveOccurred()) + log.Printf("Crane pipeline completed for namespace %s\n", srcApp.Namespace) + + By("Create ns on target cluster") + log.Printf("Creating namespace %s on target cluster", namespace) + Expect(scenario.KubectlTgt.CreateNamespace(namespace)).NotTo(HaveOccurred()) + log.Printf("Namespace %s created successfully on target cluster", namespace) + + By("Transfer PVCs") + tgtIP, err := GetClusterNodeIP(scenario.TgtApp.Context) + Expect(err).NotTo(HaveOccurred()) + for _, pvc := range pvcs { + pvcName := pvc.Name + opts := TransferPVCOptions{ + SourceContext: srcApp.Context, + TargetContext: tgtApp.Context, + PVCName: pvcName, + PVCNamespaceMap: fmt.Sprintf("%s:%s", srcApp.Namespace, tgtApp.Namespace), + Endpoint: "nginx-ingress", + IngressClass: "nginx", + Subdomain: fmt.Sprintf("%s.%s.%s.nip.io", pvcName, srcApp.Namespace, tgtIP), + } + log.Printf("Transferring PVC %s to namespace %s on target cluster", pvcName, tgtApp.Namespace) + Expect(runner.TransferPVC(opts)).NotTo(HaveOccurred()) + log.Printf("PVC transfer complete : %s -> namespace %s", pvcName, tgtApp.Namespace) + } + + By("List PVCs on target cluster") + tgtpvcs, err := ListPVCs(tgtApp.Namespace, "", tgtApp.Context) + Expect(err).NotTo(HaveOccurred()) + Expect(tgtpvcs).NotTo(BeEmpty(), "expected at least one PVC in target namespace %q", tgtApp.Namespace) + log.Printf("Found %d PVCs in target namespace %q", len(tgtpvcs), tgtApp.Namespace) + + By("Apply rendered manifests to target") + log.Printf("Applying rendered manifests on target namespace %s from %s\n", tgtApp.Namespace, paths.OutputDir) + Expect(ApplyOutputToTargetNonAdmin(kubectlTgtNonAdmin, paths.OutputDir)).NotTo(HaveOccurred()) + + By("Scale target deployment and validate app") + log.Printf("Scaling target deployment(s) with label app=%s to 1\n", appName) + Expect(kubectlTgtNonAdmin.ScaleDeployment(namespace, appName, 1)).NotTo(HaveOccurred()) + + By("Validate target application") + log.Printf("Validating app %s on target cluster\n", tgtApp.Name) + Eventually(tgtApp.Validate, "2m", "10s").Should(Succeed()) + log.Printf("Target validation completed for app %s\n", tgtApp.Name) + }) +}) From e34383121841d2580823b0dae037ad445c793501 Mon Sep 17 00:00:00 2001 From: M Sajid Mansoori Date: Thu, 23 Apr 2026 11:53:01 +0530 Subject: [PATCH 2/6] Updated test with workarounds for bug in transfer-pvc Signed-off-by: M Sajid Mansoori --- .../tests/mtc_199_data_validation_test.go | 237 +++++++++++++++++- 1 file changed, 231 insertions(+), 6 deletions(-) diff --git a/e2e-tests/tests/mtc_199_data_validation_test.go b/e2e-tests/tests/mtc_199_data_validation_test.go index 0ab44787..c995a687 100644 --- a/e2e-tests/tests/mtc_199_data_validation_test.go +++ b/e2e-tests/tests/mtc_199_data_validation_test.go @@ -3,6 +3,8 @@ package e2e import ( "fmt" "log" + "strconv" + "strings" "github.com/konveyor/crane/e2e-tests/config" . "github.com/konveyor/crane/e2e-tests/framework" @@ -10,6 +12,127 @@ import ( . "github.com/onsi/gomega" ) +func buildPVCFixPodSpec(namespace, podName string, pvcNames []string) string { + var b strings.Builder + b.WriteString("apiVersion: v1\n") + b.WriteString("kind: Pod\n") + b.WriteString("metadata:\n") + b.WriteString(fmt.Sprintf(" name: %s\n", podName)) + b.WriteString(fmt.Sprintf(" namespace: %s\n", namespace)) + b.WriteString("spec:\n") + b.WriteString(" restartPolicy: Never\n") + b.WriteString(" containers:\n") + b.WriteString(" - name: fix\n") + b.WriteString(" image: busybox\n") + b.WriteString(" command: [\"sh\", \"-c\", \"sleep 300\"]\n") + b.WriteString(" securityContext:\n") + b.WriteString(" runAsUser: 0\n") + b.WriteString(" volumeMounts:\n") + for i := range pvcNames { + b.WriteString(fmt.Sprintf(" - name: pvc-%d\n", i)) + b.WriteString(fmt.Sprintf(" mountPath: /mnt/pvc-%d\n", i)) + } + b.WriteString(" volumes:\n") + for i, pvcName := range pvcNames { + b.WriteString(fmt.Sprintf(" - name: pvc-%d\n", i)) + b.WriteString(" persistentVolumeClaim:\n") + b.WriteString(fmt.Sprintf(" claimName: %s\n", pvcName)) + } + return b.String() +} + +func runPVCFixCommands(k KubectlRunner, namespace, podName string, pvcNames []string, commands []string) error { + if len(pvcNames) == 0 { + return fmt.Errorf("no PVCs provided for fix pod") + } + spec := buildPVCFixPodSpec(namespace, podName, pvcNames) + if err := k.ApplyYAMLSpec(spec, namespace); err != nil { + return fmt.Errorf("create pvc fix pod %q: %w", podName, err) + } + defer func() { + if _, err := k.Run("delete", "pod", podName, "-n", namespace, "--ignore-not-found=true", "--wait=true"); err != nil { + log.Printf("cleanup: failed to delete pvc fix pod %q: %v", podName, err) + } + }() + + if _, err := k.Run( + "wait", "pod", podName, + "-n", namespace, + "--for=condition=Ready", + "--timeout=90s", + ); err != nil { + return fmt.Errorf("wait for pvc fix pod %q ready: %w", podName, err) + } + + cmd := strings.Join(commands, " && ") + if _, err := k.Run("exec", podName, "-n", namespace, "--", "sh", "-c", cmd); err != nil { + return fmt.Errorf("run pvc fix commands in %q: %w", podName, err) + } + return nil +} + +func getPodNameByLabel(k KubectlRunner, namespace, selector string) (string, error) { + out, err := k.Run( + "get", "pod", + "-n", namespace, + "-l", selector, + "-o", "jsonpath={.items[0].metadata.name}", + ) + if err != nil { + return "", err + } + podName := strings.TrimSpace(out) + if podName == "" { + return "", fmt.Errorf("no pod found for selector %q in namespace %q", selector, namespace) + } + return podName, nil +} + +func mysqlAuthorsCount(k KubectlRunner, namespace, podName string) (int, error) { + out, err := k.Run( + "exec", podName, "-n", namespace, "--", + "sh", "-c", + `MYSQL_PWD="$MYSQL_PASSWORD" mysql -N -B -u"$MYSQL_USER" "$MYSQL_DATABASE" -e "SELECT COUNT(*) FROM authors;"`, + ) + if err != nil { + return 0, err + } + count, err := strconv.Atoi(strings.TrimSpace(out)) + if err != nil { + return 0, fmt.Errorf("parse authors count %q: %w", strings.TrimSpace(out), err) + } + return count, nil +} + +func waitForMySQLSocket(k KubectlRunner, namespace, podName string) error { + _, err := k.Run( + "exec", podName, "-n", namespace, "--", + "sh", "-c", + `test -S /var/lib/mysql/mysql.sock`, + ) + return err +} + +func mysqlTestDataMD5(k KubectlRunner, namespace, podName string) (actual string, expected string, _ error) { + actualOut, err := k.Run( + "exec", podName, "-n", namespace, "--", + "sh", "-c", + `md5sum /test-data/test1 | awk '{print $1}'`, + ) + if err != nil { + return "", "", err + } + expectedOut, err := k.Run( + "exec", podName, "-n", namespace, "--", + "sh", "-c", + `awk '{print $1}' /test-data/test1.md5`, + ) + if err != nil { + return "", "", err + } + return strings.TrimSpace(actualOut), strings.TrimSpace(expectedOut), nil +} + var _ = Describe("Data validation", func() { It("[BUG #213][MTC-199] Should validate data", Label("BUG #213", "tier0"), func() { @@ -55,8 +178,24 @@ var _ = Describe("Data validation", func() { By("Deploy and validate source MySQL app") log.Printf("Deploying %s in namespace %s on source cluster", appName, namespace) - Expect(PrepareSourceApp(srcApp, kubectlSrcNonAdmin)).NotTo(HaveOccurred()) + Expect(PrepareSourceAppNoQuiesce(srcApp)).NotTo(HaveOccurred()) log.Printf("Source app deployed successfully") + By("Capture source data fingerprints for comparison") + srcPodName, err := getPodNameByLabel(kubectlSrcNonAdmin, srcApp.Namespace, "app="+appName) + Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + return waitForMySQLSocket(kubectlSrcNonAdmin, srcApp.Namespace, srcPodName) + }, "2m", "5s").Should(Succeed()) + sourceAuthorsCount, err := mysqlAuthorsCount(kubectlSrcNonAdmin, srcApp.Namespace, srcPodName) + Expect(err).NotTo(HaveOccurred()) + sourceMD5Actual, sourceMD5Expected, err := mysqlTestDataMD5(kubectlSrcNonAdmin, srcApp.Namespace, srcPodName) + Expect(err).NotTo(HaveOccurred()) + log.Printf("Source validation output: pod=%s authors_count=%d md5_actual=%s md5_expected=%s", srcPodName, sourceAuthorsCount, sourceMD5Actual, sourceMD5Expected) + Expect(sourceMD5Actual).To(Equal(sourceMD5Expected), "source test-data checksum should match its md5 file") + log.Printf("Source fingerprints: authors=%d md5=%s", sourceAuthorsCount, sourceMD5Actual) + + By("Quiesce source app before export") + Expect(kubectlSrcNonAdmin.ScaleDeploymentIfPresent(srcApp.Namespace, srcApp.Name, 0)).NotTo(HaveOccurred()) paths, err := NewScenarioPaths("crane-export-*") Expect(err).NotTo(HaveOccurred()) @@ -72,20 +211,48 @@ var _ = Describe("Data validation", func() { Expect(err).NotTo(HaveOccurred()) Expect(pvcs).NotTo(BeEmpty(), "expected at least one pvc in namespace %q", srcApp.Namespace) log.Printf("Found %d pvcs in namespace %q", len(pvcs), srcApp.Namespace) + pvcNames := make([]string, 0, len(pvcs)) for _, pvc := range pvcs { log.Printf("Found pvc %s in namespace %q\n", pvc.Name, pvc.Namespace) + pvcNames = append(pvcNames, pvc.Name) + } + + By("Wait for source quiesce to stabilize before export") + Eventually(func() (string, error) { + out, err := kubectlSrcNonAdmin.Run( + "get", "pods", + "--namespace", namespace, + "-l", "app="+appName, + "-o", "name", + ) + if err != nil { + return "", err + } + return strings.TrimSpace(out), nil + }, "90s", "3s").Should(BeEmpty()) + + By("[BUG #213] Relax source PVC permissions before transfer") + sourceFixCommands := make([]string, 0, len(pvcNames)*2) + for i := range pvcNames { + sourceFixCommands = append(sourceFixCommands, + fmt.Sprintf("if [ -d /mnt/pvc-%d/data ]; then chmod -R a+rX /mnt/pvc-%d/data; fi", i, i), + fmt.Sprintf("chmod -R a+rX /mnt/pvc-%d", i), + ) } + Expect(runPVCFixCommands( + kubectlSrcNonAdmin, + srcApp.Namespace, + "mysql-source-pvc-perm-fix", + pvcNames, + sourceFixCommands, + )).NotTo(HaveOccurred()) + By("Run crane export/transform/apply pipeline") log.Printf("Running crane pipeline for namespace %s\n", srcApp.Namespace) runner.WorkDir = paths.TempDir Expect(RunCranePipelineWithChecks(runner, srcApp.Namespace, paths)).NotTo(HaveOccurred()) log.Printf("Crane pipeline completed for namespace %s\n", srcApp.Namespace) - By("Create ns on target cluster") - log.Printf("Creating namespace %s on target cluster", namespace) - Expect(scenario.KubectlTgt.CreateNamespace(namespace)).NotTo(HaveOccurred()) - log.Printf("Namespace %s created successfully on target cluster", namespace) - By("Transfer PVCs") tgtIP, err := GetClusterNodeIP(scenario.TgtApp.Context) Expect(err).NotTo(HaveOccurred()) @@ -105,10 +272,34 @@ var _ = Describe("Data validation", func() { log.Printf("PVC transfer complete : %s -> namespace %s", pvcName, tgtApp.Namespace) } + By("[BUG #213] Restore destination PVC ownership for mysql runtime user") + targetFixCommands := make([]string, 0, len(pvcNames)*2) + for i := range pvcNames { + targetFixCommands = append(targetFixCommands, + fmt.Sprintf("if [ -d /mnt/pvc-%d/data ]; then chown -R 27:27 /mnt/pvc-%d/data && chmod -R u+rwX /mnt/pvc-%d/data; fi", i, i, i), + fmt.Sprintf("chown -R 27:27 /mnt/pvc-%d && chmod -R u+rwX /mnt/pvc-%d", i, i), + ) + } + Expect(runPVCFixCommands( + kubectlTgtNonAdmin, + tgtApp.Namespace, + "mysql-target-pvc-owner-fix", + pvcNames, + targetFixCommands, + )).NotTo(HaveOccurred()) + By("List PVCs on target cluster") tgtpvcs, err := ListPVCs(tgtApp.Namespace, "", tgtApp.Context) Expect(err).NotTo(HaveOccurred()) Expect(tgtpvcs).NotTo(BeEmpty(), "expected at least one PVC in target namespace %q", tgtApp.Namespace) + tgtPVCSet := make(map[string]struct{}, len(tgtpvcs)) + for _, pvc := range tgtpvcs { + tgtPVCSet[pvc.Name] = struct{}{} + } + for _, pvcName := range pvcNames { + _, exists := tgtPVCSet[pvcName] + Expect(exists).To(BeTrue(), "expected transferred PVC %q to exist in target namespace %q", pvcName, tgtApp.Namespace) + } log.Printf("Found %d PVCs in target namespace %q", len(tgtpvcs), tgtApp.Namespace) By("Apply rendered manifests to target") @@ -122,6 +313,40 @@ var _ = Describe("Data validation", func() { By("Validate target application") log.Printf("Validating app %s on target cluster\n", tgtApp.Name) Eventually(tgtApp.Validate, "2m", "10s").Should(Succeed()) + var tgtPodName string + Eventually(func() error { + podName, err := getPodNameByLabel(kubectlTgtNonAdmin, tgtApp.Namespace, "app="+appName) + if err != nil { + return err + } + tgtPodName = podName + out, err := kubectlTgtNonAdmin.Run( + "get", "pod", tgtPodName, + "-n", tgtApp.Namespace, + "-o", "jsonpath={.status.containerStatuses[0].ready}", + ) + if err != nil { + return err + } + if strings.TrimSpace(out) != "true" { + return fmt.Errorf("pod %s is not ready yet", tgtPodName) + } + return nil + }, "2m", "10s").Should(Succeed()) + Eventually(func() error { + return waitForMySQLSocket(kubectlTgtNonAdmin, tgtApp.Namespace, tgtPodName) + }, "2m", "5s").Should(Succeed()) + + targetAuthorsCount, err := mysqlAuthorsCount(kubectlTgtNonAdmin, tgtApp.Namespace, tgtPodName) + Expect(err).NotTo(HaveOccurred()) + targetMD5Actual, targetMD5Expected, err := mysqlTestDataMD5(kubectlTgtNonAdmin, tgtApp.Namespace, tgtPodName) + Expect(err).NotTo(HaveOccurred()) + log.Printf("Target validation output: pod=%s authors_count=%d md5_actual=%s md5_expected=%s", tgtPodName, targetAuthorsCount, targetMD5Actual, targetMD5Expected) + Expect(targetMD5Actual).To(Equal(targetMD5Expected), "target test-data checksum should match its md5 file") + + Expect(targetAuthorsCount).To(Equal(sourceAuthorsCount), "authors count should match between source and target") + Expect(targetMD5Actual).To(Equal(sourceMD5Actual), "test-data md5 should match between source and target") + log.Printf("Target fingerprints: authors=%d md5=%s", targetAuthorsCount, targetMD5Actual) log.Printf("Target validation completed for app %s\n", tgtApp.Name) }) }) From e34905a761aa6720355e5a750c5cfeae4151deaa Mon Sep 17 00:00:00 2001 From: M Sajid Mansoori Date: Thu, 23 Apr 2026 13:20:50 +0530 Subject: [PATCH 3/6] Add comments Signed-off-by: M Sajid Mansoori --- e2e-tests/tests/mtc_199_data_validation_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/e2e-tests/tests/mtc_199_data_validation_test.go b/e2e-tests/tests/mtc_199_data_validation_test.go index c995a687..adcae71e 100644 --- a/e2e-tests/tests/mtc_199_data_validation_test.go +++ b/e2e-tests/tests/mtc_199_data_validation_test.go @@ -231,6 +231,8 @@ var _ = Describe("Data validation", func() { return strings.TrimSpace(out), nil }, "90s", "3s").Should(BeEmpty()) + // Temporary workaround for BUG #213: remove once issue is fixed. + // See: https://github.com/migtools/crane/issues/213 By("[BUG #213] Relax source PVC permissions before transfer") sourceFixCommands := make([]string, 0, len(pvcNames)*2) for i := range pvcNames { @@ -272,6 +274,8 @@ var _ = Describe("Data validation", func() { log.Printf("PVC transfer complete : %s -> namespace %s", pvcName, tgtApp.Namespace) } + // Temporary workaround for BUG #213: remove once issue is fixed. + // See: https://github.com/migtools/crane/issues/213 By("[BUG #213] Restore destination PVC ownership for mysql runtime user") targetFixCommands := make([]string, 0, len(pvcNames)*2) for i := range pvcNames { From 1e7dc76dea072a0bc568ad1d4fc0474f82083b65 Mon Sep 17 00:00:00 2001 From: M Sajid Mansoori Date: Thu, 23 Apr 2026 16:03:27 +0530 Subject: [PATCH 4/6] Add validateTransferCompletion and clear error messages Signed-off-by: M Sajid Mansoori --- cmd/transfer-pvc/progress.go | 51 +++++++++++++++++++++- cmd/transfer-pvc/transfer-pvc.go | 9 +++- cmd/transfer-pvc/transfer-pvc_test.go | 61 +++++++++++++++++++++++++++ 3 files changed, 119 insertions(+), 2 deletions(-) diff --git a/cmd/transfer-pvc/progress.go b/cmd/transfer-pvc/progress.go index ab1ea1cf..231288ff 100644 --- a/cmd/transfer-pvc/progress.go +++ b/cmd/transfer-pvc/progress.go @@ -301,7 +301,7 @@ func (p *Progress) Status() status { return succeeded } if p.TransferredFiles == 0 && - p.TransferredData.val == 0 && + (p.TransferredData == nil || p.TransferredData.val == 0) && p.TotalFiles == nil { return failed } @@ -553,3 +553,52 @@ func getFinalPodStatus(c *kubernetes.Clientset, name string, namespace string) ( return exitCode, buf.String(), nil } + +func validateTransferCompletion(p *Progress) error { + if p == nil { + return fmt.Errorf("transfer progress unavailable") + } + + st := p.Status() + if st == succeeded { + return nil + } + + if !st.Completed() { + return fmt.Errorf("transfer did not complete, final status: %s", st) + } + + details := []string{fmt.Sprintf("status=%s", st)} + if p.ExitCode != nil { + details = append(details, fmt.Sprintf("exitCode=%d", *p.ExitCode)) + } + if len(p.FailedFiles) > 0 { + details = append(details, fmt.Sprintf("failedFiles=%d", len(p.FailedFiles))) + details = append(details, fmt.Sprintf("failedFilesSample=%s", formatFailedFilesSample(p.FailedFiles, 3))) + } + if len(p.Errors) > 0 { + details = append(details, fmt.Sprintf("errors=%d", len(p.Errors))) + details = append(details, fmt.Sprintf("errorsSample=%s", strings.Join(p.Errors[:minInt(len(p.Errors), 3)], "; "))) + } + + return fmt.Errorf("rsync transfer failed: %s", strings.Join(details, ", ")) +} + +func formatFailedFilesSample(files []FailedFile, max int) string { + if len(files) == 0 { + return "" + } + limit := minInt(len(files), max) + items := make([]string, 0, limit) + for i := 0; i < limit; i++ { + items = append(items, fmt.Sprintf("%s (%s)", files[i].Name, files[i].Err)) + } + return strings.Join(items, "; ") +} + +func minInt(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/cmd/transfer-pvc/transfer-pvc.go b/cmd/transfer-pvc/transfer-pvc.go index 1fd3e04d..1344d4cb 100644 --- a/cmd/transfer-pvc/transfer-pvc.go +++ b/cmd/transfer-pvc/transfer-pvc.go @@ -671,7 +671,14 @@ func followClientLogs(srcConfig *rest.Config, pvc types.NamespacedName, labels m break } } - return err + if err != nil { + return err + } + rsyncReader, ok := logReader.(*rsyncLogStream) + if !ok { + return fmt.Errorf("unexpected log reader type %T", logReader) + } + return validateTransferCompletion(rsyncReader.progress) } // waitForEndpoint waits for endpoint to become ready diff --git a/cmd/transfer-pvc/transfer-pvc_test.go b/cmd/transfer-pvc/transfer-pvc_test.go index b07b78d8..677a4086 100644 --- a/cmd/transfer-pvc/transfer-pvc_test.go +++ b/cmd/transfer-pvc/transfer-pvc_test.go @@ -4,6 +4,67 @@ import ( "testing" ) +func Test_validateTransferCompletion(t *testing.T) { + int100 := int64(100) + exit0 := int32(0) + exit23 := int32(23) + + tests := []struct { + name string + p *Progress + wantErr bool + }{ + { + name: "succeeded", + p: &Progress{ + ExitCode: &exit0, + TransferPercentage: &int100, + TransferredData: &dataSize{val: 10, unit: "M"}, + }, + wantErr: false, + }, + { + name: "failed", + p: &Progress{ + ExitCode: &exit23, + TransferredFiles: 0, + FailedFiles: []FailedFile{ + {Name: "/data/a", Err: "Permission denied (13)"}, + }, + }, + wantErr: true, + }, + { + name: "partially failed", + p: &Progress{ + ExitCode: &exit23, + TransferredFiles: 1, + TransferredData: &dataSize{val: 1, unit: "M"}, + FailedFiles: []FailedFile{ + {Name: "/data/journal", Err: "Permission denied (13)"}, + }, + }, + wantErr: true, + }, + { + name: "nil progress", + p: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateTransferCompletion(tt.p) + if tt.wantErr && err == nil { + t.Fatalf("expected error, got nil") + } + if !tt.wantErr && err != nil { + t.Fatalf("expected nil error, got %v", err) + } + }) + } +} func Test_parseSourceDestinationMapping(t *testing.T) { tests := []struct { name string From a67cc78eac3fb6332e81cd1352af077317bc0528 Mon Sep 17 00:00:00 2001 From: M Sajid Mansoori Date: Thu, 23 Apr 2026 16:34:41 +0530 Subject: [PATCH 5/6] update to explicitly throw error on pvc transfer failure not exit silently Signed-off-by: M Sajid Mansoori --- cmd/transfer-pvc/progress.go | 13 +- cmd/transfer-pvc/transfer-pvc.go | 2 +- .../tests/mtc_199_data_validation_test.go | 356 ------------------ 3 files changed, 3 insertions(+), 368 deletions(-) delete mode 100644 e2e-tests/tests/mtc_199_data_validation_test.go diff --git a/cmd/transfer-pvc/progress.go b/cmd/transfer-pvc/progress.go index 231288ff..227e8226 100644 --- a/cmd/transfer-pvc/progress.go +++ b/cmd/transfer-pvc/progress.go @@ -79,20 +79,9 @@ func (r *rsyncLogStream) Init() error { go func() { defer podLogStream.Close() logString := "" - zeroBytes := 0 for { buf := make([]byte, 32*1024) n, readErr := podLogStream.Read(buf) - if n > 0 { - zeroBytes = 0 - } else { - zeroBytes += 1 - } - // sometimes, a stream would end without returning an EOF gracefully - // we force exit the loop when we see null bytes on stream consecutively - if zeroBytes > 4 { - err = io.EOF - } logString = fmt.Sprintf("%s%s", logString, string(buf[:n])) if readErr == io.EOF { err = readErr @@ -103,6 +92,8 @@ func (r *rsyncLogStream) Init() error { } r.progress.ExitCode = code logString = finalLogs + } else if readErr != nil { + err = readErr } parsedProgress, unparsed := parseRsyncLogs(logString) r.progress.Merge(parsedProgress) diff --git a/cmd/transfer-pvc/transfer-pvc.go b/cmd/transfer-pvc/transfer-pvc.go index 1344d4cb..a0092a08 100644 --- a/cmd/transfer-pvc/transfer-pvc.go +++ b/cmd/transfer-pvc/transfer-pvc.go @@ -474,7 +474,7 @@ func (t *TransferPVCCommand) run() error { err = followClientLogs( srcCfg, types.NamespacedName{Name: srcPVC.Name, Namespace: srcPVC.Namespace}, labels, t.ProgressOutput) if err != nil { - log.Fatal(err, "error following rsync client logs") + log.Fatalf("error following rsync client logs: %v", err) } return garbageCollect(srcClient, destClient, labels, t.Endpoint.Type, t.PVC.Namespace) diff --git a/e2e-tests/tests/mtc_199_data_validation_test.go b/e2e-tests/tests/mtc_199_data_validation_test.go deleted file mode 100644 index adcae71e..00000000 --- a/e2e-tests/tests/mtc_199_data_validation_test.go +++ /dev/null @@ -1,356 +0,0 @@ -package e2e - -import ( - "fmt" - "log" - "strconv" - "strings" - - "github.com/konveyor/crane/e2e-tests/config" - . "github.com/konveyor/crane/e2e-tests/framework" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -func buildPVCFixPodSpec(namespace, podName string, pvcNames []string) string { - var b strings.Builder - b.WriteString("apiVersion: v1\n") - b.WriteString("kind: Pod\n") - b.WriteString("metadata:\n") - b.WriteString(fmt.Sprintf(" name: %s\n", podName)) - b.WriteString(fmt.Sprintf(" namespace: %s\n", namespace)) - b.WriteString("spec:\n") - b.WriteString(" restartPolicy: Never\n") - b.WriteString(" containers:\n") - b.WriteString(" - name: fix\n") - b.WriteString(" image: busybox\n") - b.WriteString(" command: [\"sh\", \"-c\", \"sleep 300\"]\n") - b.WriteString(" securityContext:\n") - b.WriteString(" runAsUser: 0\n") - b.WriteString(" volumeMounts:\n") - for i := range pvcNames { - b.WriteString(fmt.Sprintf(" - name: pvc-%d\n", i)) - b.WriteString(fmt.Sprintf(" mountPath: /mnt/pvc-%d\n", i)) - } - b.WriteString(" volumes:\n") - for i, pvcName := range pvcNames { - b.WriteString(fmt.Sprintf(" - name: pvc-%d\n", i)) - b.WriteString(" persistentVolumeClaim:\n") - b.WriteString(fmt.Sprintf(" claimName: %s\n", pvcName)) - } - return b.String() -} - -func runPVCFixCommands(k KubectlRunner, namespace, podName string, pvcNames []string, commands []string) error { - if len(pvcNames) == 0 { - return fmt.Errorf("no PVCs provided for fix pod") - } - spec := buildPVCFixPodSpec(namespace, podName, pvcNames) - if err := k.ApplyYAMLSpec(spec, namespace); err != nil { - return fmt.Errorf("create pvc fix pod %q: %w", podName, err) - } - defer func() { - if _, err := k.Run("delete", "pod", podName, "-n", namespace, "--ignore-not-found=true", "--wait=true"); err != nil { - log.Printf("cleanup: failed to delete pvc fix pod %q: %v", podName, err) - } - }() - - if _, err := k.Run( - "wait", "pod", podName, - "-n", namespace, - "--for=condition=Ready", - "--timeout=90s", - ); err != nil { - return fmt.Errorf("wait for pvc fix pod %q ready: %w", podName, err) - } - - cmd := strings.Join(commands, " && ") - if _, err := k.Run("exec", podName, "-n", namespace, "--", "sh", "-c", cmd); err != nil { - return fmt.Errorf("run pvc fix commands in %q: %w", podName, err) - } - return nil -} - -func getPodNameByLabel(k KubectlRunner, namespace, selector string) (string, error) { - out, err := k.Run( - "get", "pod", - "-n", namespace, - "-l", selector, - "-o", "jsonpath={.items[0].metadata.name}", - ) - if err != nil { - return "", err - } - podName := strings.TrimSpace(out) - if podName == "" { - return "", fmt.Errorf("no pod found for selector %q in namespace %q", selector, namespace) - } - return podName, nil -} - -func mysqlAuthorsCount(k KubectlRunner, namespace, podName string) (int, error) { - out, err := k.Run( - "exec", podName, "-n", namespace, "--", - "sh", "-c", - `MYSQL_PWD="$MYSQL_PASSWORD" mysql -N -B -u"$MYSQL_USER" "$MYSQL_DATABASE" -e "SELECT COUNT(*) FROM authors;"`, - ) - if err != nil { - return 0, err - } - count, err := strconv.Atoi(strings.TrimSpace(out)) - if err != nil { - return 0, fmt.Errorf("parse authors count %q: %w", strings.TrimSpace(out), err) - } - return count, nil -} - -func waitForMySQLSocket(k KubectlRunner, namespace, podName string) error { - _, err := k.Run( - "exec", podName, "-n", namespace, "--", - "sh", "-c", - `test -S /var/lib/mysql/mysql.sock`, - ) - return err -} - -func mysqlTestDataMD5(k KubectlRunner, namespace, podName string) (actual string, expected string, _ error) { - actualOut, err := k.Run( - "exec", podName, "-n", namespace, "--", - "sh", "-c", - `md5sum /test-data/test1 | awk '{print $1}'`, - ) - if err != nil { - return "", "", err - } - expectedOut, err := k.Run( - "exec", podName, "-n", namespace, "--", - "sh", "-c", - `awk '{print $1}' /test-data/test1.md5`, - ) - if err != nil { - return "", "", err - } - return strings.TrimSpace(actualOut), strings.TrimSpace(expectedOut), nil -} - -var _ = Describe("Data validation", func() { - - It("[BUG #213][MTC-199] Should validate data", Label("BUG #213", "tier0"), func() { - appName := "mysql" - namespace := appName - scenario := NewMigrationScenario( - appName, - namespace, - config.K8sDeployBin, - config.CraneBin, - config.SourceContext, - config.TargetContext, - ) - if scenario.KubectlSrcNonAdmin.Context == "" { - Skip("source-nonadmin-context is required for non-admin stateful migration test") - } - if scenario.KubectlTgtNonAdmin.Context == "" { - Skip("target-nonadmin-context is required for non-admin stateful migration test") - } - srcApp := scenario.SrcAppNonAdmin - tgtApp := scenario.TgtAppNonAdmin - runner := scenario.CraneNonAdmin - srcApp.ExtraVars = map[string]any{ - "non_admin_user": "true", - } - tgtApp.ExtraVars = map[string]any{ - "non_admin_user": "true", - } - - By("Grant namespace admin permissions to nonadmin user on source and target") - kubectlSrcNonAdmin, kubectlTgtNonAdmin, cleanup, err := SetupNamespaceAdminUsersForScenario(scenario, namespace) - Expect(err).NotTo(HaveOccurred()) - - DeferCleanup(func() { - By("Delete test namespace on source and target (wait for completion)") - for _, k := range []KubectlRunner{scenario.KubectlSrc, scenario.KubectlTgt} { - if _, err := k.Run("delete", "namespace", namespace, "--ignore-not-found=true", "--wait=true"); err != nil { - log.Printf("cleanup: failed to delete namespace %q on context %q: %v", namespace, k.Context, err) - } - } - }) - DeferCleanup(cleanup) - - By("Deploy and validate source MySQL app") - log.Printf("Deploying %s in namespace %s on source cluster", appName, namespace) - Expect(PrepareSourceAppNoQuiesce(srcApp)).NotTo(HaveOccurred()) - log.Printf("Source app deployed successfully") - By("Capture source data fingerprints for comparison") - srcPodName, err := getPodNameByLabel(kubectlSrcNonAdmin, srcApp.Namespace, "app="+appName) - Expect(err).NotTo(HaveOccurred()) - Eventually(func() error { - return waitForMySQLSocket(kubectlSrcNonAdmin, srcApp.Namespace, srcPodName) - }, "2m", "5s").Should(Succeed()) - sourceAuthorsCount, err := mysqlAuthorsCount(kubectlSrcNonAdmin, srcApp.Namespace, srcPodName) - Expect(err).NotTo(HaveOccurred()) - sourceMD5Actual, sourceMD5Expected, err := mysqlTestDataMD5(kubectlSrcNonAdmin, srcApp.Namespace, srcPodName) - Expect(err).NotTo(HaveOccurred()) - log.Printf("Source validation output: pod=%s authors_count=%d md5_actual=%s md5_expected=%s", srcPodName, sourceAuthorsCount, sourceMD5Actual, sourceMD5Expected) - Expect(sourceMD5Actual).To(Equal(sourceMD5Expected), "source test-data checksum should match its md5 file") - log.Printf("Source fingerprints: authors=%d md5=%s", sourceAuthorsCount, sourceMD5Actual) - - By("Quiesce source app before export") - Expect(kubectlSrcNonAdmin.ScaleDeploymentIfPresent(srcApp.Namespace, srcApp.Name, 0)).NotTo(HaveOccurred()) - - paths, err := NewScenarioPaths("crane-export-*") - Expect(err).NotTo(HaveOccurred()) - - DeferCleanup(func() { - By("Cleanup source and target resources") - if err := CleanupScenario(paths.TempDir, srcApp, tgtApp); err != nil { - log.Printf("cleanup: %v", err) - } - }) - By("List pvcs in the namespace") - pvcs, err := ListPVCs(srcApp.Namespace, "", srcApp.Context) - Expect(err).NotTo(HaveOccurred()) - Expect(pvcs).NotTo(BeEmpty(), "expected at least one pvc in namespace %q", srcApp.Namespace) - log.Printf("Found %d pvcs in namespace %q", len(pvcs), srcApp.Namespace) - pvcNames := make([]string, 0, len(pvcs)) - for _, pvc := range pvcs { - log.Printf("Found pvc %s in namespace %q\n", pvc.Name, pvc.Namespace) - pvcNames = append(pvcNames, pvc.Name) - } - - By("Wait for source quiesce to stabilize before export") - Eventually(func() (string, error) { - out, err := kubectlSrcNonAdmin.Run( - "get", "pods", - "--namespace", namespace, - "-l", "app="+appName, - "-o", "name", - ) - if err != nil { - return "", err - } - return strings.TrimSpace(out), nil - }, "90s", "3s").Should(BeEmpty()) - - // Temporary workaround for BUG #213: remove once issue is fixed. - // See: https://github.com/migtools/crane/issues/213 - By("[BUG #213] Relax source PVC permissions before transfer") - sourceFixCommands := make([]string, 0, len(pvcNames)*2) - for i := range pvcNames { - sourceFixCommands = append(sourceFixCommands, - fmt.Sprintf("if [ -d /mnt/pvc-%d/data ]; then chmod -R a+rX /mnt/pvc-%d/data; fi", i, i), - fmt.Sprintf("chmod -R a+rX /mnt/pvc-%d", i), - ) - } - Expect(runPVCFixCommands( - kubectlSrcNonAdmin, - srcApp.Namespace, - "mysql-source-pvc-perm-fix", - pvcNames, - sourceFixCommands, - )).NotTo(HaveOccurred()) - - By("Run crane export/transform/apply pipeline") - log.Printf("Running crane pipeline for namespace %s\n", srcApp.Namespace) - runner.WorkDir = paths.TempDir - Expect(RunCranePipelineWithChecks(runner, srcApp.Namespace, paths)).NotTo(HaveOccurred()) - log.Printf("Crane pipeline completed for namespace %s\n", srcApp.Namespace) - - By("Transfer PVCs") - tgtIP, err := GetClusterNodeIP(scenario.TgtApp.Context) - Expect(err).NotTo(HaveOccurred()) - for _, pvc := range pvcs { - pvcName := pvc.Name - opts := TransferPVCOptions{ - SourceContext: srcApp.Context, - TargetContext: tgtApp.Context, - PVCName: pvcName, - PVCNamespaceMap: fmt.Sprintf("%s:%s", srcApp.Namespace, tgtApp.Namespace), - Endpoint: "nginx-ingress", - IngressClass: "nginx", - Subdomain: fmt.Sprintf("%s.%s.%s.nip.io", pvcName, srcApp.Namespace, tgtIP), - } - log.Printf("Transferring PVC %s to namespace %s on target cluster", pvcName, tgtApp.Namespace) - Expect(runner.TransferPVC(opts)).NotTo(HaveOccurred()) - log.Printf("PVC transfer complete : %s -> namespace %s", pvcName, tgtApp.Namespace) - } - - // Temporary workaround for BUG #213: remove once issue is fixed. - // See: https://github.com/migtools/crane/issues/213 - By("[BUG #213] Restore destination PVC ownership for mysql runtime user") - targetFixCommands := make([]string, 0, len(pvcNames)*2) - for i := range pvcNames { - targetFixCommands = append(targetFixCommands, - fmt.Sprintf("if [ -d /mnt/pvc-%d/data ]; then chown -R 27:27 /mnt/pvc-%d/data && chmod -R u+rwX /mnt/pvc-%d/data; fi", i, i, i), - fmt.Sprintf("chown -R 27:27 /mnt/pvc-%d && chmod -R u+rwX /mnt/pvc-%d", i, i), - ) - } - Expect(runPVCFixCommands( - kubectlTgtNonAdmin, - tgtApp.Namespace, - "mysql-target-pvc-owner-fix", - pvcNames, - targetFixCommands, - )).NotTo(HaveOccurred()) - - By("List PVCs on target cluster") - tgtpvcs, err := ListPVCs(tgtApp.Namespace, "", tgtApp.Context) - Expect(err).NotTo(HaveOccurred()) - Expect(tgtpvcs).NotTo(BeEmpty(), "expected at least one PVC in target namespace %q", tgtApp.Namespace) - tgtPVCSet := make(map[string]struct{}, len(tgtpvcs)) - for _, pvc := range tgtpvcs { - tgtPVCSet[pvc.Name] = struct{}{} - } - for _, pvcName := range pvcNames { - _, exists := tgtPVCSet[pvcName] - Expect(exists).To(BeTrue(), "expected transferred PVC %q to exist in target namespace %q", pvcName, tgtApp.Namespace) - } - log.Printf("Found %d PVCs in target namespace %q", len(tgtpvcs), tgtApp.Namespace) - - By("Apply rendered manifests to target") - log.Printf("Applying rendered manifests on target namespace %s from %s\n", tgtApp.Namespace, paths.OutputDir) - Expect(ApplyOutputToTargetNonAdmin(kubectlTgtNonAdmin, paths.OutputDir)).NotTo(HaveOccurred()) - - By("Scale target deployment and validate app") - log.Printf("Scaling target deployment(s) with label app=%s to 1\n", appName) - Expect(kubectlTgtNonAdmin.ScaleDeployment(namespace, appName, 1)).NotTo(HaveOccurred()) - - By("Validate target application") - log.Printf("Validating app %s on target cluster\n", tgtApp.Name) - Eventually(tgtApp.Validate, "2m", "10s").Should(Succeed()) - var tgtPodName string - Eventually(func() error { - podName, err := getPodNameByLabel(kubectlTgtNonAdmin, tgtApp.Namespace, "app="+appName) - if err != nil { - return err - } - tgtPodName = podName - out, err := kubectlTgtNonAdmin.Run( - "get", "pod", tgtPodName, - "-n", tgtApp.Namespace, - "-o", "jsonpath={.status.containerStatuses[0].ready}", - ) - if err != nil { - return err - } - if strings.TrimSpace(out) != "true" { - return fmt.Errorf("pod %s is not ready yet", tgtPodName) - } - return nil - }, "2m", "10s").Should(Succeed()) - Eventually(func() error { - return waitForMySQLSocket(kubectlTgtNonAdmin, tgtApp.Namespace, tgtPodName) - }, "2m", "5s").Should(Succeed()) - - targetAuthorsCount, err := mysqlAuthorsCount(kubectlTgtNonAdmin, tgtApp.Namespace, tgtPodName) - Expect(err).NotTo(HaveOccurred()) - targetMD5Actual, targetMD5Expected, err := mysqlTestDataMD5(kubectlTgtNonAdmin, tgtApp.Namespace, tgtPodName) - Expect(err).NotTo(HaveOccurred()) - log.Printf("Target validation output: pod=%s authors_count=%d md5_actual=%s md5_expected=%s", tgtPodName, targetAuthorsCount, targetMD5Actual, targetMD5Expected) - Expect(targetMD5Actual).To(Equal(targetMD5Expected), "target test-data checksum should match its md5 file") - - Expect(targetAuthorsCount).To(Equal(sourceAuthorsCount), "authors count should match between source and target") - Expect(targetMD5Actual).To(Equal(sourceMD5Actual), "test-data md5 should match between source and target") - log.Printf("Target fingerprints: authors=%d md5=%s", targetAuthorsCount, targetMD5Actual) - log.Printf("Target validation completed for app %s\n", tgtApp.Name) - }) -}) From 5a72a951d94b644c4957d6e943d41a9f12f48816 Mon Sep 17 00:00:00 2001 From: M Sajid Mansoori Date: Thu, 23 Apr 2026 17:01:36 +0530 Subject: [PATCH 6/6] Fix issue with failure even though transfer succeeds Signed-off-by: M Sajid Mansoori --- cmd/transfer-pvc/progress.go | 20 ++++++++++++++++---- cmd/transfer-pvc/transfer-pvc.go | 1 + 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/cmd/transfer-pvc/progress.go b/cmd/transfer-pvc/progress.go index 227e8226..8b7f217f 100644 --- a/cmd/transfer-pvc/progress.go +++ b/cmd/transfer-pvc/progress.go @@ -34,6 +34,7 @@ type rsyncLogStream struct { outputFile *string } +// NewRsyncLogStream creates a log stream reader for the rsync client pod. func NewRsyncLogStream(restCfg *rest.Config, pvc types.NamespacedName, labels map[string]string, output string) LogStreams { var outputFile string if output != "" { @@ -47,6 +48,7 @@ func NewRsyncLogStream(restCfg *rest.Config, pvc types.NamespacedName, labels ma } } +// Init starts streaming logs from the rsync container and parsing progress. func (r *rsyncLogStream) Init() error { r.stdout = make(chan string) r.stderr = make(chan string) @@ -126,6 +128,7 @@ func (r *rsyncLogStream) Init() error { return nil } +// writeProgressToFile serializes final transfer progress to a file. func writeProgressToFile(o string, p *Progress) error { file, err := os.OpenFile(o, os.O_CREATE, os.ModePerm) if err != nil { @@ -136,12 +139,14 @@ func writeProgressToFile(o string, p *Progress) error { return ioutil.WriteFile(o, d, os.ModePerm) } +// Close closes all rsync log stream channels. func (r *rsyncLogStream) Close() { close(r.stdout) close(r.stderr) close(r.err) } +// Streams returns channels for stdout, stderr, and terminal stream errors. func (r *rsyncLogStream) Streams() (stdout chan string, stderr chan string, err chan error) { return r.stdout, r.stderr, r.err } @@ -178,6 +183,7 @@ type dataSize struct { unit string } +// addDataSize adds two sizes by normalizing units when needed. func addDataSize(a, b *dataSize) *dataSize { if b == nil { return nil @@ -203,10 +209,12 @@ func addDataSize(a, b *dataSize) *dataSize { return newDs } +// String formats a data size as " ". func (d *dataSize) String() string { return fmt.Sprintf("%.2f %s", d.val, d.unit) } +// MarshalJSON marshals dataSize as its human-readable string value. func (d *dataSize) MarshalJSON() ([]byte, error) { return json.Marshal(d.String()) } @@ -467,6 +475,7 @@ func parseRsyncLogs(rawLogs string) (p *Progress, unprocessedData string) { return p, "" } +// waitForPodRunning waits until the labeled rsync client pod is ready. func waitForPodRunning(c *kubernetes.Clientset, namespace string, labels map[string]string) (string, error) { var podName string err := wait.PollUntil(time.Second, func() (done bool, err error) { @@ -501,11 +510,10 @@ func waitForPodRunning(c *kubernetes.Clientset, namespace string, labels map[str return podName, err } +// getFinalPodStatus returns rsync exit code and tail logs after stream completion. func getFinalPodStatus(c *kubernetes.Clientset, name string, namespace string) (*int32, string, error) { var exitCode *int32 - count := 0 - for { - count += 1 + for attempt := 0; attempt < 15; attempt++ { pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{}) if err != nil { return nil, "", err @@ -518,9 +526,10 @@ func getFinalPodStatus(c *kubernetes.Clientset, name string, namespace string) ( } } } - if count > 5 || exitCode != nil { + if exitCode != nil { break } + time.Sleep(time.Second) } lastLines := int64(35) @@ -545,6 +554,7 @@ func getFinalPodStatus(c *kubernetes.Clientset, name string, namespace string) ( return exitCode, buf.String(), nil } +// validateTransferCompletion returns an error when transfer did not complete successfully. func validateTransferCompletion(p *Progress) error { if p == nil { return fmt.Errorf("transfer progress unavailable") @@ -575,6 +585,7 @@ func validateTransferCompletion(p *Progress) error { return fmt.Errorf("rsync transfer failed: %s", strings.Join(details, ", ")) } +// formatFailedFilesSample returns a concise sample of failed files for error messages. func formatFailedFilesSample(files []FailedFile, max int) string { if len(files) == 0 { return "" @@ -587,6 +598,7 @@ func formatFailedFilesSample(files []FailedFile, max int) string { return strings.Join(items, "; ") } +// minInt returns the smaller of two integers. func minInt(a, b int) int { if a < b { return a diff --git a/cmd/transfer-pvc/transfer-pvc.go b/cmd/transfer-pvc/transfer-pvc.go index a0092a08..ab2144d8 100644 --- a/cmd/transfer-pvc/transfer-pvc.go +++ b/cmd/transfer-pvc/transfer-pvc.go @@ -646,6 +646,7 @@ type LogStreams interface { Close() } +// followClientLogs streams rsync client logs and validates final transfer status. func followClientLogs(srcConfig *rest.Config, pvc types.NamespacedName, labels map[string]string, outputFile string) error { logReader := NewRsyncLogStream(srcConfig, pvc, labels, outputFile) err := logReader.Init()