Skip to content

Commit f9edb41

Browse files
authored
Merge pull request #9462 from adrianmoisey/adrian-controller-model
Refactor recommender to follow NewXXXController pattern
2 parents fad7e99 + dc697d8 commit f9edb41

16 files changed

Lines changed: 1260 additions & 183 deletions

File tree

hack/go-unit-tests-vpa.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ set -o nounset
2121
CONTRIB_ROOT="$(dirname ${BASH_SOURCE})/.."
2222

2323
pushd ${CONTRIB_ROOT}/vertical-pod-autoscaler
24-
go test -count=1 -race $(go list ./... | grep -v /vendor/ | grep -v vertical-pod-autoscaler/e2e)
24+
go test -count=1 -race $(go list ./... | grep -v /vendor/ | grep -v vertical-pod-autoscaler/e2e | grep -v integration)
2525
popd
2626
pushd ${CONTRIB_ROOT}/vertical-pod-autoscaler/e2e
2727
go test -run=None ./...

vertical-pod-autoscaler/e2e/go.mod

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ require (
153153
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
154154
gopkg.in/yaml.v3 v3.0.1 // indirect
155155
k8s.io/apiextensions-apiserver v0.36.0-beta.0 // indirect
156-
k8s.io/cloud-provider v0.35.0 // indirect
156+
k8s.io/cloud-provider v0.36.0-beta.0 // indirect
157157
k8s.io/component-base v0.36.0-beta.0 // indirect
158158
k8s.io/component-helpers v0.36.0-beta.0 // indirect
159159
k8s.io/controller-manager v0.36.0-beta.0 // indirect
@@ -193,6 +193,7 @@ replace (
193193
k8s.io/controller-manager => k8s.io/controller-manager v0.36.0-beta.0
194194
k8s.io/cri-api => k8s.io/cri-api v0.36.0-beta.0
195195
k8s.io/cri-client => k8s.io/cri-client v0.36.0-beta.0
196+
k8s.io/cri-streaming => k8s.io/cri-streaming v0.36.0-beta.0
196197
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.36.0-beta.0
197198
k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.36.0-beta.0
198199
k8s.io/endpointslice => k8s.io/endpointslice v0.36.0-beta.0
@@ -210,8 +211,5 @@ replace (
210211
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.36.0-beta.0
211212
k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.36.0-beta.0
212213
k8s.io/sample-controller => k8s.io/sample-controller v0.36.0-beta.0
214+
k8s.io/streaming => k8s.io/streaming v0.36.0-beta.0
213215
)
214-
215-
replace k8s.io/cri-streaming => k8s.io/cri-streaming v0.36.0-beta.0
216-
217-
replace k8s.io/streaming => k8s.io/streaming v0.36.0-beta.0

vertical-pod-autoscaler/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/prometheus/common v0.67.5
1010
github.com/spf13/pflag v1.0.10
1111
github.com/stretchr/testify v1.11.1
12+
go.uber.org/mock v0.6.0
1213
golang.org/x/time v0.15.0
1314
k8s.io/api v0.36.0-beta.0
1415
k8s.io/apimachinery v0.36.0-beta.0
@@ -65,7 +66,6 @@ require (
6566
github.com/x448/float16 v0.8.4 // indirect
6667
go.opentelemetry.io/otel v1.42.0 // indirect
6768
go.opentelemetry.io/otel/trace v1.42.0 // indirect
68-
go.uber.org/mock v0.6.0
6969
go.yaml.in/yaml/v2 v2.4.4 // indirect
7070
go.yaml.in/yaml/v3 v3.0.4 // indirect
7171
golang.org/x/mod v0.34.0 // indirect; indirects
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#! /bin/bash
2+
3+
# Copyright The Kubernetes Authors.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
set -o errexit
18+
set -o pipefail
19+
set -o nounset
20+
21+
CONTRIB_ROOT="$(dirname ${BASH_SOURCE})/.."
22+
23+
cd $CONTRIB_ROOT
24+
25+
go test -count=1 -race $(go list ./... | grep integration)

vertical-pod-autoscaler/pkg/admission-controller/main.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,15 @@ func main() {
6666

6767
kubeConfig := common.CreateKubeConfigOrDie(config.CommonFlags.KubeConfig, float32(config.CommonFlags.KubeApiQps), int(config.CommonFlags.KubeApiBurst))
6868

69+
stopCh := make(chan struct{})
70+
defer close(stopCh)
71+
6972
vpaClient := vpa_clientset.NewForConfigOrDie(kubeConfig)
7073
vpaLister := vpa_api_util.NewVpasLister(vpaClient, make(chan struct{}), config.CommonFlags.VpaObjectNamespace)
7174
kubeClient := kube_client.NewForConfigOrDie(kubeConfig)
7275
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(config.CommonFlags.VpaObjectNamespace))
73-
targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(kubeConfig, kubeClient, factory)
74-
controllerFetcher := controllerfetcher.NewControllerFetcher(kubeConfig, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
76+
targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(kubeConfig, kubeClient, factory, stopCh)
77+
controllerFetcher := controllerfetcher.NewControllerFetcher(kubeConfig, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor, stopCh)
7578

7679
podPreprocessor := pod.NewDefaultPreProcessor()
7780
vpaPreprocessor := vpa.NewDefaultPreProcessor()
@@ -84,8 +87,6 @@ func main() {
8487
recommendationProvider := recommendation.NewProvider(limitRangeCalculator, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator))
8588
vpaMatcher := vpa.NewMatcher(vpaLister, targetSelectorFetcher, controllerFetcher)
8689

87-
stopCh := make(chan struct{})
88-
defer close(stopCh)
8990
factory.Start(stopCh)
9091
informerMap := factory.WaitForCacheSync(stopCh)
9192
for kind, synced := range informerMap {

vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,11 @@ func WatchEvictionEventsWithRetries(ctx context.Context, kubeClient kube_client.
138138
// Wait between attempts, retrying too often breaks API server.
139139
waitTime := wait.Jitter(evictionWatchRetryWait, evictionWatchJitterFactor)
140140
klog.V(1).InfoS("An attempt to watch eviction events finished", "waitTime", waitTime)
141-
time.Sleep(waitTime)
141+
select {
142+
case <-ctx.Done():
143+
return
144+
case <-time.After(waitTime):
145+
}
142146
}
143147
}
144148
}()

vertical-pod-autoscaler/pkg/recommender/main.go

Lines changed: 28 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@ import (
2020
"context"
2121
"fmt"
2222
"os"
23-
"strings"
2423
"time"
2524

2625
"github.com/spf13/pflag"
27-
corev1 "k8s.io/api/core/v1"
2826
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2927
"k8s.io/apimachinery/pkg/util/uuid"
3028
"k8s.io/client-go/informers"
@@ -34,40 +32,25 @@ import (
3432
componentbaseconfig "k8s.io/component-base/config"
3533
componentbaseoptions "k8s.io/component-base/config/options"
3634
"k8s.io/klog/v2"
37-
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
3835

3936
"k8s.io/autoscaler/vertical-pod-autoscaler/common"
4037
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
41-
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/checkpoint"
4238
recommender_config "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/config"
43-
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input"
44-
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input/history"
45-
input_metrics "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input/metrics"
46-
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/logic"
47-
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
4839
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/routines"
49-
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target"
50-
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
5140
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics"
5241
metrics_quality "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics/quality"
5342
metrics_recommender "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics/recommender"
5443
metrics_resources "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics/resources"
5544
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/server"
56-
vpa_api_util "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa"
5745
)
5846

47+
var config *recommender_config.RecommenderConfig
48+
5949
const (
60-
// aggregateContainerStateGCInterval defines how often expired AggregateContainerStates are garbage collected.
61-
aggregateContainerStateGCInterval = 1 * time.Hour
62-
scaleCacheEntryLifetime time.Duration = time.Hour
63-
scaleCacheEntryFreshnessTime time.Duration = 10 * time.Minute
64-
scaleCacheEntryJitterFactor float64 = 1.
65-
scaleCacheLoopPeriod = 7 * time.Second
66-
defaultResyncPeriod time.Duration = 10 * time.Minute
50+
// defaultResyncPeriod is the default resync period for shared informer factories.
51+
defaultResyncPeriod time.Duration = 10 * time.Minute
6752
)
6853

69-
var config *recommender_config.RecommenderConfig
70-
7154
func main() {
7255
// Leader election needs to be initialized before any other flag, because it may be used in other flag's validation.
7356
leaderElection := defaultLeaderElectionConfiguration()
@@ -86,7 +69,7 @@ func main() {
8669
server.Initialize(&config.CommonFlags.EnableProfiling, healthCheck, &config.Address)
8770

8871
if !leaderElection.LeaderElect {
89-
run(ctx, healthCheck, config.CommonFlags)
72+
run(ctx, healthCheck, config)
9073
} else {
9174
id, err := os.Hostname()
9275
if err != nil {
@@ -121,7 +104,7 @@ func main() {
121104
ReleaseOnCancel: true,
122105
Callbacks: leaderelection.LeaderCallbacks{
123106
OnStartedLeading: func(_ context.Context) {
124-
run(ctx, healthCheck, config.CommonFlags)
107+
run(ctx, healthCheck, config)
125108
},
126109
OnStoppedLeading: func() {
127110
klog.Fatal("lost master")
@@ -150,16 +133,30 @@ func defaultLeaderElectionConfiguration() componentbaseconfig.LeaderElectionConf
150133
}
151134
}
152135

153-
func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
136+
func run(ctx context.Context, healthCheck *metrics.HealthCheck, config *recommender_config.RecommenderConfig) {
154137
// Create a stop channel that will be used to signal shutdown
155138
stopCh := make(chan struct{})
156139
defer close(stopCh)
157-
kubeConfig := common.CreateKubeConfigOrDie(commonFlag.KubeConfig, float32(commonFlag.KubeApiQps), int(commonFlag.KubeApiBurst))
140+
kubeConfig := common.CreateKubeConfigOrDie(config.CommonFlags.KubeConfig, float32(config.CommonFlags.KubeApiQps), int(config.CommonFlags.KubeApiBurst))
158141
kubeClient := kube_client.NewForConfigOrDie(kubeConfig)
159-
clusterState := model.NewClusterState(aggregateContainerStateGCInterval)
160-
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(commonFlag.VpaObjectNamespace))
161-
controllerFetcher := controllerfetcher.NewControllerFetcher(kubeConfig, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
162-
podLister, oomObserver := input.NewPodListerAndOOMObserver(ctx, kubeClient, commonFlag.VpaObjectNamespace, stopCh)
142+
vpaClient := vpa_clientset.NewForConfigOrDie(kubeConfig)
143+
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(config.CommonFlags.VpaObjectNamespace))
144+
145+
controller, err := routines.NewRecommenderController(
146+
ctx,
147+
kubeConfig,
148+
kubeClient,
149+
vpaClient,
150+
factory,
151+
config,
152+
healthCheck,
153+
stopCh,
154+
)
155+
156+
if err != nil {
157+
klog.ErrorS(err, "Failed to create recommender controller")
158+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
159+
}
163160

164161
factory.Start(stopCh)
165162
informerMap := factory.WaitForCacheSync(stopCh)
@@ -170,142 +167,8 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm
170167
}
171168
}
172169

173-
model.InitializeAggregationsConfig(model.NewAggregationsConfig(config.MemoryAggregationInterval, config.MemoryAggregationIntervalCount, config.MemoryHistogramDecayHalfLife, config.CpuHistogramDecayHalfLife, config.OOMBumpUpRatio, config.OOMMinBumpUp))
174-
175-
useCheckpoints := config.Storage != "prometheus"
176-
177-
var postProcessors []routines.RecommendationPostProcessor
178-
if config.PostProcessorCPUasInteger {
179-
postProcessors = append(postProcessors, &routines.IntegerCPUPostProcessor{})
180-
}
181-
182-
globalMaxAllowed := initGlobalMaxAllowed()
183-
// CappingPostProcessor, should always come in the last position for post-processing
184-
postProcessors = append(postProcessors, routines.NewCappingRecommendationProcessor(globalMaxAllowed))
185-
var source input_metrics.PodMetricsLister
186-
if config.UseExternalMetrics {
187-
resourceMetrics := map[corev1.ResourceName]string{}
188-
if config.ExternalCpuMetric != "" {
189-
resourceMetrics[corev1.ResourceCPU] = config.ExternalCpuMetric
190-
}
191-
if config.ExternalMemoryMetric != "" {
192-
resourceMetrics[corev1.ResourceMemory] = config.ExternalMemoryMetric
193-
}
194-
externalClientOptions := &input_metrics.ExternalClientOptions{ResourceMetrics: resourceMetrics, ContainerNameLabel: config.CtrNameLabel}
195-
klog.V(1).InfoS("Using External Metrics", "options", externalClientOptions)
196-
source = input_metrics.NewExternalClient(kubeConfig, clusterState, *externalClientOptions)
197-
} else {
198-
klog.V(1).InfoS("Using Metrics Server")
199-
source = input_metrics.NewPodMetricsesSource(resourceclient.NewForConfigOrDie(kubeConfig))
200-
}
201-
202-
ignoredNamespaces := strings.Split(commonFlag.IgnoredVpaObjectNamespaces, ",")
203-
204-
clusterStateFeeder := input.ClusterStateFeederFactory{
205-
PodLister: podLister,
206-
OOMObserver: oomObserver,
207-
KubeClient: kubeClient,
208-
MetricsClient: input_metrics.NewMetricsClient(source, commonFlag.VpaObjectNamespace, "default-metrics-client"),
209-
VpaCheckpointClient: vpa_clientset.NewForConfigOrDie(kubeConfig).AutoscalingV1(),
210-
VpaLister: vpa_api_util.NewVpasLister(vpa_clientset.NewForConfigOrDie(kubeConfig), make(chan struct{}), commonFlag.VpaObjectNamespace),
211-
VpaCheckpointLister: vpa_api_util.NewVpaCheckpointLister(vpa_clientset.NewForConfigOrDie(kubeConfig), make(chan struct{}), commonFlag.VpaObjectNamespace),
212-
ClusterState: clusterState,
213-
SelectorFetcher: target.NewVpaTargetSelectorFetcher(kubeConfig, kubeClient, factory),
214-
MemorySaveMode: config.MemorySaver,
215-
ControllerFetcher: controllerFetcher,
216-
RecommenderName: config.RecommenderName,
217-
IgnoredNamespaces: ignoredNamespaces,
218-
VpaObjectNamespace: commonFlag.VpaObjectNamespace,
219-
}.Make()
220-
controllerFetcher.Start(ctx, scaleCacheLoopPeriod)
221-
222-
recommender := routines.RecommenderFactory{
223-
ClusterState: clusterState,
224-
ClusterStateFeeder: clusterStateFeeder,
225-
ControllerFetcher: controllerFetcher,
226-
CheckpointWriter: checkpoint.NewCheckpointWriter(clusterState, vpa_clientset.NewForConfigOrDie(kubeConfig).AutoscalingV1()),
227-
VpaClient: vpa_clientset.NewForConfigOrDie(kubeConfig).AutoscalingV1(),
228-
PodResourceRecommender: logic.CreatePodResourceRecommender(logic.RecommendationConfig{
229-
SafetyMarginFraction: config.SafetyMarginFraction,
230-
PodMinCPUMillicores: config.PodMinCPUMillicores,
231-
PodMinMemoryMb: config.PodMinMemoryMb,
232-
TargetCPUPercentile: config.TargetCPUPercentile,
233-
LowerBoundCPUPercentile: config.LowerBoundCPUPercentile,
234-
UpperBoundCPUPercentile: config.UpperBoundCPUPercentile,
235-
ConfidenceIntervalCPU: config.ConfidenceIntervalCPU,
236-
TargetMemoryPercentile: config.TargetMemoryPercentile,
237-
LowerBoundMemoryPercentile: config.LowerBoundMemoryPercentile,
238-
UpperBoundMemoryPercentile: config.UpperBoundMemoryPercentile,
239-
ConfidenceIntervalMemory: config.ConfidenceIntervalMemory,
240-
}),
241-
RecommendationFormat: logic.RecommendationFormat{
242-
HumanizeMemory: config.HumanizeMemory,
243-
RoundCPUMillicores: config.RoundCPUMillicores,
244-
RoundMemoryBytes: config.RoundMemoryBytes,
245-
},
246-
RecommendationPostProcessors: postProcessors,
247-
CheckpointsGCInterval: config.CheckpointsGCInterval,
248-
CheckpointsWriteTimeout: config.CheckpointsWriteTimeout,
249-
UseCheckpoints: useCheckpoints,
250-
UpdateWorkerCount: config.UpdateWorkerCount,
251-
}.Make()
252-
253-
promQueryTimeout, err := time.ParseDuration(config.QueryTimeout)
254-
if err != nil {
255-
klog.ErrorS(err, "Could not parse --prometheus-query-timeout as a time.Duration")
170+
if err := controller.Run(ctx); err != nil {
171+
klog.ErrorS(err, "Recommender controller exited with error")
256172
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
257173
}
258-
259-
if useCheckpoints {
260-
recommender.GetClusterStateFeeder().InitFromCheckpoints(ctx)
261-
} else {
262-
config := history.PrometheusHistoryProviderConfig{
263-
Address: config.PrometheusAddress,
264-
Insecure: config.PrometheusInsecure,
265-
QueryTimeout: promQueryTimeout,
266-
HistoryLength: config.HistoryLength,
267-
HistoryResolution: config.HistoryResolution,
268-
PodLabelPrefix: config.PodLabelPrefix,
269-
PodLabelsMetricName: config.PodLabelsMetricName,
270-
PodNamespaceLabel: config.PodNamespaceLabel,
271-
PodNameLabel: config.PodNameLabel,
272-
CtrNamespaceLabel: config.CtrNamespaceLabel,
273-
CtrPodNameLabel: config.CtrPodNameLabel,
274-
CtrNameLabel: config.CtrNameLabel,
275-
CadvisorMetricsJobName: config.PrometheusJobName,
276-
Namespace: commonFlag.VpaObjectNamespace,
277-
Authentication: history.PrometheusCredentials{
278-
BearerToken: config.PrometheusBearerToken,
279-
Username: config.Username,
280-
Password: config.Password,
281-
},
282-
}
283-
provider, err := history.NewPrometheusHistoryProvider(config)
284-
if err != nil {
285-
klog.ErrorS(err, "Could not initialize history provider")
286-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
287-
}
288-
recommender.GetClusterStateFeeder().InitFromHistoryProvider(provider)
289-
}
290-
291-
// Start updating health check endpoint.
292-
healthCheck.StartMonitoring()
293-
294-
ticker := time.Tick(config.MetricsFetcherInterval)
295-
for range ticker {
296-
recommender.RunOnce()
297-
healthCheck.UpdateLastActivity()
298-
}
299-
}
300-
301-
func initGlobalMaxAllowed() corev1.ResourceList {
302-
result := make(corev1.ResourceList)
303-
if !config.MaxAllowedCPU.IsZero() {
304-
result[corev1.ResourceCPU] = config.MaxAllowedCPU.Quantity
305-
}
306-
if !config.MaxAllowedMemory.IsZero() {
307-
result[corev1.ResourceMemory] = config.MaxAllowedMemory.Quantity
308-
}
309-
310-
return result
311174
}

0 commit comments

Comments
 (0)