Skip to content

Commit 7a2218b

Browse files
committed
working implementation on both target-type IP and NodePort
1 parent 9c74844 commit 7a2218b

File tree

3 files changed

+41
-13
lines changed

3 files changed

+41
-13
lines changed

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func main() {
185185
tgbResManager := targetgroupbinding.NewDefaultResourceManager(mgr.GetClient(), cloud.ELBV2(),
186186
podInfoRepo, networkingManager, vpcInfoProvider, multiClusterManager, lbcMetricsCollector,
187187
cloud.VpcID(), controllerCFG.FeatureGates.Enabled(config.EndpointsFailOpen), controllerCFG.EnableEndpointSlices,
188-
mgr.GetEventRecorderFor("targetGroupBinding"), ctrl.Log)
188+
mgr.GetEventRecorderFor("targetGroupBinding"), ctrl.Log, controllerCFG.MaxTargetsPerInstance)
189189
backendSGProvider := networking.NewBackendSGProvider(controllerCFG.ClusterName, controllerCFG.BackendSecurityGroup,
190190
cloud.VpcID(), cloud.EC2(), mgr.GetClient(), controllerCFG.DefaultTags, nlbGatewayEnabled || albGatewayEnabled, ctrl.Log.WithName("backend-sg-provider"))
191191
sgResolver := networking.NewDefaultSecurityGroupResolver(cloud.EC2(), cloud.VpcID())

pkg/targetgroupbinding/resource_manager.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ func NewDefaultResourceManager(k8sClient client.Client, elbv2Client services.ELB
4848
podInfoRepo k8s.PodInfoRepo, networkingManager networking.NetworkingManager,
4949
vpcInfoProvider networking.VPCInfoProvider, multiClusterManager MultiClusterManager, metricsCollector lbcmetrics.MetricCollector,
5050
vpcID string, failOpenEnabled bool, endpointSliceEnabled bool,
51-
eventRecorder record.EventRecorder, logger logr.Logger) *defaultResourceManager {
51+
eventRecorder record.EventRecorder, logger logr.Logger, maxTargetsPerInstance int) *defaultResourceManager {
52+
5253
targetsManager := NewCachedTargetsManager(elbv2Client, logger)
5354
endpointResolver := backend.NewDefaultEndpointResolver(k8sClient, podInfoRepo, failOpenEnabled, endpointSliceEnabled, logger)
5455
return &defaultResourceManager{
@@ -61,6 +62,7 @@ func NewDefaultResourceManager(k8sClient client.Client, elbv2Client services.ELB
6162
vpcID: vpcID,
6263
vpcInfoProvider: vpcInfoProvider,
6364
podInfoRepo: podInfoRepo,
65+
maxTargetsPerInstance: maxTargetsPerInstance,
6466
multiClusterManager: multiClusterManager,
6567
metricsCollector: metricsCollector,
6668

@@ -83,6 +85,7 @@ type defaultResourceManager struct {
8385
logger logr.Logger
8486
vpcInfoProvider networking.VPCInfoProvider
8587
podInfoRepo k8s.PodInfoRepo
88+
maxTargetsPerInstance int
8689
multiClusterManager MultiClusterManager
8790
metricsCollector lbcmetrics.MetricCollector
8891
vpcID string
@@ -180,6 +183,7 @@ func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context,
180183
if err != nil {
181184
return "", "", false, ctrlerrors.NewErrorWithMetrics(controllerName, "list_targets_error", err, m.metricsCollector)
182185
}
186+
totalTargets := len(targets)
183187

184188
notDrainingTargets, _ := partitionTargetsByDrainingStatus(targets)
185189
matchedEndpointAndTargets, unmatchedEndpoints, unmatchedTargets := matchPodEndpointWithTargets(endpoints, notDrainingTargets)
@@ -240,6 +244,23 @@ func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context,
240244
return "", "", false, ctrlerrors.NewErrorWithMetrics(controllerName, "update_tracked_ip_targets_error", err, m.metricsCollector)
241245
}
242246

247+
// Log that we're witholding target additions to prevent exceeding max-targets-per-instance
248+
var limitedUnmatchedEndpoints []backend.PodEndpoint
249+
250+
if m.maxTargetsPerInstance > 0 && len(unmatchedEndpoints) + totalTargets > m.maxTargetsPerInstance {
251+
maxAdditions := m.maxTargetsPerInstance - totalTargets
252+
if maxAdditions > 0 {
253+
limitedUnmatchedEndpoints = unmatchedEndpoints[:maxAdditions]
254+
}
255+
tgbScopedLogger.Info("Limiting target additions due to max-targets-per-instance configuration",
256+
"currentTargets", totalTargets,
257+
"maxTargetsPerInstance", m.maxTargetsPerInstance,
258+
"proposedAdditions", len(unmatchedEndpoints),
259+
"numberOmitted", len(unmatchedEndpoints) - len(limitedUnmatchedEndpoints))
260+
261+
unmatchedEndpoints = limitedUnmatchedEndpoints
262+
}
263+
243264
if err := m.registerPodEndpoints(ctx, tgb, unmatchedEndpoints); err != nil {
244265
return "", "", false, ctrlerrors.NewErrorWithMetrics(controllerName, "register_pod_endpoint_error", err, m.metricsCollector)
245266
}
@@ -307,6 +328,7 @@ func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Con
307328
if err != nil {
308329
return "", "", false, ctrlerrors.NewErrorWithMetrics(controllerName, "list_targets_error", err, m.metricsCollector)
309330
}
331+
totalTargets := len(targets)
310332

311333
notDrainingTargets, _ := partitionTargetsByDrainingStatus(targets)
312334

@@ -341,6 +363,22 @@ func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Con
341363
return "", "", false, ctrlerrors.NewErrorWithMetrics(controllerName, "update_tracked_instance_targets_error", err, m.metricsCollector)
342364
}
343365

366+
var limitedUnmatchedEndpoints []backend.NodePortEndpoint
367+
368+
if m.maxTargetsPerInstance > 0 && len(unmatchedEndpoints) + totalTargets > m.maxTargetsPerInstance {
369+
maxAdditions := m.maxTargetsPerInstance - totalTargets
370+
if maxAdditions > 0 {
371+
limitedUnmatchedEndpoints = unmatchedEndpoints[:maxAdditions]
372+
}
373+
tgbScopedLogger.Info("Limiting target additions due to max-targets-per-instance configuration",
374+
"currentTargets", totalTargets,
375+
"maxTargetsPerInstance", m.maxTargetsPerInstance,
376+
"proposedAdditions", len(unmatchedEndpoints),
377+
"numberOmitted", len(unmatchedEndpoints) - len(limitedUnmatchedEndpoints))
378+
379+
unmatchedEndpoints = limitedUnmatchedEndpoints
380+
}
381+
344382
if err := m.registerNodePortEndpoints(ctx, tgb, unmatchedEndpoints); err != nil {
345383
return "", "", false, ctrlerrors.NewErrorWithMetrics(controllerName, "update_node_port_endpoints_error", err, m.metricsCollector)
346384
}

pkg/targetgroupbinding/targets_manager.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ const (
1818
defaultTargetsCacheTTL = 5 * time.Minute
1919
defaultRegisterTargetsChunkSize = 200
2020
defaultDeregisterTargetsChunkSize = 200
21-
maxTargetsPerInstance = 500
2221
)
2322

2423
// TargetsManager is an abstraction around ELBV2's targets API.
@@ -81,16 +80,7 @@ type targetsCacheItem struct {
8180

8281
func (m *cachedTargetsManager) RegisterTargets(ctx context.Context, tgb *elbv2api.TargetGroupBinding, targets []elbv2types.TargetDescription) error {
8382
tgARN := tgb.Spec.TargetGroupARN
84-
sampledTargets := targets
85-
86-
m.logger.Info("Number of targets", len(targets), "registering a subset per max-targets-per-instance", maxTargetsPerInstance)
87-
if maxTargetsPerInstance > 0 && len(targets) > maxTargetsPerInstance {
88-
m.logger.Info("Max number of targets exceeded", len(targets), "registering a subset per max-targets-per-instance", maxTargetsPerInstance)
89-
m.logger.Info("Max number of targets exceeded", len(targets), "registering a subset per max-targets-per-instance", maxTargetsPerInstance)
90-
sampledTargets = targets[:maxTargetsPerInstance]
91-
}
92-
93-
targetsChunks := chunkTargetDescriptions(sampledTargets, m.registerTargetsChunkSize)
83+
targetsChunks := chunkTargetDescriptions(targets, m.registerTargetsChunkSize)
9484
for _, targetsChunk := range targetsChunks {
9585
req := &elbv2sdk.RegisterTargetsInput{
9686
TargetGroupArn: aws.String(tgARN),

0 commit comments

Comments
 (0)