12 local CONNECT
, CONNACK
, PUBLISH
, PUBACK
, PUBREC
, PUBREL
, PUBCOMP
, SUBSCRIBE
, SUBACK
, UNSUBSCRIBE
, UNSUBACK
, PINGREQ
, PINGRESP
, DISCONNECT
= 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
13 local CLIENT_COMMAND_TIMEOUT
= 60000
17 local string = _G
.string
18 local encodeLen
= mqttcore
.encodeLen
19 --local encodeUTF8 = mqttcore.encodeUTF8
20 -- local function encodeLen(len)
25 -- len = (len - digit) / 128
27 -- --digit = bit.bor(digit, 0x80)
28 -- digit = digit | 0x80
30 -- s = s .. string.char(digit)
35 local encodeUTF8
= mqttcore
.encodeUTF8
36 -- local function encodeUTF8(s)
37 -- if not s or #s == 0 then
40 -- return pack.pack(">P", s)
44 local packCONNECT
= mqttcore
.packCONNECT
46 -- local function packCONNECT(clientId, keepAlive, username, password, cleanSession, will, version)
47 -- local content = pack.pack(">PbbHPAAAA",
48 -- version == "3.1" and "MQIsdp" or "MQTT",
49 -- version == "3.1" and 3 or 4,
50 -- (#username == 0 and 0 or 1) * 128 + (#password == 0 and 0 or 1) * 64 + will.retain * 32 + will.qos * 8 + will.flag * 4 + cleanSession * 2,
53 -- encodeUTF8(will.topic),
54 -- encodeUTF8(will.payload),
55 -- encodeUTF8(username),
56 -- encodeUTF8(password))
57 -- local mydata = pack.pack(">bAA",
59 -- encodeLen(string.len(content)),
61 -- local tdata = mqttcore.packCONNECT(clientId, keepAlive, username, password, cleanSession, will, version)
62 -- log.info("mqtt", "true", mydata:toHex())
63 -- log.info("mqtt", "false", tdata:toHex())
67 local packSUBSCRIBE
= mqttcore
.packSUBSCRIBE
69 -- local function packSUBSCRIBE(dup, packetId, topics)
70 -- local header = SUBSCRIBE * 16 + dup * 8 + 2
71 -- local data = pack.pack(">H", packetId)
72 -- for topic, qos in pairs(topics) do
73 -- data = data .. pack.pack(">Pb", topic, qos)
75 -- local mydata = pack.pack(">bAA", header, encodeLen(#data), data)
76 -- log.info("mqtt", "true", mydata:toHex())
77 -- local tdata = mqttcore.packSUBSCRIBE(dup, packetId, topics)
78 -- log.info("mqtt", "false", tdata:toHex())
82 local packUNSUBSCRIBE
= mqttcore
.packUNSUBSCRIBE
83 -- local function packUNSUBSCRIBE(dup, packetId, topics)
84 -- local header = UNSUBSCRIBE * 16 + dup * 8 + 2
85 -- local data = pack.pack(">H", packetId)
86 -- for k, topic in pairs(topics) do
87 -- data = data .. pack.pack(">P", topic)
89 -- return pack.pack(">bAA", header, encodeLen(#data), data)
92 local packPUBLISH
= mqttcore
.packPUBLISH
94 -- local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
95 -- local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
96 -- local len = 2 + #topic + #payload
99 -- mydata = pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
101 -- mydata = pack.pack(">bAPA", header, encodeLen(len), topic, payload)
103 -- local tdata = mqttcore.packPUBLISH(dup, qos, retain, packetId, topic, payload)
104 -- log.info("mqtt", "true", mydata:toHex())
105 -- log.info("mqtt", "false", tdata:toHex())
109 local packACK
= mqttcore
.packACK
111 -- local function packACK(id, dup, packetId)
112 -- return pack.pack(">bbH", id * 16 + dup * 8 + (id == PUBREL and 1 or 0) * 2, 0x02, packetId)
115 local packZeroData
= mqttcore
.packZeroData
117 -- local function packZeroData(id, dup, qos, retain)
120 -- retain = retain or 0
121 -- return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
124 local function unpack(s
)
125 if #s
< 2 then return end
126 log.debug("mqtt.unpack", #s
, string.toHex(string.sub(s
, 1, 50)))
128 -- read remaining length
134 if pos
> #s
then return end
135 local digit
= string.byte(s
, pos
)
136 len
= len
+ ((digit
% 128) * multiplier
)
137 multiplier
= multiplier
* 128
141 if #s
< len
+ pos
- 1 then return end
143 local header
= string.byte(s
, 1)
145 --local packet = {id = (header - (header % 16)) / 16, dup = ((header % 16) - ((header % 16) % 8)) / 8, qos = bit.band(header, 0x06) / 2, retain = bit.band(header, 0x01)}
146 local packet
= {id
= (header
- (header
% 16)) >> 4, dup
= ((header
% 16) - ((header
% 16) % 8)) >> 3, qos
= (header
& 0x06) >> 1, retain
= (header
& 0x01)}
149 if packet
.id
== CONNACK
then
150 nextpos
, packet
.ackFlag
, packet
.rc
= pack
.unpack(s
, "bb", pos
)
151 elseif packet
.id
== PUBLISH
then
152 nextpos
, packet
.topic
= pack
.unpack(s
, ">P", pos
)
153 if packet
.qos
> 0 then
154 nextpos
, packet
.packetId
= pack
.unpack(s
, ">H", nextpos
)
156 packet
.payload
= string.sub(s
, nextpos
, pos
+ len
- 1)
157 elseif packet
.id
~= PINGRESP
then
159 nextpos
, packet
.packetId
= pack
.unpack(s
, ">H", pos
)
165 return packet
, pos
+ len
169 mqttc
.__index
= mqttc
171 --- 创建一个mqtt client实例
173 -- @number[opt=300] keepAlive 心跳间隔(单位为秒),默认300秒
174 -- @string[opt=""] username 用户名,用户名为空配置为""或者nil
175 -- @string[opt=""] password 密码,密码为空配置为""或者nil
176 -- @number[opt=1] cleanSession 1/0
177 -- @table[opt=nil] will 遗嘱参数,格式为{qos=, retain=, topic=, payload=}
178 -- @string[opt="3.1.1"] version MQTT版本号
179 -- @return table mqttc client实例
181 -- mqttc = mqtt.client("clientid-123")
182 -- mqttc = mqtt.client("clientid-123",200)
183 -- mqttc = mqtt.client("clientid-123",nil,"user","password")
184 -- mqttc = mqtt.client("clientid-123",nil,"user","password",nil,nil,"3.1")
185 function mqtt
.client(clientId
, keepAlive
, username
, password
, cleanSession
, will
, version
)
192 will
= {flag
= 0, qos
= 0, retain
= 0, topic
= "", payload
= ""}
195 o
.clientId
= clientId
196 o
.keepAlive
= keepAlive
or 300
197 o
.username
= username
or ""
198 o
.password
= password
or ""
199 o
.cleanSession
= cleanSession
or 1
200 o
.version
= version
or "3.1.1"
202 o
.commandTimeout
= CLIENT_COMMAND_TIMEOUT
203 o
.cache
= {}-- 接收到的mqtt数据包缓冲
204 o
.inbuf
= "" -- 未完成的数据缓冲
206 o
.getNextPacketId
= function()
207 packetId
= packetId
== 65535 and 1 or (packetId
+ 1)
213 setmetatable(o
, mqttc
)
219 function mqttc
:checkKeepAlive()
220 if self
.keepAlive
== 0 then return true end
221 if os
.time() - self
.lastOTime
>= self
.keepAlive
then
222 if not self
:write(packZeroData(PINGREQ
)) then
223 log.info("mqtt.client:", "pingreq send fail")
231 function mqttc
:write(data
)
232 log.debug("mqtt.client:write", string.toHex(string.sub(data
, 1, 50)))
233 local r
= self
.io
:send(data
)
234 if r
then self
.lastOTime
= os
.time() end
239 function mqttc
:read(timeout
, msg
, msgNoResume
)
240 if not self
:checkKeepAlive() then
241 log.warn("mqtt.read checkKeepAlive fail")
245 local topic
= "MQTTC_PKG_" .. tostring(self
.io
:id())
246 local result
, data
, param
= sys
.waitUntil({topic
, msg
}, timeout
)
247 --log.debug("mqtt.read", result, data, param)
248 if result
then -- 收到topic消息
249 local pkg
= table.remove(self
.pkgs
, 1)
251 --log.debug("mqtt", "get packet", pkg.id, pkg.packetId)
254 --log.debug("mqtt", "get sys.msg", msg, data)
255 return false, msg
, data
257 if self
.io
:closed() == 1 then
260 return false, "timeout"
265 local function update_resp(_self
, data
)
267 if #_self
.inbuf
> 0 then
268 _self
.inbuf
= _self
.inbuf
.. data
273 --log.debug("mqttc", "data recv to unpack", _self.inbuf:toHex())
274 local packet
, nextpos
= unpack(_self
.inbuf
)
276 log.info("mqttc", "msg unpack ok", packet
.id
)
277 _self
.inbuf
= string.sub(_self
.inbuf
, nextpos
)
278 table.insert(_self
.pkgs
, packet
)
279 sys
.publish("MQTTC_PKG_" .. tostring(_self
.io
:id()))
280 if #_self
.inbuf
> 0 then
281 update_resp(_self
, "")
284 log.info("mqttc", "data not full")
291 function mqttc
:waitfor(id
, timeout
, msg
, msgNoResume
)
292 for index
, packet
in ipairs(self
.cache
) do
293 if packet
.id
== id
then
294 return true, table.remove(self
.cache
, index
)
299 local insertCache
= true
300 local r
, data
, param
= self
:read(timeout
, msg
, msgNoResume
)
302 if data
.id
== PUBLISH
then
304 if not self
:write(packACK(data
.qos
== 1 and PUBACK
or PUBREC
, 0, data
.packetId
)) then
305 log.info("mqtt.client:waitfor", "send publish ack failed", data
.qos
)
309 elseif data
.id
== PUBREC
or data
.id
== PUBREL
then
310 if not self
:write(packACK(data
.id
== PUBREC
and PUBREL
or PUBCOMP
, 0, data
.packetId
)) then
311 log.info("mqtt.client:waitfor", "send ack fail", data
.id
== PUBREC
and "PUBREC" or "PUBCOMP")
317 if data
.id
== id
then
320 if insertCache
then table.insert(self
.cache
, data
) end
322 return false, data
, param
328 -- @string host 服务器地址
329 -- @param port string或者number类型,服务器端口
330 -- @string[opt="tcp"] transport "tcp"或者"tcp_ssl"
331 -- @table[opt=nil] cert,table或者nil类型,ssl证书,当transport为"tcp_ssl"时,此参数才有意义。cert格式如下:
333 -- caCert = "ca.crt", --CA证书文件(Base64编码 X.509格式),如果存在此参数,则表示客户端会对服务器的证书进行校验;不存在则不校验
334 -- clientCert = "client.crt", --客户端证书文件(Base64编码 X.509格式),服务器对客户端的证书进行校验时会用到此参数
335 -- clientKey = "client.key", --客户端私钥文件(Base64编码 X.509格式)
336 -- clientPassword = "123456", --客户端证书文件密码[可选]
338 -- @number timeout, 链接服务器最长超时时间
339 -- @return result true表示成功,false或者nil表示失败
340 -- @usage mqttc = mqtt.client("clientid-123", nil, nil, false); mqttc:connect("mqttserver.com", 1883, "tcp", 5)
341 function mqttc
:connect(host
, port
, transport
, cert
, timeout
)
342 if self
.connected
then
343 log.info("mqtt.client:connect", "has connected")
352 if transport
and transport
~= "tcp" and transport
~= "tcp_ssl" then
353 log.info("mqtt.client:connect", "invalid transport", transport
)
357 self
.io
= socket
.tcp(transport
== "tcp_ssl" or type(cert
) == "table", cert
)
359 if not self
.io
:connect(host
, port
, timeout
) then
360 log.info("mqtt.client:connect", "connect host fail")
364 if not self
:write(packCONNECT(self
.clientId
, self
.keepAlive
, self
.username
, self
.password
, self
.cleanSession
, self
.will
, self
.version
)) then
365 log.info("mqtt.client:connect", "send fail")
369 local r
, packet
= self
:waitfor(CONNACK
, self
.commandTimeout
, nil, true)
370 -- if not r or packet.rc ~= 0 then
371 -- log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
372 -- return false,packet.rc
374 if (not r
) or (not packet
) or packet
.rc
~= 0 then
375 log.info("mqtt.client:connect", "connack error", r
and packet
.rc
or -1)
376 return false, packet
and packet
.rc
or -1
379 self
.connected
= true
385 -- @param topic,string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码
386 -- @param[opt=0] qos,number或者nil,topic为一个主题时,qos为number类型(0/1/2,默认0);topic为多个主题时,qos为nil
387 -- @return bool true表示成功,false或者nil表示失败
389 -- mqttc:subscribe("/abc", 0) -- subscribe topic "/abc" with qos = 0
390 -- mqttc:subscribe({["/topic1"] = 0, ["/topic2"] = 1, ["/topic3"] = 2}) -- subscribe multi topic
391 function mqttc
:subscribe(topic
, qos
)
392 if not self
.connected
then
393 log.info("mqtt.client:subscribe", "not connected")
398 if type(topic
) == "string" then
399 topics
= {[topic
] = qos
and qos
or 0}
404 if not self
:write(packSUBSCRIBE(0, self
.getNextPacketId(), topics
)) then
405 log.info("mqtt.client:subscribe", "send failed")
409 local r
, packet
= self
:waitfor(SUBACK
, self
.commandTimeout
, nil, true)
411 log.info("mqtt.client:subscribe", "wait ack failed")
415 if not (packet
.grantedQos
and packet
.grantedQos
~="" and not packet
.grantedQos
:match(string.char(0x80))) then
416 log.info("mqtt.client:subscribe", "suback grant qos error", packet
.grantedQos
)
424 -- @param topic,string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码
425 -- @return bool true表示成功,false或者nil表示失败
427 -- mqttc:unsubscribe("/abc") -- unsubscribe topic "/abc"
428 -- mqttc:unsubscribe({"/topic1", "/topic2", "/topic3"}) -- unsubscribe multi topic
429 function mqttc
:unsubscribe(topic
)
430 if not self
.connected
then
431 log.info("mqtt.client:unsubscribe", "not connected")
436 if type(topic
) == "string" then
442 if not self
:write(packUNSUBSCRIBE(0, self
.getNextPacketId(), topics
)) then
443 log.info("mqtt.client:unsubscribe", "send failed")
447 if not self
:waitfor(UNSUBACK
, self
.commandTimeout
, nil, true) then
448 log.info("mqtt.client:unsubscribe", "wait ack failed")
456 -- @string topic UTF8编码的字符串
457 -- @string payload 用户自己控制payload的编码,mqtt.lua不会对payload做任何编码转换
458 -- @number[opt=0] qos 0/1/2, default 0
459 -- @number[opt=0] retain 0或者1
460 -- @return bool 发布成功返回true,失败返回false
462 -- mqttc = mqtt.client("clientid-123", nil, nil, false)
463 -- mqttc:connect("mqttserver.com", 1883, "tcp")
464 -- mqttc:publish("/topic", "publish from luat mqtt client", 0)
465 function mqttc
:publish(topic
, payload
, qos
, retain
)
466 if not self
.connected
then
467 log.info("mqtt.client:publish", "not connected")
474 if not self
:write(packPUBLISH(0, qos
, retain
, qos
> 0 and self
.getNextPacketId() or 0, topic
, payload
)) then
475 log.info("mqtt.client:publish", "socket send failed")
479 if qos
== 0 then return true end
481 if not self
:waitfor(qos
== 1 and PUBACK
or PUBCOMP
, self
.commandTimeout
, nil, true) then
482 log.warn("mqtt.client:publish", "wait ack timeout")
490 -- @number timeout 接收超时时间,单位毫秒
491 -- @string[opt=nil] msg 可选参数,控制socket所在的线程退出recv阻塞状态
492 -- @return result 数据接收结果,true表示成功,false表示失败
494 -- 如果result为true,表示服务器发过来的mqtt包
496 -- 如果result为false,超时失败,data为"timeout"
497 -- 如果result为false,msg控制退出,data为msg的字符串
498 -- 如果result为false,socket连接被动断开控制退出,data为"CLOSED"
499 -- 如果result为false,PDP断开连接控制退出,data为"IP_ERROR_IND"
501 -- 如果result为false,mqtt不处于连接状态,data为nil
502 -- 如果result为false,收到了PUBLISH报文,发送PUBACK或者PUBREC报文失败,data为nil
503 -- 如果result为false,收到了PUBREC报文,发送PUBREL报文失败,data为nil
504 -- 如果result为false,收到了PUBREL报文,发送PUBCOMP报文失败,data为nil
505 -- 如果result为false,发送PINGREQ报文失败,data为nil
506 -- @return param 如果是msg控制退出,param的值是msg的参数;其余情况无意义,为nil
508 -- true, packet = mqttc:receive(2000)
509 -- false, error_message = mqttc:receive(2000)
510 -- false, msg, para = mqttc:receive(2000,"APP_SEND_DATA")
511 function mqttc
:receive(timeout
, msg
)
512 if not self
.connected
then
513 log.info("mqtt.client:receive", "not connected")
517 return self
:waitfor(PUBLISH
, timeout
, msg
)
523 -- mqttc = mqtt.client("clientid-123", nil, nil, false)
524 -- mqttc:connect("mqttserver.com", 1883, "tcp")
526 -- mqttc:disconnect()
527 function mqttc
:disconnect()
529 if self
.connected
then self
:write(packZeroData(DISCONNECT
)) end
535 self
.connected
= false