侧边栏壁纸
博主头像
洋洋小站博主等级

Stay foolish, stay hungry ...

  • 累计撰写 15 篇文章
  • 累计创建 21 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

物联网消息协议MQTT详解

travis
2025-03-28 / 0 评论 / 0 点赞 / 8 阅读 / 15215 字

1 MQTT 基础

1.1 什么是MQTT

MQTT 协议的全称是 Message Queuing Telemetry Transport,是一种基于客户端-服务器, 发布-订阅模式 的消息传输协议。它具有轻量级、开源、简单、易于实现的特点。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

1.2 MQTT主要特性

MQTT的主要特性包括:

  • 轻量级:MQTT的设计非常轻量,协议头部非常小,传输的数据量很小,适用于带宽有限的网络环境,如低速、高延迟或不稳定的网络。
  • 简单:MQTT的协议规范相对简单,易于实现和部署。它定义了少量的消息类型和协议操作,使得开发人员可以快速上手。
  • 异步通信:MQTT使用异步通信模式,发布者发送消息后,不需要等待接收者的响应,可以继续执行其他操作。这种异步通信模式适合在资源有限的设备和网络中工作。
  • 可靠性:MQTT支持三种不同的消息传递质量( QoS )级别:QoS 0(至多一次),QoS 1(至少一次)和QoS 2(只有一次)。这使得可以根据应用程序的要求选择适当的消息交付保证级别。
  • 网络状况适应性:MQTT可以适应不稳定的网络状况,如网络中断、重连等。它具有断开连接后自动重连的机制,可以确保消息的可靠传输。
  • 安全:MQTT可使用TLS加密消息,使用OAuth进行身份验证

1.3 发布-订阅模式

发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,它将发送消息的客户端(发布者)与接收消息的客户端(订阅者)解耦,使得两者不需要建立直接的联系也不需要知道对方的存在。

MQTT 发布/订阅模式的精髓在于由一个被称为 代理(Broker) 的中间角色 负责所有消息的路由和分发工作,发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。

在 MQTT 中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者,所以只能将消息转发给当前的订阅者, 如果当前不存在任何订阅,那么消息将被直接丢弃。

MQTT 发布/订阅模式有 4 个主要组成部分:发布者、订阅者、代理和主题。

  • 发布者(Publisher) :负责将消息发布到主题上,发布者一次只能向一个主题发送数据,发布者发布消息时也无需关心订阅者是否在线。
  • 订阅者(Subscriber) :订阅者通过订阅主题接收消息,且可一次订阅多个主题。MQTT 还支持通过 共享订阅 的方式在多个订阅者之间实现订阅的负载均衡。
  • 代理(Broker) :负责接收发布者的消息,并将消息转发至符合条件的订阅者。另外,代理也需要负责处理客户端发起的连接、断开连接、订阅、取消订阅等请求。
  • 主题(Topic) :主题是 MQTT 进行消息路由的基础,它类似 URL 路径,使用斜杠 / 进行分层,比如 sensor/1/temperature。一个主题可以有多个订阅者,代理会将该主题下的消息转发给所有订阅者;一个主题也可以有多个发布者,代理将按照消息到达的顺序转发。

PS:发布者和订阅者都是MQTT 客户端 ,它们可以同时发布和订阅。这意味着一个客户端可以向特定主题发布消息,同时也可以订阅其他主题以接收消息。在MQTT中, 发布和订阅是独立的操作,客户端可以根据需要进行发布和订阅

2 MQTT会话

MQTT会话建立的过程主要过程:

  1. MQTT客户端向服务器(Broker)发起连接请求,建立TCP连接;
  2. TCP连接建立后,客户端会发送CONNECT报文,包括协议版本,客户端标识等信息。
  3. Broker接收到CONNECT报文后,验证客户端请求,并返回CONNACK报文

CONNECT报文的主要内容说明如下:

MQTT-Packet: CONNECT
----------------------------------------------------------------------------------------
       clientId: "client-1"             // 客户端ID
   cleanSession: true                   // 是否为持久会话。TRUE:非持久会话;FALSE:持久会话
       username: "hans"                 // 可选值。认证用的用户名
       password: "letmein"              // 可选值。认证用的密码
  lastWillTopic: "/hans/will"           // 可选值。遗嘱消息的Topic
    lastWillQos: 2                      // 可选值。遗嘱消息的QoS
lastWillMessage: "unexpected exit"      // 可选值。遗嘱消息内容
 lastWillRetain: false                  // 可选值。遗嘱消息是否为Retain消息
      keepAlive: 60                     // 心跳检测间隔时间。单位秒。

2.1 客户端ID -- clientID

在MQTT协议中,相同ClientID的客户端重复连接行为是未定义的。这意味着具体的MQTT服务器实现可能会以不同的方式处理这种情况。有时可能会接受第二个连接请求并断开第一个连接,有时可能两个连接都存在,但只有一个客户端能接收消息,等等。

为了避免这种不确定性和潜在的错误行为, 建议每个客户端使用唯一的ClientID

2.2 持久会话和非持久会话 -- cleanSession

持久会话: 持久会话在客户端断开连接后仍然保持,代理服务器会保留会话相关的状态信息,包括订阅关系、QoS级别等。当客户端重新连接时,它可以恢复之前的会话并继续之前的订阅。

持久会话能够保证在订阅客户端离线时,broker能够保留消息,以便客户端重连后接收。PS:Borker只保留QoS=1和QoS=2的消息,不会保留QoS=0的消息

非持久会话: 非持久会话在客户端断开连接时被丢弃,代理服务器不会保留会话状态。当客户端重新连接时,它将启动一个新的会话,并丢失之前的订阅和状态信息。

2.3 keepAlive机制

MQTT协议是承载于TCP协议之上的,而TCP协议以连接为导向,在连接双方之间,提供稳定、有序的字节流功能。但是,在部分情况下,TCP可能出现半连接问题。所谓半连接,是指某一方的连接已经断开或者没有建立,而另外一方的连接却依然维持着。在这种情况下,半连接的一方可能会持续不断地向对端发送数据,而显然这些数据永远到达不了对端。为了避免半连接导致的通信黑洞,MQTT协议提供了Keep Alive机制,使客户端和 MQTT服务器可以判定当前是否存在半连接问题,从而关闭对应连接。

2.3.1 客户端流程

在连接建立后,客户端需要确保,自己 任意两次MQTT协议包的发送间隔不超过 Keep Alive 的值 ,如果客户端当前处于空闲状态,没有可发送的包,则可以发送 PINGREQ协议包。

当客户端发送 PINGREQ 协议包后,Broker必须返回一个 PINGRESP 协议包,如果客户端在一个可靠的时间内,没有收到服务器的PINGRESP协议包,则说明当前存在半连接、或者Broker已经下线、或者出现了网络故障,这个时候,客户端应当关闭当前连接。

2.3.2 Broker流程

在连接建立后,Broker如果没有在 Keep Alive 的1.5倍时间内,收到来自客户端的任何包,则会认为和客户端之间的连接出现了问题,此时Broker便会断开和客户端的连接。

如果Broker收到了来自客户端的PINGREQ协议包,需要回复一个PINGRESP协议包进行确认。

2.3.3 客户端接管机制

当Broker里存在半连接时,如果对应的客户端发起了重连或新的连接,则Broker会启动客户端接管机制:关闭旧的半连接,然后与客户端建立新的连接。

这种机制保证了客户端不会因为Broker里存在的半连接,导致无法进行重连。

3 MQTT消息的发布和订阅

3.1 主题 -- Topic

当客户端发布一条消息时,它会被发送到Broker,然后Broker将消息路由到该主题的所有订阅者。 当客户端订阅一个主题时,它会收到代理转发到该主题的所有消息。

MQTT 主题本质上是一个 UTF- 8 编码的字符串,是 MQTT 协议进行消息路由的基础。MQTT 主题类似URL路径,使用斜杠 / 进行分层

PS:为了避免歧义且易于理解,通常不建议主题以 / 开头或结尾

MQTT Topic 支持两种通配符 + 和 #

3.1.1 单层通配符 +

加号 (“+”) 是用于单个主题层级匹配的通配符。在使用单层通配符时,单层通配符必须占据整个层级

比如,当订阅端订阅主题 /myhome/groundfloor/+/temperature

myhome/groundfloor/living/temperature  -- 匹配
myhome/groundfloor/kitchen/temperature  -- 匹配
myhome/firstfloor/living/temperature  -- 不匹配
myhome/groundfloor/kitchen/fridge/temperature  -- 不匹配

3.1.2 多层通配符 #

井字符号(“#”U+ 0023 )是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级,在使用多层通配符时,它必须占据整个层级并且必须是主题的最后一个字符

#     ---- 有效,匹配所有主题
sensor/#  ---- 有效
sensor/bedroom#   ---- 无效(没有占据整个层级)
sensor/#/temperature   ---- 无效(不是主题最后一个字符)

如果客户端订阅主题 myhome/groundfloor/# ,它能够匹配以下主题的消息:

myhome/groundfloor
myhome/groundfloor/kitchen
myhome/groundfloor/kitchen/temperature
myhome/groundfloor/kitchen/brightness

3.1.3 以 $ 开头的主题 -- 系统主题

开头的主题开头的主题为系统主题,系统主题主要用于获取MQTT服务器自身运行状态、消息统计、客户端上下线事件等数据,通常使用前缀 **`SYS/** 来表示此类信息。目前,MQTT协议暂未明确规定 **$SYS/`** 主题标准,但大多数 MQTT 服务器都遵循该标准建议。

不要向以 $ 开头的Topic发送消息!

3.2 MQTT中的QoS等级

MQTT设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。在这套机制下,提供了三种不同层次QoS(Quality of Service)

  • QoS 0,At most once, 至多一次 ;Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;
  • QoS 1,At least once, 至少一次 ;Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;
  • QoS 2,Exactly once, 确保只有一次 。Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。

这里的Sender和Receive可能是Publisher-Broker,也可能是Broker-Subscribe。

注意,QoS是 Sender和Receiver 之间的协议,而 不是 Publisher和Subscriber之间的协议。换句话说,Publisher发布了一条QoS1的消息,只能保证Broker能至少收到一次这个消息;而对于Subscriber能否至少收到一次这个消息,还要取决于Subscriber在Subscibe的时候和Broker协商的QoS等级。

3.2.1 QoS 0 -- 至多一次

QoS 0下,Sender向Receiver发送一个包含消息数据的PUBLISH包,然后不管结果如何,丢掉已发送的PUBLISH包,一条消息的发送完成。

在三种QoS等级中,QoS0 的效率最高,但可能会丢失消息

3.2.2 QoS 1 -- 至少一次

QoS1要保证消息至少到达一次,所以有一个应答的机制。Sender和Receiver的一次消息的传递流程如下:

1.Sender向Receiver发送一个带有数据的PUBLISH包,并在本地保存这个PUBLISH包;

2.Receiver收到PUBLISH包以后,向Sender发送一个PUBACK数据包,PUBACK数据包没有消息体(Payload),在可变头中有一个包标识(Packet Identifier),和它收到的PUBLISH包中的Packet Identifier一致。

3.Sender收到PUBACK之后,根据PUBACK包中的Packet Identifier找到本地保存的PUBLISH包,然后丢弃掉,一次消息的发送完成。

在QoS 1下,消息可能会出现重传的问题。 如果Sender在一段时间内没有收到PUBLISH包对应的PUBACK,它将该PUBLISH包的DUP标识设为1(代表是重新发送的PUBLISH包),然后重新发送该PUBLISH包。这时,Receiver可能会重复收到消息,需自行去重。

3.2.3 QoS 2 -- 只有一次

相比QoS0和QoS1,QoS2不仅要确保Receiver能收到Sender发送的消息,还需要确保消息不重复。它的重传和应答机制就要复杂一些,同时开销也是最大的。QoS2下,一次消息的传递流程如下所示:

1.Sender发送QoS为2的PUBLISH数据包,数据包 Packet Identifier 为 P,并在本地保存该PUBLISH包;

2.Receiver收到PUBLISH数据包后,在本地保存PUBLISH包的Packet Identifier P,并回复Sender一个PUBREC数据包,PUBREC数据包可变头中的Packet Identifier为P,没有消息体(Payload);

3.当Sender收到PUBREC,它就可以安全的丢弃掉初始Packet Identifier为P的PUBLISH数据包。同时保存该PUBREC数据包,并回复Receiver一个PUBREL数据包,PUBREL数据包可变头中的Packet Identifier为P,没有消息体;

4.当Receiver收到PUBREL数据包,它可以丢掉保存的PUBLISH包的Packet Identifier P,并回复Sender一个可变头中 Packet Identifier 为 P,没有消息体(Payload)的PUBCOMP数据包;

5.当Sender收到PUBCOMP包,那么认为传输已完成,则丢掉对应的PUBREC数据包;

上面是一次完整无误的传输过程,然而传输过程中可能会出现以下 异常情况

  • 情况1:Sender发送PUBLISH数据包给Receiver的时候,发送失败;
  • 情况2:Sender已经成功发送PUBLISH数据包给Receiver了,但是Receiver发送PUBREC数据包失败;
  • 情况3:Sender已经成功收到了PUBREC数据包,但是PUBREL数据包发送失败;
  • 情况4:Receiver已经收到了PUBREL数据包,但是发送PUBCOMP数据包时发送失败

针对上述的问题,较为详细的 处理方法 如下:

  • 不管是情况1还是情况2,因为Sender在一定时间内没有收到PUBREC,那么它会把PUBLISH包的DUP标识设为1,重新发送该PUBLISH数据包;
  • 不管是情况3还是情况4,因为Sender在一定时间内没有收到PUBCOMP包,那么它会重新发送PUBREL数据包;
  • 针对情况2,Receiver可能会收到多个重复的PUBLISH包,更加完善的处理如下:Receiver在收到PUBLISH数据包之后,马上回复一个PUBREC数据包。并会在本地保存PUBLISH包的Packet Identifier P,不管之后因为重传多少次这个Packet Identifier 为P的数据包,Receiver都认为是重复的,丢弃。同时Receiver接收到QoS为2的PUBLISH数据包后, 并不马上投递给上层, 而是在本地做持久化,将消息保存起来(这里需要是持久化而不是保存在内存)。
  • 针对情况4,更加完善的处理如下:Receiver收到PUBREL数据包后,正式将消息递交给上层应用层,投递之后销毁Packet Identifier P,并发送PUBCOMP数据包,销毁之前的持久化消息。之后不管接收到多少个PUBREL数据包,因为没有Packet Identifier P,直接回复PUBCOMP数据包即可。

在三种QoS等级中,QoS 2 的效率是最低的

3.2.4 服务质量降级

如果Publisher到Broker的QoS等级,和Broker到Subscribe的QoS等级不同,服务端会使用较低级别来提供服务。如下图所示,虽然A发送到主题1的消息采用QoS为2,但是服务端发送主题1的消息给B时,采用的QoS为1,那么,这条消息的QoS等级为 QoS 1。

3.3 共享订阅 -- shard subscription

共享订阅是MQTT 5.0 的新特性。MQTT协议基于发布订阅者模式设计,一条消息发布后,所有的订阅者都可以收到。但在有些场景下,用户只希望一个或者几个订阅者可以收到特定消息,共享订阅就适用于这种场景,共享订阅需要订阅者订阅时添加共享订阅标记及订阅组。

在上图中,订阅者C、订阅者D、发布者A都遵循标准流程,订阅 topic:demo/topic 和发布消息 topic:demo/topic 。因为订阅者A、订阅者B使用了共享订阅的能力,所以只会随机发给A或者B其中一个。

共享订阅中,订阅者Subscribe的Topic由三部分组成:

$share/GROUPID/TOPIC
   |      |      +--------------- 3. Topic过滤器,用户定义,支持通配符
   |      +---------------------- 2. 用户自定义分组ID
   +----------------------------- 1. 以 $share 开头

3.3.1 共享订阅实践

如下使用mosquito Borker,验证共享订阅功能。(这里忽略了安装mosquito的步骤)

如下在两个终端创建两个订阅者: client1client2 ,分组名称为: group ,共享订阅的主题为 home/temp

mosquitto_sub -h "localhost" -p 1883  -u shirley -P 123456 -t '$share/group/home/temp' -q 1 -V mqttv5 -i client1

另外打开一个终端,创建订阅者client2

mosquitto_sub -h "localhost" -p 1883 -u shirley -P 123456 -t '$share/group/home/temp' -q 1 -V mqttv5 -i client2

运行如上命令后,会发现命令被block。

另外打开一个终端发布消息。如下用一个循环发布了10条消息

for i in $(seq 1 10); do  mosquitto_pub -h "localhost" -p 1883 -u shirley -P 123456 -t 'home/temp' -q 1 -m "hello $i" -i client-publisher; sleep 1 ; done

消息发布后,可以看到client1 和 client2 共同接收 home/temp 主题的消息,且两者接收的消息不重复

共享订阅的优点包括:

  • 消息负载均衡:通过共享订阅,消息服务器可以将发布到共享主题的消息均匀地分发给所有订阅该主题的客户端,实现消息负载均衡。
  • 故障容错:当某个订阅共享主题的客户端发生故障时,其他客户端仍然可以继续接收到该主题下的消息,保证了系统的可靠性和可用性。
  • 灵活性和扩展性:通过共享订阅,可以灵活地增加或减少订阅该主题的客户端数量,以满足不同场景下的需求。

4 MQTT中几种特殊的消息

4.1 保留消息 -- retained message

Retained 消息是指在 PUBLISH 数据包中 Retain 表示为 1 的消息,Broker 收到消息后,将会为该主题保存该 Retained 消息。当有新的订阅者订阅该主题时,Broker 会将这个消息立即发送给新的订阅者。

Retained 消息存在以下特点:

  • 一个 topic 只能存在 一条 Retained 消息,发布新的 Retained 消息将会 覆盖 旧消息;
  • 若订阅者使用通配符订阅主题,那么该订阅者将会收到所有的匹配主题的 Retained 消息;
  • 只有新的订阅者才能够收到 Retained 消息;

删除retain消息的方法只有一种,发送空的retain消息到Broker。当代理(broker)收到这个特殊的保留消息,它识别为消息的请求,然后删除对应主题的保留消息。因此,新的订阅者将不会收到这个主题之前的保留消息。

4.2 遗嘱消息LWT -- lastWill*

通过遗嘱消息,设备可将自己的 意外掉线情况 及时通知第三方。遗嘱LWT消息通常与Keep Alive结合使用

根据 MQTT 规范,borker 代理 必须在以下情况分发 客户端 LWT 消息:

  • 代理检测到 I/O 错误或者网络故障
  • 客户端无法再定义的 Keep Alive 时间内进行通信
  • 客户端在关闭网络连接之前不会发送 DISCONNECT 数据包
  • 由于协议错误,代理关闭连接

5 MQTT安全

可以通过如下方法增强MQTT安全:

  1. 使用TLS/SSL:通过使用传输层安全协议(TLS)或安全套接层协议(SSL),可以对MQTT通信进行加密,防止数据被窃听和篡改。
  2. 身份认证:在MQTT连接建立时,进行客户端和服务器之间的身份验证,以确保通信双方的合法性。常见的身份认证方式包括用户名/密码认证、客户端证书认证等。
  3. 访问控制:通过配置合适的访问控制策略,限制用户对MQTT Broker的访问权限。例如,可以限制某些主题只允许特定的客户端订阅或发布。
  4. 消息加密:对于特别敏感的数据,可以在发布消息时进行端到端的加密,确保只有预期的接收方能够解密并查看消息内容。
  5. 安全审计:对MQTT通信进行安全审计,记录关键事件和操作,以便监控和追踪潜在的安全问题。

6 MQTT客户端和Broker

常见的开源MQTT broker有:

  • eclipse 的 mosquitto
  • hiveMQ的社区版
  • RabbitMQ中提供的mqtt插件

MQTT有很多客户端,支持Java, C, python等多种语言。

更多软件可参考官网: mqtt.org/software/

7 参考资料

blog.csdn.net/weixin_43

zhuanlan.zhihu.com/p/80

hivemq.com/mqtt/mqtt-5/

thingsboard.io/docs/mqt

7.1 内容所属专栏

8 其他

零散的一些技术资料

物联网 MQTT 消息队列

0

评论区