Advertisement
Guest User

mqtt.lua

a guest
Mar 6th, 2018
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Lua 29.12 KB | None | 0 0
  1. ---
  2. -- @module paho.mqtt
  3. -- ~~~~~~~~~~~~~~~~
  4. -- Version: 0.3 2014-10-06
  5. -- -------------------------------------------------------------------------- --
  6. -- Copyright (c) 2011-2012 Geekscape Pty. Ltd.
  7. -- All rights reserved. This program and the accompanying materials
  8. -- are made available under the terms of the Eclipse Public License v1.0
  9. -- which accompanies this distribution, and is available at
  10. -- http://www.eclipse.org/legal/epl-v10.html
  11. --
  12. -- Contributors:
  13. --    Andy Gelme    - Initial API and implementation
  14. --    Kevin KIN-FOO - Authentication and rockspec
  15. -- -------------------------------------------------------------------------- --
  16. --
  17. -- Documentation
  18. -- ~~~~~~~~~~~~~
  19. -- Paho MQTT Lua website
  20. --   http://eclipse.org/paho/
  21. --
  22. -- References
  23. -- ~~~~~~~~~~
  24. -- MQTT Community
  25. --   http://mqtt.org
  26.  
  27. -- MQTT protocol specification 3.1
  28. --   https://www.ibm.com/developerworks/webservices/library/ws-mqtt
  29. --   http://mqtt.org/wiki/doku.php/mqtt_protocol   # Clarifications
  30. --
  31. -- Notes
  32. -- ~~~~~
  33. -- - Always assumes MQTT connection "clean session" enabled.
  34. -- - Supports connection last will and testament message.
  35. -- - Does not support connection username and password.
  36. -- - Fixed message header byte 1, only implements the "message type".
  37. -- - Only supports QOS level 0.
  38. -- - Maximum payload length is 268,435,455 bytes (as per specification).
  39. -- - Publish message doesn't support "message identifier".
  40. -- - Subscribe acknowledgement messages don't check granted QOS level.
  41. -- - Outstanding subscribe acknowledgement messages aren't escalated.
  42. -- - Works on the Sony PlayStation Portable (aka Sony PSP) ...
  43. --     See http://en.wikipedia.org/wiki/Lua_Player_HM
  44. --
  45. -- ToDo
  46. -- ~~~~
  47. -- * Consider when payload needs to be an array of bytes (not characters).
  48. -- * Maintain both "last_activity_out" and "last_activity_in".
  49. -- * - http://mqtt.org/wiki/doku.php/keepalive_for_the_client
  50. -- * Update "last_activity_in" when messages are received.
  51. -- * When a PINGREQ is sent, must check for a PINGRESP, within KEEP_ALIVE_TIME..
  52. --   * Otherwise, fail the connection.
  53. -- * When connecting, wait for CONACK, until KEEP_ALIVE_TIME, before failing.
  54. -- * Should MQTT.client:connect() be asynchronous with a callback ?
  55. -- * Review all public APIs for asynchronous callback behaviour.
  56. -- * Implement parse PUBACK message.
  57. -- * Handle failed subscriptions, i.e no subscription acknowledgement received.
  58. -- * Fix problem when KEEP_ALIVE_TIME is short, e.g. mqtt_publish -k 1
  59. --     MQTT.client:handler(): Message length mismatch
  60. -- - On socket error, optionally try reconnection to MQTT server.
  61. -- - Consider use of assert() and pcall() ?
  62. -- - Only expose public API functions, don't expose internal API functions.
  63. -- - Refactor "if self.connected()" to "self.checkConnected(error_message)".
  64. -- - Maintain and publish messaging statistics.
  65. -- - Memory heap/stack monitoring.
  66. -- - When debugging, why isn't mosquitto sending back CONACK error code ?
  67. -- - Subscription callbacks invoked by topic name (including wildcards).
  68. -- - Implement asynchronous state machine, rather than single-thread waiting.
  69. --   - After CONNECT, expect and wait for a CONACK.
  70. -- - Implement complete MQTT broker (server).
  71. -- - Consider using Copas http://keplerproject.github.com/copas/manual.html
  72. -- ------------------------------------------------------------------------- --
  73.  
  74. function isPsp() return(Socket ~= nil) end
  75.  
  76. if (not isPsp()) then
  77.   require("socket")
  78.   require("io")
  79.   require("ltn12")
  80. --require("ssl")
  81. end
  82.  
  83. local MQTT = {}
  84.  
  85. ---
  86. -- @field [parent = #paho.mqtt] mqtt.utility#utility Utility
  87. --
  88. MQTT.Utility = require "paho.utility"
  89.  
  90. ---
  91. -- @field [parent = #paho.mqtt] #number VERSION
  92. --
  93. MQTT.VERSION = 0x03
  94.  
  95. ---
  96. -- @field [parent = #paho.mqtt] #boolean ERROR_TERMINATE
  97. --
  98. MQTT.ERROR_TERMINATE = false      -- Message handler errors terminate process ?
  99.  
  100. ---
  101. -- @field [parent = #paho.mqtt] #string DEFAULT_BROKER_HOSTNAME
  102. --
  103. MQTT.DEFAULT_BROKER_HOSTNAME = "m2m.eclipse.org"
  104.  
  105. ---
  106. -- An MQTT client
  107. -- @type client
  108.  
  109. ---
  110. -- @field [parent = #paho.mqtt] #client client
  111. --
  112. MQTT.client = {}
  113. MQTT.client.__index = MQTT.client
  114.  
  115. ---
  116. -- @field [parent = #client] #number DEFAULT_PORT
  117. --
  118. MQTT.client.DEFAULT_PORT       = 1883
  119.  
  120. ---
  121. -- @field [parent = #client] #number KEEP_ALIVE_TIME
  122. --
  123. MQTT.client.KEEP_ALIVE_TIME    =   60  -- seconds (maximum is 65535)
  124.  
  125. ---
  126. -- @field [parent = #client] #number MAX_PAYLOAD_LENGTH
  127. --
  128. MQTT.client.MAX_PAYLOAD_LENGTH = 268435455 -- bytes
  129.  
  130. -- MQTT 3.1 Specification: Section 2.1: Fixed header, Message type
  131.  
  132. ---
  133. -- @field [parent = #paho.mqtt] message
  134. --
  135. MQTT.message = {}
  136. MQTT.message.TYPE_RESERVED    = 0x00
  137. MQTT.message.TYPE_CONNECT     = 0x01
  138. MQTT.message.TYPE_CONACK      = 0x02
  139. MQTT.message.TYPE_PUBLISH     = 0x03
  140. MQTT.message.TYPE_PUBACK      = 0x04
  141. MQTT.message.TYPE_PUBREC      = 0x05
  142. MQTT.message.TYPE_PUBREL      = 0x06
  143. MQTT.message.TYPE_PUBCOMP     = 0x07
  144. MQTT.message.TYPE_SUBSCRIBE   = 0x08
  145. MQTT.message.TYPE_SUBACK      = 0x09
  146. MQTT.message.TYPE_UNSUBSCRIBE = 0x0a
  147. MQTT.message.TYPE_UNSUBACK    = 0x0b
  148. MQTT.message.TYPE_PINGREQ     = 0x0c
  149. MQTT.message.TYPE_PINGRESP    = 0x0d
  150. MQTT.message.TYPE_DISCONNECT  = 0x0e
  151. MQTT.message.TYPE_RESERVED    = 0x0f
  152.  
  153. -- MQTT 3.1 Specification: Section 3.2: CONACK acknowledge connection errors
  154. -- http://mqtt.org/wiki/doku.php/extended_connack_codes
  155.  
  156. MQTT.CONACK = {}
  157. MQTT.CONACK.error_message = {          -- CONACK return code used as the index
  158.   "Unacceptable protocol version",
  159.   "Identifer rejected",
  160.   "Server unavailable",
  161.   "Bad user name or password",
  162.   "Not authorized"
  163. --"Invalid will topic"                 -- Proposed
  164. }
  165.  
  166. -- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  167. -- Create an MQTT client instance
  168. -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  169.  
  170. ---
  171. -- Create an MQTT client instance.
  172. -- @param #string hostname Host name or address of the MQTT broker
  173. -- @param #number port Port number of the MQTT broker (default: 1883)
  174. -- @param #function callback Invoked when subscribed topic messages received
  175. -- @function [parent = #client] create
  176. -- @return #client created client
  177. --
  178. function MQTT.client.create(                                      -- Public API
  179.   hostname,  -- string:   Host name or address of the MQTT broker
  180.   port,      -- integer:  Port number of the MQTT broker (default: 1883)
  181.   callback)  -- function: Invoked when subscribed topic messages received
  182.              -- return:   mqtt_client table
  183.  
  184.   local mqtt_client = {}
  185.  
  186.   setmetatable(mqtt_client, MQTT.client)
  187.  
  188.   mqtt_client.callback = callback  -- function(topic, payload)
  189.   mqtt_client.hostname = hostname
  190.   mqtt_client.port     = port or MQTT.client.DEFAULT_PORT
  191.  
  192.   mqtt_client.connected     = false
  193.   mqtt_client.destroyed     = false
  194.   mqtt_client.last_activity = 0
  195.   mqtt_client.message_id    = 0
  196.   mqtt_client.outstanding   = {}
  197.   mqtt_client.socket_client = nil
  198.  
  199.   return(mqtt_client)
  200. end
  201.  
  202. --------------------------------------------------------------------------------
  203. -- Specify username and password before #client.connect
  204. --
  205. -- If called with empty _username_ or _password_, connection flags will be set
  206. -- but no string will be appended to payload.
  207. --
  208. -- @function [parent = #client] auth
  209. -- @param self
  210. -- @param #string username Name of the user who is connecting. It is recommended
  211. --                         that user names are kept to 12 characters.
  212. -- @param #string password Password corresponding to the user who is connecting.
  213. function MQTT.client.auth(self, username, password)
  214.   -- When no string is provided, remember current call to set flags
  215.   self.username = username or true
  216.   self.password = password or true
  217. end
  218.  
  219. --------------------------------------------------------------------------------
  220. -- Transmit MQTT Client request a connection to an MQTT broker (server).
  221. -- MQTT 3.1 Specification: Section 3.1: CONNECT
  222. -- @param self
  223. -- @param #string identifier MQTT client identifier (maximum 23 characters)
  224. -- @param #string will_topic Last will and testament topic
  225. -- @param #string will_qos Last will and testament Quality Of Service
  226. -- @param #string will_retain Last will and testament retention status
  227. -- @param #string will_message Last will and testament message
  228. -- @function [parent = #client] connect
  229. --
  230. function MQTT.client:connect(                                     -- Public API
  231.   identifier,    -- string: MQTT client identifier (maximum 23 characters)
  232.   will_topic,    -- string: Last will and testament topic
  233.   will_qos,      -- byte:   Last will and testament Quality Of Service
  234.   will_retain,   -- byte:   Last will and testament retention status
  235.   will_message)  -- string: Last will and testament message
  236.                  -- return: nil or error message
  237.  
  238.   if (self.connected) then
  239.     return("MQTT.client:connect(): Already connected")
  240.   end
  241.  
  242.   MQTT.Utility.debug("MQTT.client:connect(): " .. identifier)
  243.  
  244.   self.socket_client = socket.connect(self.hostname, self.port)
  245.  
  246.   if (self.socket_client == nil) then
  247.     return("MQTT.client:connect(): Couldn't open MQTT broker connection")
  248.   end
  249.  
  250.   MQTT.Utility.socket_wait_connected(self.socket_client)
  251.  
  252.   self.connected = true
  253.  
  254. -- Construct CONNECT variable header fields (bytes 1 through 9)
  255. -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  256.   local payload
  257.   payload = MQTT.client.encode_utf8("MQIsdp")
  258.   payload = payload .. string.char(MQTT.VERSION)
  259.  
  260. -- Connect flags (byte 10)
  261. -- ~~~~~~~~~~~~~
  262. -- bit    7: Username flag =  0  -- recommended no more than 12 characters
  263. -- bit    6: Password flag =  0  -- ditto
  264. -- bit    5: Will retain   =  0
  265. -- bits 4,3: Will QOS      = 00
  266. -- bit    2: Will flag     =  0
  267. -- bit    1: Clean session =  1
  268. -- bit    0: Unused        =  0
  269.  
  270.   local username = self.username and 0x80 or 0
  271.   local password = self.password and 0x40 or 0
  272.   local flags    = username + password
  273.  
  274.   if (will_topic == nil) then
  275.     -- Clean session, no last will
  276.     flags = flags + 0x02
  277.   else
  278.     flags = flags + MQTT.Utility.shift_left(will_retain, 5)
  279.     flags = flags + MQTT.Utility.shift_left(will_qos, 3)
  280.     -- Last will and clean session
  281.     flags = flags + 0x04 + 0x02
  282.   end
  283.   payload = payload .. string.char(flags)
  284.  
  285. -- Keep alive timer (bytes 11 LSB and 12 MSB, unit is seconds)
  286. -- ~~~~~~~~~~~~~~~~~
  287.   payload = payload .. string.char(math.floor(MQTT.client.KEEP_ALIVE_TIME / 256))
  288.   payload = payload .. string.char(MQTT.client.KEEP_ALIVE_TIME % 256)
  289.  
  290. -- Client identifier
  291. -- ~~~~~~~~~~~~~~~~~
  292.   payload = payload .. MQTT.client.encode_utf8(identifier)
  293.  
  294. -- Last will and testament
  295. -- ~~~~~~~~~~~~~~~~~~~~~~~
  296.   if (will_topic ~= nil) then
  297.     payload = payload .. MQTT.client.encode_utf8(will_topic)
  298.     payload = payload .. MQTT.client.encode_utf8(will_message)
  299.   end
  300.  
  301.   -- Username and password
  302.   -- ~~~~~~~~~~~~~~~~~~~~~
  303.   if type(self.username) == 'string' then
  304.     payload = payload .. MQTT.client.encode_utf8(self.username)
  305.   end
  306.   if type(self.password) == 'string' then
  307.     payload = payload .. MQTT.client.encode_utf8(self.password)
  308.   end
  309.  
  310. -- Send MQTT message
  311. -- ~~~~~~~~~~~~~~~~~
  312.   return(self:message_write(MQTT.message.TYPE_CONNECT, payload))
  313. end
  314.  
  315. --- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  316. -- Destroy an MQTT client instance.
  317. -- @param self
  318. -- @function [parent = #client] destroy
  319. --
  320. function MQTT.client:destroy()                                    -- Public API
  321.   MQTT.Utility.debug("MQTT.client:destroy()")
  322.  
  323.   if (self.destroyed == false) then
  324.     self.destroyed = true         -- Avoid recursion when message_write() fails
  325.  
  326.     if (self.connected) then self:disconnect() end
  327.  
  328.     self.callback = nil
  329.     self.outstanding = nil
  330.   end
  331. end
  332.  
  333. --- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  334. -- Transmit MQTT Disconnect message.
  335. -- MQTT 3.1 Specification: Section 3.14: Disconnect notification
  336. -- bytes 1,2: Fixed message header, see MQTT.client:message_write()
  337. -- @param self
  338. -- @function [parent = #client] disconnect
  339. --
  340. function MQTT.client:disconnect()                                 -- Public API
  341.   MQTT.Utility.debug("MQTT.client:disconnect()")
  342.  
  343.   if (self.connected) then
  344.     self:message_write(MQTT.message.TYPE_DISCONNECT, nil)
  345.     self.socket_client:close()
  346.     self.connected = false
  347.   else
  348.     error("MQTT.client:disconnect(): Already disconnected")
  349.   end
  350. end
  351.  
  352. -- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  353. -- Encode a message string using UTF-8 (for variable header)
  354. -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  355. -- MQTT 3.1 Specification: Section 2.5: MQTT and UTF-8
  356. --
  357. -- byte  1:   String length MSB
  358. -- byte  2:   String length LSB
  359. -- bytes 3-n: String encoded as UTF-8
  360.  
  361. function MQTT.client.encode_utf8(                               -- Internal API
  362.   input)  -- string
  363.  
  364.   local output
  365.   output = string.char(math.floor(#input / 256))
  366.   output = output .. string.char(#input % 256)
  367.   output = output .. input
  368.  
  369.   return(output)
  370. end
  371.  
  372. --- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  373. -- Handle received messages and maintain keep-alive PING messages.
  374. -- This function must be invoked periodically (more often than the
  375. -- `MQTT.client.KEEP_ALIVE_TIME`) which maintains the connection and
  376. -- services the incoming subscribed topic messages
  377. -- @param self
  378. -- @function [parent = #client] handler
  379. --
  380. function MQTT.client:handler()                                    -- Public API
  381.   if (self.connected == false) then
  382.     error("MQTT.client:handler(): Not connected")
  383.   end
  384.  
  385.   MQTT.Utility.debug("MQTT.client:handler()")
  386.  
  387. -- Transmit MQTT PING message
  388. -- ~~~~~~~~~~~~~~~~~~~~~~~~~~
  389. -- MQTT 3.1 Specification: Section 3.13: PING request
  390. --
  391. -- bytes 1,2: Fixed message header, see MQTT.client:message_write()
  392.  
  393.   local activity_timeout = self.last_activity + MQTT.client.KEEP_ALIVE_TIME
  394.  
  395.   if (MQTT.Utility.get_time() > activity_timeout) then
  396.     MQTT.Utility.debug("MQTT.client:handler(): PINGREQ")
  397.  
  398.     self:message_write(MQTT.message.TYPE_PINGREQ, nil)
  399.   end
  400.  
  401. -- Check for available client socket data
  402. -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  403.   local ready = MQTT.Utility.socket_ready(self.socket_client)
  404.  
  405.   if (ready) then
  406.     local error_message, buffer =
  407.       MQTT.Utility.socket_receive(self.socket_client)
  408.  
  409.     if (error_message ~= nil) then
  410.       self:destroy()
  411.       error_message = "socket_client:receive(): " .. error_message
  412.       MQTT.Utility.debug(error_message)
  413.       return(error_message)
  414.     end
  415.  
  416.     if (buffer ~= nil and #buffer > 0) then
  417.       local index = 1
  418.  
  419.       -- Parse individual messages (each must be at least 2 bytes long)
  420.       -- Decode "remaining length" (MQTT v3.1 specification pages 6 and 7)
  421.  
  422.       while (index < #buffer) do
  423.         local message_type_flags = string.byte(buffer, index)
  424.         local multiplier = 1
  425.         local remaining_length = 0
  426.  
  427.         repeat
  428.           index = index + 1
  429.           local digit = string.byte(buffer, index)
  430.           remaining_length = remaining_length + ((digit % 128) * multiplier)
  431.           multiplier = multiplier * 128
  432.         until digit < 128                              -- check continuation bit
  433.  
  434.         local message = string.sub(buffer, index + 1, index + remaining_length)
  435.  
  436.         if (#message == remaining_length) then
  437.           self:parse_message(message_type_flags, remaining_length, message)
  438.         else
  439.           MQTT.Utility.debug(
  440.             "MQTT.client:handler(): Incorrect remaining length: " ..
  441.             remaining_length .. " ~= message length: " .. #message
  442.           )
  443.         end
  444.  
  445.         index = index + remaining_length + 1
  446.       end
  447.  
  448.       -- Check for any left over bytes, i.e. partial message received
  449.  
  450.       if (index ~= (#buffer + 1)) then
  451.         local error_message =
  452.           "MQTT.client:handler(): Partial message received" ..
  453.           index .. " ~= " .. (#buffer + 1)
  454.  
  455.         if (MQTT.ERROR_TERMINATE) then         -- TODO: Refactor duplicate code
  456.           self:destroy()
  457.           error(error_message)
  458.         else
  459.           MQTT.Utility.debug(error_message)
  460.         end
  461.       end
  462.     end
  463.   end
  464.  
  465.   return(nil)
  466. end
  467.  
  468. -- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  469. -- Transmit an MQTT message
  470. -- ~~~~~~~~~~~~~~~~~~~~~~~~
  471. -- MQTT 3.1 Specification: Section 2.1: Fixed header
  472. --
  473. -- byte  1:   Message type and flags (DUP, QOS level, and Retain) fields
  474. -- bytes 2-5: Remaining length field (between one and four bytes long)
  475. -- bytes m- : Optional variable header and payload
  476.  
  477. function MQTT.client:message_write(                             -- Internal API
  478.   message_type,  -- enumeration
  479.   payload)       -- string
  480.                  -- return: nil or error message
  481.  
  482. -- TODO: Complete implementation of fixed header byte 1
  483.  
  484.   local message = string.char(MQTT.Utility.shift_left(message_type, 4))
  485.  
  486.   if (payload == nil) then
  487.     message = message .. string.char(0)  -- Zero length, no payload
  488.   else
  489.     if (#payload > MQTT.client.MAX_PAYLOAD_LENGTH) then
  490.       return(
  491.         "MQTT.client:message_write(): Payload length = " .. #payload ..
  492.         " exceeds maximum of " .. MQTT.client.MAX_PAYLOAD_LENGTH
  493.       )
  494.     end
  495.  
  496.     -- Encode "remaining length" (MQTT v3.1 specification pages 6 and 7)
  497.  
  498.     local remaining_length = #payload
  499.  
  500.     repeat
  501.       local digit = remaining_length % 128
  502.       remaining_length = math.floor(remaining_length / 128)
  503.       if (remaining_length > 0) then digit = digit + 128 end -- continuation bit
  504.       message = message .. string.char(digit)
  505.     until remaining_length == 0
  506.  
  507.     message = message .. payload
  508.   end
  509.  
  510.   local status, error_message = self.socket_client:send(message)
  511.  
  512.   if (status == nil) then
  513.     self:destroy()
  514.     return("MQTT.client:message_write(): " .. error_message)
  515.   end
  516.  
  517.   self.last_activity = MQTT.Utility.get_time()
  518.   return(nil)
  519. end
  520.  
  521. -- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  522. -- Parse MQTT message
  523. -- ~~~~~~~~~~~~~~~~~~
  524. -- MQTT 3.1 Specification: Section 2.1: Fixed header
  525. --
  526. -- byte  1:   Message type and flags (DUP, QOS level, and Retain) fields
  527. -- bytes 2-5: Remaining length field (between one and four bytes long)
  528. -- bytes m- : Optional variable header and payload
  529. --
  530. -- The message type/flags and remaining length are already parsed and
  531. -- removed from the message by the time this function is invoked.
  532. -- Leaving just the optional variable header and payload.
  533.  
  534. function MQTT.client:parse_message(                             -- Internal API
  535.   message_type_flags,  -- byte
  536.   remaining_length,    -- integer
  537.   message)             -- string: Optional variable header and payload
  538.  
  539.   local message_type = MQTT.Utility.shift_right(message_type_flags, 4)
  540.  
  541. -- TODO: MQTT.message.TYPE table should include "parser handler" function.
  542. --       This would nicely collapse the if .. then .. elseif .. end.
  543.  
  544.   if (message_type == MQTT.message.TYPE_CONACK) then
  545.     self:parse_message_conack(message_type_flags, remaining_length, message)
  546.  
  547.   elseif (message_type == MQTT.message.TYPE_PUBLISH) then
  548.     self:parse_message_publish(message_type_flags, remaining_length, message)
  549.  
  550.   elseif (message_type == MQTT.message.TYPE_PUBACK) then
  551.     print("MQTT.client:parse_message(): PUBACK -- UNIMPLEMENTED --")    -- TODO
  552.  
  553.   elseif (message_type == MQTT.message.TYPE_SUBACK) then
  554.     self:parse_message_suback(message_type_flags, remaining_length, message)
  555.  
  556.   elseif (message_type == MQTT.message.TYPE_UNSUBACK) then
  557.     self:parse_message_unsuback(message_type_flags, remaining_length, message)
  558.  
  559.   elseif (message_type == MQTT.message.TYPE_PINGREQ) then
  560.     self:ping_response()
  561.  
  562.   elseif (message_type == MQTT.message.TYPE_PINGRESP) then
  563.     self:parse_message_pingresp(message_type_flags, remaining_length, message)
  564.  
  565.   else
  566.     local error_message =
  567.       "MQTT.client:parse_message(): Unknown message type: " .. message_type
  568.  
  569.     if (MQTT.ERROR_TERMINATE) then             -- TODO: Refactor duplicate code
  570.       self:destroy()
  571.       error(error_message)
  572.     else
  573.       MQTT.Utility.debug(error_message)
  574.     end
  575.   end
  576. end
  577.  
  578. -- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  579. -- Parse MQTT CONACK message
  580. -- ~~~~~~~~~~~~~~~~~~~~~~~~~
  581. -- MQTT 3.1 Specification: Section 3.2: CONACK Acknowledge connection
  582. --
  583. -- byte 1: Reserved value
  584. -- byte 2: Connect return code, see MQTT.CONACK.error_message[]
  585.  
  586. function MQTT.client:parse_message_conack(                      -- Internal API
  587.   message_type_flags,  -- byte
  588.   remaining_length,    -- integer
  589.   message)             -- string
  590.  
  591.   local me = "MQTT.client:parse_message_conack()"
  592.   MQTT.Utility.debug(me)
  593.  
  594.   if (remaining_length ~= 2) then
  595.     error(me .. ": Invalid remaining length")
  596.   end
  597.  
  598.   local return_code = string.byte(message, 2)
  599.  
  600.   if (return_code ~= 0) then
  601.     local error_message = "Unknown return code"
  602.  
  603.     if (return_code <= table.getn(MQTT.CONACK.error_message)) then
  604.       error_message = MQTT.CONACK.error_message[return_code]
  605.     end
  606.  
  607.     error(me .. ": Connection refused: " .. error_message)
  608.   end
  609. end
  610.  
  611. -- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  612. -- Parse MQTT PINGRESP message
  613. -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  614. -- MQTT 3.1 Specification: Section 3.13: PING response
  615.  
  616. function MQTT.client:parse_message_pingresp(                    -- Internal API
  617.   message_type_flags,  -- byte
  618.   remaining_length,    -- integer
  619.   message)             -- string
  620.  
  621.   local me = "MQTT.client:parse_message_pingresp()"
  622.   MQTT.Utility.debug(me)
  623.  
  624.   if (remaining_length ~= 0) then
  625.     error(me .. ": Invalid remaining length")
  626.   end
  627.  
  628. -- ToDo: self.ping_response_outstanding = false
  629. end
  630.  
  631. -- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  632. -- Parse MQTT PUBLISH message
  633. -- ~~~~~~~~~~~~~~~~~~~~~~~~~~
  634. -- MQTT 3.1 Specification: Section 3.3: Publish message
  635. --
  636. -- Variable header ..
  637. -- bytes 1- : Topic name and optional Message Identifier (if QOS > 0)
  638. -- bytes m- : Payload
  639.  
  640. function MQTT.client:parse_message_publish(                     -- Internal API
  641.   message_type_flags,  -- byte
  642.   remaining_length,    -- integer
  643.   message)             -- string
  644.  
  645.   local me = "MQTT.client:parse_message_publish()"
  646.   MQTT.Utility.debug(me)
  647.  
  648.   if (self.callback ~= nil) then
  649.     if (remaining_length < 3) then
  650.       error(me .. ": Invalid remaining length: " .. remaining_length)
  651.     end
  652.  
  653.     local topic_length = string.byte(message, 1) * 256
  654.     topic_length = topic_length + string.byte(message, 2)
  655.     local topic  = string.sub(message, 3, topic_length + 2)
  656.     local index  = topic_length + 3
  657.  
  658. -- Handle optional Message Identifier, for QOS levels 1 and 2
  659. -- TODO: Enable Subscribe with QOS and deal with PUBACK, etc.
  660.  
  661.     local qos = MQTT.Utility.shift_right(message_type_flags, 1) % 3
  662.  
  663.     if (qos > 0) then
  664.       local message_id = string.byte(message, index) * 256
  665.       message_id = message_id + string.byte(message, index + 1)
  666.       index = index + 2
  667.     end
  668.  
  669.     local payload_length = remaining_length - index + 1
  670.     local payload = string.sub(message, index, index + payload_length - 1)
  671.  
  672.     self.callback(topic, payload)
  673.   end
  674. end
  675.  
  676. -- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  677. -- Parse MQTT SUBACK message
  678. -- ~~~~~~~~~~~~~~~~~~~~~~~~~
  679. -- MQTT 3.1 Specification: Section 3.9: SUBACK Subscription acknowledgement
  680. --
  681. -- bytes 1,2: Message Identifier
  682. -- bytes 3- : List of granted QOS for each subscribed topic
  683.  
  684. function MQTT.client:parse_message_suback(                      -- Internal API
  685.   message_type_flags,  -- byte
  686.   remaining_length,    -- integer
  687.   message)             -- string
  688.  
  689.   local me = "MQTT.client:parse_message_suback()"
  690.   MQTT.Utility.debug(me)
  691.  
  692.   if (remaining_length < 3) then
  693.     error(me .. ": Invalid remaining length: " .. remaining_length)
  694.   end
  695.  
  696.   local message_id  = string.byte(message, 1) * 256 + string.byte(message, 2)
  697.   local outstanding = self.outstanding[message_id]
  698.  
  699.   if (outstanding == nil) then
  700.     error(me .. ": No outstanding message: " .. message_id)
  701.   end
  702.  
  703.   self.outstanding[message_id] = nil
  704.  
  705.   if (outstanding[1] ~= "subscribe") then
  706.     error(me .. ": Outstanding message wasn't SUBSCRIBE")
  707.   end
  708.  
  709.   local topic_count = table.getn(outstanding[2])
  710.  
  711.   if (topic_count ~= remaining_length - 2) then
  712.     error(me .. ": Didn't received expected number of topics: " .. topic_count)
  713.   end
  714. end
  715.  
  716. -- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  717. -- Parse MQTT UNSUBACK message
  718. -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  719. -- MQTT 3.1 Specification: Section 3.11: UNSUBACK Unsubscription acknowledgement
  720. --
  721. -- bytes 1,2: Message Identifier
  722.  
  723. function MQTT.client:parse_message_unsuback(                    -- Internal API
  724.   message_type_flags,  -- byte
  725.   remaining_length,    -- integer
  726.   message)             -- string
  727.  
  728.   local me = "MQTT.client:parse_message_unsuback()"
  729.   MQTT.Utility.debug(me)
  730.  
  731.   if (remaining_length ~= 2) then
  732.     error(me .. ": Invalid remaining length")
  733.   end
  734.  
  735.   local message_id = string.byte(message, 1) * 256 + string.byte(message, 2)
  736.  
  737.   local outstanding = self.outstanding[message_id]
  738.  
  739.   if (outstanding == nil) then
  740.     error(me .. ": No outstanding message: " .. message_id)
  741.   end
  742.  
  743.   self.outstanding[message_id] = nil
  744.  
  745.   if (outstanding[1] ~= "unsubscribe") then
  746.     error(me .. ": Outstanding message wasn't UNSUBSCRIBE")
  747.   end
  748. end
  749.  
  750. -- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  751. -- Transmit MQTT Ping response message
  752. -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  753. -- MQTT 3.1 Specification: Section 3.13: PING response
  754.  
  755. function MQTT.client:ping_response()                            -- Internal API
  756.   MQTT.Utility.debug("MQTT.client:ping_response()")
  757.  
  758.   if (self.connected == false) then
  759.     error("MQTT.client:ping_response(): Not connected")
  760.   end
  761.  
  762.   self:message_write(MQTT.message.TYPE_PINGRESP, nil)
  763. end
  764.  
  765. --- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  766. -- Transmit MQTT Publish message.
  767. -- MQTT 3.1 Specification: Section 3.3: Publish message
  768. --
  769. -- * bytes 1,2: Fixed message header, see MQTT.client:message_write()
  770. --            Variable header ..
  771. -- * bytes 3- : Topic name and optional Message Identifier (if QOS > 0)
  772. -- * bytes m- : Payload
  773. -- @param self
  774. -- @param #string topic
  775. -- @param #string payload
  776. -- @function [parent = #client] publish
  777. --
  778. function MQTT.client:publish(                                     -- Public API
  779.   topic,    -- string
  780.   payload)  -- string
  781.  
  782.   if (self.connected == false) then
  783.     error("MQTT.client:publish(): Not connected")
  784.   end
  785.  
  786.   MQTT.Utility.debug("MQTT.client:publish(): " .. topic)
  787.  
  788.   local message = MQTT.client.encode_utf8(topic) .. payload
  789.  
  790.   self:message_write(MQTT.message.TYPE_PUBLISH, message)
  791. end
  792.  
  793. --- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  794. -- Transmit MQTT Subscribe message.
  795. -- MQTT 3.1 Specification: Section 3.8: Subscribe to named topics
  796. --
  797. -- * bytes 1,2: Fixed message header, see MQTT.client:message_write()
  798. --            Variable header ..
  799. -- * bytes 3,4: Message Identifier
  800. -- * bytes 5- : List of topic names and their QOS level
  801. -- @param self
  802. -- @param #string topics table of strings
  803. -- @function [parent = #client] subscribe
  804. --
  805. function MQTT.client:subscribe(                                   -- Public API
  806.   topics)  -- table of strings
  807.  
  808.   if (self.connected == false) then
  809.     error("MQTT.client:subscribe(): Not connected")
  810.   end
  811.  
  812.   self.message_id = self.message_id + 1
  813.  
  814.   local message
  815.   message = string.char(math.floor(self.message_id / 256))
  816.   message = message .. string.char(self.message_id % 256)
  817.  
  818.   for index, topic in ipairs(topics) do
  819.     MQTT.Utility.debug("MQTT.client:subscribe(): " .. topic)
  820.     message = message .. MQTT.client.encode_utf8(topic)
  821.     message = message .. string.char(0)  -- QOS level 0
  822.   end
  823.  
  824.   self:message_write(MQTT.message.TYPE_SUBSCRIBE, message)
  825.  
  826.   self.outstanding[self.message_id] = { "subscribe", topics }
  827. end
  828.  
  829. --- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
  830. -- Transmit MQTT Unsubscribe message
  831. -- MQTT 3.1 Specification: Section 3.10: Unsubscribe from named topics
  832. --
  833. -- * bytes 1,2: Fixed message header, see MQTT.client:message_write()
  834. --            Variable header ..
  835. -- * bytes 3,4: Message Identifier
  836. -- * bytes 5- : List of topic names
  837. -- @param self
  838. -- @param #string topics table of strings
  839. -- @function [parent = #client] unsubscribe
  840. --
  841. function MQTT.client:unsubscribe(                                 -- Public API
  842.   topics)  -- table of strings
  843.  
  844.   if (self.connected == false) then
  845.     error("MQTT.client:unsubscribe(): Not connected")
  846.   end
  847.  
  848.   self.message_id = self.message_id + 1
  849.  
  850.   local message
  851.   message = string.char(math.floor(self.message_id / 256))
  852.   message = message .. string.char(self.message_id % 256)
  853.  
  854.   for index, topic in ipairs(topics) do
  855.     MQTT.Utility.debug("MQTT.client:unsubscribe(): " .. topic)
  856.     message = message .. MQTT.client.encode_utf8(topic)
  857.   end
  858.  
  859.   self:message_write(MQTT.message.TYPE_UNSUBSCRIBE, message)
  860.  
  861.   self.outstanding[self.message_id] = { "unsubscribe", topics }
  862. end
  863.  
  864. -- For ... MQTT = require 'paho.mqtt'
  865.  
  866. return(MQTT)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement