Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 89 additions & 70 deletions server/src/main/java/org/elasticsearch/TransportVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,47 +79,121 @@ public TransportVersion(int id) {
this(null, id, null);
}

interface BufferedReaderParser<T> {
T parse(String component, String path, BufferedReader bufferedReader);
}

static <T> T parseFromBufferedReader(
String component,
String path,
Function<String, InputStream> nameToStream,
BufferedReaderParser<T> parser
) {
try (InputStream inputStream = nameToStream.apply(path)) {
if (inputStream == null) {
return null;
}
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
return parser.parse(component, path, bufferedReader);
}
} catch (IOException ioe) {
throw new UncheckedIOException("parsing error [" + component + ":" + path + "]", ioe);
}
}

/**
* Constructs a named transport version along with its set of compatible patch versions from x-content.
* This method takes in the parameter {@code latest} which is the highest valid transport version id
* supported by this node. Versions newer than the current transport version id for this node are discarded.
*/
public static TransportVersion fromInputStream(String path, boolean nameInFile, InputStream stream, Integer latest) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
String line = reader.readLine();
public static TransportVersion fromBufferedReader(
String component,
String path,
boolean nameInFile,
BufferedReader bufferedReader,
Integer latest
) {
try {
String line = bufferedReader.readLine();
String[] parts = line.replaceAll("\\s+", "").split(",");
String check;
while ((check = reader.readLine()) != null) {
while ((check = bufferedReader.readLine()) != null) {
if (check.replaceAll("\\s+", "").isEmpty() == false) {
throw new IllegalArgumentException("invalid transport version file format [" + path + "]");
throw new IllegalArgumentException("invalid transport version file format [" + toComponentPath(component, path) + "]");
}
}
if (parts.length < (nameInFile ? 2 : 1)) {
throw new IllegalStateException("invalid transport version file format [" + path + "]");
throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]");
}
String name = nameInFile ? parts[0] : path.substring(path.lastIndexOf('/') + 1, path.length() - 4);
List<Integer> ids = new ArrayList<>();
for (int i = nameInFile ? 1 : 0; i < parts.length; ++i) {
try {
ids.add(Integer.parseInt(parts[i]));
} catch (NumberFormatException nfe) {
throw new IllegalStateException("invalid transport version file format [" + path + "]", nfe);
throw new IllegalStateException(
"invalid transport version file format [" + toComponentPath(component, path) + "]",
nfe
);
}
}
ids.sort(Integer::compareTo);
TransportVersion transportVersion = null;
for (int idIndex = 0; idIndex < ids.size(); ++idIndex) {
for (int idIndex = ids.size() - 1; idIndex >= 0; --idIndex) {
if (idIndex > 0 && ids.get(idIndex - 1) <= ids.get(idIndex)) {
throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]");
}
if (ids.get(idIndex) > latest) {
break;
}
transportVersion = new TransportVersion(name, ids.get(idIndex), transportVersion);
}
return transportVersion;
} catch (IOException ioe) {
throw new UncheckedIOException("cannot parse transport version [" + path + "]", ioe);
throw new UncheckedIOException("invalid transport version file format [" + toComponentPath(component, path) + "]", ioe);
}
}

public static Map<String, TransportVersion> collectFromInputStreams(
String component,
Function<String, InputStream> nameToStream,
String latestFileName
) {
TransportVersion latest = parseFromBufferedReader(
component,
"/transport/latest/" + latestFileName,
nameToStream,
(c, p, br) -> fromBufferedReader(c, p, true, br, Integer.MAX_VALUE)
);
if (latest != null) {
List<String> versionFilesNames = parseFromBufferedReader(
component,
"/transport/defined/manifest.txt",
nameToStream,
(c, p, br) -> br.lines().filter(line -> line.isBlank() == false).toList()
);
if (versionFilesNames != null) {
Map<String, TransportVersion> transportVersions = new HashMap<>();
for (String versionFileName : versionFilesNames) {
TransportVersion transportVersion = parseFromBufferedReader(
component,
"/transport/defined/" + versionFileName,
nameToStream,
(c, p, br) -> fromBufferedReader(c, p, false, br, latest.id())
);
if (transportVersion != null) {
transportVersions.put(versionFileName.substring(0, versionFileName.length() - 4), transportVersion);
}
}
return transportVersions;
}
}
return Map.of();
}

private static String toComponentPath(String component, String path) {
return component + ":" + path;
}

public static TransportVersion readVersion(StreamInput in) throws IOException {
return fromId(in.readVInt());
}
Expand Down Expand Up @@ -337,7 +411,11 @@ private static class VersionsHolder {
static {
// collect all the transport versions from server and es modules/plugins (defined in server)
List<TransportVersion> allVersions = new ArrayList<>(TransportVersions.DEFINED_VERSIONS);
Map<String, TransportVersion> allVersionsByName = loadTransportVersionsByName();
Map<String, TransportVersion> allVersionsByName = collectFromInputStreams(
"<server>",
TransportVersion.class::getResourceAsStream,
Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv"
);
addTransportVersions(allVersionsByName.values(), allVersions).sort(TransportVersion::compareTo);

// set the transport version lookups
Expand All @@ -351,65 +429,6 @@ private static class VersionsHolder {
CURRENT = ALL_VERSIONS.get(ALL_VERSIONS.size() - 1);
}

private static Map<String, TransportVersion> loadTransportVersionsByName() {
Map<String, TransportVersion> transportVersions = new HashMap<>();

String latestLocation = "/transport/latest/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv";
int latestId = -1;
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(latestLocation)) {
// this check is required until bootstrapping for the new transport versions format is completed;
// when load is false, we will only use the transport versions in the legacy format;
// load becomes false if we don't find the latest or manifest files required for the new format
if (inputStream != null) {
TransportVersion latest = fromInputStream(latestLocation, true, inputStream, Integer.MAX_VALUE);
if (latest == null) {
throw new IllegalStateException(
"invalid latest transport version for minor version ["
+ Version.CURRENT.major
+ "."
+ Version.CURRENT.minor
+ "]"
);
}
latestId = latest.id();
}
} catch (IOException ioe) {
throw new UncheckedIOException("latest transport version file not found at [" + latestLocation + "]", ioe);
}

String manifestLocation = "/transport/defined/manifest.txt";
List<String> versionFileNames = null;
if (latestId > -1) {
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(manifestLocation)) {
if (inputStream != null) {
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
versionFileNames = reader.lines().filter(line -> line.isBlank() == false).toList();
}
} catch (IOException ioe) {
throw new UncheckedIOException("transport version manifest file not found at [" + manifestLocation + "]", ioe);
}
}

if (versionFileNames != null) {
for (String name : versionFileNames) {
String versionLocation = "/transport/defined/" + name;
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(versionLocation)) {
if (inputStream == null) {
throw new IllegalStateException("transport version file not found at [" + versionLocation + "]");
}
TransportVersion transportVersion = TransportVersion.fromInputStream(versionLocation, false, inputStream, latestId);
if (transportVersion != null) {
transportVersions.put(transportVersion.name(), transportVersion);
}
} catch (IOException ioe) {
throw new UncheckedIOException("transport version file not found at [ " + versionLocation + "]", ioe);
}
}
}

return transportVersions;
}

private static List<TransportVersion> addTransportVersions(Collection<TransportVersion> addFrom, List<TransportVersion> addTo) {
for (TransportVersion transportVersion : addFrom) {
addTo.add(transportVersion);
Expand Down
83 changes: 52 additions & 31 deletions server/src/test/java/org/elasticsearch/TransportVersionTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TransportVersionUtils;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Modifier;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -268,50 +272,53 @@ public void testDuplicateConstants() {
}
}

public void testFromName() {
assertThat(TransportVersion.fromName("test_0"), is(new TransportVersion("test_0", 3001000, null)));
assertThat(TransportVersion.fromName("test_1"), is(new TransportVersion("test_1", 3002000, null)));
assertThat(
TransportVersion.fromName("test_2"),
is(
new TransportVersion(
"test_2",
3003000,
new TransportVersion("test_2", 2001001, new TransportVersion("test_2", 1001001, null))
)
)
);
assertThat(
TransportVersion.fromName("test_3"),
is(new TransportVersion("test_3", 3003001, new TransportVersion("test_3", 2001002, null)))
);
assertThat(
TransportVersion.fromName("test_4"),
is(
new TransportVersion(
"test_4",
3003002,
new TransportVersion("test_4", 2001003, new TransportVersion("test_4", 1001002, null))
)
)
public void testLatest() {
TransportVersion latest = TransportVersion.parseFromBufferedReader(
"<test>",
"/transport/defined/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv",
TransportVersion.class::getResourceAsStream,
(c, p, br) -> TransportVersion.fromBufferedReader(c, p, true, br, Integer.MAX_VALUE)
);
// TODO: once placeholder is removed, test the latest known version can be found fromName
// assertThat(latest, is(TransportVersion.fromName(latest.name())));
}

public void testSupports() {
TransportVersion test0 = TransportVersion.fromName("test_0");
byte[] data0 = "100001000,3001000".getBytes(StandardCharsets.UTF_8);
TransportVersion test0 = TransportVersion.fromBufferedReader(
"<test>",
"testSupports0",
false,
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data0), StandardCharsets.UTF_8)),
5000000
);
assertThat(new TransportVersion(null, 2003000, null).supports(test0), is(false));
assertThat(new TransportVersion(null, 3001000, null).supports(test0), is(true));
assertThat(new TransportVersion(null, 100001001, null).supports(test0), is(true));

TransportVersion test1 = TransportVersion.fromName("test_1");
byte[] data1 = "3002000".getBytes(StandardCharsets.UTF_8);
TransportVersion test1 = TransportVersion.fromBufferedReader(
"<test>",
"testSupports1",
false,
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data1), StandardCharsets.UTF_8)),
5000000
);
assertThat(new TransportVersion(null, 2003000, null).supports(test1), is(false));
assertThat(new TransportVersion(null, 3001000, null).supports(test1), is(false));
assertThat(new TransportVersion(null, 3001001, null).supports(test1), is(false));
assertThat(new TransportVersion(null, 3002000, null).supports(test1), is(true));
assertThat(new TransportVersion(null, 100001000, null).supports(test1), is(true));
assertThat(new TransportVersion(null, 100001001, null).supports(test1), is(true));

TransportVersion test2 = TransportVersion.fromName("test_2");
byte[] data2 = "3003000,2001001,1001001".getBytes(StandardCharsets.UTF_8);
TransportVersion test2 = TransportVersion.fromBufferedReader(
"<test>",
"testSupports2",
false,
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data2), StandardCharsets.UTF_8)),
5000000
);
assertThat(new TransportVersion(null, 1001000, null).supports(test2), is(false));
assertThat(new TransportVersion(null, 1001001, null).supports(test2), is(true));
assertThat(new TransportVersion(null, 1001002, null).supports(test2), is(true));
Expand All @@ -331,7 +338,14 @@ public void testSupports() {
assertThat(new TransportVersion(null, 100001000, null).supports(test2), is(true));
assertThat(new TransportVersion(null, 100001001, null).supports(test2), is(true));

TransportVersion test3 = TransportVersion.fromName("test_3");
byte[] data3 = "100002000,3003001,2001002".getBytes(StandardCharsets.UTF_8);
TransportVersion test3 = TransportVersion.fromBufferedReader(
"<test>",
"testSupports3",
false,
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data3), StandardCharsets.UTF_8)),
5000000
);
assertThat(new TransportVersion(null, 1001001, null).supports(test3), is(false));
assertThat(new TransportVersion(null, 1001002, null).supports(test3), is(false));
assertThat(new TransportVersion(null, 1001003, null).supports(test3), is(false));
Expand All @@ -352,7 +366,14 @@ public void testSupports() {
assertThat(new TransportVersion(null, 100001000, null).supports(test3), is(true));
assertThat(new TransportVersion(null, 100001001, null).supports(test3), is(true));

TransportVersion test4 = TransportVersion.fromName("test_4");
byte[] data4 = "100002000,3003002,2001003,1001002".getBytes(StandardCharsets.UTF_8);
TransportVersion test4 = TransportVersion.fromBufferedReader(
"<test>",
"testSupports3",
false,
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data4), StandardCharsets.UTF_8)),
5000000
);
assertThat(new TransportVersion(null, 1001001, null).supports(test4), is(false));
assertThat(new TransportVersion(null, 1001002, null).supports(test4), is(true));
assertThat(new TransportVersion(null, 1001003, null).supports(test4), is(true));
Expand Down
5 changes: 0 additions & 5 deletions server/src/test/resources/transport/defined/manifest.txt

This file was deleted.

1 change: 0 additions & 1 deletion server/src/test/resources/transport/defined/test_0.csv

This file was deleted.

2 changes: 0 additions & 2 deletions server/src/test/resources/transport/defined/test_1.csv

This file was deleted.

1 change: 0 additions & 1 deletion server/src/test/resources/transport/defined/test_2.csv

This file was deleted.

1 change: 0 additions & 1 deletion server/src/test/resources/transport/defined/test_3.csv

This file was deleted.

1 change: 0 additions & 1 deletion server/src/test/resources/transport/defined/test_4.csv

This file was deleted.