Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 140 additions & 49 deletions test/extended/router/config_manager_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
exutil "github.com/openshift/origin/test/extended/util"
)

// execPodRef defines the attributes of the router pod used to execute local HTTP requests
type execPodRef struct {
types.NamespacedName
ipAddress string
Expand All @@ -47,21 +48,46 @@ type execPodRef struct {
var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.io][OCPFeatureGate:IngressControllerDynamicConfigurationManager]", func() {
defer g.GinkgoRecover()

// dcmIngressTimeout defines the maximum amount of time to wait for test operations to complete.
const dcmIngressTimeout = 2 * time.Minute
// maxDynamicServers defines the number of empty dynamic servers to be allocated when a reload happens.
// Some tests use this value as a premise to limit the number of new servers on scale-out operations.
const maxDynamicServers = 4

ctx := context.Background()
oc := exutil.NewCLIWithPodSecurityLevel("router-dcm-ingress", api.LevelPrivileged).AsAdmin()
kubeClient := oc.AdminKubeClient()
routeClient := oc.AdminRouteClient()

// variables updated on every new test
var (
execPod execPodRef
controller types.NamespacedName
routeSelectorSet labels.Set
)
// execPod is the pod used to run requests against the router.
var execPod execPodRef
// controller is the fully qualified name of the ingress controller resource used in the current test.
var controller types.NamespacedName
// routeSelectorSet is the label key/value pair that the ingress controller uses to filter route resources.
// All route resources should add this label, so the router can find and process them.
var routeSelectorSet labels.Set

g.AfterEach(func() {
if g.CurrentSpecReport().Failed() {
routes, err := routeClient.RouteV1().Routes(oc.Namespace()).List(ctx, metav1.ListOptions{})
if err != nil {
framework.Logf("failed to list Routes in namespace %q: %v", oc.Namespace(), err)
} else {
outputIngress(routes.Items...)
}
endpoints, err := kubeClient.CoreV1().Endpoints(oc.Namespace()).List(ctx, metav1.ListOptions{})
if err != nil {
framework.Logf("failed to list Endpoints in namespace %q: %v", oc.Namespace(), err)
} else {
outputEndpoints(endpoints.Items...)
}
epsList, err := kubeClient.DiscoveryV1().EndpointSlices(oc.Namespace()).List(ctx, metav1.ListOptions{})
if err != nil {
framework.Logf("failed to list EndpointSlices in namespace %q: %v", oc.Namespace(), err)
} else {
outputEndpointSlice(epsList.Items...)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
if controller.Name != "" {
err := oc.AdminOperatorClient().OperatorV1().IngressControllers(controller.Namespace).Delete(ctx, controller.Name, *metav1.NewDeleteOptions(1))
o.Expect(err).NotTo(o.HaveOccurred())
Expand All @@ -73,7 +99,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
nsOperator := "openshift-ingress-operator"
controllerName := names.SimpleNameGenerator.GenerateName("e2e-dcm-")

// ... and its router is created on router's namespace
// ... and its router and service are created in router's namespace
nsRouter := "openshift-ingress"
svcName := "router-internal-" + controllerName

Expand Down Expand Up @@ -189,12 +215,12 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
// scaling-out to 4 replicas, one at a time
for replicas := initReplicas + 1; replicas <= 4; replicas++ {

g.By(fmt.Sprintf("scaling-out to %d replicas", replicas))
framework.Logf("scaling-out to %d replicas", replicas)

currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout)
o.Expect(err).NotTo(o.HaveOccurred())

g.By("waiting router to add all the backend servers to the load balance")
framework.Logf("waiting router to add all the backend servers to the load balance")

// router should eventually reach all the known replicas
eventuallyRouteAllServers(execPod, hostname, false, currentServers, dcmIngressTimeout)
Expand Down Expand Up @@ -235,9 +261,9 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
// instead of HAProxy removing it from the balance due to health-check starting to fail.
for replicas := initReplicas - 1; replicas >= 1; replicas-- {

g.By(fmt.Sprintf("scaling-in to %d replicas", replicas))
framework.Logf("scaling-in to %d replicas", replicas)

currentServers, err := builder.scaleInEndpoints(ctx, serviceName, replicas)
currentServers, err := builder.scaleInEndpoints(ctx, serviceName, replicas, dcmIngressTimeout)
o.Expect(err).NotTo(o.HaveOccurred())

g.By("ensure that router removed the missing backend servers from the load balance")
Expand Down Expand Up @@ -471,6 +497,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.

// ... k8s recreates it and we wait it to be fully functional
err = builder.waitDeployment(replicas, dcmIngressTimeout)
builder.printDeploymentState(ctx)
o.Expect(err).NotTo(o.HaveOccurred())
}

Expand Down Expand Up @@ -637,7 +664,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
// Iterates over a number of scaling operations, always checking if the change is being applied.
for i, replicas := range changingReplicas {

g.By(fmt.Sprintf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas))
framework.Logf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas)

currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout)
o.Expect(err).NotTo(o.HaveOccurred())
Expand Down Expand Up @@ -778,7 +805,12 @@ func (r *routeStackBuilder) createDeploymentStack(ctx context.Context, routetype
if err = r.waitDeployment(replicas, timeout); err != nil {
return nil, err
}
return r.exposeDeployment(ctx)
backendServers, err = r.exposeDeployment(ctx)
if err != nil {
return nil, err
}
r.printDeploymentState(ctx)
return backendServers, nil
}

// scaleDeployment scales-in/out the common deployment to the specified replicas. It waits for all the pods to be created and returns their names.
Expand All @@ -796,6 +828,7 @@ func (r *routeStackBuilder) scaleDeployment(ctx context.Context, replicas int, t
}
return len(backendServers) == replicas, nil
})
r.printDeploymentState(ctx)
return backendServers, err
}

Expand Down Expand Up @@ -824,7 +857,14 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN
}

// we also need the deprecated Endpoints API, since router still uses it depending on the ROUTER_WATCH_ENDPOINTS envvar
epCurrent, err := r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{})
var epCurrent *corev1.Endpoints
err = wait.PollUntilContextTimeout(ctx, time.Second, 10*time.Second, false, func(ctx context.Context) (done bool, err error) {
epCurrent, err = r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{})
if err != nil {
framework.Logf("error fetching Endpoints: %s", err.Error())
}
return err == nil, nil

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to avoid maybe one more source of flake potential:

If the Endpoints resource is created but the controller hasn't populated its addresses yet, err == nil will return true and exit the poll early with empty subsets (causing downstream scale-in assertions to fail)

perhaps:

return err == nil && len(epCurrent.Subsets) > 0 && len(epCurrent.Subsets[0].Addresses) > 0, nil

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good one, such temporary state should add flakiness and make our life difficult on debugging it.

I confirmed however that a new Endpoints from a new deployed Service is created atomically, having its final state, instead of reacting from an initial and empty state.

From the new Service side (creation):

$ oc expose deploy echoserver --name test-ep
service/test-ep exposed

From the Endpoints side (one single event):

$ oc get ep -oyaml -w
...

---
apiVersion: v1
kind: Endpoints
metadata:
  annotations:
    endpoints.kubernetes.io/last-change-trigger-time: "2026-06-16T20:02:06Z"
  creationTimestamp: "2026-06-16T20:02:06Z"
  labels:
    endpoints.kubernetes.io/managed-by: endpoint-controller
  name: test-ep
  namespace: default
  resourceVersion: "8928573"
  uid: 63cce2e9-7dc3-460e-b89e-2b17d1e10d18
subsets:
- addresses:
  - ip: 10.128.0.122
    nodeName: worker
    targetRef:
      kind: Pod
      name: backend1-5f8b56f5dc-mrgsv
      namespace: default
      uid: e8e6be0a-39c8-4361-aea8-2bc7132755f3
  ports:
  - port: 9000
    protocol: TCP

From etcd (version 1):

[root@worker /]# etcdctl get /kubernetes.io/services/endpoints/default/test-ep -w json |jq 
...
      "version": 1,
...

This makes the polling safe, since the only two possible scenarios are not having an Endpoints, or having it fully configured.

})
if err != nil {
return "", err
}
Expand All @@ -841,7 +881,7 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN
}

// EndpointSlice use to be created as soon as the Endpoints resource is created. Lets wait for it, and create ourselves in case it is missing
err = wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Second, false, func(ctx context.Context) (done bool, err error) {
err = wait.PollUntilContextTimeout(ctx, time.Second, 10*time.Second, false, func(ctx context.Context) (done bool, err error) {
_, err = r.fetchEndpointSlice(ctx, serviceName)
if err != nil {
framework.Logf("error fetching EndpointSlice: %s", err.Error())
Expand Down Expand Up @@ -881,49 +921,70 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN
// scaleInEndpoints changes the number of replicas of an endpoint and EndpointSlice. This only works on services
// without selector created via `createDetachedService()`. It is useful as a way to scale-in a service and route without
// removing the underlying pods of a deployment.
func (r *routeStackBuilder) scaleInEndpoints(ctx context.Context, detachedServiceName string, replicas int) (backendServers []string, err error) {
var eps *discoveryv1.EndpointSlice
func (r *routeStackBuilder) scaleInEndpoints(ctx context.Context, detachedServiceName string, replicas int, timeout time.Duration) (backendServers []string, err error) {
var targetAddresses []corev1.EndpointAddress

// update Endpoints with the desired number of replicas
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
eps, err = r.fetchEndpointSlice(ctx, detachedServiceName)
ep, err := r.kubeClient.CoreV1().Endpoints(r.namespace).Get(ctx, detachedServiceName, metav1.GetOptions{})
if err != nil {
return err
}
if count := len(eps.Endpoints); count < replicas {
return fmt.Errorf("endpoints can only be scaled-in. found %d replicas, want %d", count, replicas)
if count := len(ep.Subsets); count != 1 {
return fmt.Errorf("expected one subset in endpoints, found %d", count)
}
eps.Endpoints = eps.Endpoints[:replicas]
_, err = r.kubeClient.DiscoveryV1().EndpointSlices(eps.Namespace).Update(ctx, eps, metav1.UpdateOptions{})
if err != nil {
return err
}
backendServers = make([]string, len(eps.Endpoints))
for i, ep := range eps.Endpoints {
backendServers[i] = ep.TargetRef.Name
ss := &ep.Subsets[0]
if count := len(ss.Addresses); count < replicas {
return fmt.Errorf("endpoints can only be scaled-in. found %d replicas in Endpoints, want %d", count, replicas)
}
return nil
targetAddresses = ss.Addresses[:replicas]
ss.Addresses = targetAddresses
_, err = r.kubeClient.CoreV1().Endpoints(ep.Namespace).Update(ctx, ep, metav1.UpdateOptions{})
return err
})
if err != nil {
return nil, err
}
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
ep, err := r.kubeClient.CoreV1().Endpoints(r.namespace).Get(ctx, detachedServiceName, metav1.GetOptions{})

// Build backend server list from the preserved addresses
backendServers = make([]string, replicas)
for i, addr := range targetAddresses {
backendServers[i] = addr.TargetRef.Name
}

// The EndpointSlice mirroring controller will sync Endpoints -> EndpointSlice for selector-less services.
// Wait for the controller to mirror the change.
err = wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, false, func(ctx context.Context) (done bool, err error) {
eps, err := r.fetchEndpointSlice(ctx, detachedServiceName)
if err != nil {
return err
framework.Logf("error fetching EndpointSlice while waiting for mirroring: %s", err.Error())
return false, nil
}
// deleting addresses, from all subnets, whose IP address is not found in the patched `eps`
for i := range ep.Subsets {
ss := &ep.Subsets[i]
ss.Addresses = slices.DeleteFunc(ss.Addresses, func(addr corev1.EndpointAddress) bool {
return !slices.ContainsFunc(eps.Endpoints, func(e discoveryv1.Endpoint) bool {
return addr.IP == e.Addresses[0]
})
})
// Verify the endpoints match by IP to ensure we have the right ones, not just the right count
if !endpointSliceMatchesAddresses(eps.Endpoints, targetAddresses) {
framework.Logf("EndpointSlice and Endpoints addresses do not match, waiting for mirroring controller. eps=%v ep=%v", eps.Endpoints, targetAddresses)
return false, nil
}
_, err = r.kubeClient.CoreV1().Endpoints(ep.Namespace).Update(ctx, ep, metav1.UpdateOptions{})
return err

return true, nil
})
return backendServers, err
if err != nil {
return nil, fmt.Errorf("EndpointSlice mirroring did not converge: %w", err)
}

return backendServers, nil
}

// endpointSliceMatchesAddresses verifies that the provided endpoints contains the same addresses as the target list
func endpointSliceMatchesAddresses(eps []discoveryv1.Endpoint, targetAddresses []corev1.EndpointAddress) bool {
if len(eps) != len(targetAddresses) {
return false
}
for _, ep := range eps {
if len(ep.Addresses) == 0 || !slices.ContainsFunc(targetAddresses, func(addr corev1.EndpointAddress) bool { return addr.IP == ep.Addresses[0] }) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think [0] might be brittle, better to iterate and see if contains?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is by the spec:

https://github.com/kubernetes/api/blob/70b37e4a198597ec99d5424c438c42e7a6620e3d/discovery/v1/types.go#L92-L102

	// EndpointSlices generated by the EndpointSlice
	// controller will always have exactly 1 address. No semantics are defined for
	// additional addresses beyond the first, and kube-proxy does not look at them.

return false
}
}
return true
}

// waitDeployment waits the common deployment to report all its replicas as ready.
Expand All @@ -939,6 +1000,8 @@ func (r *routeStackBuilder) createServeHostnameDeployment(replicas int) error {

// createDeployment creates the deployment resource. It should be called just once, since it uses the OC namespace and the common resource name.
func (r *routeStackBuilder) createDeployment(image string, replicas, port int, cmd ...string) error {
// This deployment is safe under Single Node OpenShift (SNO): although it can create more replicas,
// those replicas does not configure anti-affinity, so all of them can run in the same node.
runArgs := []string{"deployment", r.resourceName, "--image", image, "--replicas", strconv.Itoa(replicas), "--port", strconv.Itoa(port), "--"}
runArgs = append(runArgs, cmd...)
return r.oc.Run("create").Args(runArgs...).Execute()
Expand All @@ -953,6 +1016,25 @@ func (r *routeStackBuilder) exposeDeployment(ctx context.Context) (backendServer
return r.fetchServiceReplicas(ctx)
}

// printDeploymentState outputs the pod names, status, and their IP addresses. Best effort, it outputs the error instead in case it happens.
// It requires that `exposeDeployment()` was already called.
func (r *routeStackBuilder) printDeploymentState(ctx context.Context) {
pods, err := r.fetchPods(ctx)
if err != nil {
framework.Logf("deployment state: error reading deployment pods: %v", err)
return
}
var podDescription []string
for _, pod := range pods {
var podIPs []string
for _, ip := range pod.Status.PodIPs {
podIPs = append(podIPs, ip.IP)
}
podDescription = append(podDescription, fmt.Sprintf("%s/%s/%s", pod.Name, pod.Status.Phase, strings.Join(podIPs, ",")))
}
framework.Logf("deployment state: replicas=%d pods=%s", len(pods), strings.Join(podDescription, " // "))
}

// fetchEndpointSlice fetches the EndpointSlice of the provided service name. It currently supports only one EndpointSlice instance for simplicity.
func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName string) (*discoveryv1.EndpointSlice, error) {
listOpts := metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + serviceName}
Expand All @@ -967,8 +1049,8 @@ func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName
return &epsList.Items[0], nil
}

// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called.
func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) {
// fetchPods fetches the pods from the exposed common deployment. It requires that `exposeDeployment()` was already called.
func (r *routeStackBuilder) fetchPods(ctx context.Context) ([]corev1.Pod, error) {
svc, err := r.kubeClient.CoreV1().Services(r.namespace).Get(ctx, r.resourceName, metav1.GetOptions{})
if err != nil {
return nil, err
Expand All @@ -978,9 +1060,18 @@ func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string,
if err != nil {
return nil, err
}
backendServers := make([]string, len(pods.Items))
for i := range pods.Items {
backendServers[i] = pods.Items[i].Name
return pods.Items, nil
}

// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called.
func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) {
pods, err := r.fetchPods(ctx)
if err != nil {
return nil, err
}
backendServers := make([]string, len(pods))
for i := range pods {
backendServers[i] = pods[i].Name
}
return backendServers, nil
}
Expand Down
Loading