| consumer | library | etc |
|---|---|---|
| CustomMessageListener | spring-kafka | dlq, retry |
| CustomMessageLegacyListener | spring-cloud-stream-kafka-binder | dlq |
| CustomMessageFunctionalListener | spring-cloud-stream-kafka-streams-binder | dlq (only deserialization err) |
| CustomMessageRetryableFunctionalListener | spring-cloud-stream-kafka-streams-binder | dlq (only deserialization err), retry |
If you want to run retry-consumers that will automaically registered, change dynamic.kafka.enable value to true.
NOTE
- only support
spring.profiles.active: original - only support when
CustomMessageListenerbean is registered - only support for
spring-kafka
dynamic-kafka: enable: false default: classPath: com.boot.kafa.consumer.dlq.CustomMessageListener methodName: listen retry: history-5m-retry: # topic-name id: history-5m-retry containerFactory: retry5mKafkaListenerContainerFactory groupId: history-5m-retry-group classPath: com.boot.kafa.consumer.dlq.CustomMessageListener methodName: listener5m history-10m-retry: # topic-name id: history-10m-retry containerFactory: retry10mKafkaListenerContainerFactory groupId: history-10m-retry-group history-20-retry: # topic-name id: history-20m-retry containerFactory: retry20mKafkaListenerContainerFactory groupId: history-20m-retry-group dlq: history-deadletter-topic - Select an active profile
- consume-topic:
custom-message-topic - dead-letter-topic:
custom-message-dlq
- consume-topic:
- Run the application
- Trigger message
GET http://localhost:8080/produce - Monitor
custom-message-dlqtopic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic custom-message-dlq
application.yml
spring: profiles: active: functional kafka: bootstrap-servers: localhost:9092 # producer # active profile list # 1. original - [CustomMessageListener] - [spring-kafka] # 2. legacy - [CustomMessageLegacyListener] - [spring-cloud-stream-binder-kafka] # 3. functional - [CustomMessageFunctionalListener] - [spring-cloud-stream-binder-kafka-stream] # 4. functional-retryable - [CustomMessageRetryableFunctionalListener] - [spring-cloud-stream-binder-kafka-stream] # select listener to media.kafka.consumer below # 1. CustomMessageListener (spring-kafka, dlq, retry O) # 2. CustomMessageLegacyListener (spring-cloud-stream-kafka-binder, dlq) # 3. CustomMessageFunctionalListener (spring-cloud-stream-kafka-streams-binder, dlq (only deserialization err)) # 4. CustomMessageRetryableFunctionalListener (spring-cloud-stream-kafka-streams-binder, dlq (only deserialization err), retry O)
Do not confuse spring-cloud-stream-kafka-binder with spring-cloud-stream-kafka-streams-binder.
difference
| spring-cloud-stream-kafka-binder | spring-cloud-stream-kafka-streams-binder |
|---|---|
| spring.cloud.stream.kafka.binder.~ spring.cloud.stream.kafka.binding. <input>.consumer~ | spring.cloud.stream.kafka.streams.binder.~ spring.cloud.stream.kafka.streams.binding. <input>.consumer~ |
~FunctionalListener DLQ
ContentHistoryFunctionalListener, CustomHistoryRetryableFunctionalListener are consist of functional style of spring-cloud-stream-kafka-streams-binder. When an error occurs (e.g. RuntimeException) within the consume logic, stream thread dies and is subsequently shut down.
It only sends DLQ for deserialization error by the setting below, so it needs to be handled separately within consume logic.
spring.cloud.stream.kafka.streams.binding.<input>.consumer.deserializationExceptionHandler: sendToDlq