用Redis Sorted-Set实现的优先级消息队列,可靠消费
<dependency> <groupId>pers.lurker</groupId> <artifactId>redisZSetQ-spring-boot-starter</artifactId> <version>${version}</version> </dependency>根据配置信息自动选择集群类型和连接池类型:
- 集群优先级:sentinel > cluster > standalone
- 连接池优先级:lettuce > jedis > null
spring.redis.host= spring.redis.port= spring.redis.password= spring.redis.database= spring.redis.timeout= # 消费执行超时时间(秒),默认3600 rediszsetq.consumer.timeout=3600 # 消费执行超时检查频率(秒/次),默认5 rediszsetq.consumer.timeout-check-interval=5 # 消费并发线程数,默认1 rediszsetq.consumer.concurrency=1 # 消费每次拉取消息数量,默认2 rediszsetq.consumer.fetch-count=2 # 最大重试次数,-1不限制,0不重试,>0 n次 max-retry-count=5import org.springframework.beans.factory.annotation.Autowired; @Autowired private MessageProducer messageProducer; public void produce() { // 队列名, 消息内容, 优先级 messageProducer.send("queueName", "hello", 0); }继承 MessageListenerAdapter 并在 onMessage 方法上添加 @RedisZSetListener 注解
- value: 队列名,必须
- concurrency: 并发线程数,非必须
- fetchCount: 拉取消息数,非必须,只有接收List生效
- restTimeIfConsumeNull: 消费消息为空时,休眠时间(秒),默认1
import pers.lurker.rediszsetq.consumer.MessageListener; import pers.lurker.rediszsetq.consumer.RedisZSetListener; import pers.lurker.rediszsetq.consumer.Consumer; import pers.lurker.rediszsetq.model.Message; import org.springframework.stereotype.Component; @Component public class StringMessageListener implements MessageListener<String> { @Override @RedisZSetListener("queueName") public void onMessage(Message<String> message, Consumer consumer) { System.out.println(message); consumer.ack(message); } @Override @RedisZSetListener(value = "queueName", concurrency = 2, fetchCount = 2) public void onMessage(List<Message<String>> messages, Consumer consumer) { messages.forEach(message -> System.out.println(message)); consumer.ack(messages); } }