5555import org .elasticsearch .logging .LogManager ;
5656import org .elasticsearch .logging .Logger ;
5757import org .elasticsearch .search .SearchService ;
58+ import org .elasticsearch .search .crossproject .CrossProjectIndexResolutionValidator ;
59+ import org .elasticsearch .search .crossproject .CrossProjectModeDecider ;
5860import org .elasticsearch .tasks .CancellableTask ;
5961import org .elasticsearch .tasks .Task ;
6062import org .elasticsearch .threadpool .ThreadPool ;
8890
8991import static org .elasticsearch .action .fieldcaps .FieldCapabilitiesRequest .RESOLVED_FIELDS_CAPS ;
9092import static org .elasticsearch .action .search .TransportSearchHelper .checkCCSVersionCompatibility ;
93+ import static org .elasticsearch .search .crossproject .CrossProjectIndexResolutionValidator .indicesOptionsForCrossProjectFanout ;
9194
9295public class TransportFieldCapabilitiesAction extends HandledTransportAction <FieldCapabilitiesRequest , FieldCapabilitiesResponse > {
9396 public static final String EXCLUSION = "-" ;
@@ -110,6 +113,7 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
110113 private final boolean ccsCheckCompatibility ;
111114 private final ThreadPool threadPool ;
112115 private final TimeValue forceConnectTimeoutSecs ;
116+ private final CrossProjectModeDecider crossProjectModeDecider ;
113117
114118 @ Inject
115119 public TransportFieldCapabilitiesAction (
@@ -138,6 +142,7 @@ public TransportFieldCapabilitiesAction(
138142 this .ccsCheckCompatibility = SearchService .CCS_VERSION_CHECK_SETTING .get (clusterService .getSettings ());
139143 this .threadPool = threadPool ;
140144 this .forceConnectTimeoutSecs = clusterService .getSettings ().getAsTime ("search.ccs.force_connect_timeout" , null );
145+ this .crossProjectModeDecider = new CrossProjectModeDecider (clusterService .getSettings ());
141146 }
142147
143148 @ Override
@@ -196,8 +201,14 @@ private <R extends ActionResponse> void doExecuteForked(
196201 long nowInMillis = request .nowInMillis () == null ? System .currentTimeMillis () : request .nowInMillis ();
197202 final ProjectState projectState = projectResolver .getProjectState (clusterService .state ());
198203 final var minTransportVersion = new AtomicReference <>(clusterService .state ().getMinTransportVersion ());
204+ final IndicesOptions originalIndicesOptions = request .indicesOptions ();
205+ final boolean resolveCrossProject = crossProjectModeDecider .resolvesCrossProject (request );
199206 final Map <String , OriginalIndices > remoteClusterIndices = transportService .getRemoteClusterService ()
200- .groupIndices (request .indicesOptions (), request .indices (), request .returnLocalAll ());
207+ .groupIndices (
208+ resolveCrossProject ? indicesOptionsForCrossProjectFanout (originalIndicesOptions ) : originalIndicesOptions ,
209+ request .indices (),
210+ request .returnLocalAll ()
211+ );
201212 final OriginalIndices localIndices = remoteClusterIndices .remove (RemoteClusterAware .LOCAL_CLUSTER_GROUP_KEY );
202213
203214 final String [] concreteLocalIndices ;
@@ -270,9 +281,9 @@ private <R extends ActionResponse> void doExecuteForked(
270281 indexResponses .clear ();
271282 indexMappingHashToResponses .clear ();
272283 };
273- Map <String , ResolvedIndexExpressions .Builder > resolvedRemotely = new ConcurrentHashMap <>();
284+ Map <String , ResolvedIndexExpressions .Builder > resolvedRemotelyBuilder = new ConcurrentHashMap <>();
274285 for (String clusterAlias : remoteClusterIndices .keySet ()) {
275- resolvedRemotely .put (clusterAlias , ResolvedIndexExpressions .builder ());
286+ resolvedRemotelyBuilder .put (clusterAlias , ResolvedIndexExpressions .builder ());
276287 }
277288 final Consumer <FieldCapabilitiesIndexResponse > handleIndexResponse = resp -> {
278289 if (fieldCapTask .isCancelled ()) {
@@ -335,12 +346,28 @@ private <R extends ActionResponse> void doExecuteForked(
335346 if (fieldCapTask .notifyIfCancelled (listener )) {
336347 releaseResourcesOnCancel .run ();
337348 } else {
349+ Map <String , ResolvedIndexExpressions > resolvedRemotely = resolvedRemotelyBuilder .entrySet ()
350+ .stream ()
351+ .collect (Collectors .toMap (Map .Entry ::getKey , e -> e .getValue ().build ()));
352+ ResolvedIndexExpressions resolvedLocally = new ResolvedIndexExpressions (resolvedLocallyList );
353+ if (resolveCrossProject ) {
354+ final Exception ex = CrossProjectIndexResolutionValidator .validate (
355+ request .indicesOptions (),
356+ request .getProjectRouting (),
357+ resolvedLocally ,
358+ resolvedRemotely
359+ );
360+ if (ex != null ) {
361+ listener .onFailure (ex );
362+ return ;
363+ }
364+ }
338365 mergeIndexResponses (
339366 request ,
340367 fieldCapTask ,
341368 indexResponses ,
342369 indexFailures ,
343- resolvedLocallyList ,
370+ resolvedLocally ,
344371 resolvedRemotely ,
345372 minTransportVersion ,
346373 listener .map (linkedRequestExecutor ::wrapPrimary )
@@ -370,15 +397,21 @@ private <R extends ActionResponse> void doExecuteForked(
370397 for (Map .Entry <String , OriginalIndices > remoteIndices : remoteClusterIndices .entrySet ()) {
371398 String clusterAlias = remoteIndices .getKey ();
372399 OriginalIndices originalIndices = remoteIndices .getValue ();
373- FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest (clusterAlias , request , originalIndices , nowInMillis );
400+ FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest (
401+ clusterAlias ,
402+ request ,
403+ originalIndices ,
404+ nowInMillis ,
405+ resolveCrossProject
406+ );
374407 ActionListener <FieldCapabilitiesResponse > remoteListener = ActionListener .wrap (response -> {
375408
376409 if (request .includeResolvedTo () && response .getResolvedLocally () != null ) {
377410 ResolvedIndexExpressions resolvedOnRemoteProject = response .getResolvedLocally ();
378411 // for bwc we need to check that resolvedOnRemoteProject Exists in the response
379412 if (resolvedOnRemoteProject != null ) {
380413 for (ResolvedIndexExpression remoteResolvedExpression : resolvedOnRemoteProject .expressions ()) {
381- resolvedRemotely .computeIfPresent (clusterAlias , (k , v ) -> {
414+ resolvedRemotelyBuilder .computeIfPresent (clusterAlias , (k , v ) -> {
382415 v .addExpression (remoteResolvedExpression );
383416 return v ;
384417 });
@@ -413,7 +446,7 @@ private <R extends ActionResponse> void doExecuteForked(
413446 ),
414447 Set .of ()
415448 );
416- resolvedRemotely .computeIfPresent (clusterAlias , (k , v ) -> {
449+ resolvedRemotelyBuilder .computeIfPresent (clusterAlias , (k , v ) -> {
417450 v .addExpression (err );
418451 return v ;
419452 });
@@ -439,7 +472,7 @@ private <R extends ActionResponse> void doExecuteForked(
439472 ),
440473 Set .of ()
441474 );
442- resolvedRemotely .computeIfPresent (clusterAlias , (k , v ) -> {
475+ resolvedRemotelyBuilder .computeIfPresent (clusterAlias , (k , v ) -> {
443476 v .addExpression (err );
444477 return v ;
445478 });
@@ -552,12 +585,11 @@ private static void mergeIndexResponses(
552585 CancellableTask task ,
553586 Map <String , FieldCapabilitiesIndexResponse > indexResponses ,
554587 FailureCollector indexFailures ,
555- List < ResolvedIndexExpression > resolvedLocallyList ,
556- Map <String , ResolvedIndexExpressions . Builder > resolvedRemotely ,
588+ ResolvedIndexExpressions resolvedLocally ,
589+ Map <String , ResolvedIndexExpressions > resolvedRemotely ,
557590 AtomicReference <TransportVersion > minTransportVersion ,
558591 ActionListener <FieldCapabilitiesResponse > listener
559592 ) {
560- ResolvedIndexExpressions resolvedLocally = new ResolvedIndexExpressions (resolvedLocallyList );
561593 List <FieldCapabilitiesFailure > failures = indexFailures .build (indexResponses .keySet ());
562594 if (indexResponses .isEmpty () == false ) {
563595 if (request .isMergeResults ()) {
@@ -570,7 +602,7 @@ private static void mergeIndexResponses(
570602 FieldCapabilitiesResponse .builder ()
571603 .withIndexResponses (new ArrayList <>(indexResponses .values ()))
572604 .withResolvedLocally (resolvedLocally )
573- .withResolvedRemotelyBuilder (resolvedRemotely )
605+ .withResolvedRemotely (resolvedRemotely )
574606 .withMinTransportVersion (minTransportVersion .get ())
575607 .withFailures (failures )
576608 .build ()
@@ -590,7 +622,7 @@ private static void mergeIndexResponses(
590622 FieldCapabilitiesResponse .builder ()
591623 .withFailures (failures )
592624 .withResolvedLocally (resolvedLocally )
593- .withResolvedRemotelyBuilder (resolvedRemotely )
625+ .withResolvedRemotely (resolvedRemotely )
594626 .withMinTransportVersion (minTransportVersion .get ())
595627 .build ()
596628 );
@@ -607,12 +639,18 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(
607639 String clusterAlias ,
608640 FieldCapabilitiesRequest request ,
609641 OriginalIndices originalIndices ,
610- long nowInMillis
642+ long nowInMillis ,
643+ boolean resolveCrossProject
611644 ) {
645+ IndicesOptions indicesOptions = originalIndices .indicesOptions ();
646+ if (indicesOptions .resolveCrossProjectIndexExpression ()) {
647+ // if is a CPS request reset CrossProjectModeOptions to Default and use lenient IndicesOptions.
648+ indicesOptions = indicesOptionsForCrossProjectFanout (indicesOptions );
649+ }
612650 FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest ();
613651 remoteRequest .clusterAlias (clusterAlias );
614652 remoteRequest .setMergeResults (false ); // we need to merge on this node
615- remoteRequest .indicesOptions (originalIndices . indicesOptions () );
653+ remoteRequest .indicesOptions (indicesOptions );
616654 remoteRequest .indices (originalIndices .indices ());
617655 remoteRequest .fields (request .fields ());
618656 remoteRequest .filters (request .filters ());
@@ -621,7 +659,7 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(
621659 remoteRequest .indexFilter (request .indexFilter ());
622660 remoteRequest .nowInMillis (nowInMillis );
623661 remoteRequest .includeEmptyFields (request .includeEmptyFields ());
624- remoteRequest .includeResolvedTo (request .includeResolvedTo ());
662+ remoteRequest .includeResolvedTo (request .includeResolvedTo () || resolveCrossProject );
625663 return remoteRequest ;
626664 }
627665
@@ -634,7 +672,7 @@ private static boolean hasSameMappingHash(FieldCapabilitiesIndexResponse r1, Fie
634672 private static FieldCapabilitiesResponse merge (
635673 Map <String , FieldCapabilitiesIndexResponse > indexResponsesMap ,
636674 ResolvedIndexExpressions resolvedLocally ,
637- Map <String , ResolvedIndexExpressions . Builder > resolvedRemotely ,
675+ Map <String , ResolvedIndexExpressions > resolvedRemotely ,
638676 CancellableTask task ,
639677 FieldCapabilitiesRequest request ,
640678 List <FieldCapabilitiesFailure > failures ,
@@ -703,7 +741,7 @@ private static FieldCapabilitiesResponse merge(
703741 .withMinTransportVersion (minTransportVersion .get ());
704742 if (request .includeResolvedTo () && minTransportVersion .get ().supports (RESOLVED_FIELDS_CAPS )) {
705743 // add resolution to response iff includeResolvedTo and all the nodes in the cluster supports it
706- responseBuilder .withResolvedLocally (new ResolvedIndexExpressions (collect )).withResolvedRemotelyBuilder (resolvedRemotely );
744+ responseBuilder .withResolvedLocally (new ResolvedIndexExpressions (collect )).withResolvedRemotely (resolvedRemotely );
707745 }
708746 return responseBuilder .build ();
709747 }
0 commit comments