温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎么在Kotlin中使用RocketMQ实现一个延时消息

发布时间:2021-03-24 17:03:24 来源:亿速云 阅读:281 作者:Leah 栏目:移动开发

这期内容当中小编将会给大家带来有关怎么在Kotlin中使用RocketMQ实现一个延时消息,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

一. 延时消息

延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

使用延时消息的典型场景,例如:

  • 在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消。

  • 在电商系统中,用户七天内没有评价商品,则默认好评。

这些场景对应的解决方案,包括:

  • 轮询遍历数据库记录

  • JDK 的 DelayQueue

  • ScheduledExecutorService

  • 基于 Quartz 的定时任务

  • 基于 Redis 的 zset 实现延时队列。

除此之外,还可以使用消息队列来实现延时消息,例如 RocketMQ。

二. RocketMQ

RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ 是2012年阿里巴巴开源的第三代分布式消息中间件。

怎么在Kotlin中使用RocketMQ实现一个延时消息

三. RocketMQ 实现延时消息

3.1 业务背景

我们的系统完成某项操作之后,会推送事件消息到业务方的接口。当我们调用业务方的通知接口返回值为成功时,表示本次推送消息成功;当返回值为失败时,则会多次推送消息,直到返回成功为止(保证至少成功一次)。
当我们推送失败后,虽然会进行多次推送消息,但并不是立即进行。会有一定的延迟,并按照一定的规则进行推送消息。
例如:1小时后尝试推送、3小时后尝试推送、1天后尝试推送、3天后尝试推送等等。因此,考虑使用延时消息实现该功能。

3.2 生产者(Producer)

生产者负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。

首先,定义一个支持延时发送的 AbstractProducer。

abstract class AbstractProducer :ProducerBean() {   var producerId: String? = null   var topic: String? = null   var tag: String?=null   var timeoutMillis: Int? = null   var delaySendTimeMills: Long? = null   val log = LogFactory.getLog(this.javaClass)   open fun sendMessage(messageBody: Any, tag: String) {     val msgBody = JSON.toJSONString(messageBody)     val message = Message(topic, tag, msgBody.toByteArray())     if (delaySendTimeMills != null) {       val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!!       message.startDeliverTime = startDeliverTime       log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}")     }     val logMessageId = buildLogMessageId(message)     try {       val sendResult = send(message)       log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)     } catch (e: Exception) {       log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)     }   }   fun buildLogMessageId(message: Message): String {     return "topic: " + message.topic + "\n" +         "producer: " + producerId + "\n" +         "tag: " + message.tag + "\n" +         "key: " + message.key + "\n"   } }

根据业务需要,增加一个支持重试机制的 Producer

@Component @ConfigurationProperties("mqs.ons.producers.xxx-producer") @Configuration @Data class CleanReportPushEventProducer :AbstractProducer() {   lateinit var delaySecondList:List<Long>   fun sendMessage(messageBody: CleanReportPushEventMessage){     //重试超过次数之后不再发事件     if (delaySecondList!=null) {       if(messageBody.times>=delaySecondList.size){         return       }       val msgBody = JSON.toJSONString(messageBody)       val message = Message(topic, tag, msgBody.toByteArray())       val delayTimeMills = delaySecondList[messageBody.times]*1000L       message.startDeliverTime = System.currentTimeMillis() + delayTimeMills       log.info( "messageBody: " + msgBody+ "startDeliverTime: "+message.startDeliverTime )       val logMessageId = buildLogMessageId(message)       try {         val sendResult = send(message)         log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)       } catch (e: Exception) {         log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)       }     }   } }

在 CleanReportPushEventProducer 中,超过了重试的次数就不会再发送消息了。

每一次延时消息的时间也会不同,因此需要根据重试的次数来获取这个delayTimeMills 。

通过 System.currentTimeMillis() + delayTimeMills 可以设置 message 的 startDeliverTime。然后调用 send(message) 即可发送延时消息。

我们使用商用版的 RocketMQ,因此支持精度为秒级别的延迟消息。在开源版本中,RocketMQ 只支持18个特定级别的延迟消息。:(

3.3 消费者(Consumer)

消费者负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。

定义 Push 类型的 AbstractConsumer:

@Data abstract class AbstractConsumer ():MessageListener{   var consumerId: String? = null   lateinit var subscribeOptions: List<SubscribeOptions>   var threadNums: Int? = null   val log = LogFactory.getLog(this.javaClass)   override fun consume(message: Message, context: ConsumeContext): Action {     val logMessageId = buildLogMessageId(message)     val body = String(message.body)     try {       log.info(logMessageId + " body: " + body)       val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag)))       log.info(logMessageId + " result: " + result.name)       return result     } catch (e: Exception) {       if (message.reconsumeTimes >= 3) {         log.error(logMessageId + " error: " + e.message, e)       }       return Action.ReconsumeLater     }   }   abstract fun getMessageBodyType(tag: String): Type?   abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action   protected fun buildLogMessageId(message: Message): String {     return "topic: " + message.topic + "\n" +         "consumer: " + consumerId + "\n" +         "tag: " + message.tag + "\n" +         "key: " + message.key + "\n" +         "MsgId:" + message.msgID + "\n" +         "BornTimestamp" + message.bornTimestamp + "\n" +         "StartDeliverTime:" + message.startDeliverTime + "\n" +         "ReconsumeTimes:" + message.reconsumeTimes + "\n"   } }

再定义具体的消费者,并且在消费失败之后能够再发送一次消息。

@Configuration @ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer") @Data class CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() {   val logger: Logger = LoggerFactory.getLogger(this.javaClass)   override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action {     if(obj is CleanReportPushEventMessage){       //清除事件       logger.info("consumer clean-report event report_id:${obj.id} ")       //消费失败之后再发送一次消息       if(!cleanReportService.sendCleanReportEvent(obj.id)){         val times = obj.times+1         eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))       }     }     return Action.CommitMessage   }   override fun getMessageBodyType(tag: String): Type? {     return CleanReportPushEventMessage::class.java   } }

其中,cleanReportService 的 sendCleanReportEvent() 会通过 http 的方式调用业务方提供的接口,进行事件消息的推送。如果推送失败了,则会进行下一次的推送。(这里使用了 eventProducer 的 sendMessage() 方法再次投递消息,是因为要根据调用的http接口返回的内容来判断消息是否发送成功。)

最后,定义 ConsumerFactory

@Component class ConsumerFactory(val consumers: List<AbstractConsumer>,val aliyunOnsOptions: AliyunOnsOptions) {   val logger: Logger = LoggerFactory.getLogger(this.javaClass)   @PostConstruct   fun start() {     CompletableFuture.runAsync{       consumers.stream().forEach {         val properties = buildProperties(it.consumerId!!, it.threadNums)         val consumer = ONSFactory.createConsumer(properties)         if (it.subscribeOptions != null && !it.subscribeOptions!!.isEmpty()) {           for (options in it.subscribeOptions!!) {             consumer.subscribe(options.topic, options.tag, it)           }           consumer.start()           val message = "\n".plus(               it.subscribeOptions!!.stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)}                   .collect(Collectors.toList<Any>()))           logger.info(String.format("consumer: %s\n", message))         }       }     }   }   private fun buildProperties(consumerId: String,threadNums: Int?): Properties {     val properties = Properties()     properties.put(PropertyKeyConst.ConsumerId, consumerId)     properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey)     properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey)     if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) {       properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr)     } else {       // 测试环境接入RocketMQ       properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress)     }     properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!)     return properties   } }

上述就是小编为大家分享的怎么在Kotlin中使用RocketMQ实现一个延时消息了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI