Files
BR_YKC/4G/code/lib/mqtt.lua
2026-03-31 15:46:04 +08:00

511 lines
17 KiB
Lua
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
--- 模块功能MQTT客户端
-- @module mqtt
-- @author openLuat
-- @license MIT
-- @copyright openLuat
-- @release 2017.10.24
require "log"
require "socket"
require "utils"
module(..., package.seeall)
-- MQTT 指令id
local CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
local CLIENT_COMMAND_TIMEOUT = 60000
local function encodeLen(len)
local s = ""
local digit
repeat
digit = len % 128
len = (len - digit) / 128
if len > 0 then
digit = bit.bor(digit, 0x80)
end
s = s .. string.char(digit)
until (len <= 0)
return s
end
local function encodeUTF8(s)
if not s or #s == 0 then
return ""
else
return pack.pack(">P", s)
end
end
local function packCONNECT(clientId, keepAlive, username, password, cleanSession, will, version)
local content = pack.pack(">PbbHPAAAA",
version == "3.1" and "MQIsdp" or "MQTT",
version == "3.1" and 3 or 4,
(#username == 0 and 0 or 1) * 128 + (#password == 0 and 0 or 1) * 64 + will.retain * 32 + will.qos * 8 + will.flag * 4 + cleanSession * 2,
keepAlive,
clientId,
encodeUTF8(will.topic),
encodeUTF8(will.payload),
encodeUTF8(username),
encodeUTF8(password))
return pack.pack(">bAA",
CONNECT * 16,
encodeLen(string.len(content)),
content)
end
local function packSUBSCRIBE(dup, packetId, topics)
local header = SUBSCRIBE * 16 + dup * 8 + 2
local data = pack.pack(">H", packetId)
for topic, qos in pairs(topics) do
data = data .. pack.pack(">Pb", topic, qos)
end
return pack.pack(">bAA", header, encodeLen(#data), data)
end
local function packUNSUBSCRIBE(dup, packetId, topics)
local header = UNSUBSCRIBE * 16 + dup * 8 + 2
local data = pack.pack(">H", packetId)
for k, topic in pairs(topics) do
data = data .. pack.pack(">P", topic)
end
return pack.pack(">bAA", header, encodeLen(#data), data)
end
local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
local len = 2 + #topic + #payload
if qos > 0 then
return pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
else
return pack.pack(">bAPA", header, encodeLen(len), topic, payload)
end
end
local function packACK(id, dup, packetId)
return pack.pack(">bbH", id * 16 + dup * 8 + (id == PUBREL and 1 or 0) * 2, 0x02, packetId)
end
local function packZeroData(id, dup, qos, retain)
dup = dup or 0
qos = qos or 0
retain = retain or 0
return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
end
local function unpack(s)
if #s < 2 then return end
log.debug("mqtt.unpack", #s, string.toHex(string.sub(s, 1, 50)))
-- read remaining length
local len = 0
local multiplier = 1
local pos = 2
repeat
if pos > #s then return end
local digit = string.byte(s, pos)
len = len + ((digit % 128) * multiplier)
multiplier = multiplier * 128
pos = pos + 1
until digit < 128
if #s < len + pos - 1 then return end
local header = string.byte(s, 1)
local packet = {id = (header - (header % 16)) / 16, dup = ((header % 16) - ((header % 16) % 8)) / 8, qos = bit.band(header, 0x06) / 2, retain = bit.band(header, 0x01)}
local nextpos
if packet.id == CONNACK then
nextpos, packet.ackFlag, packet.rc = pack.unpack(s, "bb", pos)
elseif packet.id == SUBACK then
nextpos, packet.ackFlag, packet.rc = pack.unpack(s, "bb", pos)
if len >= 2 then
nextpos, packet.packetId = pack.unpack(s, ">H", pos)
packet.grantedQos = string.sub(s, nextpos, pos + len - 1)
else
packet.packetId = 0
packet.grantedQos = ""
end
elseif packet.id == PUBLISH then
nextpos, packet.topic = pack.unpack(s, ">P", pos)
if packet.qos > 0 then
nextpos, packet.packetId = pack.unpack(s, ">H", nextpos)
end
packet.payload = string.sub(s, nextpos, pos + len - 1)
elseif packet.id ~= PINGRESP then
if len >= 2 then
nextpos, packet.packetId = pack.unpack(s, ">H", pos)
else
packet.packetId = 0
end
end
return packet, pos + len
end
local mqttc = {}
mqttc.__index = mqttc
--- 创建一个mqtt client实例
-- @string clientId 确保设备唯一性
-- @number[opt=300] keepAlive 心跳间隔(单位为秒)默认300秒
-- @string[opt=""] username 用户名,用户名为空配置为""或者nil
-- @string[opt=""] password 密码,密码为空配置为""或者nil
-- @number[opt=1] cleanSession 1/0
-- @table[opt=nil] will 遗嘱参数,格式为{qos=,retain=,topic=,payload=}
-- @string[opt="3.1.1"] version MQTT版本号仅支持"3.1"和"3.1.1"
-- @return table mqttc client实例
-- @usage
-- mqttc = mqtt.client("clientid-123")
-- mqttc = mqtt.client("clientid-123",200)
-- mqttc = mqtt.client("clientid-123",nil,"user","password")
-- mqttc = mqtt.client("clientid-123",nil,"user","password",nil,{qos=0,retain=0,topic="willTopic",payload="willTopic"},"3.1")
function client(clientId, keepAlive, username, password, cleanSession, will, version)
local o = {}
local packetId = 1
if will then
will.flag = 1
else
will = {flag = 0, qos = 0, retain = 0, topic = "", payload = ""}
end
o.clientId = clientId
o.keepAlive = keepAlive or 300
o.username = username or ""
o.password = password or ""
o.cleanSession = cleanSession or 1
o.version = version or "3.1.1"
o.will = will
o.commandTimeout = CLIENT_COMMAND_TIMEOUT
o.cache = {}-- 接收到的mqtt数据包缓冲
o.inbuf = "" -- 未完成的数据缓冲
o.connected = false
o.getNextPacketId = function()
packetId = packetId == 65535 and 1 or (packetId + 1)
return packetId
end
o.lastOTime = 0
setmetatable(o, mqttc)
return o
end
-- 检测是否需要发送心跳包
function mqttc:checkKeepAlive()
if self.keepAlive == 0 then return true end
if os.time() - self.lastOTime >= self.keepAlive then
if not self:write(packZeroData(PINGREQ)) then
log.info("mqtt.client:", "pingreq send fail")
return false
end
end
return true
end
-- 发送mqtt数据
function mqttc:write(data)
log.debug("mqtt.client:write", string.toHex(string.sub(data, 1, 50)))
local r = self.io:send(data)
if r then self.lastOTime = os.time() end
return r
end
-- 接收mqtt数据包
function mqttc:read(timeout, msg, msgNoResume)
if not self:checkKeepAlive() then
log.warn("mqtt.read checkKeepAlive fail")
return false
end
-- 处理之前缓冲的数据
local packet, nextpos = unpack(self.inbuf)
if packet then
self.inbuf = string.sub(self.inbuf, nextpos)
return true, packet
end
while true do
local recvTimeout
if self.keepAlive == 0 then
recvTimeout = timeout
else
local kaTimeout = (self.keepAlive - (os.time() - self.lastOTime)) * 1000
recvTimeout = kaTimeout > timeout and timeout or kaTimeout
end
local r, s, p = self.io:recv(recvTimeout == 0 and 5 or recvTimeout, msg, msgNoResume)
if r then
self.inbuf = self.inbuf .. s
elseif s == "timeout" then -- 超时,判断是否需要发送心跳包
if not self:checkKeepAlive() then
return false
elseif timeout <= recvTimeout then
return false, "timeout"
else
timeout = timeout - recvTimeout
end
else -- 其他错误直接返回
return r, s, p
end
local packet, nextpos = unpack(self.inbuf)
if packet then
--self.lastIOTime = os.time()
self.inbuf = string.sub(self.inbuf, nextpos)
if packet.id ~= PINGRESP then
return true, packet
end
end
end
end
-- 等待接收指定的mqtt消息
function mqttc:waitfor(id, timeout, msg, msgNoResume)
for index, packet in ipairs(self.cache) do
if packet.id == id then
return true, table.remove(self.cache, index)
end
end
while true do
local insertCache = true
local r, data, param = self:read(timeout, msg, msgNoResume)
if r then
if data.id == PUBLISH then
if data.qos > 0 then
if not self:write(packACK(data.qos == 1 and PUBACK or PUBREC, 0, data.packetId)) then
log.info("mqtt.client:waitfor", "send publish ack failed", data.qos)
return false
end
end
elseif data.id == PUBREC or data.id == PUBREL then
if not self:write(packACK(data.id == PUBREC and PUBREL or PUBCOMP, 0, data.packetId)) then
log.info("mqtt.client:waitfor", "send ack fail", data.id == PUBREC and "PUBREC" or "PUBCOMP")
return false
end
insertCache = false
end
if data.id == id then
return true, data
end
if insertCache then table.insert(self.cache, data) end
else
return false, data, param
end
end
end
--- 连接mqtt服务器
-- @string host 服务器地址
-- @param port string或者number类型服务器端口
-- @string[opt="tcp"] transport "tcp"或者"tcp_ssl"
-- @table[opt=nil] cert table或者nil类型ssl证书当transport为"tcp_ssl"时此参数才有意义。cert格式如下
-- {
-- caCert = "ca.crt", --CA证书文件(Base64编码 X.509格式),如果存在此参数,则表示客户端会对服务器的证书进行校验;不存在则不校验
-- clientCert = "client.crt", --客户端证书文件(Base64编码 X.509格式),服务器对客户端的证书进行校验时会用到此参数
-- clientKey = "client.key", --客户端私钥文件(Base64编码 X.509格式)
-- clientPassword = "123456", --客户端证书文件密码[可选]
-- }
-- @number[opt=120] timeout 可选参数socket连接超时时间单位秒
-- @return result true表示成功false或者nil表示失败
-- @usage mqttc = mqtt.client("clientid-123", nil, nil, false); mqttc:connect("mqttserver.com", 1883, "tcp", 5)
function mqttc:connect(host, port, transport, cert, timeout)
if self.connected then
log.info("mqtt.client:connect", "has connected")
return false
end
if self.io then
self.io:close()
self.io = nil
end
if transport and transport ~= "tcp" and transport ~= "tcp_ssl" then
log.info("mqtt.client:connect", "invalid transport", transport)
return false
end
self.io = socket.tcp(transport == "tcp_ssl" or type(cert) == "table", cert)
if not self.io:connect(host, port, timeout) then
log.info("mqtt.client:connect", "connect host fail")
return false
end
if not self:write(packCONNECT(self.clientId, self.keepAlive, self.username, self.password, self.cleanSession, self.will, self.version)) then
log.info("mqtt.client:connect", "send fail")
return false
end
local r, packet = self:waitfor(CONNACK, self.commandTimeout, nil, true)
-- if not r or packet.rc ~= 0 then
-- log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
-- return false,packet.rc
-- end
if (not r) or (not packet) or packet.rc ~= 0 then
log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
return false, packet and packet.rc or -1
end
self.connected = true
return true
end
--- 订阅主题
-- @param topic string或者table类型一个主题时为string类型多个主题时为table类型主题内容为UTF8编码
-- @param[opt=0] qos number或者niltopic为一个主题时qos为number类型(0/1/2默认0)topic为多个主题时qos为nil
-- @return bool true表示成功false或者nil表示失败
-- @usage
-- mqttc:subscribe("/abc", 0) -- subscribe topic "/abc" with qos = 0
-- mqttc:subscribe({["/topic1"] = 0, ["/topic2"] = 1, ["/topic3"] = 2}) -- subscribe multi topic
function mqttc:subscribe(topic, qos)
if not self.connected then
log.info("mqtt.client:subscribe", "not connected")
return false
end
local topics
if type(topic) == "string" then
topics = {[topic] = qos and qos or 0}
else
topics = topic
end
if not self:write(packSUBSCRIBE(0, self.getNextPacketId(), topics)) then
log.info("mqtt.client:subscribe", "send failed")
return false
end
local r, packet = self:waitfor(SUBACK, self.commandTimeout, nil, true)
if not r then
log.info("mqtt.client:subscribe", "wait ack failed")
return false
end
if not (packet.grantedQos and packet.grantedQos~="" and not packet.grantedQos:match(string.char(0x80))) then
log.info("mqtt.client:subscribe", "suback grant qos error", packet.grantedQos)
return false
end
return true
end
--- 取消订阅主题
-- @param topic string或者table类型一个主题时为string类型多个主题时为table类型主题内容为UTF8编码
-- @return bool true表示成功false或者nil表示失败
-- @usage
-- mqttc:unsubscribe("/abc") -- unsubscribe topic "/abc"
-- mqttc:unsubscribe({"/topic1", "/topic2", "/topic3"}) -- unsubscribe multi topic
function mqttc:unsubscribe(topic)
if not self.connected then
log.info("mqtt.client:unsubscribe", "not connected")
return false
end
local topics
if type(topic) == "string" then
topics = {topic}
else
topics = topic
end
if not self:write(packUNSUBSCRIBE(0, self.getNextPacketId(), topics)) then
log.info("mqtt.client:unsubscribe", "send failed")
return false
end
if not self:waitfor(UNSUBACK, self.commandTimeout, nil, true) then
log.info("mqtt.client:unsubscribe", "wait ack failed")
return false
end
return true
end
--- 发布一条消息
-- @string topic UTF8编码的字符串
-- @string payload 用户自己控制payload的编码mqtt.lua不会对payload做任何编码转换
-- @number[opt=0] qos 0/1/2, default 0
-- @number[opt=0] retain 0或者1
-- @return bool 发布成功返回true失败返回false
-- @usage
-- mqttc = mqtt.client("clientid-123", nil, nil, false)
-- mqttc:connect("mqttserver.com", 1883, "tcp")
-- mqttc:publish("/topic", "publish from luat mqtt client", 0)
function mqttc:publish(topic, payload, qos, retain)
if not self.connected then
log.info("mqtt.client:publish", "not connected")
return false
end
qos = qos or 0
retain = retain or 0
if not self:write(packPUBLISH(0, qos, retain, qos > 0 and self.getNextPacketId() or 0, topic, payload)) then
log.info("mqtt.client:publish", "socket send failed")
return false
end
if qos == 0 then return true end
if not self:waitfor(qos == 1 and PUBACK or PUBCOMP, self.commandTimeout, nil, true) then
log.warn("mqtt.client:publish", "wait ack timeout")
return false
end
return true
end
--- 接收消息
-- @number timeout 接收超时时间,单位毫秒
-- @string[opt=nil] msg 可选参数控制socket所在的线程退出recv阻塞状态
-- @return result 数据接收结果true表示成功false表示失败
-- @return data
-- 如果result为true表示服务器发过来的mqtt包
--
-- 如果result为false超时失败,data为"timeout"
-- 如果result为falsemsg控制退出data为msg的字符串
-- 如果result为falsesocket连接被动断开控制退出data为"CLOSED"
-- 如果result为falsePDP断开连接控制退出data为"IP_ERROR_IND"
--
-- 如果result为falsemqtt不处于连接状态data为nil
-- 如果result为false收到了PUBLISH报文发送PUBACK或者PUBREC报文失败data为nil
-- 如果result为false收到了PUBREC报文发送PUBREL报文失败data为nil
-- 如果result为false收到了PUBREL报文发送PUBCOMP报文失败data为nil
-- 如果result为false发送PINGREQ报文失败data为nil
-- @return param 如果是msg控制退出param的值是msg的参数其余情况无意义为nil
-- @usage
-- true, packet = mqttc:receive(2000)
-- false, error_message = mqttc:receive(2000)
-- false, msg, para = mqttc:receive(2000,"APP_SEND_DATA")
function mqttc:receive(timeout, msg)
if not self.connected then
log.info("mqtt.client:receive", "not connected")
return false
end
return self:waitfor(PUBLISH, timeout, msg)
end
--- 断开与服务器的连接
-- @return nil
-- @usage
-- mqttc = mqtt.client("clientid-123", nil, nil, false)
-- mqttc:connect("mqttserver.com", 1883, "tcp")
-- process data
-- mqttc:disconnect()
function mqttc:disconnect()
if self.io then
if self.connected then self:write(packZeroData(DISCONNECT)) end
self.io:close()
self.io = nil
end
self.cache = {}
self.inbuf = ""
self.connected = false
end