工程提交

This commit is contained in:
2026-03-31 15:46:04 +08:00
parent 75f512a5b4
commit da4e944bca
2841 changed files with 4822938 additions and 1 deletions

510
4G/code/lib/mqtt.lua Normal file
View File

@@ -0,0 +1,510 @@
--- 模块功能MQTT客户端
-- @module mqtt
-- @author openLuat
-- @license MIT
-- @copyright openLuat
-- @release 2017.10.24
require "log"
require "socket"
require "utils"
module(..., package.seeall)
-- MQTT 指令id
local CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
local CLIENT_COMMAND_TIMEOUT = 60000
local function encodeLen(len)
local s = ""
local digit
repeat
digit = len % 128
len = (len - digit) / 128
if len > 0 then
digit = bit.bor(digit, 0x80)
end
s = s .. string.char(digit)
until (len <= 0)
return s
end
local function encodeUTF8(s)
if not s or #s == 0 then
return ""
else
return pack.pack(">P", s)
end
end
local function packCONNECT(clientId, keepAlive, username, password, cleanSession, will, version)
local content = pack.pack(">PbbHPAAAA",
version == "3.1" and "MQIsdp" or "MQTT",
version == "3.1" and 3 or 4,
(#username == 0 and 0 or 1) * 128 + (#password == 0 and 0 or 1) * 64 + will.retain * 32 + will.qos * 8 + will.flag * 4 + cleanSession * 2,
keepAlive,
clientId,
encodeUTF8(will.topic),
encodeUTF8(will.payload),
encodeUTF8(username),
encodeUTF8(password))
return pack.pack(">bAA",
CONNECT * 16,
encodeLen(string.len(content)),
content)
end
local function packSUBSCRIBE(dup, packetId, topics)
local header = SUBSCRIBE * 16 + dup * 8 + 2
local data = pack.pack(">H", packetId)
for topic, qos in pairs(topics) do
data = data .. pack.pack(">Pb", topic, qos)
end
return pack.pack(">bAA", header, encodeLen(#data), data)
end
local function packUNSUBSCRIBE(dup, packetId, topics)
local header = UNSUBSCRIBE * 16 + dup * 8 + 2
local data = pack.pack(">H", packetId)
for k, topic in pairs(topics) do
data = data .. pack.pack(">P", topic)
end
return pack.pack(">bAA", header, encodeLen(#data), data)
end
local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
local len = 2 + #topic + #payload
if qos > 0 then
return pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
else
return pack.pack(">bAPA", header, encodeLen(len), topic, payload)
end
end
local function packACK(id, dup, packetId)
return pack.pack(">bbH", id * 16 + dup * 8 + (id == PUBREL and 1 or 0) * 2, 0x02, packetId)
end
local function packZeroData(id, dup, qos, retain)
dup = dup or 0
qos = qos or 0
retain = retain or 0
return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
end
local function unpack(s)
if #s < 2 then return end
log.debug("mqtt.unpack", #s, string.toHex(string.sub(s, 1, 50)))
-- read remaining length
local len = 0
local multiplier = 1
local pos = 2
repeat
if pos > #s then return end
local digit = string.byte(s, pos)
len = len + ((digit % 128) * multiplier)
multiplier = multiplier * 128
pos = pos + 1
until digit < 128
if #s < len + pos - 1 then return end
local header = string.byte(s, 1)
local packet = {id = (header - (header % 16)) / 16, dup = ((header % 16) - ((header % 16) % 8)) / 8, qos = bit.band(header, 0x06) / 2, retain = bit.band(header, 0x01)}
local nextpos
if packet.id == CONNACK then
nextpos, packet.ackFlag, packet.rc = pack.unpack(s, "bb", pos)
elseif packet.id == SUBACK then
nextpos, packet.ackFlag, packet.rc = pack.unpack(s, "bb", pos)
if len >= 2 then
nextpos, packet.packetId = pack.unpack(s, ">H", pos)
packet.grantedQos = string.sub(s, nextpos, pos + len - 1)
else
packet.packetId = 0
packet.grantedQos = ""
end
elseif packet.id == PUBLISH then
nextpos, packet.topic = pack.unpack(s, ">P", pos)
if packet.qos > 0 then
nextpos, packet.packetId = pack.unpack(s, ">H", nextpos)
end
packet.payload = string.sub(s, nextpos, pos + len - 1)
elseif packet.id ~= PINGRESP then
if len >= 2 then
nextpos, packet.packetId = pack.unpack(s, ">H", pos)
else
packet.packetId = 0
end
end
return packet, pos + len
end
local mqttc = {}
mqttc.__index = mqttc
--- 创建一个mqtt client实例
-- @string clientId 确保设备唯一性
-- @number[opt=300] keepAlive 心跳间隔(单位为秒)默认300秒
-- @string[opt=""] username 用户名,用户名为空配置为""或者nil
-- @string[opt=""] password 密码,密码为空配置为""或者nil
-- @number[opt=1] cleanSession 1/0
-- @table[opt=nil] will 遗嘱参数,格式为{qos=,retain=,topic=,payload=}
-- @string[opt="3.1.1"] version MQTT版本号仅支持"3.1"和"3.1.1"
-- @return table mqttc client实例
-- @usage
-- mqttc = mqtt.client("clientid-123")
-- mqttc = mqtt.client("clientid-123",200)
-- mqttc = mqtt.client("clientid-123",nil,"user","password")
-- mqttc = mqtt.client("clientid-123",nil,"user","password",nil,{qos=0,retain=0,topic="willTopic",payload="willTopic"},"3.1")
function client(clientId, keepAlive, username, password, cleanSession, will, version)
local o = {}
local packetId = 1
if will then
will.flag = 1
else
will = {flag = 0, qos = 0, retain = 0, topic = "", payload = ""}
end
o.clientId = clientId
o.keepAlive = keepAlive or 300
o.username = username or ""
o.password = password or ""
o.cleanSession = cleanSession or 1
o.version = version or "3.1.1"
o.will = will
o.commandTimeout = CLIENT_COMMAND_TIMEOUT
o.cache = {}-- 接收到的mqtt数据包缓冲
o.inbuf = "" -- 未完成的数据缓冲
o.connected = false
o.getNextPacketId = function()
packetId = packetId == 65535 and 1 or (packetId + 1)
return packetId
end
o.lastOTime = 0
setmetatable(o, mqttc)
return o
end
-- 检测是否需要发送心跳包
function mqttc:checkKeepAlive()
if self.keepAlive == 0 then return true end
if os.time() - self.lastOTime >= self.keepAlive then
if not self:write(packZeroData(PINGREQ)) then
log.info("mqtt.client:", "pingreq send fail")
return false
end
end
return true
end
-- 发送mqtt数据
function mqttc:write(data)
log.debug("mqtt.client:write", string.toHex(string.sub(data, 1, 50)))
local r = self.io:send(data)
if r then self.lastOTime = os.time() end
return r
end
-- 接收mqtt数据包
function mqttc:read(timeout, msg, msgNoResume)
if not self:checkKeepAlive() then
log.warn("mqtt.read checkKeepAlive fail")
return false
end
-- 处理之前缓冲的数据
local packet, nextpos = unpack(self.inbuf)
if packet then
self.inbuf = string.sub(self.inbuf, nextpos)
return true, packet
end
while true do
local recvTimeout
if self.keepAlive == 0 then
recvTimeout = timeout
else
local kaTimeout = (self.keepAlive - (os.time() - self.lastOTime)) * 1000
recvTimeout = kaTimeout > timeout and timeout or kaTimeout
end
local r, s, p = self.io:recv(recvTimeout == 0 and 5 or recvTimeout, msg, msgNoResume)
if r then
self.inbuf = self.inbuf .. s
elseif s == "timeout" then -- 超时,判断是否需要发送心跳包
if not self:checkKeepAlive() then
return false
elseif timeout <= recvTimeout then
return false, "timeout"
else
timeout = timeout - recvTimeout
end
else -- 其他错误直接返回
return r, s, p
end
local packet, nextpos = unpack(self.inbuf)
if packet then
--self.lastIOTime = os.time()
self.inbuf = string.sub(self.inbuf, nextpos)
if packet.id ~= PINGRESP then
return true, packet
end
end
end
end
-- 等待接收指定的mqtt消息
function mqttc:waitfor(id, timeout, msg, msgNoResume)
for index, packet in ipairs(self.cache) do
if packet.id == id then
return true, table.remove(self.cache, index)
end
end
while true do
local insertCache = true
local r, data, param = self:read(timeout, msg, msgNoResume)
if r then
if data.id == PUBLISH then
if data.qos > 0 then
if not self:write(packACK(data.qos == 1 and PUBACK or PUBREC, 0, data.packetId)) then
log.info("mqtt.client:waitfor", "send publish ack failed", data.qos)
return false
end
end
elseif data.id == PUBREC or data.id == PUBREL then
if not self:write(packACK(data.id == PUBREC and PUBREL or PUBCOMP, 0, data.packetId)) then
log.info("mqtt.client:waitfor", "send ack fail", data.id == PUBREC and "PUBREC" or "PUBCOMP")
return false
end
insertCache = false
end
if data.id == id then
return true, data
end
if insertCache then table.insert(self.cache, data) end
else
return false, data, param
end
end
end
--- 连接mqtt服务器
-- @string host 服务器地址
-- @param port string或者number类型服务器端口
-- @string[opt="tcp"] transport "tcp"或者"tcp_ssl"
-- @table[opt=nil] cert table或者nil类型ssl证书当transport为"tcp_ssl"时此参数才有意义。cert格式如下
-- {
-- caCert = "ca.crt", --CA证书文件(Base64编码 X.509格式),如果存在此参数,则表示客户端会对服务器的证书进行校验;不存在则不校验
-- clientCert = "client.crt", --客户端证书文件(Base64编码 X.509格式),服务器对客户端的证书进行校验时会用到此参数
-- clientKey = "client.key", --客户端私钥文件(Base64编码 X.509格式)
-- clientPassword = "123456", --客户端证书文件密码[可选]
-- }
-- @number[opt=120] timeout 可选参数socket连接超时时间单位秒
-- @return result true表示成功false或者nil表示失败
-- @usage mqttc = mqtt.client("clientid-123", nil, nil, false); mqttc:connect("mqttserver.com", 1883, "tcp", 5)
function mqttc:connect(host, port, transport, cert, timeout)
if self.connected then
log.info("mqtt.client:connect", "has connected")
return false
end
if self.io then
self.io:close()
self.io = nil
end
if transport and transport ~= "tcp" and transport ~= "tcp_ssl" then
log.info("mqtt.client:connect", "invalid transport", transport)
return false
end
self.io = socket.tcp(transport == "tcp_ssl" or type(cert) == "table", cert)
if not self.io:connect(host, port, timeout) then
log.info("mqtt.client:connect", "connect host fail")
return false
end
if not self:write(packCONNECT(self.clientId, self.keepAlive, self.username, self.password, self.cleanSession, self.will, self.version)) then
log.info("mqtt.client:connect", "send fail")
return false
end
local r, packet = self:waitfor(CONNACK, self.commandTimeout, nil, true)
-- if not r or packet.rc ~= 0 then
-- log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
-- return false,packet.rc
-- end
if (not r) or (not packet) or packet.rc ~= 0 then
log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
return false, packet and packet.rc or -1
end
self.connected = true
return true
end
--- 订阅主题
-- @param topic string或者table类型一个主题时为string类型多个主题时为table类型主题内容为UTF8编码
-- @param[opt=0] qos number或者niltopic为一个主题时qos为number类型(0/1/2默认0)topic为多个主题时qos为nil
-- @return bool true表示成功false或者nil表示失败
-- @usage
-- mqttc:subscribe("/abc", 0) -- subscribe topic "/abc" with qos = 0
-- mqttc:subscribe({["/topic1"] = 0, ["/topic2"] = 1, ["/topic3"] = 2}) -- subscribe multi topic
function mqttc:subscribe(topic, qos)
if not self.connected then
log.info("mqtt.client:subscribe", "not connected")
return false
end
local topics
if type(topic) == "string" then
topics = {[topic] = qos and qos or 0}
else
topics = topic
end
if not self:write(packSUBSCRIBE(0, self.getNextPacketId(), topics)) then
log.info("mqtt.client:subscribe", "send failed")
return false
end
local r, packet = self:waitfor(SUBACK, self.commandTimeout, nil, true)
if not r then
log.info("mqtt.client:subscribe", "wait ack failed")
return false
end
if not (packet.grantedQos and packet.grantedQos~="" and not packet.grantedQos:match(string.char(0x80))) then
log.info("mqtt.client:subscribe", "suback grant qos error", packet.grantedQos)
return false
end
return true
end
--- 取消订阅主题
-- @param topic string或者table类型一个主题时为string类型多个主题时为table类型主题内容为UTF8编码
-- @return bool true表示成功false或者nil表示失败
-- @usage
-- mqttc:unsubscribe("/abc") -- unsubscribe topic "/abc"
-- mqttc:unsubscribe({"/topic1", "/topic2", "/topic3"}) -- unsubscribe multi topic
function mqttc:unsubscribe(topic)
if not self.connected then
log.info("mqtt.client:unsubscribe", "not connected")
return false
end
local topics
if type(topic) == "string" then
topics = {topic}
else
topics = topic
end
if not self:write(packUNSUBSCRIBE(0, self.getNextPacketId(), topics)) then
log.info("mqtt.client:unsubscribe", "send failed")
return false
end
if not self:waitfor(UNSUBACK, self.commandTimeout, nil, true) then
log.info("mqtt.client:unsubscribe", "wait ack failed")
return false
end
return true
end
--- 发布一条消息
-- @string topic UTF8编码的字符串
-- @string payload 用户自己控制payload的编码mqtt.lua不会对payload做任何编码转换
-- @number[opt=0] qos 0/1/2, default 0
-- @number[opt=0] retain 0或者1
-- @return bool 发布成功返回true失败返回false
-- @usage
-- mqttc = mqtt.client("clientid-123", nil, nil, false)
-- mqttc:connect("mqttserver.com", 1883, "tcp")
-- mqttc:publish("/topic", "publish from luat mqtt client", 0)
function mqttc:publish(topic, payload, qos, retain)
if not self.connected then
log.info("mqtt.client:publish", "not connected")
return false
end
qos = qos or 0
retain = retain or 0
if not self:write(packPUBLISH(0, qos, retain, qos > 0 and self.getNextPacketId() or 0, topic, payload)) then
log.info("mqtt.client:publish", "socket send failed")
return false
end
if qos == 0 then return true end
if not self:waitfor(qos == 1 and PUBACK or PUBCOMP, self.commandTimeout, nil, true) then
log.warn("mqtt.client:publish", "wait ack timeout")
return false
end
return true
end
--- 接收消息
-- @number timeout 接收超时时间,单位毫秒
-- @string[opt=nil] msg 可选参数控制socket所在的线程退出recv阻塞状态
-- @return result 数据接收结果true表示成功false表示失败
-- @return data
-- 如果result为true表示服务器发过来的mqtt包
--
-- 如果result为false超时失败,data为"timeout"
-- 如果result为falsemsg控制退出data为msg的字符串
-- 如果result为falsesocket连接被动断开控制退出data为"CLOSED"
-- 如果result为falsePDP断开连接控制退出data为"IP_ERROR_IND"
--
-- 如果result为falsemqtt不处于连接状态data为nil
-- 如果result为false收到了PUBLISH报文发送PUBACK或者PUBREC报文失败data为nil
-- 如果result为false收到了PUBREC报文发送PUBREL报文失败data为nil
-- 如果result为false收到了PUBREL报文发送PUBCOMP报文失败data为nil
-- 如果result为false发送PINGREQ报文失败data为nil
-- @return param 如果是msg控制退出param的值是msg的参数其余情况无意义为nil
-- @usage
-- true, packet = mqttc:receive(2000)
-- false, error_message = mqttc:receive(2000)
-- false, msg, para = mqttc:receive(2000,"APP_SEND_DATA")
function mqttc:receive(timeout, msg)
if not self.connected then
log.info("mqtt.client:receive", "not connected")
return false
end
return self:waitfor(PUBLISH, timeout, msg)
end
--- 断开与服务器的连接
-- @return nil
-- @usage
-- mqttc = mqtt.client("clientid-123", nil, nil, false)
-- mqttc:connect("mqttserver.com", 1883, "tcp")
-- process data
-- mqttc:disconnect()
function mqttc:disconnect()
if self.io then
if self.connected then self:write(packZeroData(DISCONNECT)) end
self.io:close()
self.io = nil
end
self.cache = {}
self.inbuf = ""
self.connected = false
end