diff --git a/test/extended/router/config_manager_ingress.go b/test/extended/router/config_manager_ingress.go index 5325c72943c3..86b970845876 100644 --- a/test/extended/router/config_manager_ingress.go +++ b/test/extended/router/config_manager_ingress.go @@ -39,6 +39,7 @@ import ( exutil "github.com/openshift/origin/test/extended/util" ) +// execPodRef defines the attributes of the router pod used to execute local HTTP requests type execPodRef struct { types.NamespacedName ipAddress string @@ -47,21 +48,46 @@ type execPodRef struct { var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.io][OCPFeatureGate:IngressControllerDynamicConfigurationManager]", func() { defer g.GinkgoRecover() + // dcmIngressTimeout defines the maximum amount of time to wait for test operations to complete. const dcmIngressTimeout = 2 * time.Minute + // maxDynamicServers defines the number of empty dynamic servers to be allocated when a reload happens. + // Some tests use this value as a premise to limit the number of new servers on scale-out operations. const maxDynamicServers = 4 ctx := context.Background() oc := exutil.NewCLIWithPodSecurityLevel("router-dcm-ingress", api.LevelPrivileged).AsAdmin() kubeClient := oc.AdminKubeClient() + routeClient := oc.AdminRouteClient() - // variables updated on every new test - var ( - execPod execPodRef - controller types.NamespacedName - routeSelectorSet labels.Set - ) + // execPod is the pod used to run requests against the router. + var execPod execPodRef + // controller is the fully qualified name of the ingress controller resource used in the current test. + var controller types.NamespacedName + // routeSelectorSet is the label key/value pair that the ingress controller uses to filter route resources. + // All route resources should add this label, so the router can find and process them. + var routeSelectorSet labels.Set g.AfterEach(func() { + if g.CurrentSpecReport().Failed() { + routes, err := routeClient.RouteV1().Routes(oc.Namespace()).List(ctx, metav1.ListOptions{}) + if err != nil { + framework.Logf("failed to list Routes in namespace %q: %v", oc.Namespace(), err) + } else { + outputIngress(routes.Items...) + } + endpoints, err := kubeClient.CoreV1().Endpoints(oc.Namespace()).List(ctx, metav1.ListOptions{}) + if err != nil { + framework.Logf("failed to list Endpoints in namespace %q: %v", oc.Namespace(), err) + } else { + outputEndpoints(endpoints.Items...) + } + epsList, err := kubeClient.DiscoveryV1().EndpointSlices(oc.Namespace()).List(ctx, metav1.ListOptions{}) + if err != nil { + framework.Logf("failed to list EndpointSlices in namespace %q: %v", oc.Namespace(), err) + } else { + outputEndpointSlice(epsList.Items...) + } + } if controller.Name != "" { err := oc.AdminOperatorClient().OperatorV1().IngressControllers(controller.Namespace).Delete(ctx, controller.Name, *metav1.NewDeleteOptions(1)) o.Expect(err).NotTo(o.HaveOccurred()) @@ -73,7 +99,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift. nsOperator := "openshift-ingress-operator" controllerName := names.SimpleNameGenerator.GenerateName("e2e-dcm-") - // ... and its router is created on router's namespace + // ... and its router and service are created in router's namespace nsRouter := "openshift-ingress" svcName := "router-internal-" + controllerName @@ -189,12 +215,12 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift. // scaling-out to 4 replicas, one at a time for replicas := initReplicas + 1; replicas <= 4; replicas++ { - g.By(fmt.Sprintf("scaling-out to %d replicas", replicas)) + framework.Logf("scaling-out to %d replicas", replicas) currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout) o.Expect(err).NotTo(o.HaveOccurred()) - g.By("waiting router to add all the backend servers to the load balance") + framework.Logf("waiting router to add all the backend servers to the load balance") // router should eventually reach all the known replicas eventuallyRouteAllServers(execPod, hostname, false, currentServers, dcmIngressTimeout) @@ -235,9 +261,9 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift. // instead of HAProxy removing it from the balance due to health-check starting to fail. for replicas := initReplicas - 1; replicas >= 1; replicas-- { - g.By(fmt.Sprintf("scaling-in to %d replicas", replicas)) + framework.Logf("scaling-in to %d replicas", replicas) - currentServers, err := builder.scaleInEndpoints(ctx, serviceName, replicas) + currentServers, err := builder.scaleInEndpoints(ctx, serviceName, replicas, dcmIngressTimeout) o.Expect(err).NotTo(o.HaveOccurred()) g.By("ensure that router removed the missing backend servers from the load balance") @@ -471,6 +497,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift. // ... k8s recreates it and we wait it to be fully functional err = builder.waitDeployment(replicas, dcmIngressTimeout) + builder.printDeploymentState(ctx) o.Expect(err).NotTo(o.HaveOccurred()) } @@ -637,7 +664,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift. // Iterates over a number of scaling operations, always checking if the change is being applied. for i, replicas := range changingReplicas { - g.By(fmt.Sprintf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas)) + framework.Logf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas) currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout) o.Expect(err).NotTo(o.HaveOccurred()) @@ -778,7 +805,12 @@ func (r *routeStackBuilder) createDeploymentStack(ctx context.Context, routetype if err = r.waitDeployment(replicas, timeout); err != nil { return nil, err } - return r.exposeDeployment(ctx) + backendServers, err = r.exposeDeployment(ctx) + if err != nil { + return nil, err + } + r.printDeploymentState(ctx) + return backendServers, nil } // scaleDeployment scales-in/out the common deployment to the specified replicas. It waits for all the pods to be created and returns their names. @@ -796,6 +828,7 @@ func (r *routeStackBuilder) scaleDeployment(ctx context.Context, replicas int, t } return len(backendServers) == replicas, nil }) + r.printDeploymentState(ctx) return backendServers, err } @@ -824,7 +857,14 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN } // we also need the deprecated Endpoints API, since router still uses it depending on the ROUTER_WATCH_ENDPOINTS envvar - epCurrent, err := r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{}) + var epCurrent *corev1.Endpoints + err = wait.PollUntilContextTimeout(ctx, time.Second, 10*time.Second, false, func(ctx context.Context) (done bool, err error) { + epCurrent, err = r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{}) + if err != nil { + framework.Logf("error fetching Endpoints: %s", err.Error()) + } + return err == nil, nil + }) if err != nil { return "", err } @@ -841,7 +881,7 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN } // EndpointSlice use to be created as soon as the Endpoints resource is created. Lets wait for it, and create ourselves in case it is missing - err = wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Second, false, func(ctx context.Context) (done bool, err error) { + err = wait.PollUntilContextTimeout(ctx, time.Second, 10*time.Second, false, func(ctx context.Context) (done bool, err error) { _, err = r.fetchEndpointSlice(ctx, serviceName) if err != nil { framework.Logf("error fetching EndpointSlice: %s", err.Error()) @@ -881,49 +921,70 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN // scaleInEndpoints changes the number of replicas of an endpoint and EndpointSlice. This only works on services // without selector created via `createDetachedService()`. It is useful as a way to scale-in a service and route without // removing the underlying pods of a deployment. -func (r *routeStackBuilder) scaleInEndpoints(ctx context.Context, detachedServiceName string, replicas int) (backendServers []string, err error) { - var eps *discoveryv1.EndpointSlice +func (r *routeStackBuilder) scaleInEndpoints(ctx context.Context, detachedServiceName string, replicas int, timeout time.Duration) (backendServers []string, err error) { + var targetAddresses []corev1.EndpointAddress + + // update Endpoints with the desired number of replicas err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - eps, err = r.fetchEndpointSlice(ctx, detachedServiceName) + ep, err := r.kubeClient.CoreV1().Endpoints(r.namespace).Get(ctx, detachedServiceName, metav1.GetOptions{}) if err != nil { return err } - if count := len(eps.Endpoints); count < replicas { - return fmt.Errorf("endpoints can only be scaled-in. found %d replicas, want %d", count, replicas) + if count := len(ep.Subsets); count != 1 { + return fmt.Errorf("expected one subset in endpoints, found %d", count) } - eps.Endpoints = eps.Endpoints[:replicas] - _, err = r.kubeClient.DiscoveryV1().EndpointSlices(eps.Namespace).Update(ctx, eps, metav1.UpdateOptions{}) - if err != nil { - return err - } - backendServers = make([]string, len(eps.Endpoints)) - for i, ep := range eps.Endpoints { - backendServers[i] = ep.TargetRef.Name + ss := &ep.Subsets[0] + if count := len(ss.Addresses); count < replicas { + return fmt.Errorf("endpoints can only be scaled-in. found %d replicas in Endpoints, want %d", count, replicas) } - return nil + targetAddresses = ss.Addresses[:replicas] + ss.Addresses = targetAddresses + _, err = r.kubeClient.CoreV1().Endpoints(ep.Namespace).Update(ctx, ep, metav1.UpdateOptions{}) + return err }) if err != nil { return nil, err } - err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - ep, err := r.kubeClient.CoreV1().Endpoints(r.namespace).Get(ctx, detachedServiceName, metav1.GetOptions{}) + + // Build backend server list from the preserved addresses + backendServers = make([]string, replicas) + for i, addr := range targetAddresses { + backendServers[i] = addr.TargetRef.Name + } + + // The EndpointSlice mirroring controller will sync Endpoints -> EndpointSlice for selector-less services. + // Wait for the controller to mirror the change. + err = wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, false, func(ctx context.Context) (done bool, err error) { + eps, err := r.fetchEndpointSlice(ctx, detachedServiceName) if err != nil { - return err + framework.Logf("error fetching EndpointSlice while waiting for mirroring: %s", err.Error()) + return false, nil } - // deleting addresses, from all subnets, whose IP address is not found in the patched `eps` - for i := range ep.Subsets { - ss := &ep.Subsets[i] - ss.Addresses = slices.DeleteFunc(ss.Addresses, func(addr corev1.EndpointAddress) bool { - return !slices.ContainsFunc(eps.Endpoints, func(e discoveryv1.Endpoint) bool { - return addr.IP == e.Addresses[0] - }) - }) + // Verify the endpoints match by IP to ensure we have the right ones, not just the right count + if !endpointSliceMatchesAddresses(eps.Endpoints, targetAddresses) { + framework.Logf("EndpointSlice and Endpoints addresses do not match, waiting for mirroring controller. eps=%v ep=%v", eps.Endpoints, targetAddresses) + return false, nil } - _, err = r.kubeClient.CoreV1().Endpoints(ep.Namespace).Update(ctx, ep, metav1.UpdateOptions{}) - return err - + return true, nil }) - return backendServers, err + if err != nil { + return nil, fmt.Errorf("EndpointSlice mirroring did not converge: %w", err) + } + + return backendServers, nil +} + +// endpointSliceMatchesAddresses verifies that the provided endpoints contains the same addresses as the target list +func endpointSliceMatchesAddresses(eps []discoveryv1.Endpoint, targetAddresses []corev1.EndpointAddress) bool { + if len(eps) != len(targetAddresses) { + return false + } + for _, ep := range eps { + if len(ep.Addresses) == 0 || !slices.ContainsFunc(targetAddresses, func(addr corev1.EndpointAddress) bool { return addr.IP == ep.Addresses[0] }) { + return false + } + } + return true } // waitDeployment waits the common deployment to report all its replicas as ready. @@ -939,6 +1000,8 @@ func (r *routeStackBuilder) createServeHostnameDeployment(replicas int) error { // createDeployment creates the deployment resource. It should be called just once, since it uses the OC namespace and the common resource name. func (r *routeStackBuilder) createDeployment(image string, replicas, port int, cmd ...string) error { + // This deployment is safe under Single Node OpenShift (SNO): although it can create more replicas, + // those replicas does not configure anti-affinity, so all of them can run in the same node. runArgs := []string{"deployment", r.resourceName, "--image", image, "--replicas", strconv.Itoa(replicas), "--port", strconv.Itoa(port), "--"} runArgs = append(runArgs, cmd...) return r.oc.Run("create").Args(runArgs...).Execute() @@ -953,6 +1016,25 @@ func (r *routeStackBuilder) exposeDeployment(ctx context.Context) (backendServer return r.fetchServiceReplicas(ctx) } +// printDeploymentState outputs the pod names, status, and their IP addresses. Best effort, it outputs the error instead in case it happens. +// It requires that `exposeDeployment()` was already called. +func (r *routeStackBuilder) printDeploymentState(ctx context.Context) { + pods, err := r.fetchPods(ctx) + if err != nil { + framework.Logf("deployment state: error reading deployment pods: %v", err) + return + } + var podDescription []string + for _, pod := range pods { + var podIPs []string + for _, ip := range pod.Status.PodIPs { + podIPs = append(podIPs, ip.IP) + } + podDescription = append(podDescription, fmt.Sprintf("%s/%s/%s", pod.Name, pod.Status.Phase, strings.Join(podIPs, ","))) + } + framework.Logf("deployment state: replicas=%d pods=%s", len(pods), strings.Join(podDescription, " // ")) +} + // fetchEndpointSlice fetches the EndpointSlice of the provided service name. It currently supports only one EndpointSlice instance for simplicity. func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName string) (*discoveryv1.EndpointSlice, error) { listOpts := metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + serviceName} @@ -967,8 +1049,8 @@ func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName return &epsList.Items[0], nil } -// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called. -func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) { +// fetchPods fetches the pods from the exposed common deployment. It requires that `exposeDeployment()` was already called. +func (r *routeStackBuilder) fetchPods(ctx context.Context) ([]corev1.Pod, error) { svc, err := r.kubeClient.CoreV1().Services(r.namespace).Get(ctx, r.resourceName, metav1.GetOptions{}) if err != nil { return nil, err @@ -978,9 +1060,18 @@ func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, if err != nil { return nil, err } - backendServers := make([]string, len(pods.Items)) - for i := range pods.Items { - backendServers[i] = pods.Items[i].Name + return pods.Items, nil +} + +// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called. +func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) { + pods, err := r.fetchPods(ctx) + if err != nil { + return nil, err + } + backendServers := make([]string, len(pods)) + for i := range pods { + backendServers[i] = pods[i].Name } return backendServers, nil } diff --git a/test/extended/router/stress.go b/test/extended/router/stress.go index 04c7699f8579..9d4c88f56df9 100644 --- a/test/extended/router/stress.go +++ b/test/extended/router/stress.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "strconv" "strings" "text/tabwriter" "time" @@ -16,6 +17,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -656,6 +658,64 @@ func outputIngress(routes ...routev1.Route) { e2e.Logf("Routes:\n%s", b.String()) } +func outputEndpoints(endpoints ...corev1.Endpoints) { + b := &bytes.Buffer{} + w := tabwriter.NewWriter(b, 0, 0, 2, ' ', 0) + fmt.Fprintf(w, "NAME\tADDRESSES\tNOT READY ADDRESSES\tPORTS\n") + for _, ep := range endpoints { + for _, ss := range ep.Subsets { + resumeAddrs := func(addrs []corev1.EndpointAddress) string { + var addrList []string + for _, addr := range addrs { + val := "-" + if addr.IP != "" { + val = addr.IP + } else if addr.Hostname != "" { + val = addr.Hostname + } + addrList = append(addrList, val) + } + return strings.Join(addrList, ",") + } + var portList []string + for _, port := range ss.Ports { + portList = append(portList, strconv.Itoa(int(port.Port))) + } + fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", ep.Name, resumeAddrs(ss.Addresses), resumeAddrs(ss.NotReadyAddresses), strings.Join(portList, ",")) + } + } + w.Flush() + e2e.Logf("Endpoints:\n%s", b.String()) +} + +func outputEndpointSlice(epss ...discoveryv1.EndpointSlice) { + b := &bytes.Buffer{} + w := tabwriter.NewWriter(b, 0, 0, 2, ' ', 0) + fmt.Fprintf(w, "NAME\tSERVICE\tADDRESSES\tNOT READY ADDRESSES\tPORTS\n") + for _, eps := range epss { + var addrList, notReadyAddrList []string + for _, ep := range eps.Endpoints { + addrs := strings.Join(ep.Addresses, "+") + if ready := ep.Conditions.Ready; ready == nil || *ready == true { + addrList = append(addrList, addrs) + } else { + notReadyAddrList = append(notReadyAddrList, addrs) + } + } + var portList []string + for _, port := range eps.Ports { + val := "-" + if port.Port != nil { + val = strconv.Itoa(int(*port.Port)) + } + portList = append(portList, val) + } + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", eps.Name, eps.Labels[discoveryv1.LabelServiceName], strings.Join(addrList, ","), strings.Join(notReadyAddrList, ","), strings.Join(portList, ",")) + } + w.Flush() + e2e.Logf("EndpointSlices:\n%s", b.String()) +} + // findMostRecentConditionTime returns the time of the most recent condition. func findMostRecentConditionTime(conditions []routev1.RouteIngressCondition) time.Time { var recent time.Time