温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何用MQTT协议实现消息的订阅接收?

发布时间:2020-05-28 14:23:32 来源:亿速云 阅读:2323 作者:鸽子 栏目:软件技术

MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。
操作步骤:

  1. 引入相关的依赖
<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency>    <groupId>org.projectlombok</groupId>    <artifactId>lombok</artifactId>    <optional>true</optional> </dependency>
  1. 在application.yml配置MQTT服务器信息
server:  port: 8090 mqtt:  host: tcp://127.0.0.1:1883  clientinid: mqttinId  clientoutid: mqttoutid  topic: virus  qoslevel: 1  #MQTT 认证  username: xxx  password: xxx  timeout: 10000  #20s  keepalive: 20
  1. 配置MQTT消息推送配置
package com.favccxx.mqtt.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; @Slf4j @Configuration @IntegrationComponentScan public class MQTTReceiveConfig {    @Value("${mqtt.username}")    private String username;    @Value("${mqtt.password}")    private String password;    @Value("${mqtt.host}")    private String hostUrl;    @Value("${mqtt.clientinid}")    private String clientId;    @Value("${mqtt.topic}")    private String defaultTopic;    @Value("${mqtt.timeout}")    private int completionTimeout ;   //连接超时    @Bean    public MqttConnectOptions getReceiverMqttConnectOptions(){        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();        mqttConnectOptions.setCleanSession(true);        mqttConnectOptions.setConnectionTimeout(10);        mqttConnectOptions.setKeepAliveInterval(90);        mqttConnectOptions.setAutomaticReconnect(true);        mqttConnectOptions.setUserName(username);        mqttConnectOptions.setPassword(password.toCharArray());        mqttConnectOptions.setServerURIs(new String[]{hostUrl});        mqttConnectOptions.setKeepAliveInterval(2);        return mqttConnectOptions;    }    @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        factory.setConnectionOptions(getReceiverMqttConnectOptions());        return factory;    }    //接收通道    @Bean    public MessageChannel mqttInputChannel() {        return new DirectChannel();    }    //配置client,监听的topic    @Bean    public MessageProducer inbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),                        defaultTopic);        adapter.setCompletionTimeout(completionTimeout);        adapter.setConverter(new DefaultPahoMessageConverter());        adapter.setQos(1);        adapter.setOutputChannel(mqttInputChannel());        return adapter;    }    //通过通道获取数据    @Bean    @ServiceActivator(inputChannel = "mqttInputChannel")    public MessageHandler handler() {        return new MessageHandler() {            @Override            public void handleMessage(Message<?> message) throws MessagingException {                log.info("主题:{},消息接收到的数据:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());            }        };    } }

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI