9
9
10
10
package org .elasticsearch .transport ;
11
11
12
+ import org .elasticsearch .Build ;
12
13
import org .elasticsearch .TransportVersion ;
13
14
import org .elasticsearch .TransportVersions ;
14
15
import org .elasticsearch .action .ActionListener ;
19
20
import org .elasticsearch .common .io .stream .StreamOutput ;
20
21
import org .elasticsearch .common .metrics .CounterMetric ;
21
22
import org .elasticsearch .core .TimeValue ;
23
+ import org .elasticsearch .core .UpdateForV9 ;
22
24
import org .elasticsearch .threadpool .ThreadPool ;
23
25
24
26
import java .io .EOFException ;
25
27
import java .io .IOException ;
28
+ import java .util .Objects ;
26
29
import java .util .Set ;
27
30
import java .util .concurrent .ConcurrentHashMap ;
28
31
import java .util .concurrent .ConcurrentMap ;
@@ -206,7 +209,7 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
206
209
assert ignoreDeserializationErrors : exception ;
207
210
throw exception ;
208
211
}
209
- channel .sendResponse (new HandshakeResponse (this .version ));
212
+ channel .sendResponse (new HandshakeResponse (this .version , Build . current (). version () ));
210
213
}
211
214
212
215
TransportResponseHandler <HandshakeResponse > removeHandlerForHandshake (long requestId ) {
@@ -245,7 +248,7 @@ public Executor executor() {
245
248
@ Override
246
249
public void handleResponse (HandshakeResponse response ) {
247
250
if (isDone .compareAndSet (false , true )) {
248
- TransportVersion responseVersion = response .responseVersion ;
251
+ TransportVersion responseVersion = response .transportVersion ;
249
252
if (TransportVersion .isCompatible (responseVersion ) == false ) {
250
253
listener .onFailure (
251
254
new IllegalStateException (
@@ -257,7 +260,7 @@ public void handleResponse(HandshakeResponse response) {
257
260
)
258
261
);
259
262
} else {
260
- listener .onResponse (TransportVersion .min (TransportHandshaker .this .version , response .getResponseVersion ()));
263
+ listener .onResponse (TransportVersion .min (TransportHandshaker .this .version , response .getTransportVersion ()));
261
264
}
262
265
}
263
266
}
@@ -278,12 +281,23 @@ void handleLocalException(TransportException e) {
278
281
279
282
static final class HandshakeRequest extends TransportRequest {
280
283
281
- private final TransportVersion version ;
284
+ /**
285
+ * The {@link TransportVersion#current()} of the requesting node.
286
+ */
287
+ final TransportVersion transportVersion ;
282
288
283
- HandshakeRequest (TransportVersion version ) {
284
- this .version = version ;
289
+ /**
290
+ * The {@link Build#version()} of the requesting node, as a {@link String}, for better reporting of handshake failures due to
291
+ * an incompatible version.
292
+ */
293
+ final String releaseVersion ;
294
+
295
+ HandshakeRequest (TransportVersion transportVersion , String releaseVersion ) {
296
+ this .transportVersion = Objects .requireNonNull (transportVersion );
297
+ this .releaseVersion = Objects .requireNonNull (releaseVersion );
285
298
}
286
299
300
+ @ UpdateForV9 (owner = UpdateForV9 .Owner .CORE_INFRA ) // remainingMessage == null is invalid in v9
287
301
HandshakeRequest (StreamInput streamInput ) throws IOException {
288
302
super (streamInput );
289
303
BytesReference remainingMessage ;
@@ -293,53 +307,101 @@ static final class HandshakeRequest extends TransportRequest {
293
307
remainingMessage = null ;
294
308
}
295
309
if (remainingMessage == null ) {
296
- version = null ;
310
+ transportVersion = null ;
311
+ releaseVersion = null ;
297
312
} else {
298
313
try (StreamInput messageStreamInput = remainingMessage .streamInput ()) {
299
- this .version = TransportVersion .readVersion (messageStreamInput );
314
+ this .transportVersion = TransportVersion .readVersion (messageStreamInput );
315
+ if (streamInput .getTransportVersion ().onOrAfter (V9_HANDSHAKE_VERSION )) {
316
+ this .releaseVersion = messageStreamInput .readString ();
317
+ } else {
318
+ this .releaseVersion = this .transportVersion .toReleaseVersion ();
319
+ }
300
320
}
301
321
}
302
322
}
303
323
304
324
@ Override
305
325
public void writeTo (StreamOutput streamOutput ) throws IOException {
306
326
super .writeTo (streamOutput );
307
- assert version != null ;
308
- try (BytesStreamOutput messageStreamOutput = new BytesStreamOutput (4 )) {
309
- TransportVersion .writeVersion (version , messageStreamOutput );
327
+ assert transportVersion != null ;
328
+ try (BytesStreamOutput messageStreamOutput = new BytesStreamOutput (1024 )) {
329
+ TransportVersion .writeVersion (transportVersion , messageStreamOutput );
330
+ if (streamOutput .getTransportVersion ().onOrAfter (V9_HANDSHAKE_VERSION )) {
331
+ messageStreamOutput .writeString (releaseVersion );
332
+ } // else we just send the transport version and rely on a best-effort mapping to release versions
310
333
BytesReference reference = messageStreamOutput .bytes ();
311
334
streamOutput .writeBytesReference (reference );
312
335
}
313
336
}
314
337
}
315
338
339
+ /**
340
+ * A response to a low-level transport handshake, carrying information about the version of the responding node.
341
+ */
316
342
static final class HandshakeResponse extends TransportResponse {
317
343
318
- private final TransportVersion responseVersion ;
344
+ /**
345
+ * The {@link TransportVersion#current()} of the responding node.
346
+ */
347
+ private final TransportVersion transportVersion ;
319
348
320
- HandshakeResponse (TransportVersion responseVersion ) {
321
- this .responseVersion = responseVersion ;
349
+ /**
350
+ * The {@link Build#version()} of the responding node, as a {@link String}, for better reporting of handshake failures due to
351
+ * an incompatible version.
352
+ */
353
+ private final String releaseVersion ;
354
+
355
+ HandshakeResponse (TransportVersion transportVersion , String releaseVersion ) {
356
+ this .transportVersion = Objects .requireNonNull (transportVersion );
357
+ this .releaseVersion = Objects .requireNonNull (releaseVersion );
322
358
}
323
359
324
- private HandshakeResponse (StreamInput in ) throws IOException {
360
+ HandshakeResponse (StreamInput in ) throws IOException {
325
361
super (in );
326
- responseVersion = TransportVersion .readVersion (in );
362
+ transportVersion = TransportVersion .readVersion (in );
363
+ if (in .getTransportVersion ().onOrAfter (V9_HANDSHAKE_VERSION )) {
364
+ releaseVersion = in .readString ();
365
+ } else {
366
+ releaseVersion = transportVersion .toReleaseVersion ();
367
+ }
327
368
}
328
369
329
370
@ Override
330
371
public void writeTo (StreamOutput out ) throws IOException {
331
- assert responseVersion != null ;
332
- TransportVersion .writeVersion (responseVersion , out );
372
+ TransportVersion .writeVersion (transportVersion , out );
373
+ if (out .getTransportVersion ().onOrAfter (V9_HANDSHAKE_VERSION )) {
374
+ out .writeString (releaseVersion );
375
+ } // else we just send the transport version and rely on a best-effort mapping to release versions
376
+ }
377
+
378
+ /**
379
+ * @return the {@link TransportVersion#current()} of the responding node.
380
+ */
381
+ TransportVersion getTransportVersion () {
382
+ return transportVersion ;
333
383
}
334
384
335
- TransportVersion getResponseVersion () {
336
- return responseVersion ;
385
+ /**
386
+ * @return the {@link Build#version()} of the responding node, as a {@link String}, for better reporting of handshake failures due
387
+ * to an incompatible version.
388
+ */
389
+ String getReleaseVersion () {
390
+ return releaseVersion ;
337
391
}
338
392
}
339
393
340
394
@ FunctionalInterface
341
395
interface HandshakeRequestSender {
342
-
343
- void sendRequest (DiscoveryNode node , TcpChannel channel , long requestId , TransportVersion version ) throws IOException ;
396
+ /**
397
+ * @param node The (expected) remote node, for error reporting and passing to
398
+ * {@link TransportMessageListener#onRequestSent}.
399
+ * @param channel The TCP channel to use to send the handshake request.
400
+ * @param requestId The transport request ID, for matching up the response.
401
+ * @param handshakeTransportVersion The {@link TransportVersion} to use for the handshake request, which will be
402
+ * {@link TransportHandshaker#V8_HANDSHAKE_VERSION} in production.
403
+ */
404
+ void sendRequest (DiscoveryNode node , TcpChannel channel , long requestId , TransportVersion handshakeTransportVersion )
405
+ throws IOException ;
344
406
}
345
407
}
0 commit comments