|
76 | 76 | import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
|
77 | 77 | import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.fromXContentBytesConnectorFiltering;
|
78 | 78 | import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.sortFilteringRulesByOrder;
|
| 79 | +import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.MANAGED_CONNECTOR_INDEX_PREFIX; |
79 | 80 |
|
80 | 81 | /**
|
81 | 82 | * A service that manages persistent {@link Connector} configurations.
|
@@ -807,38 +808,71 @@ public void updateConnectorLastSyncStats(UpdateConnectorLastSyncStatsAction.Requ
|
807 | 808 | }
|
808 | 809 |
|
809 | 810 | /**
|
810 |
| - * Updates the is_native property of a {@link Connector}. It always sets the {@link ConnectorStatus} to |
811 |
| - * CONFIGURED. |
| 811 | + * Updates the is_native property of a {@link Connector}. It sets the {@link ConnectorStatus} to |
| 812 | + * CONFIGURED when connector is in CONNECTED state to indicate that connector needs to reconnect. |
812 | 813 | *
|
813 | 814 | * @param request The request for updating the connector's is_native property.
|
814 | 815 | * @param listener The listener for handling responses, including successful updates or errors.
|
815 | 816 | */
|
816 | 817 | public void updateConnectorNative(UpdateConnectorNativeAction.Request request, ActionListener<UpdateResponse> listener) {
|
817 | 818 | try {
|
818 | 819 | String connectorId = request.getConnectorId();
|
| 820 | + boolean isNative = request.isNative(); |
819 | 821 |
|
820 |
| - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( |
821 |
| - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) |
822 |
| - .id(connectorId) |
823 |
| - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) |
824 |
| - .source( |
825 |
| - Map.of( |
826 |
| - Connector.IS_NATIVE_FIELD.getPreferredName(), |
827 |
| - request.isNative(), |
828 |
| - Connector.STATUS_FIELD.getPreferredName(), |
829 |
| - ConnectorStatus.CONFIGURED.toString() |
830 |
| - ) |
831 |
| - ) |
| 822 | + getConnector(connectorId, listener.delegateFailure((l, connector) -> { |
832 | 823 |
|
833 |
| - ); |
834 |
| - client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { |
835 |
| - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { |
836 |
| - l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); |
| 824 | + String indexName = getConnectorIndexNameFromSearchResult(connector); |
| 825 | + |
| 826 | + boolean doesNotHaveContentPrefix = indexName != null && isValidManagedConnectorIndexName(indexName) == false; |
| 827 | + // Ensure attached content index is prefixed correctly |
| 828 | + if (isNative && doesNotHaveContentPrefix) { |
| 829 | + l.onFailure( |
| 830 | + new ElasticsearchStatusException( |
| 831 | + "The index name [" |
| 832 | + + indexName |
| 833 | + + "] attached to the connector [" |
| 834 | + + connectorId |
| 835 | + + "] must start with the required prefix: [" |
| 836 | + + MANAGED_CONNECTOR_INDEX_PREFIX |
| 837 | + + "] to be Elastic-managed. Please update the attached index first to comply with this requirement.", |
| 838 | + RestStatus.BAD_REQUEST |
| 839 | + ) |
| 840 | + ); |
837 | 841 | return;
|
838 | 842 | }
|
839 |
| - l.onResponse(updateResponse); |
840 |
| - })); |
841 | 843 |
|
| 844 | + ConnectorStatus status = getConnectorStatusFromSearchResult(connector); |
| 845 | + |
| 846 | + // If connector was connected already, change its status to CONFIGURED as we need to re-connect |
| 847 | + boolean isConnected = status == ConnectorStatus.CONNECTED; |
| 848 | + boolean isValidTransitionToConfigured = ConnectorStateMachine.isValidTransition(status, ConnectorStatus.CONFIGURED); |
| 849 | + if (isConnected && isValidTransitionToConfigured) { |
| 850 | + status = ConnectorStatus.CONFIGURED; |
| 851 | + } |
| 852 | + |
| 853 | + final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).setRefreshPolicy( |
| 854 | + WriteRequest.RefreshPolicy.IMMEDIATE |
| 855 | + ) |
| 856 | + .doc( |
| 857 | + new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) |
| 858 | + .id(connectorId) |
| 859 | + .source( |
| 860 | + Map.of( |
| 861 | + Connector.IS_NATIVE_FIELD.getPreferredName(), |
| 862 | + isNative, |
| 863 | + Connector.STATUS_FIELD.getPreferredName(), |
| 864 | + status.toString() |
| 865 | + ) |
| 866 | + ) |
| 867 | + ); |
| 868 | + client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> { |
| 869 | + if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { |
| 870 | + ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); |
| 871 | + return; |
| 872 | + } |
| 873 | + ll.onResponse(updateResponse); |
| 874 | + })); |
| 875 | + })); |
842 | 876 | } catch (Exception e) {
|
843 | 877 | listener.onFailure(e);
|
844 | 878 | }
|
@@ -896,22 +930,45 @@ public void updateConnectorIndexName(UpdateConnectorIndexNameAction.Request requ
|
896 | 930 | return;
|
897 | 931 | }
|
898 | 932 |
|
899 |
| - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( |
900 |
| - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) |
901 |
| - .id(connectorId) |
902 |
| - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) |
903 |
| - .source(new HashMap<>() { |
904 |
| - { |
905 |
| - put(Connector.INDEX_NAME_FIELD.getPreferredName(), request.getIndexName()); |
906 |
| - } |
907 |
| - }) |
908 |
| - ); |
909 |
| - client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> { |
910 |
| - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { |
911 |
| - ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); |
| 933 | + getConnector(connectorId, l.delegateFailure((ll, connector) -> { |
| 934 | + |
| 935 | + Boolean isNativeConnector = getConnectorIsNativeFlagFromSearchResult(connector); |
| 936 | + Boolean doesNotHaveContentPrefix = indexName != null && isValidManagedConnectorIndexName(indexName) == false; |
| 937 | + |
| 938 | + if (isNativeConnector && doesNotHaveContentPrefix) { |
| 939 | + ll.onFailure( |
| 940 | + new ElasticsearchStatusException( |
| 941 | + "Index attached to an Elastic-managed connector must start with the prefix: [" |
| 942 | + + MANAGED_CONNECTOR_INDEX_PREFIX |
| 943 | + + "]. The index name in the payload [" |
| 944 | + + indexName |
| 945 | + + "] doesn't comply with this requirement.", |
| 946 | + RestStatus.BAD_REQUEST |
| 947 | + ) |
| 948 | + ); |
912 | 949 | return;
|
913 | 950 | }
|
914 |
| - ll.onResponse(updateResponse); |
| 951 | + |
| 952 | + final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( |
| 953 | + new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) |
| 954 | + .id(connectorId) |
| 955 | + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) |
| 956 | + .source(new HashMap<>() { |
| 957 | + { |
| 958 | + put(Connector.INDEX_NAME_FIELD.getPreferredName(), request.getIndexName()); |
| 959 | + } |
| 960 | + }) |
| 961 | + ); |
| 962 | + client.update( |
| 963 | + updateRequest, |
| 964 | + new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (lll, updateResponse) -> { |
| 965 | + if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { |
| 966 | + lll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); |
| 967 | + return; |
| 968 | + } |
| 969 | + lll.onResponse(updateResponse); |
| 970 | + }) |
| 971 | + ); |
915 | 972 | }));
|
916 | 973 | }));
|
917 | 974 |
|
@@ -1064,6 +1121,18 @@ private ConnectorStatus getConnectorStatusFromSearchResult(ConnectorSearchResult
|
1064 | 1121 | return ConnectorStatus.connectorStatus((String) searchResult.getResultMap().get(Connector.STATUS_FIELD.getPreferredName()));
|
1065 | 1122 | }
|
1066 | 1123 |
|
| 1124 | + private Boolean getConnectorIsNativeFlagFromSearchResult(ConnectorSearchResult searchResult) { |
| 1125 | + return (Boolean) searchResult.getResultMap().get(Connector.IS_NATIVE_FIELD.getPreferredName()); |
| 1126 | + } |
| 1127 | + |
| 1128 | + private String getConnectorIndexNameFromSearchResult(ConnectorSearchResult searchResult) { |
| 1129 | + return (String) searchResult.getResultMap().get(Connector.INDEX_NAME_FIELD.getPreferredName()); |
| 1130 | + } |
| 1131 | + |
| 1132 | + private boolean isValidManagedConnectorIndexName(String indexName) { |
| 1133 | + return indexName.startsWith(MANAGED_CONNECTOR_INDEX_PREFIX); |
| 1134 | + } |
| 1135 | + |
1067 | 1136 | @SuppressWarnings("unchecked")
|
1068 | 1137 | private Map<String, Object> getConnectorConfigurationFromSearchResult(ConnectorSearchResult searchResult) {
|
1069 | 1138 | return (Map<String, Object>) searchResult.getResultMap().get(Connector.CONFIGURATION_FIELD.getPreferredName());
|
|
0 commit comments