Commons Kafka Vertical Event library is an implementation of the vertical event pattern that can be used to guarantee the sending and receiving of messages across multiple applications using Kafka.
-  
Enable commons-kafka-vertical-events by using the annotation
@EnableUnifiedKafkaVerticalEvents. -  
To implement a producer extend the class
AbstractKafkaProducer. 
package com.czetsuyatech.vertical.events.client.messaging.producers; import com.czetsuyatech.vertical.events.client.messaging.messages.IamEvent; import com.czetsuyatech.vertical.events.config.KafkaConfig; import com.czetsuyatech.vertical.events.messaging.messages.KeyAttributes; import com.czetsuyatech.vertical.events.messaging.messages.VerticalEventDTO; import com.czetsuyatech.vertical.events.messaging.messages.VerticalEventDTO.Entity; import com.czetsuyatech.vertical.events.messaging.messages.VerticalEventDTO.Event; import com.czetsuyatech.vertical.events.messaging.producers.AbstractKafkaProducer; import java.time.OffsetDateTime; import java.util.Map; import java.util.concurrent.CompletableFuture; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; @Service @AllArgsConstructor @Slf4j public class IamEventProducer extends AbstractKafkaProducer<IamEvent, VerticalEventDTO> { private final KafkaConfig kafkaConfig; private final KafkaTemplate<String, VerticalEventDTO> kafkaTemplate; @Override protected CompletableFuture<SendResult<String, VerticalEventDTO>> publishMessage(String topic, VerticalEventDTO event) { return kafkaTemplate.send(topic, event.getEvent().getEventId(), event); } @Override protected void publishFailure(VerticalEventDTO message, Throwable throwable) { log.error("Message: {} failed to be sent to kafka", message, throwable); } @Override protected void publishSuccess(VerticalEventDTO message, SendResult<String, VerticalEventDTO> result) { log.debug( "Message: {} was sent to kafka with offset: {}", message, result.getRecordMetadata().offset()); } @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) @Override public void publish(IamEvent event) { log.debug("Sending to topic={}, event={}", kafkaConfig.getTopics().getIamVertical(), event); sendMessage( kafkaConfig.getTopics().getIamVertical(), getVerticalEvent(event)); } private VerticalEventDTO getVerticalEvent(IamEvent event) { return VerticalEventDTO.builder() .event(Event.builder() .timestamp(OffsetDateTime.now().toString()) .service(event.getServiceName()) .eventType(event.getEventType().name()) .build()) .entity(Entity.builder() .type(event.getEntityType()) .keyAttributes(Map.of( KeyAttributes.EID, event.getEid())) .build()) .build(); } } - To implement a consumer we need to define a bean config that extends 
AbstractKafkaBeansConfig. 
package com.czetsuyatech.vertical.events.client.messaging.config; import com.czetsuyatech.vertical.events.client.messaging.messages.EventType; import com.czetsuyatech.vertical.events.config.AbstractKafkaBeansConfig; import com.czetsuyatech.vertical.events.messaging.messages.VerticalEventDTO; import jakarta.validation.constraints.NotNull; import java.util.Optional; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; @Slf4j @AllArgsConstructor @Configuration @EnableKafka @ConditionalOnProperty( value = AbstractKafkaBeansConfig.BACKOFF_POLICY_ENABLED, havingValue = "true", matchIfMissing = true ) public class KafkaBeansConfig extends AbstractKafkaBeansConfig { @NotNull public RecordFilterStrategy<Object, Object> getRejectRecordFilterStrategy() { return consumerRecord -> { log.debug("RecordFilterStrategy: {} ", consumerRecord.value()); VerticalEventDTO verticalEvent = (VerticalEventDTO) consumerRecord.value(); return Optional.ofNullable(verticalEvent.getEvent()) .filter(ev -> EventType.isPresent(ev.getEventType())) .map( ev -> { log.info("Consuming bulk manual with eventId={}", ev.getEventId()); VerticalEventDTO.Entity entity = verticalEvent.getEntity(); return Optional.ofNullable(entity).isEmpty(); }) .orElse(Boolean.TRUE); }; } @Override protected Logger getLogger() { return log; } } - Use the bean we created in step 3 in the concrete consumer.
 
package com.czetsuyatech.vertical.events.client.messaging.consumers; import com.czetsuyatech.vertical.events.client.services.IamEventStrategy; import com.czetsuyatech.vertical.events.messaging.messages.VerticalEventDTO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.DltHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; @ConditionalOnProperty(value = "unified.kafka.vertical-events.enabled", havingValue = "true") @Service @Slf4j @RequiredArgsConstructor public class IamEventsConsumer { private static final String CONSUMER_TOPIC = "${unified.kafka.vertical-events.topics.iam-vertical}"; private static final String CONSUMER_GROUP = "${unified.kafka.vertical-events.consumers.iam-group}"; private final IamEventStrategy iamEventStrategy; @KafkaListener( topics = CONSUMER_TOPIC, groupId = CONSUMER_GROUP, containerFactory = "concurrentKafkaListenerContainerFactory") public void handleEvent(VerticalEventDTO verticalEventDTO) { log.debug("Receives event with type={}", verticalEventDTO.getEvent().getEventType()); iamEventStrategy.processEvent(verticalEventDTO); } @DltHandler void handle(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { log.debug("{} from {}", in, topic); } }