@@ -5,8 +5,11 @@ import (
55 "errors"
66 "fmt"
77 "sync"
8+ "sync/atomic"
9+ "time"
810
911 "github.com/go-logr/logr"
12+ "github.com/google/uuid"
1013 "github.com/kong/go-database-reconciler/pkg/file"
1114 "github.com/kong/go-kong/kong"
1215 "github.com/samber/mo"
@@ -18,6 +21,7 @@ import (
1821 "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckgen"
1922 "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate"
2023 "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig"
24+ "github.com/kong/kubernetes-ingress-controller/v3/internal/konnect/tracing"
2125 "github.com/kong/kubernetes-ingress-controller/v3/internal/logging"
2226 "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics"
2327 "github.com/kong/kubernetes-ingress-controller/v3/internal/util"
@@ -45,6 +49,11 @@ type ConfigSynchronizer struct {
4549
4650 targetKongState mo.Option [TargetKongState ]
4751 configLock sync.RWMutex
52+
53+ // synchronizerID is the identifier to mark the ConfigSynchronizer instance.
54+ synchronizerID string
55+ // serialNumber is the serial number to mark the loop round of config synchronization.
56+ serialNumber atomic.Uint32
4857}
4958
5059// TargetKongState wraps the Kong state to be uploaded to Konnect and indicates whether the configuration is a fallback
@@ -74,18 +83,23 @@ type ConfigSynchronizerParams struct {
7483 ConfigChangeDetector sendconfig.ConfigurationChangeDetector
7584 ConfigStatusNotifier clients.ConfigStatusNotifier
7685 MetricsRecorder metrics.Recorder
86+ SynchronizerID string
7787}
7888
7989func NewConfigSynchronizer (p ConfigSynchronizerParams ) * ConfigSynchronizer {
90+ if p .SynchronizerID == "" {
91+ p .SynchronizerID = uuid .NewString ()
92+ }
8093 return & ConfigSynchronizer {
81- logger : p .Logger ,
94+ logger : p .Logger . WithValues ( "synchronizerID" , p . SynchronizerID ) ,
8295 kongConfig : p .KongConfig ,
8396 syncTicker : p .ConfigUploadTicker ,
8497 konnectClientFactory : p .KonnectClientFactory ,
8598 updateStrategyResolver : p .UpdateStrategyResolver ,
8699 configChangeDetector : p .ConfigChangeDetector ,
87100 configStatusNotifier : p .ConfigStatusNotifier ,
88101 metricsRecorder : p .MetricsRecorder ,
102+ synchronizerID : p .SynchronizerID ,
89103 }
90104}
91105
@@ -94,6 +108,7 @@ var _ manager.LeaderElectionRunnable = &ConfigSynchronizer{}
94108// Start starts the loop to receive configuration and upload configuration to Konnect.
95109func (s * ConfigSynchronizer ) Start (ctx context.Context ) error {
96110 s .logger .Info ("Starting Konnect configuration synchronizer" )
111+ ctx = context .WithValue (ctx , tracing .SynchronizerIDKey , s .synchronizerID )
97112
98113 konnectAdminClient , err := s .konnectClientFactory .NewKonnectClient (ctx )
99114 if err != nil {
@@ -199,26 +214,36 @@ func (s *ConfigSynchronizer) run(ctx context.Context) {
199214}
200215
201216func (s * ConfigSynchronizer ) handleConfigSynchronizationTick (ctx context.Context ) {
202- s .logger .V (logging .DebugLevel ).Info ("Start uploading configuration to Konnect" )
217+ // Add values about the sync round in the context.
218+ serialNumber := s .serialNumber .Add (1 )
219+ startTimestamp := time .Now ().Unix ()
220+ syncRoundID := uuid .NewSHA1 (uuid .Nil , fmt .Appendf ([]byte {}, "%s:%d:%d" , s .synchronizerID , serialNumber , startTimestamp ))
221+ ctx = context .WithValue (ctx , tracing .SyncSerialNumberKey , serialNumber )
222+ ctx = context .WithValue (ctx , tracing .SyncStartTimestampKey , startTimestamp )
223+ ctx = context .WithValue (ctx , tracing .SyncRoundIDKey , syncRoundID )
224+ logger := s .logger .WithValues ("syncRoundID" , syncRoundID , "serialNumber" , serialNumber )
225+
226+ logger .V (logging .DebugLevel ).Info ("Start uploading configuration to Konnect" )
203227
204228 // Get the latest configuration copy to upload to Konnect. We don't want to hold the lock for a long time to prevent
205229 // blocking the update of the configuration.
206230 targetCfg , ok := s .currentContent (ctx )
207231 if ! ok {
208- s . logger .Info ("No configuration received yet, skipping Konnect configuration synchronization" )
232+ logger .Info ("No configuration received yet, skipping Konnect configuration synchronization" )
209233 return
210234 }
211235
212236 // Upload the configuration to Konnect.
213- if err := s .uploadConfig (ctx , s .konnectAdminClient , targetCfg ); err != nil {
214- s . logger .Error (err , "Failed to upload configuration to Konnect" )
215- logKonnectErrors (s . logger , err )
237+ if err := s .uploadConfig (ctx , logger , s .konnectAdminClient , targetCfg ); err != nil {
238+ logger .Error (err , "Failed to upload configuration to Konnect" )
239+ logKonnectErrors (logger , err )
216240 }
217241}
218242
219243// uploadConfig sends the given configuration to Konnect.
220244func (s * ConfigSynchronizer ) uploadConfig (
221245 ctx context.Context ,
246+ logger logr.Logger ,
222247 client * adminapi.KonnectClient ,
223248 targetContent TargetContent ,
224249) error {
@@ -229,7 +254,7 @@ func (s *ConfigSynchronizer) uploadConfig(
229254
230255 newSHA , err := sendconfig .PerformUpdate (
231256 ctx ,
232- s . logger ,
257+ logger ,
233258 client ,
234259 s .kongConfig ,
235260 targetContent .Content ,
0 commit comments