Skip to content

Commit ee716f1

Browse files
authored
transport: edit TransportConnectionListener for close exceptions (elastic#129015)
The TransportConnectionListener interface has previously included the Transport.Connection being closed and unregistered in its onNodeDisconnected callback. This is not in use, and can be removed as it is also available in the onConnectionClosed callback. It is being replaced with a Nullable exception that caused the close. This is being used in pending work (ES-11448) to differentiate network issues from node restarts. Closes ES-12007
1 parent aceaf23 commit ee716f1

File tree

11 files changed

+42
-23
lines changed

11 files changed

+42
-23
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@
2626
import org.elasticsearch.common.util.concurrent.EsExecutors;
2727
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2828
import org.elasticsearch.core.CheckedRunnable;
29+
import org.elasticsearch.core.Nullable;
2930
import org.elasticsearch.core.TimeValue;
3031
import org.elasticsearch.monitor.NodeHealthService;
3132
import org.elasticsearch.monitor.StatusInfo;
3233
import org.elasticsearch.threadpool.ThreadPool.Names;
3334
import org.elasticsearch.transport.AbstractTransportRequest;
3435
import org.elasticsearch.transport.ConnectTransportException;
3536
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
36-
import org.elasticsearch.transport.Transport;
3737
import org.elasticsearch.transport.TransportConnectionListener;
3838
import org.elasticsearch.transport.TransportException;
3939
import org.elasticsearch.transport.TransportRequestOptions;
@@ -137,7 +137,7 @@ public FollowersChecker(
137137
);
138138
transportService.addConnectionListener(new TransportConnectionListener() {
139139
@Override
140-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
140+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
141141
handleDisconnectedNode(node);
142142
}
143143
});

server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.transport.ConnectTransportException;
3333
import org.elasticsearch.transport.NodeDisconnectedException;
3434
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
35-
import org.elasticsearch.transport.Transport;
3635
import org.elasticsearch.transport.TransportConnectionListener;
3736
import org.elasticsearch.transport.TransportException;
3837
import org.elasticsearch.transport.TransportRequestOptions;
@@ -124,7 +123,7 @@ public class LeaderChecker {
124123

125124
transportService.addConnectionListener(new TransportConnectionListener() {
126125
@Override
127-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
126+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
128127
handleDisconnectedNode(node);
129128
}
130129
});

server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,11 +229,26 @@ private void connectToNodeOrRetry(
229229
try {
230230
connectionListener.onNodeConnected(node, conn);
231231
} finally {
232-
conn.addCloseListener(ActionListener.running(() -> {
233-
connectedNodes.remove(node, conn);
234-
connectionListener.onNodeDisconnected(node, conn);
235-
managerRefs.decRef();
236-
}));
232+
conn.addCloseListener(new ActionListener<Void>() {
233+
@Override
234+
public void onResponse(Void ignored) {
235+
handleClose(null);
236+
}
237+
238+
@Override
239+
public void onFailure(Exception e) {
240+
handleClose(e);
241+
}
242+
243+
void handleClose(@Nullable Exception e) {
244+
connectedNodes.remove(node, conn);
245+
try {
246+
connectionListener.onNodeDisconnected(node, e);
247+
} finally {
248+
managerRefs.decRef();
249+
}
250+
}
251+
});
237252

238253
conn.addCloseListener(ActionListener.running(() -> {
239254
if (connectingRefCounter.hasReferences() == false) {

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ final class DelegatingNodeConnectionListener implements TransportConnectionListe
6060
private final CopyOnWriteArrayList<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
6161

6262
@Override
63-
public void onNodeDisconnected(DiscoveryNode key, Transport.Connection connection) {
63+
public void onNodeDisconnected(DiscoveryNode key, @Nullable Exception closeException) {
6464
for (TransportConnectionListener listener : listeners) {
65-
listener.onNodeDisconnected(key, connection);
65+
listener.onNodeDisconnected(key, closeException);
6666
}
6767
}
6868

server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
6161
}
6262

6363
@Override
64-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
64+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
6565
removeConnectedNode(node);
6666
}
6767
});

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.settings.Settings;
2222
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2323
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
24+
import org.elasticsearch.core.Nullable;
2425
import org.elasticsearch.core.TimeValue;
2526
import org.elasticsearch.threadpool.ThreadPool;
2627

@@ -339,7 +340,7 @@ boolean shouldRebuildConnection(Settings newSettings) {
339340
protected abstract ConnectionStrategy strategyType();
340341

341342
@Override
342-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
343+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
343344
if (shouldOpenMoreConnections()) {
344345
// try to reconnect and fill up the slot of the disconnected node
345346
connect(

server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.transport;
1111

1212
import org.elasticsearch.cluster.node.DiscoveryNode;
13+
import org.elasticsearch.core.Nullable;
1314

1415
/**
1516
* A listener interface that allows to react on transport events. All methods may be
@@ -38,5 +39,5 @@ default void onNodeConnected(DiscoveryNode node, Transport.Connection connection
3839
/**
3940
* Called once a node connection is closed and unregistered.
4041
*/
41-
default void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {}
42+
default void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {}
4243
}

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.common.settings.Settings;
5353
import org.elasticsearch.common.transport.TransportAddress;
5454
import org.elasticsearch.common.util.set.Sets;
55+
import org.elasticsearch.core.Nullable;
5556
import org.elasticsearch.core.TimeValue;
5657
import org.elasticsearch.core.Tuple;
5758
import org.elasticsearch.index.Index;
@@ -831,7 +832,7 @@ public void testCCSRemoteReduceWithDisconnectedRemoteClusters() throws Exception
831832
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
832833
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
833834
@Override
834-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
835+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
835836
if (disconnectedNodes.remove(node)) {
836837
disconnectedLatch.countDown();
837838
}
@@ -1134,7 +1135,7 @@ public void testCollectSearchShards() throws Exception {
11341135
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
11351136
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
11361137
@Override
1137-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
1138+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
11381139
if (disconnectedNodes.remove(node)) {
11391140
disconnectedLatch.countDown();
11401141
}

server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
3131
import org.elasticsearch.core.AbstractRefCounted;
3232
import org.elasticsearch.core.CheckedRunnable;
33+
import org.elasticsearch.core.Nullable;
3334
import org.elasticsearch.core.RefCounted;
3435
import org.elasticsearch.core.TimeValue;
3536
import org.elasticsearch.test.ESTestCase;
@@ -251,7 +252,7 @@ public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
251252
final AtomicReference<ActionListener<DiscoveryNode>> disconnectListenerRef = new AtomicReference<>();
252253
transportService.addConnectionListener(new TransportConnectionListener() {
253254
@Override
254-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
255+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
255256
final ActionListener<DiscoveryNode> disconnectListener = disconnectListenerRef.getAndSet(null);
256257
if (disconnectListener != null) {
257258
disconnectListener.onResponse(node);

server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.util.concurrent.RunOnce;
2424
import org.elasticsearch.common.util.concurrent.ThreadContext;
2525
import org.elasticsearch.core.AbstractRefCounted;
26+
import org.elasticsearch.core.Nullable;
2627
import org.elasticsearch.core.Releasable;
2728
import org.elasticsearch.core.Releasables;
2829
import org.elasticsearch.core.TimeValue;
@@ -101,7 +102,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
101102
}
102103

103104
@Override
104-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
105+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
105106
nodeDisconnectedCount.incrementAndGet();
106107
}
107108
});
@@ -658,7 +659,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
658659
}
659660

660661
@Override
661-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
662+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
662663
nodeDisconnectedCount.incrementAndGet();
663664
}
664665
});
@@ -698,7 +699,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
698699
}
699700

700701
@Override
701-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
702+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
702703
nodeDisconnectedCount.incrementAndGet();
703704
}
704705
});

0 commit comments

Comments
 (0)