Here are all the ways you can configure Micronaut Kafka, both regular applications and streams, to use particular serialisers and deserialisers.
A key thing to remember is properties are used first and then the configured serde registries are used.
(as an aside I found it helpful to know a serde is simply and object with both a serialiser and a deserialiser)
For regular kafka
If you want all consumers and producers to have the same config
kafka: key: serializer: org.apache.kafka.common.serialization.UUIDSerializer deserializer: org.apache.kafka.common.serialization.UUIDDeserializer value: serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer If you want all producers to have the same config
kafka: producers: default: key.serializer: org.apache.kafka.common.serialization.UUIDSerializer value.serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer If you want just consumers to have the same config
kafka: consumers: default: key.deserializer: org.apache.kafka.common.serialization.UUIDDeserializer value.deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer If you want just one consumer (with group id “my-consumer-group”) to be configured
kafka: consumers: my-consumer-group: key.deserializer: org.apache.kafka.common.serialization.UUIDDeserializer value.deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer The same goes for producers, you can use their id to configure them specifically.
If you want to configure the serialisers and deserialisers in code
Create an instance of a io.micronaut.configuration.kafka.serde.SerdeRegistry:
@Singleton public class AvroSerdeRegistry implements SerdeRegistry { @Inject private SchemaRegistryClient schemaRegistryClient; @SuppressWarnings("unchecked") @Override public <T> Serde<T> getSerde(Class<T> type) { if (Arrays.asList(type.getInterfaces()).contains(SpecificRecord.class)) { SpecificAvroSerde serde = new SpecificAvroSerde(schemaRegistryClient); Map<String, String> config = Map.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true", AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, ""); serde.configure(config, false); return (Serde<T>) serde; } return null; } @Override public int getOrder() { // Before JSON Serde return -1; } } The order is important as this class will be picked up by the CompositeSerdeRegistry and all the SerdeRegistry will be tried in order until one returns a non-null Serde.
Option 2 for code configuration
If you want more control than just ordered Serde Registries, or if you want to make sure JsonSerde is never used, replace the CompositeSerdeRegistry:
@Singleton @Replaces(CompositeSerdeRegistry.class) public class StringSerdeRegistry implements SerdeRegistry { @Override public <T> Serde<T> getSerde(Class<T> type) { return (Serde<T>) Serdes.String(); } } This will only ever serialise or deserialise Strings.
For Kafka Streams
By properties, for all streams
kafka: streams: #Micronaut default - applies this config to all streams default: #Kafka streams default default: key.serde: org.apache.kafka.common.serialization.Serdes$UUIDSerde value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde By properties, for just one stream called bank-transfer
kafka: streams: bank-transfer: default: key.serde: org.apache.kafka.common.serialization.Serdes$UUIDSerde value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde In code as properties for just one stream
@Factory public class TransferStreamFactory { public static final String BANK_TRANSFER = "bank-transfer"; public static final String INPUT = "transfer-commands"; public static final String OUTPUT = "transfer-events"; @Singleton @Named(BANK_TRANSFER) KStream<UUID, MakeTransfer> bankTransferStream(ConfiguredStreamBuilder builder) { Properties props = builder.getConfiguration(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.UUID().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName()); KStream<UUID, MakeTransfer> source = builder.stream(INPUT); source.mapValues(value -> TransferEvent.newBuilder() .setTransferId(UUID.randomUUID()) .setSrcAccountId(value.getSrcAccountId()) .setDestAccountId(value.getDestAccountId()) .setAmount(value.getAmount()) .build()) .to(OUTPUT); return source; } } In code for just one stream, with different configuration for input and output
@Factory public class TransferStreamFactory { public static final String BANK_TRANSFER = "bank-transfer"; public static final String INPUT = "transfer-commands"; public static final String OUTPUT = "transfer-events"; @Singleton @Named(BANK_TRANSFER) KStream<UUID, MakeTransfer> bankTransferStream(ConfiguredStreamBuilder builder) { Properties props = builder.getConfiguration(); Map<String, Object> serdeConfig = Map.of(SCHEMA_REGISTRY_URL_CONFIG, props.get(SCHEMA_REGISTRY_URL_CONFIG)); SpecificAvroSerde<MakeTransfer> inputValueSerde = new SpecificAvroSerde<>(); inputValueSerde.configure(serdeConfig, false); SpecificAvroSerde<TransferEvent> outputValueSerde = new SpecificAvroSerde<>(); outputValueSerde.configure(serdeConfig, false); KStream<UUID, MakeTransfer> source = builder.stream(INPUT, Consumed.with(Serdes.UUID(), inputValueSerde)); source.mapValues(value -> TransferEvent.newBuilder() .setTransferId(UUID.randomUUID()) .setSrcAccountId(value.getSrcAccountId()) .setDestAccountId(value.getDestAccountId()) .setAmount(value.getAmount()) .build()) .to(OUTPUT, Produced.with(Serdes.UUID(), outputValueSerde)); return source; } } In code
You can also, by not configuring any serdes in properties or the stream code allow Micronaut to pick the serde from the SerdeRegistry, so the same advice above about adding new Serde Registries (or replacing existing ones) apply in Kafka Streams!
Conclusion
There’s lots of flexibility here. The best approach is to configure everything to be the same - so use the default properties and then use other more specific approaches as the need arises e.g. if all messages have UUID keys apart from one, configure all producers and consumers to use UUID serdes and then for the one consumer that needs it configure a String serde. This keeps configuration and repetition low. I would also favour properties over class and code configuration as it means less classes to understand when others open the project and less clutter around the business logic.
Top comments (0)