Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ public Mono<ResponseEntity<Void>> deleteTopic(
.operationName("deleteTopic")
.build();

return accessControlService.validateAccess(context).then(
topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
return accessControlService.validateAccess(context)
.then(
topicsService.deleteTopic(getCluster(clusterName), topicName)
.thenReturn(ResponseEntity.ok().<Void>build())
).doOnEach(sig -> auditService.audit(context, sig));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaReferenceDTO;
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaTypeDTO;
import com.provectus.kafka.ui.service.SchemaRegistryService;
import com.provectus.kafka.ui.sr.model.Compatibility;
import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse;
import com.provectus.kafka.ui.sr.model.NewSubject;
import com.provectus.kafka.ui.sr.model.SchemaReference;
import com.provectus.kafka.ui.sr.model.SchemaType;
import java.util.List;
import java.util.Optional;
import org.mapstruct.Mapper;


@Mapper(componentModel = "spring")
@Mapper
public interface KafkaSrMapper {

default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLevel s) {
Expand All @@ -24,9 +27,12 @@ default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLev
.subject(s.getSubject())
.schema(s.getSchema())
.schemaType(SchemaTypeDTO.fromValue(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO).getValue()))
.references(toDto(s.getReferences()))
.compatibilityLevel(s.getCompatibility().toString());
}

List<SchemaReferenceDTO> toDto(List<SchemaReference> references);

CompatibilityCheckResponseDTO toDto(CompatibilityCheckResponse ccr);

CompatibilityLevelDTO.CompatibilityEnum toDto(Compatibility compatibility);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
Expand Down Expand Up @@ -217,7 +218,9 @@ private String convertSchema(SchemaMetadata schema, ParsedSchema parsedSchema) {
case AVRO -> new AvroJsonSchemaConverter()
.convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
.toJson();
case JSON -> schema.getSchema();
case JSON ->
//need to use confluent JsonSchema since it includes resolved references
((JsonSchema) parsedSchema).rawSchema().toString();
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
import com.provectus.kafka.ui.sr.model.NewSubject;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import com.provectus.kafka.ui.util.ReactiveFailover;
import com.provectus.kafka.ui.util.WebClientConfigurator;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -92,7 +91,7 @@ public Mono<SubjectWithCompatibilityLevel> getLatestSchemaVersionBySubject(Kafka
private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
String version) {
return api(cluster)
.mono(c -> c.getSubjectVersion(schemaName, version))
.mono(c -> c.getSubjectVersion(schemaName, version, false))
.zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
.map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.provectus.kafka.ui.service.integration.odd;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.sr.model.SchemaReference;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import reactor.core.publisher.Mono;

// logic copied from AbstractSchemaProvider:resolveReferences
// https://github.com/confluentinc/schema-registry/blob/fd59613e2c5adf62e36705307f420712e4c8c1ea/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java#L54
class SchemaReferencesResolver {

private final KafkaSrClientApi client;

SchemaReferencesResolver(KafkaSrClientApi client) {
this.client = client;
}

Mono<ImmutableMap<String, String>> resolve(List<SchemaReference> refs) {
return resolveReferences(refs, new Resolving(ImmutableMap.of(), ImmutableSet.of()))
.map(Resolving::resolved);
}

private record Resolving(ImmutableMap<String, String> resolved, ImmutableSet<String> visited) {

Resolving visit(String name) {
return new Resolving(resolved, ImmutableSet.<String>builder().addAll(visited).add(name).build());
}

Resolving resolve(String ref, String schema) {
return new Resolving(ImmutableMap.<String, String>builder().putAll(resolved).put(ref, schema).build(), visited);
}
}

private Mono<Resolving> resolveReferences(@Nullable List<SchemaReference> refs, Resolving initState) {
Mono<Resolving> result = Mono.just(initState);
for (SchemaReference reference : Optional.ofNullable(refs).orElse(List.of())) {
result = result.flatMap(state -> {
if (state.visited().contains(reference.getName())) {
return Mono.just(state);
} else {
final var newState = state.visit(reference.getName());
return client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true)
.flatMap(subj ->
resolveReferences(subj.getReferences(), newState)
.map(withNewRefs -> withNewRefs.resolve(reference.getName(), subj.getSchema())));
}
});
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.provectus.kafka.ui.model.Statistics;
import com.provectus.kafka.ui.service.StatisticsCache;
import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import java.net.URI;
import java.util.List;
import java.util.Map;
Expand All @@ -24,6 +25,8 @@
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

@Slf4j
@RequiredArgsConstructor
Expand Down Expand Up @@ -100,12 +103,20 @@ private Mono<List<DataSetField>> getTopicSchema(KafkaCluster cluster,
return Mono.just(List.of());
}
String subject = topic + (isKey ? "-key" : "-value");
return cluster.getSchemaRegistryClient()
.mono(client -> client.getSubjectVersion(subject, "latest"))
.map(subj -> DataSetFieldsExtractors.extract(subj, topicOddrn, isKey))
return getSubjWithResolvedRefs(cluster, subject)
.map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey))
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
.onErrorMap(WebClientResponseException.class, err ->
new IllegalStateException("Error retrieving subject %s".formatted(subject), err));
}

private Mono<Tuple2<SchemaSubject, Map<String, String>>> getSubjWithResolvedRefs(KafkaCluster cluster,
String subjectName) {
return cluster.getSchemaRegistryClient()
.mono(client ->
client.getSubjectVersion(subjectName, "latest", false)
.flatMap(subj -> new SchemaReferencesResolver(client).resolve(subj.getReferences())
.map(resolvedRefs -> Tuples.of(subj, resolvedRefs))));
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.provectus.kafka.ui.service.integration.odd.schema;

import com.google.common.collect.ImmutableSet;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
Expand All @@ -14,8 +14,8 @@ final class AvroExtractor {
private AvroExtractor() {
}

static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
var schema = new Schema.Parser().parse(subject.getSchema());
static List<DataSetField> extract(AvroSchema avroSchema, KafkaPath topicOddrn, boolean isKey) {
var schema = avroSchema.rawSchema();
List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
extract(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@

import com.provectus.kafka.ui.sr.model.SchemaSubject;
import com.provectus.kafka.ui.sr.model.SchemaType;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.opendatadiscovery.client.model.DataSetField;
import org.opendatadiscovery.client.model.DataSetFieldType;
import org.opendatadiscovery.oddrn.model.KafkaPath;

public final class DataSetFieldsExtractors {

public static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
public static List<DataSetField> extract(SchemaSubject subject,
Map<String, String> resolvedRefs,
KafkaPath topicOddrn,
boolean isKey) {
SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO);
return switch (schemaType) {
case AVRO -> AvroExtractor.extract(subject, topicOddrn, isKey);
case JSON -> JsonSchemaExtractor.extract(subject, topicOddrn, isKey);
case PROTOBUF -> ProtoExtractor.extract(subject, topicOddrn, isKey);
case AVRO -> AvroExtractor.extract(
new AvroSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
case JSON -> JsonSchemaExtractor.extract(
new JsonSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
case PROTOBUF -> ProtoExtractor.extract(
new ProtobufSchema(subject.getSchema(), List.of(), resolvedRefs, null, null), topicOddrn, isKey);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ final class JsonSchemaExtractor {
private JsonSchemaExtractor() {
}

static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
Schema schema = new JsonSchema(subject.getSchema()).rawSchema();
static List<DataSetField> extract(JsonSchema jsonSchema, KafkaPath topicOddrn, boolean isKey) {
Schema schema = jsonSchema.rawSchema();
List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
extract(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.Value;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -42,8 +41,8 @@ final class ProtoExtractor {
private ProtoExtractor() {
}

static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
Descriptor schema = new ProtobufSchema(subject.getSchema()).toDescriptor();
static List<DataSetField> extract(ProtobufSchema protobufSchema, KafkaPath topicOddrn, boolean isKey) {
Descriptor schema = protobufSchema.toDescriptor();
List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaReferenceDTO;
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
import com.provectus.kafka.ui.model.SchemaTypeDTO;
Expand Down Expand Up @@ -190,6 +191,58 @@ void shouldCreateNewProtobufSchema() {
Assertions.assertEquals(schema, actual.getSchema());
}


@Test
void shouldCreateNewProtobufSchemaWithRefs() {
NewSchemaSubjectDTO requestBody = new NewSchemaSubjectDTO()
.schemaType(SchemaTypeDTO.PROTOBUF)
.subject(subject + "-ref")
.schema("""
syntax = "proto3";
message MyRecord {
int32 id = 1;
string name = 2;
}
""");

webTestClient
.post()
.uri("/api/clusters/{clusterName}/schemas", LOCAL)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
.exchange()
.expectStatus()
.isOk();

requestBody = new NewSchemaSubjectDTO()
.schemaType(SchemaTypeDTO.PROTOBUF)
.subject(subject)
.schema("""
syntax = "proto3";
import "MyRecord.proto";
message MyRecordWithRef {
int32 id = 1;
MyRecord my_ref = 2;
}
""")
.references(List.of(new SchemaReferenceDTO().name("MyRecord.proto").subject(subject + "-ref").version(1)));

SchemaSubjectDTO actual = webTestClient
.post()
.uri("/api/clusters/{clusterName}/schemas", LOCAL)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
.exchange()
.expectStatus()
.isOk()
.expectBody(SchemaSubjectDTO.class)
.returnResult()
.getResponseBody();

Assertions.assertNotNull(actual);
Assertions.assertEquals(requestBody.getReferences(), actual.getReferences());
}

@Test
public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
webTestClient
Expand Down Expand Up @@ -278,7 +331,7 @@ public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() {
String schema =
"{\"subject\":\"test/test\",\"schemaType\":\"JSON\",\"schema\":"
+ "\"{\\\"type\\\": \\\"string\\\"}\"}";
+ "\"{\\\"type\\\": \\\"string\\\"}\"}";

webTestClient
.post()
Expand Down
Loading