@@ -160,6 +160,12 @@ func main() {
160160managerOpts .RetryPeriod = & retryPeriod
161161}
162162
163+ var maxConcurrentReconciles int
164+ if maxConcurrentReconcilesEnvValue := getIntEnv (controllers .MaxConcurrentReconciles ); maxConcurrentReconcilesEnvValue > 0 {
165+ maxConcurrentReconciles = maxConcurrentReconcilesEnvValue
166+ log .Info (fmt .Sprintf ("maxConcurrentReconciles set to %d" , maxConcurrentReconciles ))
167+ }
168+
163169if enableDebugPprof , ok := os .LookupEnv ("ENABLE_DEBUG_PPROF" ); ok {
164170pprofEnabled , err := strconv .ParseBool (enableDebugPprof )
165171if err == nil && pprofEnabled {
@@ -187,6 +193,7 @@ func main() {
187193KubernetesClusterDomain : clusterDomain ,
188194ReconcileFunc : & controllers.QueueReconciler {},
189195ConnectUsingPlainHTTP : usePlainHTTP ,
196+ MaxConcurrentReconciles : maxConcurrentReconciles ,
190197}).SetupWithManager (mgr ); err != nil {
191198log .Error (err , "unable to create controller" , "controller" , controllers .QueueControllerName )
192199os .Exit (1 )
@@ -202,6 +209,7 @@ func main() {
202209KubernetesClusterDomain : clusterDomain ,
203210ReconcileFunc : & controllers.ExchangeReconciler {},
204211ConnectUsingPlainHTTP : usePlainHTTP ,
212+ MaxConcurrentReconciles : maxConcurrentReconciles ,
205213}).SetupWithManager (mgr ); err != nil {
206214log .Error (err , "unable to create controller" , "controller" , controllers .ExchangeControllerName )
207215os .Exit (1 )
@@ -217,6 +225,7 @@ func main() {
217225KubernetesClusterDomain : clusterDomain ,
218226ReconcileFunc : & controllers.BindingReconciler {},
219227ConnectUsingPlainHTTP : usePlainHTTP ,
228+ MaxConcurrentReconciles : maxConcurrentReconciles ,
220229}).SetupWithManager (mgr ); err != nil {
221230log .Error (err , "unable to create controller" , "controller" , controllers .BindingControllerName )
222231os .Exit (1 )
@@ -233,6 +242,7 @@ func main() {
233242WatchTypes : []client.Object {},
234243ReconcileFunc : & controllers.UserReconciler {Client : mgr .GetClient (), Scheme : mgr .GetScheme ()},
235244ConnectUsingPlainHTTP : usePlainHTTP ,
245+ MaxConcurrentReconciles : maxConcurrentReconciles ,
236246}).SetupWithManager (mgr ); err != nil {
237247log .Error (err , "unable to create controller" , "controller" , controllers .UserControllerName )
238248os .Exit (1 )
@@ -248,6 +258,7 @@ func main() {
248258KubernetesClusterDomain : clusterDomain ,
249259ReconcileFunc : & controllers.VhostReconciler {Client : mgr .GetClient ()},
250260ConnectUsingPlainHTTP : usePlainHTTP ,
261+ MaxConcurrentReconciles : maxConcurrentReconciles ,
251262}).SetupWithManager (mgr ); err != nil {
252263log .Error (err , "unable to create controller" , "controller" , controllers .VhostControllerName )
253264os .Exit (1 )
@@ -263,6 +274,7 @@ func main() {
263274KubernetesClusterDomain : clusterDomain ,
264275ReconcileFunc : & controllers.PolicyReconciler {},
265276ConnectUsingPlainHTTP : usePlainHTTP ,
277+ MaxConcurrentReconciles : maxConcurrentReconciles ,
266278}).SetupWithManager (mgr ); err != nil {
267279log .Error (err , "unable to create controller" , "controller" , controllers .PolicyControllerName )
268280os .Exit (1 )
@@ -278,6 +290,7 @@ func main() {
278290KubernetesClusterDomain : clusterDomain ,
279291ReconcileFunc : & controllers.OperatorPolicyReconciler {},
280292ConnectUsingPlainHTTP : usePlainHTTP ,
293+ MaxConcurrentReconciles : maxConcurrentReconciles ,
281294}).SetupWithManager (mgr ); err != nil {
282295log .Error (err , "unable to create controller" , "controller" , controllers .OperatorPolicyControllerName )
283296os .Exit (1 )
@@ -293,6 +306,7 @@ func main() {
293306KubernetesClusterDomain : clusterDomain ,
294307ReconcileFunc : & controllers.PermissionReconciler {Client : mgr .GetClient (), Scheme : mgr .GetScheme ()},
295308ConnectUsingPlainHTTP : usePlainHTTP ,
309+ MaxConcurrentReconciles : maxConcurrentReconciles ,
296310}).SetupWithManager (mgr ); err != nil {
297311log .Error (err , "unable to create controller" , "controller" , controllers .PermissionControllerName )
298312os .Exit (1 )
@@ -308,6 +322,7 @@ func main() {
308322KubernetesClusterDomain : clusterDomain ,
309323ReconcileFunc : & controllers.SchemaReplicationReconciler {Client : mgr .GetClient ()},
310324ConnectUsingPlainHTTP : usePlainHTTP ,
325+ MaxConcurrentReconciles : maxConcurrentReconciles ,
311326}).SetupWithManager (mgr ); err != nil {
312327log .Error (err , "unable to create controller" , "controller" , controllers .SchemaReplicationControllerName )
313328os .Exit (1 )
@@ -323,6 +338,7 @@ func main() {
323338KubernetesClusterDomain : clusterDomain ,
324339ReconcileFunc : & controllers.FederationReconciler {Client : mgr .GetClient ()},
325340ConnectUsingPlainHTTP : usePlainHTTP ,
341+ MaxConcurrentReconciles : maxConcurrentReconciles ,
326342}).SetupWithManager (mgr ); err != nil {
327343log .Error (err , "unable to create controller" , "controller" , controllers .FederationControllerName )
328344os .Exit (1 )
@@ -338,6 +354,7 @@ func main() {
338354KubernetesClusterDomain : clusterDomain ,
339355ReconcileFunc : & controllers.ShovelReconciler {Client : mgr .GetClient ()},
340356ConnectUsingPlainHTTP : usePlainHTTP ,
357+ MaxConcurrentReconciles : maxConcurrentReconciles ,
341358}).SetupWithManager (mgr ); err != nil {
342359log .Error (err , "unable to create controller" , "controller" , controllers .ShovelControllerName )
343360os .Exit (1 )
@@ -353,17 +370,19 @@ func main() {
353370KubernetesClusterDomain : clusterDomain ,
354371ReconcileFunc : & controllers.TopicPermissionReconciler {Client : mgr .GetClient (), Scheme : mgr .GetScheme ()},
355372ConnectUsingPlainHTTP : usePlainHTTP ,
373+ MaxConcurrentReconciles : maxConcurrentReconciles ,
356374}).SetupWithManager (mgr ); err != nil {
357375log .Error (err , "unable to create controller" , "controller" , controllers .TopicPermissionControllerName )
358376os .Exit (1 )
359377}
360378
361379if err = (& controllers.SuperStreamReconciler {
362- Client : mgr .GetClient (),
363- Log : ctrl .Log .WithName (controllers .SuperStreamControllerName ),
364- Scheme : mgr .GetScheme (),
365- Recorder : mgr .GetEventRecorderFor (controllers .SuperStreamControllerName ),
366- RabbitmqClientFactory : rabbitmqclient .RabbitholeClientFactory ,
380+ Client : mgr .GetClient (),
381+ Log : ctrl .Log .WithName (controllers .SuperStreamControllerName ),
382+ Scheme : mgr .GetScheme (),
383+ Recorder : mgr .GetEventRecorderFor (controllers .SuperStreamControllerName ),
384+ RabbitmqClientFactory : rabbitmqclient .RabbitholeClientFactory ,
385+ MaxConcurrentReconciles : maxConcurrentReconciles ,
367386}).SetupWithManager (mgr ); err != nil {
368387log .Error (err , "unable to create controller" , "controller" , controllers .SuperStreamControllerName )
369388os .Exit (1 )
@@ -456,3 +475,15 @@ func getBoolEnv(envName string) bool {
456475}
457476return boolVar
458477}
478+
479+ func getIntEnv (envName string ) int {
480+ var intVar int
481+ if initStr , ok := os .LookupEnv (envName ); ok {
482+ var err error
483+ if intVar , err = strconv .Atoi (initStr ); err != nil {
484+ log .Error (err , fmt .Sprintf ("unable to parse provided '%s'" , envName ))
485+ os .Exit (1 )
486+ }
487+ }
488+ return intVar
489+ }
0 commit comments