温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

SpringBoot怎么整合Pulsar

发布时间:2022-07-02 09:51:49 来源:亿速云 阅读:632 作者:iii 栏目:开发技术

这篇文章主要介绍了SpringBoot怎么整合Pulsar的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么整合Pulsar文章都会有所收获,下面我们一起来看看吧。

一、添加pom.xml依赖

<parent>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-parent</artifactId>     <version>2.7.0</version> </parent> <dependencies>     <dependency>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-web</artifactId>     </dependency>     <dependency>         <groupId>org.apache.pulsar</groupId>         <artifactId>pulsar-client</artifactId>         <version>2.10.0</version>     </dependency>     <dependency>         <groupId>org.projectlombok</groupId>         <artifactId>lombok</artifactId>         <version>1.18.24</version>         <scope>provided</scope>     </dependency> </dependencies> <build>     <plugins>         <plugin>             <groupId>org.apache.maven.plugins</groupId>             <artifactId>maven-compiler-plugin</artifactId>             <configuration>                 <source>8</source>                 <target>8</target>             </configuration>         </plugin>     </plugins> </build>

二、Pulsar 参数类

import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.Map; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:32  * @Description: Pulsar 参数类  */ @Component @ConfigurationProperties(prefix = "tdmq.pulsar") @Data public class PulsarProperties {     /**      * 接入地址      */     private String serviceurl;     /**      * 命名空间tdc      */     private String tdcNamespace;     /**      * 角色tdc的token      */     private String tdcToken;     /**      * 集群name      */     private String cluster;     /**      * topicMap      */     private Map<String, String> topicMap;     /**      * 订阅      */     private Map<String, String> subMap;     /**      * 开关 on:Consumer可用 ||||| off:Consumer断路      */     private String onOff; }

三、Pulsar 配置类

import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:33  * @Description: Pulsar 配置类  */ @Configuration @EnableConfigurationProperties(PulsarProperties.class) public class PulsarConfig {     @Autowired     PulsarProperties pulsarProperties;     @Bean     public PulsarClient getPulsarClient() {         try {             return PulsarClient.builder()                     .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))                     .serviceUrl(pulsarProperties.getServiceurl())                     .build();         } catch (PulsarClientException e) {             System.out.println(e);             throw new RuntimeException("初始化Pulsar Client失败");         }     } }

四、不同消费数据类型的监听器

import com.yibo.pulsar.pojo.User; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; import org.springframework.stereotype.Component; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:37  * @Description:  */ @Component public class UserMessageListener implements MessageListener<User> {     @Override     public void received(Consumer<User> consumer, Message<User> msg) {         try {             User user = msg.getValue();             System.out.println(user);             consumer.acknowledge(msg);         } catch (Exception e) {             consumer.negativeAcknowledge(msg);         }     } } import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; import org.springframework.stereotype.Component; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:37  * @Description:  */ @Component public class StringMessageListener implements MessageListener<String> {     @Override     public void received(Consumer<String> consumer, Message<String> msg) {         try {             System.out.println(msg.getValue());             consumer.acknowledge(msg);         } catch (Exception e) {             consumer.negativeAcknowledge(msg);         }     } }

五、Pulsar的核心服务类

import com.yibo.pulsar.common.listener.StringMessageListener; import com.yibo.pulsar.common.listener.UserMessageListener; import com.yibo.pulsar.pojo.User; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:35  * @Description: Pulsar的核心服务类  */ @Component public class PulsarCommon {     @Autowired     private PulsarProperties pulsarProperties;     @Autowired     private PulsarClient client;     @Autowired     private UserMessageListener userMessageListener;     @Autowired     private StringMessageListener stringMessageListener;     /**      * 创建一个生产者       * @param topic     topic name      * @param schema    schema方式      * @param <T>       泛型      * @return          Producer生产者      */     public <T> Producer<T> createProducer(String topic, Schema<T> schema) {         try {             return client.newProducer(schema)                     .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)                     .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)                     .sendTimeout(10, TimeUnit.SECONDS)                     .blockIfQueueFull(true)                     .create();         } catch (PulsarClientException e) {             throw new RuntimeException("初始化Pulsar Producer失败");         }     }     /**      *       * @param topic             topic name      * @param subscription      sub name      * @param messageListener   MessageListener的自定义实现类      * @param schema            schema消费方式      * @param <T>               泛型      * @return                  Consumer消费者      */     public <T> Consumer<T> createConsumer(String topic, String subscription,                                    MessageListener<T> messageListener, Schema<T> schema) {         try {             return client.newConsumer(schema)                     .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)                     .subscriptionName(subscription)                     .ackTimeout(10, TimeUnit.SECONDS)                     .subscriptionType(SubscriptionType.Shared)                     .messageListener(messageListener)                     .subscribe();         } catch (PulsarClientException e) {             throw new RuntimeException("初始化Pulsar Consumer失败");         }     }          /**      * 异步发送一条消息      * @param message       消息体      * @param producer      生产者实例      * @param <T>           消息泛型      */     public <T> void sendAsyncMessage(T message, Producer<T> producer) {         producer.sendAsync(message).thenAccept(msgId -> {         });     }               /**      * 同步发送一条消息      * @param message       消息体      * @param producer      生产者实例      * @param <T>           泛型      * @throws PulsarClientException      */     public <T> void sendSyncMessage(T message, Producer<T> producer) throws PulsarClientException {         MessageId send = producer.send(message);         System.out.println();         System.out.println();         System.out.println();         System.out.println();         System.out.println(send);     }          //-----------consumer-----------     @Bean(name = "comment-publish-topic-consumer")     public Consumer<String> getCommentPublishTopicConsumer() {         return this.createConsumer(pulsarProperties.getTopicMap().get("comment-publish-topic"),                 pulsarProperties.getSubMap().get("comment-publish-topic-test"),                 stringMessageListener, Schema.STRING);     }     @Bean(name = "reply-publish-topic-consumer")     public Consumer<User> getReplyPublishTopicConsumer() {         return this.createConsumer(pulsarProperties.getTopicMap().get("reply-publish-topic"),                 pulsarProperties.getSubMap().get("reply-publish-topic-test"),                 userMessageListener, AvroSchema.of(User.class));     }     //-----------producer-----------     @Bean(name = "comment-publish-topic-producer")     public Producer<String> getCommentPublishTopicProducer() {         return this.createProducer(pulsarProperties.getTopicMap().get("comment-publish-topic"),Schema.STRING);     }     @Bean(name = "reply-publish-topic-producer")     public Producer<User> getReplyPublishTopicProducer() {         return this.createProducer(pulsarProperties.getTopicMap().get("reply-publish-topic"), AvroSchema.of(User.class));     } }

六、Pulsar整合Spring Cloud

后来发现如上代码会导致BUG-> 在更新Nacos配置之后 Consumer会挂掉
经排查发现结果是由于@RefreshScope注解导致,此注解将摧毁Bean,PulsarConsumer和Producer都将被摧毁,只是说Producer将在下⼀次调⽤中完成重启,Consumer则不能重启,因为没有调⽤,那么怎么解决呢?

就是发布系列事件以刷新容器

import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:34  * @Description:  */ @Component @Slf4j public class RefreshPulsarListener implements ApplicationListener {     @Autowired     ApplicationContext applicationContext;     @Override     public void onApplicationEvent(ApplicationEvent event) {         if (event.getSource().equals("__refreshAll__")) {             log.info("Nacos配置中心配置修改 重启Pulsar====================================");             log.info("重启PulsarClient,{}", applicationContext.getBean("getPulsarClient"));             log.info("重启PulsarConsumer,{}", applicationContext.getBean("comment-publish-topic-consumer"));             log.info("重启PulsarConsumer,{}", applicationContext.getBean("reply-publish-topic-consumer"));         }     } }

关于“SpringBoot怎么整合Pulsar”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“SpringBoot怎么整合Pulsar”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI