温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Springboot如何整合RocketMQ收发消息

发布时间:2021-12-29 12:45:16 来源:亿速云 阅读:255 作者:小新 栏目:开发技术

这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

Springboot 整合 RocketMQ 收发消息

创建springboot项目

pom.xml添加rocketmq-spring-boot-starter依赖。

<dependency>     <groupId>org.apache.rocketmq</groupId>     <artifactId>rocketmq-spring-boot-starter</artifactId>     <version>2.1.0</version> </dependency>

yml 配置

application.yml

rocketmq:   name-server: 192.168.64.141:9876

application-demo1.yml

使用 demo1 profile 指定生产者组组名

rocketmq:   producer:     group: producer-demo1

application-demo2.yml

使用 demo2 profile 指定生产者组组名

rocketmq:   producer:     group: producer-demo2

测试

demo 1

  • 发送普通消息

  • 发送 Spring 的通用 Message 对象

  • 发送异步消息

  • 发送顺序消息

生产者

package cn.tedu.demo2.m1; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component public class Producer {     @Autowired     private RocketMQTemplate t ;     public  void send(){         //发送同步消息         t.convertAndSend("Topic1:TagA", "Hello world! ");         //发送spring的Message         Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();         t.send("Topic1:TagA",message);         //发送异步消息         t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {             @Override             public void onSuccess(SendResult sendResult) {                 System.out.println("发送成功");             }             @Override             public void onException(Throwable throwable) {                 System.out.println("发送失败");             }         });         //发送顺序消息         t.syncSendOrderly("Topic1", "98456237,创建", "98456237");         t.syncSendOrderly("Topic1", "98456237,支付", "98456237");         t.syncSendOrderly("Topic1", "98456237,完成", "98456237");     } }

消费者

package cn.tedu.demo2.m1; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1") public class Consumer  implements RocketMQListener<String> {     @Override     public void onMessage(String s) {         System.out.println("收到"+s);     } }

主类

package cn.tedu.demo2.m1; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main {     public static void main(String[] args) {         SpringApplication.run(Main.class, args);     } }

测试类

需要放在 test 文件夹

激活 demo1 profile  @ActiveProfiles("demo1")

package cn.tedu.demo2.m1; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; @SpringBootTest @ActiveProfiles("demo1") public class Test1 {     @Autowired     private  Producer producer;     @Test     public void test1(){         producer.send();         try {             Thread.sleep(5000);         } catch (InterruptedException e) {             e.printStackTrace();         }     } }

demo 2

发送事务消息

生产者

package cn.tedu.demo2.m2; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component public class Producer {     @Autowired     private RocketMQTemplate t;     public void send(){         Message<String> message = MessageBuilder.withPayload("Hello world").build();         //一旦发送消息,则执行监听器         t.sendMessageInTransaction("Topic2",message,null);     }     @RocketMQTransactionListener     class Lis implements RocketMQLocalTransactionListener {         @Override         public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {             System.out.println("执行本地事务");             return RocketMQLocalTransactionState.UNKNOWN;         }         @Override         public RocketMQLocalTransactionState checkLocalTransaction(Message message) {             System.out.println("执行事务回查");             return RocketMQLocalTransactionState.COMMIT;         }     } }

消费者

package cn.tedu.demo2.m2; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2") public class Consumer implements RocketMQListener<String> {     @Override     public void onMessage(String s) {         System.out.println("收到"+s);     } }

主类

package cn.tedu.demo2.m2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main {     public static void main(String[] args) {         SpringApplication.run(Main.class, args);     } }

测试类

package cn.tedu.demo2.m2; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; @SpringBootTest @ActiveProfiles("demo2") public class Test2 {     @Autowired     private  Producer producer;     @Test     public void  test1(){         producer.send();         //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间         try {             Thread.sleep(30000);         } catch (InterruptedException e) {             e.printStackTrace();         }     } }

关于“Springboot如何整合RocketMQ收发消息”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI