温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

RocketMQ怎么在springBoot中使用

发布时间:2021-04-15 17:50:41 来源:亿速云 阅读:337 作者:Leah 栏目:编程语言

今天就跟大家聊聊有关RocketMQ怎么在springBoot中使用,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

pom 配置:    

<parent>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-parent</artifactId>  <version>1.5.10.RELEASE</version> </parent> <dependency>   <groupId>org.apache.rocketmq</groupId>   <artifactId>rocketmq-client</artifactId>   <version>4.2.0</version> </dependency>

application.properties  配置:

# 消费者的组名 apache.rocketmq.consumer.PushConsumer=PushConsumer # 生产者的组名 apache.rocketmq.producer.producerGroup=Producer # NameServer地址 apache.rocketmq.namesrvAddr=localhost:9876

java代码:

生产者

package test.config.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.util.StopWatch; import javax.annotation.PostConstruct; @Component public class RocketMQClient {   /**    * 生产者的组名    */   @Value("${apache.rocketmq.producer.producerGroup}")   private String producerGroup;   /**    * NameServer 地址    */   @Value("${apache.rocketmq.namesrvAddr}")   private String namesrvAddr;   @PostConstruct   public void defaultMQProducer() {     //生产者的组名     DefaultMQProducer producer = new DefaultMQProducer(producerGroup);     //指定NameServer地址,多个地址以 ; 隔开     producer.setNamesrvAddr(namesrvAddr);     producer.setVipChannelEnabled(false);     try {       /**        * Producer对象在使用之前必须要调用start初始化,初始化一次即可        * 注意:切记不可以在每次发送消息时,都调用start方法        */       producer.start();       //创建一个消息实例,包含 topic、tag 和 消息体       //如下:topic 为 "TopicTest",tag 为 "push"       Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));       StopWatch stop = new StopWatch();       stop.start();       for (int i = 0; i < 1; i++) {         SendResult result = producer.send(message);         System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());       }       stop.stop();       System.out.println("----------------发送一万条消息耗时:" + stop.getTotalTimeMillis());     } catch (Exception e) {       e.printStackTrace();     } finally {       producer.shutdown();     }   } }

消费者: 

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class RocketMQServer {   /**    * 消费者的组名    */   @Value("${apache.rocketmq.consumer.PushConsumer}")   private String consumerGroup;   /**    * NameServer 地址    */   @Value("${apache.rocketmq.namesrvAddr}")   private String namesrvAddr;   @PostConstruct   public void defaultMQPushConsumer() {     //消费者的组名     DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);     //指定NameServer地址,多个地址以 ; 隔开     consumer.setNamesrvAddr(namesrvAddr);     consumer.setVipChannelEnabled(false);     try {       //订阅PushTopic下Tag为push的消息       consumer.subscribe("TopicTest", "push");       //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费       //如果非第一次启动,那么按照上次消费的位置继续消费       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);       consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {         try {           for (MessageExt messageExt : list) {             System.out.println("messageExt: " + messageExt);//输出消息内容             String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);             System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容           }         } catch (Exception e) {           e.printStackTrace();           return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试         }         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功       });       consumer.start();     } catch (Exception e) {       e.printStackTrace();     }   } }

掉坑总结:

1.rocketMQ启动时,命令不是  mqbroker -n 127.0.0.1:9876

         正确应该是:mqbroker -n 127.0.0.1:9876 butiautoCreateTopicEnable=true

         否则会抛出:No route info of this topic, TopicTest

2.客户端连接时抛出异常

        org.apache.rocketmq.client.exception.MQClientException: 

        Send [3] times, still failed, cost [3180]ms, Topic: TopicTest, BrokersSent: \

        [WIN-93CGO0S5G25, WIN-93CGO0S5G25, WIN-93CGO0S5G25]

解决方式两种

1.producer.setVipChannelEnabled(false); 生产者和消费者添加这行代买。

2.降rocketmq版本,降成3.2.6

关于spring.rocketmq.name-server的坑

看下图:

RocketMQ怎么在springBoot中使用

注意:

如果你是SpringBoot2.0+的框架,或者是JDK10。

你需要将你自己的项目配置文件中的,spring.rocketmq.name-server改成

spring.rocketmq.nameServer。注意是nameServer。

不然就会报各种稀奇古怪的bug。

关于启动报内存不足的错

在安装启动Name Server和Broker的时候,一定要修改配置文件,不然内存会爆炸。

Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory 

RocketMQ怎么在springBoot中使用

看完上述内容,你们对RocketMQ怎么在springBoot中使用有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI