SpringBoot整合EMQX(MQTT协议)
原文:springboot当中使用EMQX(MQTT协议)
1、MQTT协议 1.1、MQTT简介 MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输),是一种基于 发布/订阅 模式的 轻量级物联网消息传输协议。IBM 公司的安迪·斯坦福-克拉克
及 Arcom 公司的阿兰·尼普
于 1999
年撰写了该协议的第一个版本1,之后 MQTT 便以简单易实现、支持 QoS、轻量且省带宽等众多特性逐渐成为了 IoT 通讯的标准。
MQTT 协议每个消息最少仅需 2 个字节 (其中报头仅需 1 个字节,其余字节可以全部作为消息载荷)就可以完成通信,专为那些资源和空间有限、功耗敏感的硬件所打造。
1.2、MQTT 协议基本特点
使用发布/订阅消息模式,提供了一对多的消息分发和应用程序的解耦。
不关心负载内容的消息传输。
提供 3 种消息服务质量等级,满足不同投递需求。
很小的传输消耗和协议数据交换,最大限度减少网络流量。
提供连接异常断开时通知相关各方的机制。
1.3、MQTT 应用行业 MQTT 作为一种低开销,低带宽占用的即时通讯协议,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它适用于硬件资源有限的设备及带宽有限的网络环境。因此,MQTT 协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。
1.4、MQTT 协议原理 基于发布/订阅模式的 MQTT 协议中有三种角色:发布者(Publisher)
、代理(Broker)
、订阅者(Subscriber)
。发布者向代理发布消息,代理向订阅者转发这些消息。通常情况下,客户端的角色是发布者和订阅者,服务器的角色是代理,但实际上,服务器也可能主动发布消息或者订阅主题,客串一下客户端的角色。
为了方便理解,MQTT 传输的消息可以简化为:主题(Topic)
和载荷(Payload)
两部分:
Topic ,消息主题,订阅者向代理订阅主题后,一旦代理收到相应主题的消息,就会向订阅者转发该消息。
Payload ,消息载荷(也可以理解为传输的数据),订阅者在消息中真正关心的部分,通常是业务相关的。
1.5、MQTT 协议基础概念 1.5.1、会话(Session) 每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话可以存在于一个网络连接之间,也可以跨越多个连续的网络连接存在。
1.5.2、订阅(Subscription) 订阅包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。订阅与单个会话(Session)关联。会话可以包含多于一个的订阅。
1.5.3、主题名(Topic Name) 附加在应用消息上的一个标签,被用于匹配服务端已存在的订阅。服务端会向所有匹配订阅的客户端发送此应用消息。
1.5.4、主题过滤器(Topic Filter) 仅在订阅时使用的主题表达式,可以包含通配符,以匹配多个主题名。就是可以通过通配符达到,发一条消息,多个主题能接受到消息的效果。
1.5.5、载荷(Payload) 对于 PUBLISH 报文来说载荷就是业务消息(就是指发送的消息内容),它可以是任意格式(二进制、十六进制、普通字符串、JSON 字符串、Base64)的数据。
1.6、MQTT 协议进阶 1.6.1、消息服务质量(QoS) MQTT 协议提供了 3 种消息服务质量等级(Quality of Service),它保证了在不同的网络环境下消息传递的可靠性。这里有一点要明白,必须先订阅,发布消息才会收到 。假如没订阅,他发送消息了,我再订阅,这时候不管QoS设置几,都是收不到消息的。
QoS 0(最多分发一次) 当 QoS 为 0 时,消息的分发依赖于底层网络的能力。发布者只会发布一次消息,接收者不会应答消息,发布者也不会储存和重发消息。消息在这个等级下具有最高的传输效率,但可能送达一次也可能根本没送达。
Qos 1(至少分发一次)
当 QoS 为 1 时,可以保证消息至少送达一次。MQTT 通过简单的 ACK 机制来保证 QoS 1。
发送者: 发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内没有收到 PUBACK 的应答,发布者会将消息的 DUP 置为1 并重发消息。
接收者: 接收到 QoS 为 1 的消息时应该回应 PUBACK 报文,可能因为网络延迟等原因没有及时发出,这时接收者可能会多次接受同一个消息,无论 DUP标志如何,接收者都会将收到的消息当作一个新的消息并发送 PUBACK 报文应答。
**核心:就是发送消息的时候,接受者需要确认一次,规定时间内没有确认就会重新发。如果使用这种方式,写业务的时候需要保证幂等性
**。
QoS 2(只分发一次)
当 QoS 为 2 时,发布者和订阅者通过两次会话来保证消息只被传递一次,这是最高等级的服务质量,消息丢失和重复都是不可接受的。使用这个服务质量等级会有额外的开销。
发送者: 发布 QoS 为 2 的消息之后,消息储存起来并等待接收者回复 PUBREC 的消息。
接收者: 收到一条 QoS 为 2 的消息时,他会处理此消息并返回一条 PUBREC 进行应答。
发送者: 收到 PUBREC 消息后,丢弃掉之前的发布消息。保存 PUBREC 消息,并应答一个 PUBREL。等待接收者回复 PUBCOMP 消息
接收者: 当接收者收到 PUBREL 消息之后,它会丢弃掉所有已保存的状态,并回复 PUBCOMP。
发送者: 当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。
核心:发送消息的时候,接受者需要确认两次,来保证消息确实已经送到。
无论在传输过程中何时出现丢包,发送端都负责重发上一条消息。不管发送端是 Publisher(发送端) 还是 Broker(服务器),都是如此。因此,接收端也需要对每一条命令消息都进行应答。
1.6.2、QoS 在发布与订阅中的区别 发布时的 QoS 表示消息发送到服务端时使用的 QoS 订阅时的 QoS 表示服务端向自己转发消息时可以使用的最大 QoS
客户端 A 的发布 QoS 大于客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 B 的订阅QoS。
客户端 A 的发布 QoS 小于客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 A 的发布 QoS。
总结:接收端可以设置订阅Qos为2,这样就可以接所有qos等级消息。也就是发布消息qos为多少,那我这边接受消息就是多少。主要以发布消息的qos为准。
1.6.3、如何选择 MQTT QoS 等级 QoS 级别越高,流程越复杂,系统资源消耗越大。 应用程序可以根据自己的网络场景和业务需求,选择合适的 QoS 级别。
以下情况下可以选择 QoS 0
以下情况下可以选择 QoS 1
对系统资源消耗较为关注,希望性能最优化。
消息不能丢失,但能接受并处理重复的消息。
以下情况下可以选择 QoS 2
1.6.4、清除会话(Clean Session) MQTT 客户端向服务器发起 CONNECT 请求时,可以通过 Clean Session 标志设置是否创建全新的会话。
Clean Session 设置为 0 时:
如果存在一个关联此客户标识符的会话,服务端必须基于此会话的状态恢复与客户端的通信。
如果不存在任何关联此客户标识符的会话,服务端必须创建一个新的会话。
Clean Session 设置为 1 时:
客户端和服务端必须丢弃任何已存在的会话,并开始一个新的会话。
总结:监听端建议设置为0,一般监听端,我们都会配置单例,并且项目启动就开始创建连接监听,设置为0,这样可以保证连接的唯一性,和消息的安全性。
1.6.5、保活心跳(Keep Alive) MQTT 客户端向服务器发起 CONNECT 请求时,通过 Keep Alive 参数设置保活周期。
客户端在无报文发送时,按 Keep Alive 周期定时发送 2 字节的 PINGREQ 心跳报文,服务端收到 PINGREQ 报文后,回复 2 字节的 PINGRESP 报文。
服务端在 1.5 个心跳周期内,既没有收到客户端发布订阅报文,也没有收到 PINGREQ 心跳报文时,将断开客户端连接。
1.6.6、保留消息(Retained Message) MQTT 客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息会驻留在消息服务器,后来的订阅者订阅主题时可以接收到最新一条(注意,是只有最近的一条) 保留消息。
1.6.7、遗嘱消息(Will Message) MQTT 客户端向服务端发送 CONNECT 请求时,可以携带遗嘱消息。MQTT 客户端异常下线时(客户端断开前未向服务器发送 DISCONNECT 消息),MQTT 消息服务器会发布遗嘱消息。
在连接的时候通过调用 MqttConnectOptions 实例的 setWill 方法来设定。任何订阅了下面的主题的客户端都可以收到该遗嘱消息。
1 2 3 4 MqttConnectOptions.setWill(MqttTopic topic, byte [] payload, int qos, boolean retained); MqttConnectOptions.setWill(java.lang.String topic, byte [] payload, int qos, boolean retained);
以下情况下会发送 Will Message
:
服务端发生了I/O 错误或者网络失败;
客户端在定义的心跳时期失联;
客户端在发送下线包( DISCONNECT)之前关闭网络连接;
服务端在收到下线包之前关闭网络连接。
总结:发送遗嘱信息可以理解为,创建客户端连接的时候,告诉服务器(mqtt服务器)我挂了之后,给哪些主题发这些消息。当订阅到遗嘱消息之后,他就知道监听端挂了,我不能给他发消息了,遗嘱消息在客户端正常调用 disconnect 方法之后并不会被发送。
高级使用场景: 这里介绍一下如何将 Retained(保留) 消息与Will (遗嘱)消息结合起来进行使用。
客户端 A 遗嘱消息设定为”offline“,该遗嘱主题与一个普通发送状态的主题设定成同一个 A/status;
当客户端 A 连接时,向主题 A/status 发送 “online” 的 Retained 消息,其它客户端订阅主题 A/status的时候,获取 Retained 消息为 “online” ;
当客户端 A 异常断开时,系统自动向主题 A/status 发送”offline“的消息,其它订阅了此主题的客户端会马上收到”offline“消息;如果遗嘱消息被设定了 Retained 的话,这时有新的订阅A/status主题的客户端上线的时候,获取到的消息为“offline”。
2、EMQ X Cloud
官网:https://www.emqx.com/zh/cloud
2.1、EMQ X Cloud简介 通过开放标准的物联网协议 MQTT、MQTT over WebSocket、CoAP/LwM2M 将数以亿计的物联网设备可靠地连接到 EMQ X Cloud。通过 TLS/SSL 和基于 X.509 证书的认证确保安全的双向通信。
在该模型中,EMQ X Cloud 提供的 MQTT 服务不仅为设备与设备、设备与应用间架起桥梁,同时可将需要的数据进行持久化,以便非实时应用在后续对获取的数据加以利用。
2.2、EMQ X Cloud优势 2.2.1、协议支持完整 支持 MQTT v3.1,v3.1.1 与 v5.0 协议版本,是全球首个支持 MQTT 5.0 的公有云服务,支持 MQTT WebSocket 服务,完整支持 QoS0, QoS1 与 QoS2 级别 MQTT 消息。
2.2.2、多种协议接入 支持包含 MQTT、MQTT-SN、CoAP、LwM2M、私有 TCP 协议在内的多种通信协议接入,覆盖各类行业应用;可根据您的特殊使用场景定制私有化功能,充分契合业务需求。
2.2.3、容量预估与伸缩 通过连接数与消息吞吐量自动预估容量,通过紧密的监控来制定伸缩计划,集群大小可随业务规模平滑调整。
2.3、EMQ X 和 RabbitMQ对比 EMQ X 是基于高并发的 Erlang/OTP 语言平台开发,支持百万级连接、分布式集群架构、发布订阅模式的开源 MQTT 消息服务器。开源至今,EMQ X 在全球物联网市场得到了广泛应用。在开源版基础上,还陆续发展了商业版和提供云版本(cloud-hosting)。EMQ X 支持很多插件,具有强大拓展能力,用户依靠插件可以实现更多的功能。
RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器也是基于 Erlang 语言开发的,现在可以通过插件配置的形式,使其支持 MQTT 协议。
2.3.1、测试场景 以下的测试均使用了 QoS 1 的消息。当发送 QoS 1 的消息时,这些消息每次都要作为可持久化的备份保存在硬盘上。所以队列空间的使用也尤为重要。
这次评测使用了一个云主机 M5 large 的实例,每个 MQTT 消息服务器集群由 3 个节点组成,每个节点的配置是双核,8GB 内存。需要强调的是,我们对于 EMQ X 和 RabbitMQ 的测试使用了完全一致的硬件资源以消除变量。
压力测试将会有两个场景,「多对一」 和 「一对多」 。
1.多对一 许多设备作为发布者,如温度传感器或者是压力传感器,发送数据给一个服务器。服务器再将这些数据发送给一个控制器(即订阅者)处理这些数据。
2.一对多 一个控制器作为发布者将消息传送给服务器,再由服务器将这些消息传送给多个作为订阅者的设备。
在每个场景里,「多」的那一方的数量将会从 2000 个逐渐上升到 10000 个。每个场景里,每一秒会发送一条载荷为 256 字节的消息。这样的发布并不会造成过大的吞吐量。仅仅使用 256 字节载荷是为了展示出这两个服务器的工作原理,以及他们的集群模式如何对这些场景作出反应的。
2.3.2、测试结果 左侧Y轴是指 CPU 占用,底部X轴是指「多」侧的客户端数量变化。
1.多对一 从 「多对一」 的结果可以看出,EMQ X 和 RabbitMQ 相比并没有太大差别。
2.一对多 但是从「一对多」的结果来看,RabbitMQ 相比于 EMQ X 确实有很明显的差距。
2.3.3、测试总结 结果表明:在「多对一」 场景中,EMQ X 和 RabbitMQ 相比并没有太大差别;而在「一对多」场景中,RabbitMQ 则较 EMQ X 产生了较为明显的差距。相比较而言,RabbitMQ使用MQTT协议,和 EMQ X使用MQTT协议存在着一定的差距。
2.3.4、注意 使用MQTT的发布-订阅模型不能满足使用要求。可以选择使用AMQP。
3、Eclipse Paho Java Paho Java客户端是用Java编写的MQTT客户端库,用于开发在JVM或其他Java兼容平台(例如Android)上运行的应用程序。 Paho不仅可以对接EMQ X Broker,还可以对接满足符合MQTT协议规范的消息代理服务端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1协议版本基本能满足百分之九十多的接入场景。
4、SpringBoot整合Eclipse Paho Java EMQX是消息服务器,而我们java想要发送消息,和订阅消息都是和服务器打交道,想要和服务器打交道就需要想办法连上他,这时候就需要用到了Eclipse Paho Java客户端,用来在java当中连接EMQX消息服务器。
下面案例是按照我的应用场景来写的,监听单独用了一个客户端存入了内存,使用了static变量,启动项目的时候初始化,发送客户端并没有存入内存,而是发送一条,创建一个客户端。这里有一点需要注意,客户端id一定不要重复,就是对于MQTT服务器来说,clientId一定要保持唯一。
4.1、导入Maven依赖 SpringBoot版本:2.3.9.RELEASE
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-integration</artifactId > </dependency > <dependency > <groupId > org.springframework.integration</groupId > <artifactId > spring-integration-stream</artifactId > </dependency > <dependency > <groupId > org.springframework.integration</groupId > <artifactId > spring-integration-mqtt</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-configuration-processor</artifactId > <optional > true</optional > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > 1.18.22</version > <scope > provided</scope > </dependency >
4.2、配置文件 4.2.1、application.yml 1 2 3 4 5 6 7 8 9 10 11 12 13 mqtt: hostUrl: tcp://192.168.56.103:1883 username: dev password: dev client-id: MQTT-CLIENT-DEV cleanSession: true reconnect: true timeout: 100 keepAlive: 100 defaultTopic: client/dev/report serverTopic: server/dev/report isOpen: true qos: 0
4.2.2、MqttProperties 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 package net.iot.mqtt.client.config;import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;@Component @ConfigurationProperties("mqtt") @Data public class MqttProperties { private String username; private String password; private String hostUrl; private String clientId; private String defaultTopic; private String serverTopic; private int timeout; private int keepAlive; private Boolean cleanSession; private Boolean reconnect; private Boolean isOpen; private Integer qos; public String getDefaultTopic () { return defaultTopic + "/#" ; } public String getServerTopic (String topic) { return serverTopic + "/" + topic; } }
4.3、添加MQTT接受服务的客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 package net.iot.mqtt.client.config;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Component public class MqttAcceptClient { private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class); @Autowired private MqttAcceptCallback mqttAcceptCallback; @Autowired private MqttProperties mqttProperties; public static MqttClient client; private static MqttClient getClient () { return client; } private static void setClient (MqttClient client) { MqttAcceptClient.client = client; } public void connect () { MqttClient client; try { client = new MqttClient (mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence ()); MqttConnectOptions options = new MqttConnectOptions (); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setAutomaticReconnect(mqttProperties.getReconnect()); options.setCleanSession(mqttProperties.getCleanSession()); MqttAcceptClient.setClient(client); client.setCallback(mqttAcceptCallback); client.connect(options); } catch (Exception e) { logger.error("MqttAcceptClient connect error,message:{}" , e.getMessage()); e.printStackTrace(); } } public void reconnection () { try { client.connect(); } catch (MqttException e) { logger.error("MqttAcceptClient reconnection error,message:{}" , e.getMessage()); e.printStackTrace(); } } public void subscribe (String topic, int qos) { logger.info("========================【开始订阅主题:" + topic + "】========================" ); try { client.subscribe(topic, qos); } catch (MqttException e) { logger.error("MqttAcceptClient subscribe error,message:{}" , e.getMessage()); e.printStackTrace(); } } public void unsubscribe (String topic) { logger.info("========================【取消订阅主题:" + topic + "】========================" ); try { client.unsubscribe(topic); } catch (MqttException e) { logger.error("MqttAcceptClient unsubscribe error,message:{}" , e.getMessage()); e.printStackTrace(); } } }
4.4、添加MQTT接受服务的回调类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 package net.iot.mqtt.client.config;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component public class MqttAcceptCallback implements MqttCallbackExtended { private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class); @Autowired private MqttAcceptClient mqttAcceptClient; @Autowired private MqttProperties mqttProperties; @Override public void connectionLost (Throwable throwable) { logger.info("连接断开,可以重连" ); if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) { logger.info("【emqx重新连接】...................................................." ); mqttAcceptClient.reconnection(); } } @Override public void messageArrived (String topic, MqttMessage mqttMessage) throws Exception { logger.info("【接收消息主题】:" + topic); logger.info("【接收消息Qos】:" + mqttMessage.getQos()); logger.info("【接收消息内容】:" + new String (mqttMessage.getPayload())); } @Override public void deliveryComplete (IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { logger.info("向主题【" + topic + "】发送消息成功!" ); } try { MqttMessage message = token.getMessage(); byte [] payload = message.getPayload(); String s = new String (payload, "UTF-8" ); logger.info("【消息内容】:" + s); } catch (Exception e) { logger.error("MqttAcceptCallback deliveryComplete error,message:{}" , e.getMessage()); e.printStackTrace(); } } @Override public void connectComplete (boolean b, String s) { logger.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!=============================" ); mqttAcceptClient.subscribe(mqttProperties.getDefaultTopic(), 0 ); } }
4.5、添加MQTT发送客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 package net.iot.mqtt.client.config;import org.eclipse.paho.client.mqttv3.*;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.UUID;@Component public class MqttSendClient { private static final Logger logger = LoggerFactory.getLogger(MqttSendClient.class); @Autowired private MqttSendCallBack mqttSendCallBack; @Autowired private MqttProperties mqttProperties; public MqttClient connect () { MqttClient client = null ; try { String uuid = UUID.randomUUID().toString().replaceAll("-" , "" ); client = new MqttClient (mqttProperties.getHostUrl(), uuid, new MemoryPersistence ()); MqttConnectOptions options = new MqttConnectOptions (); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setCleanSession(true ); options.setAutomaticReconnect(false ); client.setCallback(mqttSendCallBack); client.connect(options); } catch (Exception e) { logger.error("MqttSendClient connect error,message:{}" , e.getMessage()); e.printStackTrace(); } return client; } public void publish (boolean retained, String topic, String content) { MqttMessage message = new MqttMessage (); message.setQos(mqttProperties.getQos()); message.setRetained(retained); message.setPayload(content.getBytes()); MqttDeliveryToken token; MqttClient mqttClient = connect(); try { mqttClient.publish(mqttProperties.getServerTopic(topic), message); } catch (MqttException e) { logger.error("MqttSendClient publish error,message:{}" , e.getMessage()); e.printStackTrace(); } finally { disconnect(mqttClient); close(mqttClient); } } public static void disconnect (MqttClient mqttClient) { try { if (mqttClient != null ) mqttClient.disconnect(); } catch (MqttException e) { logger.error("MqttSendClient disconnect error,message:{}" , e.getMessage()); e.printStackTrace(); } } public static void close (MqttClient mqttClient) { try { if (mqttClient != null ) mqttClient.close(); } catch (MqttException e) { logger.error("MqttSendClient close error,message:{}" , e.getMessage()); e.printStackTrace(); } } }
4.6、添加MQTT发送客户端的回调类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 package net.iot.mqtt.client.config;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component public class MqttSendCallBack implements MqttCallbackExtended { private static final Logger logger = LoggerFactory.getLogger(MqttSendCallBack.class); @Override public void connectionLost (Throwable throwable) { logger.info("连接断开,可以重连" ); } @Override public void messageArrived (String topic, MqttMessage mqttMessage) throws Exception { logger.info("【接收消息主题】: " + topic); logger.info("【接收消息Qos】: " + mqttMessage.getQos()); logger.info("【接收消息内容】: " + new String (mqttMessage.getPayload())); } @Override public void deliveryComplete (IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { logger.info("向主题【" + topic + "】发送消息成功!" ); } try { MqttMessage message = token.getMessage(); byte [] payload = message.getPayload(); String s = new String (payload, "UTF-8" ); logger.info("【消息内容】:" + s); } catch (Exception e) { logger.error("MqttSendCallBack deliveryComplete error,message:{}" , e.getMessage()); e.printStackTrace(); } } @Override public void connectComplete (boolean b, String s) { logger.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!=============================" ); } }
4.7、添加配置类 自定义配置,通过这个配置,来控制启动项目的时候是否启动mqtt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package net.iot.mqtt.client.config;import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;import org.springframework.context.annotation.Condition;import org.springframework.context.annotation.ConditionContext;import org.springframework.core.env.Environment;import org.springframework.core.type.AnnotatedTypeMetadata;public class MqttCondition implements Condition { @Override public boolean matches (ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) { ConfigurableListableBeanFactory beanFactory = context.getBeanFactory(); ClassLoader classLoader = context.getClassLoader(); Environment environment = context.getEnvironment(); String isOpen = environment.getProperty("mqtt.isOpen" ); return Boolean.valueOf(isOpen); } }
4.8、启动服务的时候开启监听客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package net.iot.mqtt.client.config;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Conditional;import org.springframework.context.annotation.Configuration;@Configuration public class MqttConfig { @Autowired private MqttAcceptClient mqttAcceptClient; @Conditional(MqttCondition.class) @Bean public MqttAcceptClient getMqttPushClient () { mqttAcceptClient.connect(); return mqttAcceptClient; } }
4.9、测试类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package net.dreamlu.iot.mqtt.client.controller;import net.dreamlu.iot.mqtt.client.config.MqttSendClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController @RequestMapping("/mqtt") public class MqttController { @Autowired private MqttSendClient mqttSendClient; @GetMapping(value = "/publishTopic") public String publishTopic (String topic, String sendMessage) { System.out.println("topic:" + topic); System.out.println("message:" + sendMessage); this .mqttSendClient.publish(false , topic, sendMessage); return "topic:" + topic + "\nmessage:" + sendMessage; } }
4.10、发送和接收消息测试 4.10.1、发送消息 访问:http://localhost:8080/mqtt/publishTopic?sendMessage=222
4.10.2、接收消息
4.11、可能出现的问题 问题:循环引用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 *************************** APPLICATION FAILED TO START *************************** Description: The dependencies of some of the beans in the application context form a cycle: ┌─────┐ | mqttAcceptCallback (field private net.iot.mqtt.client.config.MqttAcceptClient net.iot.mqtt.client.config.MqttAcceptCallback.mqttAcceptClient) ↑ ↓ | mqttAcceptClient (field private net.iot.mqtt.client.config.MqttAcceptCallback net.iot.mqtt.client.config.MqttAcceptClient.mqttAcceptCallback) └─────┘ Action: Relying upon circular references is discouraged and they are prohibited by default. Update your application to remove the dependency cycle between beans. As a last resort, it may be possible to break the cycle automatically by setting spring.main.allow-circular-references to true.
解决方法:打开循环引用
1 2 3 spring: main: allow-circular-references: true