Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kafbat.ui.exception.NotFoundException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.util.KafkaVersion;
import io.kafbat.ui.util.MetadataVersion;
import io.kafbat.ui.util.annotation.KafkaClientInternalsDependant;
import java.io.Closeable;
import java.time.Duration;
Expand Down Expand Up @@ -49,6 +50,8 @@
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.FinalizedVersionRange;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
Expand Down Expand Up @@ -96,6 +99,7 @@
@Slf4j
@AllArgsConstructor
public class ReactiveAdminClient implements Closeable {
private static final String DEFAULT_UNKNOWN_VERSION = "Unknown";

public enum SupportedFeature {
INCREMENTAL_ALTER_CONFIGS(2.3f),
Expand All @@ -114,8 +118,8 @@ public enum SupportedFeature {
this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion);
}

static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, String kafkaVersionStr) {
@Nullable Float kafkaVersion = KafkaVersion.parse(kafkaVersionStr).orElse(null);
static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, Optional<String> kafkaVersionStr) {
@Nullable Float kafkaVersion = kafkaVersionStr.flatMap(KafkaVersion::parse).orElse(null);
return Flux.fromArray(SupportedFeature.values())
.flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled)))
.filter(Tuple2::getT2)
Expand Down Expand Up @@ -150,18 +154,28 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
.orElse(desc.getNodes().iterator().next().id());
return loadBrokersConfig(ac, List.of(targetNodeId))
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
.flatMap(configs -> {
String version = "1.0-UNKNOWN";
.zipWith(toMono(ac.describeFeatures().featureMetadata()))
.flatMap(tuple -> {
List<ConfigEntry> configs = tuple.getT1();
FeatureMetadata featureMetadata = tuple.getT2();
Optional<String> version = Optional.empty();
boolean topicDeletionEnabled = true;
for (ConfigEntry entry : configs) {
if (entry.name().contains("inter.broker.protocol.version")) {
version = entry.value();
version = Optional.of(entry.value());
}
if (entry.name().equals("delete.topic.enable")) {
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
}
}
final String finalVersion = version;
if (version.isEmpty()) {
FinalizedVersionRange metadataVersion =
featureMetadata.finalizedFeatures().get("metadata.version");
if (metadataVersion != null) {
version = MetadataVersion.findVersion(metadataVersion.maxVersionLevel());
}
}
final String finalVersion = version.orElse(DEFAULT_UNKNOWN_VERSION);
final boolean finalTopicDeletionEnabled = topicDeletionEnabled;
return SupportedFeature.forVersion(ac, version)
.map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled));
Expand Down
48 changes: 48 additions & 0 deletions api/src/main/java/io/kafbat/ui/util/MetadataVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.kafbat.ui.util;

import java.util.Arrays;
import java.util.Optional;

public enum MetadataVersion {
IBP_3_0_IV1(1, "3.0-IV1"),
IBP_3_1_IV0(2, "3.1-IV0"),
IBP_3_2_IV0(3, "3.2-IV0"),
IBP_3_3_IV0(4, "3.3-IV0"),
IBP_3_3_IV1(5, "3.3-IV1"),
IBP_3_3_IV2(6, "3.3-IV2"),
IBP_3_3_IV3(7, "3.3-IV3"),
IBP_3_4_IV0(8, "3.4-IV0"),
IBP_3_5_IV0(9, "3.5-IV0"),
IBP_3_5_IV1(10, "3.5-IV1"),
IBP_3_5_IV2(11, "3.5-IV2"),
IBP_3_6_IV0(12, "3.6-IV0"),
IBP_3_6_IV1(13, "3.6-IV1"),
IBP_3_6_IV2(14, "3.6-IV2"),
IBP_3_7_IV0(15, "3.7-IV0"),
IBP_3_7_IV1(16, "3.7-IV1"),
IBP_3_7_IV2(17, "3.7-IV2"),
IBP_3_7_IV3(18, "3.7-IV3"),
IBP_3_7_IV4(19, "3.7-IV4"),
IBP_3_8_IV0(20, "3.8-IV0"),
IBP_3_9_IV0(21, "3.9-IV0"),
IBP_4_0_IV0(22, "4.0-IV0"),
IBP_4_0_IV1(23, "4.0-IV1"),
IBP_4_0_IV2(24, "4.0-IV2"),
IBP_4_0_IV3(25, "4.0-IV3"),
IBP_4_1_IV0(26, "4.1-IV0");

private final int featureLevel;
private final String release;

MetadataVersion(int featureLevel, String release) {
this.featureLevel = featureLevel;
this.release = release;
}

public static Optional<String> findVersion(int featureLevel) {
return Arrays.stream(values())
.filter(v -> v.featureLevel == featureLevel)
.findFirst().map(v -> v.release);
}

}
Loading