add: 添加air780epvh的定位演示代码,初始化版本
[LuatOS.git] / demo / ril_8266 / mqtt.lua
blob5aa2d3915725befc848298be091a7a35cec9272d
2 --- 模块功能:MQTT客户端
3 -- @module mqtt
4 -- @author openLuat
5 -- @license MIT
6 -- @copyright openLuat
7 -- @release 2017.10.24
9 local mqtt = {}
11 -- MQTT 指令id
12 local CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
13 local CLIENT_COMMAND_TIMEOUT = 60000
15 sys = require("sys")
16 local pack = _G.pack
17 local string = _G.string
18 local encodeLen = mqttcore.encodeLen
19 --local encodeUTF8 = mqttcore.encodeUTF8
20 -- local function encodeLen(len)
21 -- local s = ""
22 -- local digit
23 -- repeat
24 -- digit = len % 128
25 -- len = (len - digit) / 128
26 -- if len > 0 then
27 -- --digit = bit.bor(digit, 0x80)
28 -- digit = digit | 0x80
29 -- end
30 -- s = s .. string.char(digit)
31 -- until (len <= 0)
32 -- return s
33 -- end
35 local encodeUTF8 = mqttcore.encodeUTF8
36 -- local function encodeUTF8(s)
37 -- if not s or #s == 0 then
38 -- return ""
39 -- else
40 -- return pack.pack(">P", s)
41 -- end
42 -- end
44 local packCONNECT = mqttcore.packCONNECT
46 -- local function packCONNECT(clientId, keepAlive, username, password, cleanSession, will, version)
47 -- local content = pack.pack(">PbbHPAAAA",
48 -- version == "3.1" and "MQIsdp" or "MQTT",
49 -- version == "3.1" and 3 or 4,
50 -- (#username == 0 and 0 or 1) * 128 + (#password == 0 and 0 or 1) * 64 + will.retain * 32 + will.qos * 8 + will.flag * 4 + cleanSession * 2,
51 -- keepAlive,
52 -- clientId,
53 -- encodeUTF8(will.topic),
54 -- encodeUTF8(will.payload),
55 -- encodeUTF8(username),
56 -- encodeUTF8(password))
57 -- local mydata = pack.pack(">bAA",
58 -- CONNECT * 16,
59 -- encodeLen(string.len(content)),
60 -- content)
61 -- local tdata = mqttcore.packCONNECT(clientId, keepAlive, username, password, cleanSession, will, version)
62 -- log.info("mqtt", "true", mydata:toHex())
63 -- log.info("mqtt", "false", tdata:toHex())
64 -- return mydata
65 -- end
67 local packSUBSCRIBE = mqttcore.packSUBSCRIBE
69 -- local function packSUBSCRIBE(dup, packetId, topics)
70 -- local header = SUBSCRIBE * 16 + dup * 8 + 2
71 -- local data = pack.pack(">H", packetId)
72 -- for topic, qos in pairs(topics) do
73 -- data = data .. pack.pack(">Pb", topic, qos)
74 -- end
75 -- local mydata = pack.pack(">bAA", header, encodeLen(#data), data)
76 -- log.info("mqtt", "true", mydata:toHex())
77 -- local tdata = mqttcore.packSUBSCRIBE(dup, packetId, topics)
78 -- log.info("mqtt", "false", tdata:toHex())
79 -- return mydata
80 -- end
82 local packUNSUBSCRIBE = mqttcore.packUNSUBSCRIBE
83 -- local function packUNSUBSCRIBE(dup, packetId, topics)
84 -- local header = UNSUBSCRIBE * 16 + dup * 8 + 2
85 -- local data = pack.pack(">H", packetId)
86 -- for k, topic in pairs(topics) do
87 -- data = data .. pack.pack(">P", topic)
88 -- end
89 -- return pack.pack(">bAA", header, encodeLen(#data), data)
90 -- end
92 local packPUBLISH = mqttcore.packPUBLISH
94 -- local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
95 -- local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
96 -- local len = 2 + #topic + #payload
97 -- local mydata = nil
98 -- if qos > 0 then
99 -- mydata = pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
100 -- else
101 -- mydata = pack.pack(">bAPA", header, encodeLen(len), topic, payload)
102 -- end
103 -- local tdata = mqttcore.packPUBLISH(dup, qos, retain, packetId, topic, payload)
104 -- log.info("mqtt", "true", mydata:toHex())
105 -- log.info("mqtt", "false", tdata:toHex())
106 -- return mydata
107 -- end
109 local packACK = mqttcore.packACK
111 -- local function packACK(id, dup, packetId)
112 -- return pack.pack(">bbH", id * 16 + dup * 8 + (id == PUBREL and 1 or 0) * 2, 0x02, packetId)
113 -- end
115 local packZeroData = mqttcore.packZeroData
117 -- local function packZeroData(id, dup, qos, retain)
118 -- dup = dup or 0
119 -- qos = qos or 0
120 -- retain = retain or 0
121 -- return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
122 -- end
124 local function unpack(s)
125 if #s < 2 then return end
126 log.debug("mqtt.unpack", #s, string.toHex(string.sub(s, 1, 50)))
128 -- read remaining length
129 local len = 0
130 local multiplier = 1
131 local pos = 2
133 repeat
134 if pos > #s then return end
135 local digit = string.byte(s, pos)
136 len = len + ((digit % 128) * multiplier)
137 multiplier = multiplier * 128
138 pos = pos + 1
139 until digit < 128
141 if #s < len + pos - 1 then return end
143 local header = string.byte(s, 1)
145 --local packet = {id = (header - (header % 16)) / 16, dup = ((header % 16) - ((header % 16) % 8)) / 8, qos = bit.band(header, 0x06) / 2, retain = bit.band(header, 0x01)}
146 local packet = {id = (header - (header % 16)) >> 4, dup = ((header % 16) - ((header % 16) % 8)) >> 3, qos = (header & 0x06) >> 1, retain = (header & 0x01)}
147 local nextpos
149 if packet.id == CONNACK then
150 nextpos, packet.ackFlag, packet.rc = pack.unpack(s, "bb", pos)
151 elseif packet.id == PUBLISH then
152 nextpos, packet.topic = pack.unpack(s, ">P", pos)
153 if packet.qos > 0 then
154 nextpos, packet.packetId = pack.unpack(s, ">H", nextpos)
156 packet.payload = string.sub(s, nextpos, pos + len - 1)
157 elseif packet.id ~= PINGRESP then
158 if len >= 2 then
159 nextpos, packet.packetId = pack.unpack(s, ">H", pos)
160 else
161 packet.packetId = 0
165 return packet, pos + len
168 local mqttc = {}
169 mqttc.__index = mqttc
171 --- 创建一个mqtt client实例
172 -- @string clientId
173 -- @number[opt=300] keepAlive 心跳间隔(单位为秒),默认300秒
174 -- @string[opt=""] username 用户名,用户名为空配置为""或者nil
175 -- @string[opt=""] password 密码,密码为空配置为""或者nil
176 -- @number[opt=1] cleanSession 1/0
177 -- @table[opt=nil] will 遗嘱参数,格式为{qos=, retain=, topic=, payload=}
178 -- @string[opt="3.1.1"] version MQTT版本号
179 -- @return table mqttc client实例
180 -- @usage
181 -- mqttc = mqtt.client("clientid-123")
182 -- mqttc = mqtt.client("clientid-123",200)
183 -- mqttc = mqtt.client("clientid-123",nil,"user","password")
184 -- mqttc = mqtt.client("clientid-123",nil,"user","password",nil,nil,"3.1")
185 function mqtt.client(clientId, keepAlive, username, password, cleanSession, will, version)
186 local o = {}
187 local packetId = 1
189 if will then
190 will.flag = 1
191 else
192 will = {flag = 0, qos = 0, retain = 0, topic = "", payload = ""}
195 o.clientId = clientId
196 o.keepAlive = keepAlive or 300
197 o.username = username or ""
198 o.password = password or ""
199 o.cleanSession = cleanSession or 1
200 o.version = version or "3.1.1"
201 o.will = will
202 o.commandTimeout = CLIENT_COMMAND_TIMEOUT
203 o.cache = {}-- 接收到的mqtt数据包缓冲
204 o.inbuf = "" -- 未完成的数据缓冲
205 o.connected = false
206 o.getNextPacketId = function()
207 packetId = packetId == 65535 and 1 or (packetId + 1)
208 return packetId
210 o.lastOTime = 0
211 o.pkgs = {}
213 setmetatable(o, mqttc)
215 return o
218 -- 检测是否需要发送心跳包
219 function mqttc:checkKeepAlive()
220 if self.keepAlive == 0 then return true end
221 if os.time() - self.lastOTime >= self.keepAlive then
222 if not self:write(packZeroData(PINGREQ)) then
223 log.info("mqtt.client:", "pingreq send fail")
224 return false
227 return true
230 -- 发送mqtt数据
231 function mqttc:write(data)
232 log.debug("mqtt.client:write", string.toHex(string.sub(data, 1, 50)))
233 local r = self.io:send(data)
234 if r then self.lastOTime = os.time() end
235 return r
238 -- 接收mqtt数据包
239 function mqttc:read(timeout, msg, msgNoResume)
240 if not self:checkKeepAlive() then
241 log.warn("mqtt.read checkKeepAlive fail")
242 return false
245 local topic = "MQTTC_PKG_" .. tostring(self.io:id())
246 local result, data, param = sys.waitUntil({topic, msg}, timeout)
247 --log.debug("mqtt.read", result, data, param)
248 if result then -- 收到topic消息
249 local pkg = table.remove(self.pkgs, 1)
250 if pkg ~= nil then
251 --log.debug("mqtt", "get packet", pkg.id, pkg.packetId)
252 return true, pkg
254 --log.debug("mqtt", "get sys.msg", msg, data)
255 return false, msg, data
256 else
257 if self.io:closed() == 1 then
258 return false
259 else
260 return false, "timeout"
265 local function update_resp(_self, data)
266 if #data > 0 then
267 if #_self.inbuf > 0 then
268 _self.inbuf = _self.inbuf .. data
269 else
270 _self.inbuf = data
273 --log.debug("mqttc", "data recv to unpack", _self.inbuf:toHex())
274 local packet, nextpos = unpack(_self.inbuf)
275 if packet then
276 log.info("mqttc", "msg unpack ok", packet.id)
277 _self.inbuf = string.sub(_self.inbuf, nextpos)
278 table.insert(_self.pkgs, packet)
279 sys.publish("MQTTC_PKG_" .. tostring(_self.io:id()))
280 if #_self.inbuf > 0 then
281 update_resp(_self, "")
283 else
284 log.info("mqttc", "data not full")
287 return true
290 -- 等待接收指定的mqtt消息
291 function mqttc:waitfor(id, timeout, msg, msgNoResume)
292 for index, packet in ipairs(self.cache) do
293 if packet.id == id then
294 return true, table.remove(self.cache, index)
298 while true do
299 local insertCache = true
300 local r, data, param = self:read(timeout, msg, msgNoResume)
301 if r then
302 if data.id == PUBLISH then
303 if data.qos > 0 then
304 if not self:write(packACK(data.qos == 1 and PUBACK or PUBREC, 0, data.packetId)) then
305 log.info("mqtt.client:waitfor", "send publish ack failed", data.qos)
306 return false
309 elseif data.id == PUBREC or data.id == PUBREL then
310 if not self:write(packACK(data.id == PUBREC and PUBREL or PUBCOMP, 0, data.packetId)) then
311 log.info("mqtt.client:waitfor", "send ack fail", data.id == PUBREC and "PUBREC" or "PUBCOMP")
312 return false
314 insertCache = false
317 if data.id == id then
318 return true, data
320 if insertCache then table.insert(self.cache, data) end
321 else
322 return false, data, param
327 --- 连接mqtt服务器
328 -- @string host 服务器地址
329 -- @param port string或者number类型,服务器端口
330 -- @string[opt="tcp"] transport "tcp"或者"tcp_ssl"
331 -- @table[opt=nil] cert,table或者nil类型,ssl证书,当transport为"tcp_ssl"时,此参数才有意义。cert格式如下:
332 -- {
333 -- caCert = "ca.crt", --CA证书文件(Base64编码 X.509格式),如果存在此参数,则表示客户端会对服务器的证书进行校验;不存在则不校验
334 -- clientCert = "client.crt", --客户端证书文件(Base64编码 X.509格式),服务器对客户端的证书进行校验时会用到此参数
335 -- clientKey = "client.key", --客户端私钥文件(Base64编码 X.509格式)
336 -- clientPassword = "123456", --客户端证书文件密码[可选]
337 -- }
338 -- @number timeout, 链接服务器最长超时时间
339 -- @return result true表示成功,false或者nil表示失败
340 -- @usage mqttc = mqtt.client("clientid-123", nil, nil, false); mqttc:connect("mqttserver.com", 1883, "tcp", 5)
341 function mqttc:connect(host, port, transport, cert, timeout)
342 if self.connected then
343 log.info("mqtt.client:connect", "has connected")
344 return false
347 if self.io then
348 self.io:close()
349 self.io = nil
352 if transport and transport ~= "tcp" and transport ~= "tcp_ssl" then
353 log.info("mqtt.client:connect", "invalid transport", transport)
354 return false
357 self.io = socket.tcp(transport == "tcp_ssl" or type(cert) == "table", cert)
359 if not self.io:connect(host, port, timeout) then
360 log.info("mqtt.client:connect", "connect host fail")
361 return false
364 if not self:write(packCONNECT(self.clientId, self.keepAlive, self.username, self.password, self.cleanSession, self.will, self.version)) then
365 log.info("mqtt.client:connect", "send fail")
366 return false
369 local r, packet = self:waitfor(CONNACK, self.commandTimeout, nil, true)
370 -- if not r or packet.rc ~= 0 then
371 -- log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
372 -- return false,packet.rc
373 -- end
374 if (not r) or (not packet) or packet.rc ~= 0 then
375 log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
376 return false, packet and packet.rc or -1
379 self.connected = true
381 return true
384 --- 订阅主题
385 -- @param topic,string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码
386 -- @param[opt=0] qos,number或者nil,topic为一个主题时,qos为number类型(0/1/2,默认0);topic为多个主题时,qos为nil
387 -- @return bool true表示成功,false或者nil表示失败
388 -- @usage
389 -- mqttc:subscribe("/abc", 0) -- subscribe topic "/abc" with qos = 0
390 -- mqttc:subscribe({["/topic1"] = 0, ["/topic2"] = 1, ["/topic3"] = 2}) -- subscribe multi topic
391 function mqttc:subscribe(topic, qos)
392 if not self.connected then
393 log.info("mqtt.client:subscribe", "not connected")
394 return false
397 local topics
398 if type(topic) == "string" then
399 topics = {[topic] = qos and qos or 0}
400 else
401 topics = topic
404 if not self:write(packSUBSCRIBE(0, self.getNextPacketId(), topics)) then
405 log.info("mqtt.client:subscribe", "send failed")
406 return false
409 local r, packet = self:waitfor(SUBACK, self.commandTimeout, nil, true)
410 if not r then
411 log.info("mqtt.client:subscribe", "wait ack failed")
412 return false
415 if not (packet.grantedQos and packet.grantedQos~="" and not packet.grantedQos:match(string.char(0x80))) then
416 log.info("mqtt.client:subscribe", "suback grant qos error", packet.grantedQos)
417 return false
420 return true
423 --- 取消订阅主题
424 -- @param topic,string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码
425 -- @return bool true表示成功,false或者nil表示失败
426 -- @usage
427 -- mqttc:unsubscribe("/abc") -- unsubscribe topic "/abc"
428 -- mqttc:unsubscribe({"/topic1", "/topic2", "/topic3"}) -- unsubscribe multi topic
429 function mqttc:unsubscribe(topic)
430 if not self.connected then
431 log.info("mqtt.client:unsubscribe", "not connected")
432 return false
435 local topics
436 if type(topic) == "string" then
437 topics = {topic}
438 else
439 topics = topic
442 if not self:write(packUNSUBSCRIBE(0, self.getNextPacketId(), topics)) then
443 log.info("mqtt.client:unsubscribe", "send failed")
444 return false
447 if not self:waitfor(UNSUBACK, self.commandTimeout, nil, true) then
448 log.info("mqtt.client:unsubscribe", "wait ack failed")
449 return false
452 return true
455 --- 发布一条消息
456 -- @string topic UTF8编码的字符串
457 -- @string payload 用户自己控制payload的编码,mqtt.lua不会对payload做任何编码转换
458 -- @number[opt=0] qos 0/1/2, default 0
459 -- @number[opt=0] retain 0或者1
460 -- @return bool 发布成功返回true,失败返回false
461 -- @usage
462 -- mqttc = mqtt.client("clientid-123", nil, nil, false)
463 -- mqttc:connect("mqttserver.com", 1883, "tcp")
464 -- mqttc:publish("/topic", "publish from luat mqtt client", 0)
465 function mqttc:publish(topic, payload, qos, retain)
466 if not self.connected then
467 log.info("mqtt.client:publish", "not connected")
468 return false
471 qos = qos or 0
472 retain = retain or 0
474 if not self:write(packPUBLISH(0, qos, retain, qos > 0 and self.getNextPacketId() or 0, topic, payload)) then
475 log.info("mqtt.client:publish", "socket send failed")
476 return false
479 if qos == 0 then return true end
481 if not self:waitfor(qos == 1 and PUBACK or PUBCOMP, self.commandTimeout, nil, true) then
482 log.warn("mqtt.client:publish", "wait ack timeout")
483 return false
486 return true
489 --- 接收消息
490 -- @number timeout 接收超时时间,单位毫秒
491 -- @string[opt=nil] msg 可选参数,控制socket所在的线程退出recv阻塞状态
492 -- @return result 数据接收结果,true表示成功,false表示失败
493 -- @return data
494 -- 如果result为true,表示服务器发过来的mqtt包
496 -- 如果result为false,超时失败,data为"timeout"
497 -- 如果result为false,msg控制退出,data为msg的字符串
498 -- 如果result为false,socket连接被动断开控制退出,data为"CLOSED"
499 -- 如果result为false,PDP断开连接控制退出,data为"IP_ERROR_IND"
501 -- 如果result为false,mqtt不处于连接状态,data为nil
502 -- 如果result为false,收到了PUBLISH报文,发送PUBACK或者PUBREC报文失败,data为nil
503 -- 如果result为false,收到了PUBREC报文,发送PUBREL报文失败,data为nil
504 -- 如果result为false,收到了PUBREL报文,发送PUBCOMP报文失败,data为nil
505 -- 如果result为false,发送PINGREQ报文失败,data为nil
506 -- @return param 如果是msg控制退出,param的值是msg的参数;其余情况无意义,为nil
507 -- @usage
508 -- true, packet = mqttc:receive(2000)
509 -- false, error_message = mqttc:receive(2000)
510 -- false, msg, para = mqttc:receive(2000,"APP_SEND_DATA")
511 function mqttc:receive(timeout, msg)
512 if not self.connected then
513 log.info("mqtt.client:receive", "not connected")
514 return false
517 return self:waitfor(PUBLISH, timeout, msg)
520 --- 断开与服务器的连接
521 -- @return nil
522 -- @usage
523 -- mqttc = mqtt.client("clientid-123", nil, nil, false)
524 -- mqttc:connect("mqttserver.com", 1883, "tcp")
525 -- process data
526 -- mqttc:disconnect()
527 function mqttc:disconnect()
528 if self.io then
529 if self.connected then self:write(packZeroData(DISCONNECT)) end
530 self.io:close()
531 self.io = nil
533 self.cache = {}
534 self.inbuf = ""
535 self.connected = false
539 return mqtt