温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

springboot如何实现mqtt物联网

发布时间:2021-03-26 10:44:50 来源:亿速云 阅读:289 作者:小新 栏目:开发技术

这篇文章将为大家详细讲解有关springboot如何实现mqtt物联网,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

Springboot整合mybatisPlus+mysql+druid+swaggerUI+ mqtt 整合mqtt整合druid整合mybatis-plus完整pom完整yml整合swaggerUi整合log4j MQTT 物联网系统基本架构本物联网系列
mqtt)

整合mqtt

 <!--mqtt依赖-->   <dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-integration</artifactId>   </dependency>   <dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-stream</artifactId>   </dependency>   <dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-mqtt</artifactId>   </dependency>

yml

iot:  mqtt:  clientId: ${random.value}  defaultTopic: topic  shbykjTopic: shbykj_topic  url: tcp://127.0.0.1:1883  username: admin  password: admin  completionTimeout: 3000
package com.shbykj.handle.mqtt; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.stereotype.Component; /**  * @Author: wxm  * @Description: mqtt基础配置类  */ @Getter @Setter @Component @IntegrationComponentScan @ConfigurationProperties(prefix = "iot.mqtt") public class BykjMqttConfig {  /*  *  * 服务地址  */  private String url;  /**  * 客户端id  */  private String clientId;  /*  *  * 默认主题  */  private String defaultTopic;  /*  *  * 用户名和密码*/  private String username;  private String password;  /**  * 超时时间  */  private int completionTimeout;  /**  * shbykj自定义主题  */  private String shbykjTopic; }
package com.shbykj.handle.mqtt.producer; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; /**  * @description rabbitmq mqtt协议网关接口  * @date 2020/6/8 18:26  */ @MessagingGateway(defaultRequestChannel = "iotMqttInputChannel") public interface IotMqttGateway {  void sendMessage2Mqtt(String data);  void sendMessage2Mqtt(String data, @Header(MqttHeaders.TOPIC) String topic);  void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
package com.shbykj.handle.mqtt; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; @Configuration public class IotMqttProducerConfig {  public final Logger logger = LoggerFactory.getLogger(this.getClass());  @Autowired  private BykjMqttConfig mqttConfig;  /*  *  * MQTT连接器选项  * *  */  @Bean(value = "getMqttConnectOptions")  public MqttConnectOptions getMqttConnectOptions1() {  MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();  // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接  mqttConnectOptions.setCleanSession(true);  // 设置超时时间 单位为秒  mqttConnectOptions.setConnectionTimeout(mqttConfig.getCompletionTimeout());  mqttConnectOptions.setAutomaticReconnect(true);  mqttConnectOptions.setUserName(mqttConfig.getUsername());  mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());  mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getUrl()});  // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制  mqttConnectOptions.setKeepAliveInterval(10);  // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。  //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);  return mqttConnectOptions;  }  /**  * mqtt工厂  *  * @return  */  @Bean  public MqttPahoClientFactory mqttClientFactory() {  DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); // factory.setServerURIs(mqttConfig.getServers());  factory.setConnectionOptions(getMqttConnectOptions1());  return factory;  }  @Bean  public MessageChannel iotMqttInputChannel() {  return new DirectChannel();  }  // @Bean // @ServiceActivator(inputChannel = "iotMqttInputChannel") // public MessageHandler mqttOutbound() { // MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getClientId(), mqttClientFactory()); // messageHandler.setAsync(false); // messageHandler.setDefaultQos(2); // messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic()); // return messageHandler; // }  @Bean  @ServiceActivator(inputChannel = "iotMqttInputChannel")  public MessageHandler handlerTest() {  return message -> {   try {   String string = message.getPayload().toString();   System.out.println(string);   } catch (MessagingException ex) {   ex.printStackTrace();   logger.info(ex.getMessage());   }  };  } }
package com.shbykj.handle.mqtt; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; /**  * @Author: xiaofu  * @Description: 消息订阅配置  * @date 2020/6/8 18:24  */ @Configuration public class IotMqttSubscriberConfig {  public final Logger logger = LoggerFactory.getLogger(this.getClass());  @Autowired  private MqttReceiveHandle mqttReceiveHandle;  @Autowired  private BykjMqttConfig mqttConfig;  /*  *  * MQTT连接器选项  * *  */  @Bean(value = "getMqttConnectOptions")  public MqttConnectOptions getMqttConnectOptions1() {  MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();  // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接  mqttConnectOptions.setCleanSession(true);  // 设置超时时间 单位为秒  mqttConnectOptions.setConnectionTimeout(10);  mqttConnectOptions.setAutomaticReconnect(true); // mqttConnectOptions.setUserName(mqttConfig.getUsername()); // mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());  mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getUrl()});  // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制  mqttConnectOptions.setKeepAliveInterval(10);  // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。  //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);  return mqttConnectOptions;  }  /*  *  *MQTT信息通道(生产者)  **  */  @Bean  public MessageChannel iotMqttOutboundChannel() {  return new DirectChannel();  }  /*  *  *MQTT消息处理器(生产者)  **  */  @Bean  @ServiceActivator(inputChannel = "iotMqttOutboundChannel")  public MessageHandler mqttOutbound() {  MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getClientId(), mqttClientFactory());  messageHandler.setAsync(true);  messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());  return messageHandler;  }  /*  *  *MQTT工厂  **  */  @Bean  public MqttPahoClientFactory mqttClientFactory() {  DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); // factory.setServerURIs(mqttConfig.getServers());  factory.setConnectionOptions(getMqttConnectOptions1());  return factory;  }  /*  *  *MQTT信息通道(消费者)  **  */  @Bean  public MessageChannel iotMqttInputChannel() {  return new DirectChannel();  }  /**  * 配置client,监听的topic  * MQTT消息订阅绑定(消费者)  ***/  @Bean  public MessageProducer inbound() {  MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic(), mqttConfig.getShbykjTopic());  adapter.setCompletionTimeout(mqttConfig.getCompletionTimeout());  adapter.setConverter(new DefaultPahoMessageConverter());  adapter.setQos(2);  adapter.setOutputChannel(iotMqttInputChannel());  return adapter;  }  /**  * @author wxm  * @description 消息订阅  * @date 2020/6/8 18:20  */  @Bean  @ServiceActivator(inputChannel = "iotMqttInputChannel")  public MessageHandler handler() {  return new MessageHandler() {   @Override   public void handleMessage(Message<?> message) throws MessagingException {   //处理接收消息   try {    mqttReceiveHandle.handle(message);   } catch (Exception e) {    logger.warn("消息处理异常"+e.getMessage());    e.printStackTrace();   }   }  };  } }
package com.shbykj.handle.mqtt; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.shbykj.handle.common.DataCheck; import com.shbykj.handle.common.RedisKey; import com.shbykj.handle.common.RedisUtils; import com.shbykj.handle.common.constants.Constants; import com.shbykj.handle.common.model.ShbyCSDeviceEntity; import com.shbykj.handle.common.model.sys.SysInstrument; import com.shbykj.handle.resolve.mapper.SysInstrumentMapper; import com.shbykj.handle.resolve.util.DateUtils; import com.shbykj.handle.resolve.util.ShbyCSDeviceUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.BidiMap; import org.apache.commons.collections.bidimap.DualHashBidiMap; import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; /*  *  * mqtt客户端消息处理类  * **/ @Component @Slf4j @Transactional public class MqttReceiveHandle implements MqttCallback {  private static final Logger logger = LoggerFactory.getLogger(MqttReceiveHandle.class);  @Value("${shbykj.checkCrc}")  private boolean checkcrc;  @Autowired  private SysInstrumentMapper sysInstrumentMapper;  @Autowired  private RedisUtils redisUtils;  public static BidiMap bidiMap = new DualHashBidiMap();  //记录bykj协议内容  public static Map<String, Map<String, Object>> devMap = new HashMap();  //记录上限数量 // public static Map<String, ChannelHandlerContext> ctxMap = new HashMap();  public void handle(Message<?> message) {  try {   logger.info("{},客户端号:{},主题:{},QOS:{},消息接收到的数据:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), message.getHeaders().get(MqttHeaders.ID), message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getHeaders().get(MqttHeaders.RECEIVED_QOS), message.getPayload());   //处理mqtt数据   this.handle(message.getPayload().toString());  } catch (Exception e) {   e.printStackTrace();   log.error("处理错误" + e.getMessage());  }  }  private void handle(String str) throws Exception {  boolean flag = this.dataCheck(str);  if (flag) {   ShbyCSDeviceEntity shbyCSDeviceEntity = ShbyCSDeviceUtils.convertToSysInstrumentEntity(str);   String deviceNumber = shbyCSDeviceEntity.getPN();   String smpId = shbyCSDeviceEntity.getSMP_ID();   String smpName = shbyCSDeviceEntity.getSMP_NAME();   String smpWt = shbyCSDeviceEntity.getSMP_WT();   if (StringUtils.isEmpty(smpId) || StringUtils.isEmpty(smpName) || StringUtils.isEmpty(smpWt)) {   log.error("过滤无实际作用报文信息", str);   logger.error("过滤无实际作用报文信息", str);   return;   }   //判断设备id是否存在数据库中,存在才进行数据部分处理   //不存在就提醒需要添加设备:   QueryWrapper<SysInstrument> wrapper = new QueryWrapper();   wrapper.eq("number", deviceNumber);   wrapper.eq("is_deleted", Constants.NO);   SysInstrument sysInstrument = sysInstrumentMapper.selectOne(wrapper);   if (null == sysInstrument) {   log.error("碳氧仪不存在或已删除,设备号:{}", deviceNumber);   logger.error("碳氧仪不存在或已删除,设备号:{}", deviceNumber);   return;   }   try {   //增加实时数据   String instrumentId = sysInstrument.getId().toString();   String realDataKey = RedisKey.CSdevice_DATA_KEY + instrumentId;   this.redisUtils.set(realDataKey, shbyCSDeviceEntity);   System.out.println(shbyCSDeviceEntity);   //通讯时间   String onlineTime = "shbykj_mqtt:onlines:" + instrumentId;   this.redisUtils.set(onlineTime, shbyCSDeviceEntity.getDataTime(), (long) Constants.RedisTimeOut.REAL_TIME_OUT);   log.info("实时数据已经更新:设备主键id" + instrumentId);   logger.info("{} 实时数据已经更新:设备主键id:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),instrumentId);   } catch (Exception var1) {   log.error("redis处理实时报文数据逻辑异常 :" + var1.getMessage());   logger.error("redis处理实时报文数据逻辑异常 :" + var1.getMessage());   }  }  }  private boolean dataCheck(String message) {  boolean flag = DataCheck.receiverCheck(message);  if (!flag) {   return false;  } else {   int i = message.indexOf("QN=");   if (i < 0) {   log.warn("数据包中没有QN号码: " + message);   logger.warn("数据包中没有QN号码: " + message);   return false;   } else {   i = message.indexOf("PN=");   if (i < 0) {    log.warn("数据包中没有PN号码: " + message);    logger.warn("数据包中没有PN号码: " + message);    return false;   } else {    if (this.checkcrc) {    flag = DataCheck.checkCrc(message);    if (!flag) {     log.warn("crc校验失败: " + message);     logger.warn("数据包中没有PN号码: " + message);     return false;    }    }    return true;   }   }  }  }  /**  * 连接丢失  *  * @param throwable  */  @Override  public void connectionLost(Throwable throwable) {  logger.warn("连接丢失-客户端:{},原因:{}", throwable.getMessage());  }  /**  * 消息已到达  *  * @param s  * @param mqttMessage  * @throws Exception  */  @Override  public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {  }  /**  * 完成消息回调  *  * @param iMqttDeliveryToken  */  @Override  public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {  } }

整合druid

pom

 <dependency>   <groupId>com.alibaba</groupId>   <artifactId>druid-spring-boot-starter</artifactId>   <version>1.1.10</version>  </dependency>

druid-bean.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"  xmlns:aop="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="  http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans.xsd  http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop.xsd">  <!-- 配置_Druid和Spring关联监控配置 -->  <bean id="druid-stat-interceptor"   class="com.alibaba.druid.support.spring.stat.DruidStatInterceptor"></bean>  <!-- 方法名正则匹配拦截配置 -->  <bean id="druid-stat-pointcut" class="org.springframework.aop.support.JdkRegexpMethodPointcut"   scope="prototype">  <property name="patterns">   <list>   <value>com.shbykj.*.service.*.impl.*</value>   </list>  </property>  </bean>  <aop:config proxy-target-class="true">  <aop:advisor advice-ref="druid-stat-interceptor"    pointcut-ref="druid-stat-pointcut" />  </aop:config> </beans>

yml

#spring spring:  main:  allow-bean-definition-overriding: true  # mysql DATABASE CONFIG  datasource:  druid:  filters: stat,wall,log4j2  continueOnError: true  type: com.alibaba.druid.pool.DruidDataSource  url: jdbc:mysql://localhost:3306/mqttdb?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true  username: root  password: 123456  driver-class-name: com.mysql.jdbc.Driver  # see https://github.com/alibaba/druid  initialSize: 15  minIdle: 10  maxActive: 200  maxWait: 60000  timeBetweenEvictionRunsMillis: 60000  validationQuery: SELECT 1  testWhileIdle: true  testOnBorrow: false  testOnReturn: false  poolPreparedStatements: true  keepAlive: true  maxPoolPreparedStatementPerConnectionSize: 50  connectionProperties:  druid.stat.mergeSql: true  druid.stat.slowSqlMillis: 5000

启动类加上注解@ImportResource( locations = {"classpath:druid-bean.xml"} )

springboot如何实现mqtt物联网

整合mybatis-plus

pom

<!--mybatis-plus-->  <dependency>   <groupId>com.baomidou</groupId>   <artifactId>spring-wind</artifactId>   <version>1.1.5</version>   <exclusions>   <exclusion>    <groupId>com.baomidou</groupId>    <artifactId>mybatis-plus</artifactId>   </exclusion>   </exclusions>  </dependency>  <dependency>   <groupId>com.baomidou</groupId>   <version>3.1.2</version>   <artifactId>mybatis-plus-boot-starter</artifactId>  </dependency>  <dependency>   <groupId>mysql</groupId>   <artifactId>mysql-connector-java</artifactId>   <version>5.1.44</version>  </dependency>   <!--PageHelper分页插件-->  <dependency>   <groupId>com.github.pagehelper</groupId>   <artifactId>pagehelper-spring-boot-starter</artifactId>   <version>1.2.12</version>  </dependency>

yml

#mybatis mybatis-plus:  mapper-locations: classpath:/mapper/*.xml  typeAliasesPackage: org.spring.springboot.entity  global-config:  #主键类型 0:"数据库ID自增", 1:"用户输入ID",2:"全局唯一ID (数字类型唯一ID)", 3:"全局唯一ID UUID";  id-type: 3  #字段策略 0:"忽略判断",1:"非 NULL 判断"),2:"非空判断"  field-strategy: 2  #驼峰下划线转换  db-column-underline: true  #刷新mapper 调试神器  refresh-mapper: true  configuration:  map-underscore-to-camel-case: true  cache-enabled: false

启动类注解@MapperScan({"com.shbykj.handle.resolve.mapper"})

完整pom

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <parent>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-parent</artifactId>  <version>2.4.1</version>  <relativePath/> <!-- lookup parent from repository -->  </parent>  <groupId>com.shbykj</groupId>  <artifactId>handle</artifactId>  <version>0.0.1-SNAPSHOT</version>  <name>handle</name>  <description>Demo project for Spring Boot</description>  <properties>  <java.version>1.8</java.version>  </properties>  <dependencies>  <!--注意: <scope>compile</scope> 这里是正式环境,解决启动报错-->  <!--idea springboot启动报SLF4J:Failed to load class “org.slf4j.impl.StaticLoggerBinder-->  <!--参考:https://blog.csdn.net/u010696630/article/details/84991116-->  <dependency>   <groupId>org.slf4j</groupId>   <artifactId>slf4j-simple</artifactId>   <version>1.7.25</version>   <scope>compile</scope>  </dependency>  <!-- Log4j2 -->  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-log4j2</artifactId>  </dependency>  <!--开启日志注解-->  <dependency>   <groupId>org.projectlombok</groupId>   <artifactId>lombok</artifactId>  </dependency>  <!-- 排除 Spring-boot-starter 默认的日志配置 -->  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter</artifactId>   <exclusions>   <exclusion>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-logging</artifactId>   </exclusion>   </exclusions>  </dependency>  <!--swagger api接口生成-->  <dependency>   <groupId>io.springfox</groupId>   <artifactId>springfox-swagger-ui</artifactId>   <version>2.9.2</version>  </dependency>  <!-- 代码生成器的依赖 -->  <dependency>   <groupId>io.springfox</groupId>   <artifactId>springfox-swagger2</artifactId>   <version>2.9.2</version>   <exclusions>   <exclusion>    <groupId>com.google.guava</groupId>    <artifactId>guava</artifactId>   </exclusion>   </exclusions>  </dependency>  <!--mybatis-plus-->  <dependency>   <groupId>com.baomidou</groupId>   <artifactId>spring-wind</artifactId>   <version>1.1.5</version>   <exclusions>   <exclusion>    <groupId>com.baomidou</groupId>    <artifactId>mybatis-plus</artifactId>   </exclusion>   </exclusions>  </dependency>  <dependency>   <groupId>com.baomidou</groupId>   <version>3.1.2</version>   <artifactId>mybatis-plus-boot-starter</artifactId>  </dependency>  <dependency>   <groupId>mysql</groupId>   <artifactId>mysql-connector-java</artifactId>   <version>5.1.44</version>  </dependency>  <dependency>   <groupId>com.alibaba</groupId>   <artifactId>druid-spring-boot-starter</artifactId>   <version>1.1.10</version>  </dependency>  <!--PageHelper分页插件-->  <dependency>   <groupId>com.github.pagehelper</groupId>   <artifactId>pagehelper-spring-boot-starter</artifactId>   <version>1.2.12</version>  </dependency>  <!--devtools热部署-->  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-devtools</artifactId>   <optional>true</optional>   <scope>runtime</scope>  </dependency>  <!--json转换工具-->  <dependency>   <groupId>com.google.code.gson</groupId>   <artifactId>gson</artifactId>  </dependency>  <!-- redis -->  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-data-redis</artifactId>  </dependency>  <!--工具类-->  <dependency>   <groupId>org.apache.commons</groupId>   <artifactId>commons-lang3</artifactId>   <version>3.8.1</version>  </dependency>  <!--google-->  <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->  <dependency>   <groupId>com.google.guava</groupId>   <artifactId>guava</artifactId>   <version>30.0-jre</version>  </dependency>  <!-- 工具类库 -->  <dependency>   <groupId>cn.hutool</groupId>   <artifactId>hutool-core</artifactId>   <version>5.5.0</version>  </dependency>  <!--lombok-->  <dependency>   <groupId>org.projectlombok</groupId>   <artifactId>lombok</artifactId>   <optional>true</optional>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-test</artifactId>   <scope>test</scope>  </dependency>  </dependencies>  <build>  <plugins>   <plugin>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-maven-plugin</artifactId>   </plugin>  </plugins>  </build> </project>

完整yml

server:  port: 8082 #spring spring:  devtools:  restart:  enabled: true  main:  allow-bean-definition-overriding: true  # mysql DATABASE CONFIG  datasource:  druid:  filters: stat,wall,log4j2  continueOnError: true  type: com.alibaba.druid.pool.DruidDataSource  url: jdbc:mysql://localhost:3306/mqttdb?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true  username: root  password: 123456  driver-class-name: com.mysql.jdbc.Driver  # see https://github.com/alibaba/druid  initialSize: 15  minIdle: 10  maxActive: 200  maxWait: 60000  timeBetweenEvictionRunsMillis: 60000  validationQuery: SELECT 1  testWhileIdle: true  testOnBorrow: false  testOnReturn: false  poolPreparedStatements: true  keepAlive: true  maxPoolPreparedStatementPerConnectionSize: 50  connectionProperties:  druid.stat.mergeSql: true  druid.stat.slowSqlMillis: 5000 shbykj:  checkCrc: false #mybatis mybatis-plus:  mapper-locations: classpath:/mapper/*.xml  typeAliasesPackage: org.spring.springboot.entity  global-config:  #主键类型 0:"数据库ID自增", 1:"用户输入ID",2:"全局唯一ID (数字类型唯一ID)", 3:"全局唯一ID UUID";  id-type: 3  #字段策略 0:"忽略判断",1:"非 NULL 判断"),2:"非空判断"  field-strategy: 2  #驼峰下划线转换  db-column-underline: true  #刷新mapper 调试神器  refresh-mapper: true  configuration:  map-underscore-to-camel-case: true  cache-enabled: false #logging logging:  config: classpath:log4j2-demo.xml

整合swaggerUi

pom

<!--swagger api接口生成-->   <dependency>   <groupId>io.springfox</groupId>   <artifactId>springfox-swagger-ui</artifactId>   <version>2.9.2</version>   </dependency>   <!--解决报错:swagger:Illegal DefaultValue null for parameter type integer. java.lang.NumberFormatException: For input string: "".-->   <!--1.5.21的AbstractSerializableParameter.getExample()方法增加了对空字符串的判断-->   <dependency>   <groupId>io.swagger</groupId>   <artifactId>swagger-models</artifactId>   <version>1.5.21</version>   </dependency>   <!-- 代码生成器的依赖 -->   <dependency>   <groupId>io.springfox</groupId>   <artifactId>springfox-swagger2</artifactId>   <version>2.9.2</version>   <exclusions>    <exclusion>    <groupId>com.google.guava</groupId>    <artifactId>guava</artifactId>    </exclusion>   </exclusions>   </dependency>

使用

package com.shbykj.handle.web.wx; import com.baomidou.mybatisplus.core.metadata.IPage; import com.shbykj.handle.common.RetMsgData; import com.shbykj.handle.common.State; import com.shbykj.handle.common.model.sys.SysInstrument; import com.shbykj.handle.h.service.ISysInstrumentService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /**  * 监测点接口  *  * @author  * @date 2021-01-15 16:49  */ @RestController @RequestMapping({"/api/wxapoint"}) @Api(  tags = {"小程序 监测点接口"} ) public class CSDevicesController extends BaseController {  @Autowired  private ISysInstrumentService sysInstrumentService;  public CSDevicesController() {  }  @ApiOperation(   value = "分页查询",   notes = "分页查询站点信息"  )  @ApiImplicitParams({@ApiImplicitParam(   name = "number",   value = "设备编号",   paramType = "query",   dataType = "String"  ), @ApiImplicitParam(   name = "page",   value = "页码 从1开始",   required = false,   dataType = "long",   paramType = "query"  ), @ApiImplicitParam(   name = "size",   value = "页数",   required = false,   dataType = "long",   paramType = "query"  )})  @GetMapping({"/pageByNumber"})  public RetMsgData<IPage<SysInstrument>> pageByNumber(@RequestParam(required = false) String number) {  RetMsgData msg = new RetMsgData();  try {   IPage<SysInstrument> page1 = this.getPage();   page1 = sysInstrumentService.pageByNumber(number, page1);   msg.setData(page1);  } catch (Exception var5) {   msg.setState(State.RET_STATE_SYSTEM_ERROR);   this.logger.error(var5.getMessage());  }  return msg;  } }
package com.shbykj.handle.common.model.sys; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import java.io.Serializable; import java.util.Date; @TableName("instrument") @ApiModel("仪器配置表字段信息") public class SysInstrument implements Serializable {  private static final long serialVersionUID = 1L;  @TableId(  value = "id",  type = IdType.AUTO  )  @ApiModelProperty(  value = "id",  name = "id",  required = true  )  private Long id;  @TableField("name")  @ApiModelProperty(  value = "名称 仪器名称",  name = "name"  )  private String name;  @TableField("number")  @ApiModelProperty(  value = "编号 仪器编号(PN)",  name = "number"  )  private String number;  @TableField("manufacturer")  @ApiModelProperty(  value = "生产厂商 生产厂商",  name = "manufacturer"  )  private String manufacturer;  @TableField("gmt_create")  @ApiModelProperty(  value = "创建时间",  name = "gmt_create"  )  private Date gmtCreate;  @TableField("gmt_modified")  @ApiModelProperty(  value = "更新时间",  name = "gmt_modified"  )  private Date gmtModified;  @TableField("is_deleted")  @ApiModelProperty(  value = "表示删除,0 表示未删除 默认0",  name = "is_deleted"  )  private Integer isDeleted;  @TableField("device_type")  @ApiModelProperty(  value = "设备类型(PT)",  name = "device_type"  )  private String deviceType;  public SysInstrument() {  }  public Long getId() {  return this.id;  }  public String getName() {  return this.name;  }  public String getNumber() {  return this.number;  }  public String getManufacturer() {  return this.manufacturer;  }  public Date getGmtCreate() {  return this.gmtCreate;  }  public Date getGmtModified() {  return this.gmtModified;  }  public Integer getIsDeleted() {  return this.isDeleted;  }  public String getDeviceType() {  return this.deviceType;  }  public void setId(final Long id) {  this.id = id;  }  public void setName(final String name) {  this.name = name;  }  public void setNumber(final String number) {  this.number = number;  }  public void setManufacturer(final String manufacturer) {  this.manufacturer = manufacturer;  }  public void setGmtCreate(final Date gmtCreate) {  this.gmtCreate = gmtCreate;  }  public void setGmtModified(final Date gmtModified) {  this.gmtModified = gmtModified;  }  public void setIsDeleted(final Integer isDeleted) {  this.isDeleted = isDeleted;  }  public void setDeviceType(final String deviceType) {  this.deviceType = deviceType;  }    public String toString() {  return "SysInstrument(id=" + this.getId() + ", name=" + this.getName() + ", number=" + this.getNumber() + ", manufacturer=" + this.getManufacturer() + ", gmtCreate=" + this.getGmtCreate() + ", gmtModified=" + this.getGmtModified() + ", isDeleted=" + this.getIsDeleted() + ", deviceType=" + this.getDeviceType() + ")";  } }

MQTT 物联网系统基本架构

pom

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <parent>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-parent</artifactId>  <version>2.4.2</version>  <relativePath/> <!-- lookup parent from repository -->  </parent>  <groupId>com.shbykj</groupId>  <artifactId>handle_mqtt</artifactId>  <version>0.0.1-SNAPSHOT</version>  <name>handle_mqtt</name>  <description>Demo project for Spring Boot</description>  <properties>  <java.version>1.8</java.version>  <skipTests>true</skipTests>  </properties>  <dependencies>   <!--mqtt依赖-->   <dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-integration</artifactId>   </dependency>   <dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-stream</artifactId>   </dependency>   <dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-mqtt</artifactId>   </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-web</artifactId>   <exclusions>   <exclusion>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-logging</artifactId>   </exclusion>   </exclusions>  </dependency>  <!--注意: <scope>compile</scope> 这里是正式环境,解决启动报错-->  <!--idea springboot启动报SLF4J:Failed to load class “org.slf4j.impl.StaticLoggerBinder-->  <!--参考:https://blog.csdn.net/u010696630/article/details/84991116-->  <dependency>   <groupId>org.slf4j</groupId>   <artifactId>slf4j-simple</artifactId>   <version>1.7.25</version>   <scope>compile</scope>  </dependency>  <!-- Log4j2 -->  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-log4j2</artifactId>  </dependency>  <!-- 排除 Spring-boot-starter 默认的日志配置 -->  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter</artifactId>   <exclusions>   <exclusion>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-logging</artifactId>   </exclusion>   </exclusions>  </dependency>  <!--swagger api接口生成-->   <dependency>   <groupId>io.springfox</groupId>   <artifactId>springfox-swagger-ui</artifactId>   <version>2.9.2</version>   </dependency>   <!--解决报错:swagger:Illegal DefaultValue null for parameter type integer. java.lang.NumberFormatException: For input string: "".-->   <!--1.5.21的AbstractSerializableParameter.getExample()方法增加了对空字符串的判断-->   <dependency>   <groupId>io.swagger</groupId>   <artifactId>swagger-models</artifactId>   <version>1.5.21</version>   </dependency>   <!-- 代码生成器的依赖 -->   <dependency>   <groupId>io.springfox</groupId>   <artifactId>springfox-swagger2</artifactId>   <version>2.9.2</version>   <exclusions>    <exclusion>    <groupId>com.google.guava</groupId>    <artifactId>guava</artifactId>    </exclusion>   </exclusions>   </dependency>  <!--其他工具-->  <!--devtools热部署-->  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-devtools</artifactId>   <optional>true</optional>   <scope>runtime</scope>  </dependency>  <!--json转换工具-->  <dependency>   <groupId>com.google.code.gson</groupId>   <artifactId>gson</artifactId>  </dependency>  <!-- redis -->  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-data-redis</artifactId>  </dependency>  <!--工具类-->  <dependency>   <groupId>org.apache.commons</groupId>   <artifactId>commons-lang3</artifactId>   <version>3.8.1</version>  </dependency>  <!--google-->  <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->  <dependency>   <groupId>com.google.guava</groupId>   <artifactId>guava</artifactId>   <version>30.0-jre</version>  </dependency>  <!-- 工具类库 -->  <dependency>   <groupId>cn.hutool</groupId>   <artifactId>hutool-core</artifactId>   <version>5.5.0</version>  </dependency>  <!--lombok-->  <dependency>   <groupId>org.projectlombok</groupId>   <artifactId>lombok</artifactId>   <optional>true</optional>  </dependency>  <!--工具类-->  <dependency>   <groupId>commons-collections</groupId>   <artifactId>commons-collections</artifactId>   <version>3.2</version>  </dependency>  <dependency>   <groupId>com.baomidou</groupId>   <artifactId>spring-wind</artifactId>   <version>1.1.5</version>   <exclusions>   <exclusion>    <groupId>com.baomidou</groupId>    <artifactId>mybatis-plus</artifactId>   </exclusion>   </exclusions>  </dependency>  <dependency>   <groupId>com.baomidou</groupId>   <version>3.1.2</version>   <artifactId>mybatis-plus-boot-starter</artifactId>  </dependency>  <dependency>   <groupId>mysql</groupId>   <artifactId>mysql-connector-java</artifactId>   <version>5.1.44</version>  </dependency>  <dependency>   <groupId>com.alibaba</groupId>   <artifactId>druid-spring-boot-starter</artifactId>   <version>1.1.10</version>  </dependency>  <!--PageHelper分页插件-->  <dependency>   <groupId>com.github.pagehelper</groupId>   <artifactId>pagehelper-spring-boot-starter</artifactId>   <version>1.2.12</version>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-test</artifactId>   <scope>test</scope>  </dependency>  </dependencies>  <build>  <plugins>   <plugin>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-maven-plugin</artifactId>   <executions>    <execution>    <goals>     <goal>repackage</goal>    </goals>    </execution>   </executions>   </plugin>  </plugins>  </build> </project>

yml

server:  port: 8082 iot:  mqtt:  clientId: ${random.value}  defaultTopic: topic  shbykjTopic: shbykj_topic  url: tcp://127.0.0.1:1883  username: admin  password: admin  completionTimeout: 3000 #微信小程序相关参数 shbykjWeixinAppid: wxae343ca8948f97c4 shbykjSecret: 9e168c92702efc06cb12fa22680f049a #spring spring:  devtools:  restart:  enabled: true  main:  allow-bean-definition-overriding: true  # mysql DATABASE CONFIG  datasource:  druid:  filters: stat,wall,log4j2  continueOnError: true  type: com.alibaba.druid.pool.DruidDataSource  url: jdbc:mysql://localhost:3306/mqttdb?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true  username: root  password: 123456  driver-class-name: com.mysql.jdbc.Driver  # see https://github.com/alibaba/druid  initialSize: 15  minIdle: 10  maxActive: 200  maxWait: 60000  timeBetweenEvictionRunsMillis: 60000  validationQuery: SELECT 1  testWhileIdle: true  testOnBorrow: false  testOnReturn: false  poolPreparedStatements: true  keepAlive: true  maxPoolPreparedStatementPerConnectionSize: 50  connectionProperties:  druid.stat.mergeSql: true  druid.stat.slowSqlMillis: 5000 shbykj:  checkCrc: false #mybatis mybatis-plus:  mapper-locations: classpath:/mapper/*.xml  typeAliasesPackage: org.spring.springboot.entity  global-config:  #主键类型 0:"数据库ID自增", 1:"用户输入ID",2:"全局唯一ID (数字类型唯一ID)", 3:"全局唯一ID UUID";  id-type: 3  #字段策略 0:"忽略判断",1:"非 NULL 判断"),2:"非空判断"  field-strategy: 2  #驼峰下划线转换  db-column-underline: true  #刷新mapper 调试神器  refresh-mapper: true  configuration:  map-underscore-to-camel-case: true  cache-enabled: false  #log4j打印sql日志  log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #logging logging:  config: classpath:log4j2-demo.xml

关于“springboot如何实现mqtt物联网”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI