Skip to content

Commit 6b82030

Browse files
amirejazjhrozek
andauthored
Enable MCPRemoteProxy discovery in MCPGroup and VirtualMCPServer (#2970)
* add MCPRemoteProxy support in MCPGroup and VirtualMCPServer * use list instead of get in loop * refactor and change the name * operator crds bump * fix unit tests * updated crd-ref gen * SetupWithManager add a Watches() call for MCPRemoteProxy resources * test coverage for findReferencingMCPRemoteProxies, findMCPGroupForMCPRemoteProxy, updateReferencingRemoteProxiesOnDeletion * consolidating status updates at the end of reconciliation * clear both status after entering failed status * removed GroupRefValidated condition when the groupRef is empty to remove any stale data * adding test cases for: MCPRemoteProxy discovery, Mixed groups (both MCPServer and MCPRemoteProxy), mcpRemoteProxyToBackend conversion * adding test cases with WorkloadTypeMCPRemoteProxy to verify the aggregator handles both workload types correctly * adding backend.Metadata[workload_type] = mcp_server in mcpServerToBackend as well. * unit tests to verify the adapter correctly converts workload types * consolidate the discoverRemoteProxyAuthConfig and discoverAuthConfig * bump version * operator-crds version bump --------- Co-authored-by: Jakub Hrozek <jakub.hrozek@posteo.se>
1 parent 6db8faa commit 6b82030

31 files changed

+3118
-248
lines changed

cmd/thv-operator/api/v1alpha1/mcpgroup_types.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,22 @@ type MCPGroupStatus struct {
1818
// +kubebuilder:default=Pending
1919
Phase MCPGroupPhase `json:"phase,omitempty"`
2020

21-
// Servers lists server names in this group
21+
// Servers lists MCPServer names in this group
2222
// +optional
2323
Servers []string `json:"servers"`
2424

25-
// ServerCount is the number of servers
25+
// ServerCount is the number of MCPServers
2626
// +optional
2727
ServerCount int `json:"serverCount"`
2828

29+
// RemoteProxies lists MCPRemoteProxy names in this group
30+
// +optional
31+
RemoteProxies []string `json:"remoteProxies,omitempty"`
32+
33+
// RemoteProxyCount is the number of MCPRemoteProxies
34+
// +optional
35+
RemoteProxyCount int `json:"remoteProxyCount,omitempty"`
36+
2937
// Conditions represent observations
3038
// +optional
3139
Conditions []metav1.Condition `json:"conditions,omitempty"`

cmd/thv-operator/api/v1alpha1/mcpremoteproxy_types.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ type MCPRemoteProxySpec struct {
6666
// ResourceOverrides allows overriding annotations and labels for resources created by the operator
6767
// +optional
6868
ResourceOverrides *ResourceOverrides `json:"resourceOverrides,omitempty"`
69+
70+
// GroupRef is the name of the MCPGroup this proxy belongs to
71+
// Must reference an existing MCPGroup in the same namespace
72+
// +optional
73+
GroupRef string `json:"groupRef,omitempty"`
6974
}
7075

7176
// MCPRemoteProxyStatus defines the observed state of MCPRemoteProxy
@@ -131,6 +136,9 @@ const (
131136

132137
// ConditionTypeAuthConfigured indicates whether authentication is properly configured
133138
ConditionTypeAuthConfigured = "AuthConfigured"
139+
140+
// ConditionTypeMCPRemoteProxyGroupRefValidated indicates whether the GroupRef is valid
141+
ConditionTypeMCPRemoteProxyGroupRefValidated = "GroupRefValidated"
134142
)
135143

136144
// Condition reasons for MCPRemoteProxy
@@ -155,6 +163,15 @@ const (
155163

156164
// ConditionReasonMissingOIDCConfig indicates OIDCConfig is not specified
157165
ConditionReasonMissingOIDCConfig = "MissingOIDCConfig"
166+
167+
// ConditionReasonMCPRemoteProxyGroupRefValidated indicates the GroupRef is valid
168+
ConditionReasonMCPRemoteProxyGroupRefValidated = "GroupRefIsValid"
169+
170+
// ConditionReasonMCPRemoteProxyGroupRefNotFound indicates the GroupRef is invalid
171+
ConditionReasonMCPRemoteProxyGroupRefNotFound = "GroupRefNotFound"
172+
173+
// ConditionReasonMCPRemoteProxyGroupRefNotReady indicates the referenced MCPGroup is not in the Ready state
174+
ConditionReasonMCPRemoteProxyGroupRefNotReady = "GroupRefNotReady"
158175
)
159176

160177
//+kubebuilder:object:root=true

cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/thv-operator/controllers/mcpgroup_controller.go

Lines changed: 206 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type MCPGroupReconciler struct {
3333
// +kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=mcpgroups/finalizers,verbs=update
3434
// +kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=mcpservers,verbs=get;list;watch
3535
// +kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=mcpservers/status,verbs=get;update;patch
36+
// +kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=mcpremoteproxies,verbs=get;list;watch
37+
// +kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=mcpremoteproxies/status,verbs=get;update;patch
3638

3739
// Reconcile is part of the main kubernetes reconciliation loop
3840
// which aims to move the current state of the cluster closer to the desired state.
@@ -71,68 +73,137 @@ func (r *MCPGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
7173
return ctrl.Result{RequeueAfter: 500 * time.Millisecond}, nil
7274
}
7375

76+
// Find and update status for MCPServers and MCPRemoteProxies
77+
result, err := r.updateGroupMemberStatus(ctx, mcpGroup)
78+
if err != nil || result.RequeueAfter > 0 {
79+
return result, err
80+
}
81+
82+
ctxLogger.Info("Successfully reconciled MCPGroup",
83+
"serverCount", mcpGroup.Status.ServerCount,
84+
"remoteProxyCount", mcpGroup.Status.RemoteProxyCount)
85+
return ctrl.Result{}, nil
86+
}
87+
88+
// updateGroupMemberStatus finds MCPServers and MCPRemoteProxies referencing the group
89+
// and updates the MCPGroup status accordingly.
90+
func (r *MCPGroupReconciler) updateGroupMemberStatus(
91+
ctx context.Context,
92+
mcpGroup *mcpv1alpha1.MCPGroup,
93+
) (ctrl.Result, error) {
94+
ctxLogger := log.FromContext(ctx)
95+
7496
// Find MCPServers that reference this MCPGroup
7597
mcpServers, err := r.findReferencingMCPServers(ctx, mcpGroup)
7698
if err != nil {
77-
ctxLogger.Error(err, "Failed to list MCPServers")
78-
mcpGroup.Status.Phase = mcpv1alpha1.MCPGroupPhaseFailed
79-
meta.SetStatusCondition(&mcpGroup.Status.Conditions, metav1.Condition{
80-
Type: mcpv1alpha1.ConditionTypeMCPServersChecked,
81-
Status: metav1.ConditionFalse,
82-
Reason: mcpv1alpha1.ConditionReasonListMCPServersFailed,
83-
Message: "Failed to list MCPServers in namespace",
84-
ObservedGeneration: mcpGroup.Generation,
85-
})
86-
mcpGroup.Status.ServerCount = 0
87-
mcpGroup.Status.Servers = nil
88-
// Update the MCPGroup status to reflect the failure
89-
if updateErr := r.Status().Update(ctx, mcpGroup); updateErr != nil {
90-
if errors.IsConflict(err) {
91-
// Requeue to retry with fresh data
92-
return ctrl.Result{Requeue: true}, nil
93-
}
94-
ctxLogger.Error(updateErr, "Failed to update MCPGroup status after list failure")
95-
}
96-
return ctrl.Result{}, nil
99+
return r.handleListFailure(ctx, mcpGroup, err, "MCPServers")
100+
}
101+
102+
// Find MCPRemoteProxies that reference this MCPGroup
103+
mcpRemoteProxies, err := r.findReferencingMCPRemoteProxies(ctx, mcpGroup)
104+
if err != nil {
105+
return r.handleListFailure(ctx, mcpGroup, err, "MCPRemoteProxies")
97106
}
98107

99108
meta.SetStatusCondition(&mcpGroup.Status.Conditions, metav1.Condition{
100109
Type: mcpv1alpha1.ConditionTypeMCPServersChecked,
101110
Status: metav1.ConditionTrue,
102111
Reason: mcpv1alpha1.ConditionReasonListMCPServersSucceeded,
103-
Message: "Successfully listed MCPServers in namespace",
112+
Message: "Successfully listed MCPServers and MCPRemoteProxies in namespace",
104113
ObservedGeneration: mcpGroup.Generation,
105114
})
106115

107-
// Set MCPGroup status fields
108-
mcpGroup.Status.ServerCount = len(mcpServers)
109-
if len(mcpServers) == 0 {
110-
mcpGroup.Status.Servers = []string{}
111-
} else {
112-
// If there are servers, extract their names
113-
mcpGroup.Status.Servers = make([]string, len(mcpServers))
114-
for i, server := range mcpServers {
115-
mcpGroup.Status.Servers[i] = server.Name
116-
}
117-
sort.Strings(mcpGroup.Status.Servers)
118-
}
116+
// Set MCPGroup status fields for MCPServers
117+
r.populateServerStatus(mcpGroup, mcpServers)
118+
119+
// Set MCPGroup status fields for MCPRemoteProxies
120+
r.populateRemoteProxyStatus(mcpGroup, mcpRemoteProxies)
119121

120122
mcpGroup.Status.Phase = mcpv1alpha1.MCPGroupPhaseReady
121123

122124
// Update the MCPGroup status
123125
if err := r.Status().Update(ctx, mcpGroup); err != nil {
124126
if errors.IsConflict(err) {
125-
// Requeue to retry with fresh data
126-
return ctrl.Result{Requeue: true}, nil
127+
return ctrl.Result{RequeueAfter: 500 * time.Millisecond}, nil
127128
}
128129
ctxLogger.Error(err, "Failed to update MCPGroup status")
129130
return ctrl.Result{}, err
130131
}
131132

132-
ctxLogger.Info("Successfully reconciled MCPGroup", "serverCount", mcpGroup.Status.ServerCount)
133+
ctxLogger.Info("Successfully reconciled MCPGroup",
134+
"serverCount", mcpGroup.Status.ServerCount,
135+
"remoteProxyCount", mcpGroup.Status.RemoteProxyCount)
136+
return ctrl.Result{}, nil
137+
}
138+
139+
// handleListFailure handles the case when listing MCPServers or MCPRemoteProxies fails.
140+
func (r *MCPGroupReconciler) handleListFailure(
141+
ctx context.Context,
142+
mcpGroup *mcpv1alpha1.MCPGroup,
143+
listErr error,
144+
resourceType string,
145+
) (ctrl.Result, error) {
146+
ctxLogger := log.FromContext(ctx)
147+
ctxLogger.Error(listErr, "Failed to list "+resourceType)
148+
149+
mcpGroup.Status.Phase = mcpv1alpha1.MCPGroupPhaseFailed
150+
meta.SetStatusCondition(&mcpGroup.Status.Conditions, metav1.Condition{
151+
Type: mcpv1alpha1.ConditionTypeMCPServersChecked,
152+
Status: metav1.ConditionFalse,
153+
Reason: mcpv1alpha1.ConditionReasonListMCPServersFailed,
154+
Message: "Failed to list " + resourceType + " in namespace",
155+
ObservedGeneration: mcpGroup.Generation,
156+
})
157+
158+
// Clear both resource types' status fields to avoid stale data when entering Failed state
159+
mcpGroup.Status.ServerCount = 0
160+
mcpGroup.Status.Servers = nil
161+
mcpGroup.Status.RemoteProxyCount = 0
162+
mcpGroup.Status.RemoteProxies = nil
163+
164+
if updateErr := r.Status().Update(ctx, mcpGroup); updateErr != nil {
165+
if errors.IsConflict(updateErr) {
166+
return ctrl.Result{RequeueAfter: 500 * time.Millisecond}, nil
167+
}
168+
ctxLogger.Error(updateErr, "Failed to update MCPGroup status after list failure")
169+
}
133170
return ctrl.Result{}, nil
134171
}
135172

173+
// populateServerStatus populates the MCPGroup status with MCPServer information.
174+
func (*MCPGroupReconciler) populateServerStatus(
175+
mcpGroup *mcpv1alpha1.MCPGroup,
176+
mcpServers []mcpv1alpha1.MCPServer,
177+
) {
178+
mcpGroup.Status.ServerCount = len(mcpServers)
179+
if len(mcpServers) == 0 {
180+
mcpGroup.Status.Servers = []string{}
181+
return
182+
}
183+
mcpGroup.Status.Servers = make([]string, len(mcpServers))
184+
for i, server := range mcpServers {
185+
mcpGroup.Status.Servers[i] = server.Name
186+
}
187+
sort.Strings(mcpGroup.Status.Servers)
188+
}
189+
190+
// populateRemoteProxyStatus populates the MCPGroup status with MCPRemoteProxy information.
191+
func (*MCPGroupReconciler) populateRemoteProxyStatus(
192+
mcpGroup *mcpv1alpha1.MCPGroup,
193+
mcpRemoteProxies []mcpv1alpha1.MCPRemoteProxy,
194+
) {
195+
mcpGroup.Status.RemoteProxyCount = len(mcpRemoteProxies)
196+
if len(mcpRemoteProxies) == 0 {
197+
mcpGroup.Status.RemoteProxies = []string{}
198+
return
199+
}
200+
mcpGroup.Status.RemoteProxies = make([]string, len(mcpRemoteProxies))
201+
for i, proxy := range mcpRemoteProxies {
202+
mcpGroup.Status.RemoteProxies[i] = proxy.Name
203+
}
204+
sort.Strings(mcpGroup.Status.RemoteProxies)
205+
}
206+
136207
// handleDeletion handles the deletion of an MCPGroup
137208
func (r *MCPGroupReconciler) handleDeletion(ctx context.Context, mcpGroup *mcpv1alpha1.MCPGroup) (ctrl.Result, error) {
138209
ctxLogger := log.FromContext(ctx)
@@ -151,6 +222,19 @@ func (r *MCPGroupReconciler) handleDeletion(ctx context.Context, mcpGroup *mcpv1
151222
r.updateReferencingServersOnDeletion(ctx, referencingServers, mcpGroup.Name)
152223
}
153224

225+
// Find all MCPRemoteProxies that reference this group
226+
referencingProxies, err := r.findReferencingMCPRemoteProxies(ctx, mcpGroup)
227+
if err != nil {
228+
ctxLogger.Error(err, "Failed to find referencing MCPRemoteProxies during deletion")
229+
return ctrl.Result{}, err
230+
}
231+
232+
// Update conditions on all referencing MCPRemoteProxies to indicate the group is being deleted
233+
if len(referencingProxies) > 0 {
234+
ctxLogger.Info("Updating conditions on referencing MCPRemoteProxies", "count", len(referencingProxies))
235+
r.updateReferencingRemoteProxiesOnDeletion(ctx, referencingProxies, mcpGroup.Name)
236+
}
237+
154238
// Remove the finalizer to allow deletion
155239
controllerutil.RemoveFinalizer(mcpGroup, MCPGroupFinalizerName)
156240
if err := r.Update(ctx, mcpGroup); err != nil {
@@ -183,6 +267,22 @@ func (r *MCPGroupReconciler) findReferencingMCPServers(
183267
return mcpServerList.Items, nil
184268
}
185269

270+
// findReferencingMCPRemoteProxies finds all MCPRemoteProxies that reference the given MCPGroup
271+
func (r *MCPGroupReconciler) findReferencingMCPRemoteProxies(
272+
ctx context.Context, mcpGroup *mcpv1alpha1.MCPGroup) ([]mcpv1alpha1.MCPRemoteProxy, error) {
273+
274+
mcpRemoteProxyList := &mcpv1alpha1.MCPRemoteProxyList{}
275+
listOpts := []client.ListOption{
276+
client.InNamespace(mcpGroup.Namespace),
277+
client.MatchingFields{"spec.groupRef": mcpGroup.Name},
278+
}
279+
if err := r.List(ctx, mcpRemoteProxyList, listOpts...); err != nil {
280+
return nil, err
281+
}
282+
283+
return mcpRemoteProxyList.Items, nil
284+
}
285+
186286
// updateReferencingServersOnDeletion updates the conditions on MCPServers to indicate their group is being deleted
187287
func (r *MCPGroupReconciler) updateReferencingServersOnDeletion(
188288
ctx context.Context, servers []mcpv1alpha1.MCPServer, groupName string) {
@@ -210,6 +310,33 @@ func (r *MCPGroupReconciler) updateReferencingServersOnDeletion(
210310
}
211311
}
212312

313+
// updateReferencingRemoteProxiesOnDeletion updates the conditions on MCPRemoteProxies to indicate their group is being deleted
314+
func (r *MCPGroupReconciler) updateReferencingRemoteProxiesOnDeletion(
315+
ctx context.Context, proxies []mcpv1alpha1.MCPRemoteProxy, groupName string) {
316+
ctxLogger := log.FromContext(ctx)
317+
318+
for _, proxy := range proxies {
319+
// Update the condition to indicate the group is being deleted
320+
meta.SetStatusCondition(&proxy.Status.Conditions, metav1.Condition{
321+
Type: mcpv1alpha1.ConditionTypeMCPRemoteProxyGroupRefValidated,
322+
Status: metav1.ConditionFalse,
323+
Reason: mcpv1alpha1.ConditionReasonMCPRemoteProxyGroupRefNotFound,
324+
Message: "Referenced MCPGroup is being deleted",
325+
ObservedGeneration: proxy.Generation,
326+
})
327+
328+
// Update the proxy status
329+
if err := r.Status().Update(ctx, &proxy); err != nil {
330+
ctxLogger.Error(err, "Failed to update MCPRemoteProxy condition during group deletion",
331+
"mcpremoteproxy", proxy.Name, "mcpgroup", groupName)
332+
// Continue with other proxies even if one fails
333+
continue
334+
}
335+
ctxLogger.Info("Updated MCPRemoteProxy condition for group deletion",
336+
"mcpremoteproxy", proxy.Name, "mcpgroup", groupName)
337+
}
338+
}
339+
213340
func (r *MCPGroupReconciler) findMCPGroupForMCPServer(ctx context.Context, obj client.Object) []ctrl.Request {
214341
ctxLogger := log.FromContext(ctx)
215342

@@ -248,12 +375,55 @@ func (r *MCPGroupReconciler) findMCPGroupForMCPServer(ctx context.Context, obj c
248375
}
249376
}
250377

378+
func (r *MCPGroupReconciler) findMCPGroupForMCPRemoteProxy(ctx context.Context, obj client.Object) []ctrl.Request {
379+
ctxLogger := log.FromContext(ctx)
380+
381+
// Get the MCPRemoteProxy object
382+
mcpRemoteProxy, ok := obj.(*mcpv1alpha1.MCPRemoteProxy)
383+
if !ok {
384+
ctxLogger.Error(nil, "Object is not an MCPRemoteProxy", "object", obj.GetName())
385+
return []ctrl.Request{}
386+
}
387+
if mcpRemoteProxy.Spec.GroupRef == "" {
388+
// No MCPGroup reference, nothing to do
389+
return []ctrl.Request{}
390+
}
391+
392+
// Find which MCPGroup this MCPRemoteProxy belongs to
393+
ctxLogger.Info(
394+
"Finding MCPGroup for MCPRemoteProxy",
395+
"namespace",
396+
obj.GetNamespace(),
397+
"mcpremoteproxy",
398+
obj.GetName(),
399+
"groupRef",
400+
mcpRemoteProxy.Spec.GroupRef)
401+
group := &mcpv1alpha1.MCPGroup{}
402+
groupKey := types.NamespacedName{Namespace: obj.GetNamespace(), Name: mcpRemoteProxy.Spec.GroupRef}
403+
if err := r.Get(ctx, groupKey, group); err != nil {
404+
ctxLogger.Error(err, "Failed to get MCPGroup for MCPRemoteProxy",
405+
"namespace", obj.GetNamespace(), "name", mcpRemoteProxy.Spec.GroupRef)
406+
return []ctrl.Request{}
407+
}
408+
return []ctrl.Request{
409+
{
410+
NamespacedName: types.NamespacedName{
411+
Namespace: obj.GetNamespace(),
412+
Name: group.Name,
413+
},
414+
},
415+
}
416+
}
417+
251418
// SetupWithManager sets up the controller with the Manager.
252419
func (r *MCPGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
253420
return ctrl.NewControllerManagedBy(mgr).
254421
For(&mcpv1alpha1.MCPGroup{}).
255422
Watches(
256423
&mcpv1alpha1.MCPServer{}, handler.EnqueueRequestsFromMapFunc(r.findMCPGroupForMCPServer),
257424
).
425+
Watches(
426+
&mcpv1alpha1.MCPRemoteProxy{}, handler.EnqueueRequestsFromMapFunc(r.findMCPGroupForMCPRemoteProxy),
427+
).
258428
Complete(r)
259429
}

0 commit comments

Comments
 (0)