Skip to content

Commit 74cc252

Browse files
committed
Pass the host name on as server_name if proxy mode is on (elastic#34559)
In remote cluster setup if we see a configured proxy we should set the seed nodes host name as the `server_name` to trigger SNI based routing even for seed nodes. Since remote cluster connections are plain TCP connections we have to set the host manually since the other side can't take it from the request URL like in the HTTP case. This also adds some more informative logging to remote cluster connection.
1 parent 65eeced commit 74cc252

File tree

3 files changed

+11
-6
lines changed

3 files changed

+11
-6
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,7 @@ static DiscoveryNode buildSeedNode(String clusterName, String address, boolean p
223223
TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0);
224224
String hostName = address.substring(0, indexOfPortSeparator(address));
225225
return new DiscoveryNode("", clusterName + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
226-
transportAddress, Collections
227-
.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class),
226+
transportAddress, Collections.singletonMap("server_name", hostName), EnumSet.allOf(DiscoveryNode.Role.class),
228227
Version.CURRENT.minimumCompatibilityVersion());
229228
} else {
230229
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,8 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
490490
if (seedNodes.hasNext()) {
491491
cancellableThreads.executeIO(() -> {
492492
final DiscoveryNode seedNode = maybeAddProxyAddress(proxyAddress, seedNodes.next().get());
493+
logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode,
494+
proxyAddress);
493495
final TransportService.HandshakeResponse handshakeResponse;
494496
Transport.Connection connection = manager.openConnection(seedNode,
495497
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
@@ -523,7 +525,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
523525
ThreadContext threadContext = threadPool.getThreadContext();
524526
TransportService.ContextRestoreResponseHandler<ClusterStateResponse> responseHandler = new TransportService
525527
.ContextRestoreResponseHandler<>(threadContext.newRestorableContext(false),
526-
new SniffClusterStateResponseHandler(transportService, connection, listener, seedNodes,
528+
new SniffClusterStateResponseHandler(connection, listener, seedNodes,
527529
cancellableThreads));
528530
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
529531
// we stash any context here since this is an internal execution and should not leak any
@@ -543,13 +545,16 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
543545
listener.onFailure(new IllegalStateException("no seed node left"));
544546
}
545547
} catch (CancellableThreads.ExecutionCancelledException ex) {
548+
logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), ex);
546549
listener.onFailure(ex); // we got canceled - fail the listener and step out
547550
} catch (ConnectTransportException | IOException | IllegalStateException ex) {
548551
// ISE if we fail the handshake with an version incompatible node
549552
if (seedNodes.hasNext()) {
550-
logger.debug(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex);
553+
logger.debug(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed moving to next node",
554+
clusterAlias), ex);
551555
collectRemoteNodes(seedNodes, transportService, manager, listener);
552556
} else {
557+
logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), ex);
553558
listener.onFailure(ex);
554559
}
555560
}
@@ -581,8 +586,8 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl
581586
private final Iterator<Supplier<DiscoveryNode>> seedNodes;
582587
private final CancellableThreads cancellableThreads;
583588

584-
SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection,
585-
ActionListener<Void> listener, Iterator<Supplier<DiscoveryNode>> seedNodes,
589+
SniffClusterStateResponseHandler(Transport.Connection connection, ActionListener<Void> listener,
590+
Iterator<Supplier<DiscoveryNode>> seedNodes,
586591
CancellableThreads cancellableThreads) {
587592
this.connection = connection;
588593
this.listener = listener;

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1483,6 +1483,7 @@ public void testProxyMode() throws Exception {
14831483
service.acceptIncomingRequests();
14841484
Supplier<DiscoveryNode> seedSupplier = () ->
14851485
RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true);
1486+
assertEquals("node_0", seedSupplier.get().getAttributes().get("server_name"));
14861487
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
14871488
Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, proxyAddress)) {
14881489
updateSeedNodes(connection, Arrays.asList(seedSupplier), proxyAddress);

0 commit comments

Comments
 (0)