Advertisement
Guest User

Untitled

a guest
Jul 21st, 2016
111
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 30.64 KB | None | 0 0
  1. /// The MQTT Agent is always started by a <class>Net.MQTT.Client</class> in the background to manage the real communication with the broker.
  2. /// <p>The agent inherits its settings from the Client, which started it and it is responsible for:<ul>
  3. /// <li>Building the TCP connection to the broker.</li>
  4. /// <li>Executing the tasks created by the Client (and the Agent itself) based on the <class>Net.MQTT.Aux.TaskList</class> records.
  5. /// E.g.: sending various messages to the broker.</li>
  6. /// <li>Keeping the connection alive by sending PING requests to the broker, when no other messages has been sent for a certain amount of time.</li>
  7. /// <li>Listening for incoming messages from the broker and triggering appropriate actions based on the message type.</li>
  8. /// </ul></p>
  9. Class Net.MQTT.Agent Extends Net.MQTT.Client
  10. {
  11.  
  12. Property connected As %Boolean [ InitialExpression = 0 ];
  13.  
  14. Property device As %String;
  15.  
  16. Property saveIODev As %String;
  17.  
  18. Property transTable As %String [ InitialExpression = {##class(%IO.I.TranslationDevice).GetCharEncodingTable("UTF-8")} ];
  19.  
  20. Property lastMessage As %TimeStamp [ InitialExpression = {$ZDateTime($ZTimeStamp, 3, 1)} ];
  21.  
  22. Property lastPing As %TimeStamp;
  23.  
  24. Property fatalError As %Boolean [ InitialExpression = 0 ];
  25.  
  26. /// This method is called directly (as a background job) by the <class>Net.MQTT.Client</class> to start the Agent.
  27. /// <p><var>pInitialState</var> is the XML serialized content of the calling Client object, from which the agent can populate its own properties.</p>
  28. /// <p><var>pUsername</var> and <var>pPassword</var> are the optional credentials to be sent to the broker on connecting.</p>
  29. /// <p>The Agent builds the TCP connection to the broker, sends a <b><var>CONNECT</var></b> message and then starts to communicate with the broker.</p>
  30. ClassMethod StartListening(pInitialState As %String, pUsername As %String = "", pPassword As %String = "")
  31. {
  32. Set ret = ""
  33. Set reader = ##class(%XML.Reader).%New()
  34. Do reader.Correlate("root", ..%ClassName(1))
  35. Set tSC = reader.OpenString(pInitialState)
  36. If $$$ISOK(tSC) && reader.Next(.agent) {
  37. Set tSC = agent.DoCONNECT(pUsername, pPassword)
  38. If $$$ISOK(tSC) {
  39. Do agent.Listen()
  40. }
  41. Else {
  42. Set ret = $System.Status.GetErrorText(tSC)
  43. }
  44. }
  45. Else {
  46. Set ret = "MQTT Agent cannot start (missing initial state)"
  47. }
  48. }
  49.  
  50. /// The main function of the Agent. Unless a fatal error happens, it runs in an endless loop, doing onw of the followin things in every loop:<ul>
  51. /// <li>If a pending <class>Net.MQTT.Aux.TaskList</class> object can be found for the Agent's unique <property>connectionId</property>,
  52. /// it starts to execute the corresponding task, then signals the initiator of the task via the <class>%SYSTEM.Event</class> API.</li>
  53. /// <li>If no response has arrived from the broker for the last PING request within the <property>KeepAliveInterval</property>
  54. /// it treats the connection to the broker broken and stops.</li>
  55. /// <li>If no other messages has been sent to the broker for a certain amount of time (~ 80% of the ) <property>KeepAliveInterval</property>,
  56. /// it sends a PING request to keep the connection alive.</li>
  57. /// <li>Listens for incoming messages.</li>
  58. /// </ul>
  59. Method Listen() [ Internal, Private ]
  60. {
  61. Use ..device
  62. $$$MQTTTraceINF("Agent for " _ ..ClientId _ " to <" _ ..Host _ ":" _ ..Port _ "> starts listening")
  63. Do SetIO^%NLS("RAW")
  64. While 1 {
  65. If ..fatalError {
  66. Quit
  67. }
  68.  
  69. TRY {
  70. Set tSC = ##class(Net.MQTT.Aux.TaskList).AcquireNext(..connectionId, .taskId, .contextId, .action)
  71. If $$$ISOK(tSC) {
  72. If taskId '= "" {
  73. $$$MQTTTraceINF("Agent for " _ ..ClientId _ " to <" _ ..Host _ ":" _ ..Port _ "> processes task (" _taskId _ ")")
  74. Set tSC = $METHOD($this, "Do" _ action, taskId, contextId)
  75. Set tSC = ##class(Net.MQTT.Aux.TaskList).SignalTask(..connectionId, taskId, tSC, (''..debugMode))
  76. }
  77. ElseIf (..lastPing '= "") && ($System.SQL.DATEDIFF("s", ..lastPing, $ZDateTime($ZTimeStamp, 3, 1)) > ..KeepAliveInterval) {
  78. $$$MQTTTraceERR("PINGRESP message has not arrived from <" _ ..Host _ ":" _ ..Port _ "> within the timeout interval")
  79. Set ..fatalError = 1
  80. }
  81. ElseIf $System.SQL.DATEDIFF("s", ..lastMessage, $ZDateTime($ZTimeStamp, 3, 1)) > (..KeepAliveInterval * .8) {
  82. $$$MQTTTraceINF("Agent for " _ ..ClientId _ " to <" _ ..Host _ ":" _ ..Port _ "> sends ping")
  83. Set tSC = ..SendPINGREQ()
  84. If $$$ISERR(tSC) {
  85. $$$MQTTTraceERR("PINGREQ message cannot be sent to <" _ ..Host _ ":" _ ..Port _ ">")
  86. Set ..fatalError = 1
  87. }
  88. }
  89. Else {
  90. Set tSC = ..RecvMessage()
  91. }
  92. }
  93. }
  94. CATCH ex {
  95. Set tSC = ex.AsStatus()
  96. }
  97.  
  98. If $$$ISERR(tSC) {
  99. $$$MQTTTraceERR($System.Status.GetErrorText(tSC))
  100. }
  101. }
  102.  
  103. Do ..CloseDev()
  104. Use ..saveIODev
  105.  
  106. Quit
  107. }
  108.  
  109. /// Listens for incoming messages.
  110. /// <p>First takes a 1 byte MQTT Header. The takes the following 1 to 4 bytes to define the remaining length of the message.
  111. /// Finally reads the remaining part of the message and triggers appropriate action based on the message type (extracted from the Header).</p>
  112. Method RecvMessage() As %Status
  113. {
  114. Set tSC = $$$OK
  115. TRY {
  116. Read header#1:0 Set timeout = ('$Test)
  117. If 'timeout {
  118. $$$MQTTTraceIN("Header", header)
  119. Set type = ..GetMessageType(header)
  120. Set typeT = $$$MQTTMsgType(type)
  121. Set dup = ..IsDuplicate(header)
  122. Set qos = ..GetQoSLevel(header)
  123. Set retain = ..IsRetain(header)
  124.  
  125. Set multi = 1, length = 0, pos = 2, rl = ""
  126. For i = 1: 1: 4 {
  127. Read next#1:..ReadTimeout Set timeout = ('$Test)
  128. If timeout {
  129. Set tSC = $$$ERROR($$$GeneralError, "Invalid " _ typeT _ " message has arrived from <" _ ..Host _ ":" _ ..Port _ "> (missing Remaining Length)")
  130. Quit
  131. }
  132. Set rl = rl _ next
  133. Set next = $Ascii(next)
  134. Set length = length + ((next # 128) * multi)
  135. If next < 128 { Quit }
  136. Set multi = multi * 128
  137. }
  138.  
  139. Set content = ""
  140. $$$MQTTTraceIN("Length", rl)
  141. If $$$ISOK(tSC) && (length > 0) {
  142. Read content#length:..ReadTimeout Set timeout = ('$Test)
  143. If timeout || ($Length(content) '= length) {
  144. Set tSC = $$$ERROR($$$GeneralError, "Invalid " _ typeT _ " message has arrived from <" _ ..Host _ ":" _ ..Port _ "> (missing content)")
  145. }
  146. }
  147. If $$$ISOK(tSC) {
  148. $$$MQTTTraceIN(typeT, header _ rl _ content)
  149. }
  150. Set tSC = $METHOD($this, "Recv" _ typeT, dup, qos, retain, content)
  151. }
  152. }
  153. CATCH ex {
  154. If ex.%IsA("%Exception.SystemException") && (ex.Name = "<READ>") {
  155. $$$MQTTTraceERR("MQTT broker probably closed TCP connection to <" _ ..Host _ ":" _ ..Port _ "> (READ error)")
  156. Set ..fatalError = 1
  157. }
  158. Set tSC = ex.AsStatus()
  159. }
  160. Quit tSC
  161. }
  162.  
  163. /// Builds the TCP connection to the MQTT broker and sends the <b><var>CONNECT</var></b> message.
  164. Method DoCONNECT(pUsername As %String = "", pPassword As %String = "") As %Status [ Internal, Private ]
  165. {
  166. Set tSC = $$$OK
  167. $$$MQTTTraceINF("Agent for " _ ..ClientId _ " to <" _ ..Host _ ":" _ ..Port _ "> is starting")
  168.  
  169. Set ..saveIODev = $IO
  170. If ('..connected) {
  171. Set ..device = "|TCP|" _ ..Port _ "|" _ $P($Job, ":")
  172. Open ..device:(..Host:..Port::::::):..ConnectTimeout Set timeout=('$Test)
  173.  
  174. If timeout {
  175. $$$MQTTTraceERR("Agent for " _ ..ClientId _ " to <" _ ..Host _ ":" _ ..Port _ "> failed to start TCP connection")
  176. Quit $$$ERROR($$$GeneralError, "TCP Connection to <" _ ..Host _ ":" _ ..Port _ "> has not succeeded within the timeout interval")
  177. }
  178. Else {
  179. $$$MQTTTraceINF("Agent for " _ ..ClientId _ " to <" _ ..Host _ ":" _ ..Port _ "> started TCP connection")
  180. Set ..connected = 1
  181. }
  182. }
  183.  
  184. Use ..device
  185. Do SetIO^%NLS("RAW")
  186. Set tSC = ..SendCONNECT(pUsername, pPassword)
  187. If $$$ISERR(tSC) {
  188. Do ..CloseDev()
  189. }
  190. Use ..saveIODev
  191.  
  192. Quit tSC
  193. }
  194.  
  195. /// Sends a <b><var>DISCONNECT</var></b> message to the MQTT broker and closes the TCP connection.
  196. Method DoDISCONNECT(pTaskId As %String, pConnectionId As %String) As %Status [ Internal, Private ]
  197. {
  198. Set tSC = ..SendDISCONNECT()
  199. Do ..CloseDev()
  200. Use ..saveIODev
  201.  
  202. Quit tSC
  203. }
  204.  
  205. /// Sends a <b><var>SUBSCRIBE</var></b> message to the MQTT broker.
  206. /// <p>The <property>ContextId</property> of the corresponding <class>Net.MQTT.Aux.TaskList</class> object is a Message Idenifier.
  207. /// If it contains a colon, this is a repeated attempt, because no acknowledge has been received from the broker within the defined timout period.</p>
  208. /// <p>The details of the <b><var>SUBSCRIBE</var></b> message must be stored in a <class>Net.MQTT.Aux.Subscription</class> object.</p>
  209. Method DoSUBSCRIBE(pTaskId As %String, pMessageId As %String) As %Status [ Internal, Private ]
  210. {
  211. Set dup = 0
  212. Set msgid = pMessageId
  213. If $Length(pMessageId, ":") > 1 {
  214. Set msgid = $Piece(pMessageId, ":", 1)
  215. Set dup = (''$Piece(pMessageId, ":", 2))
  216. }
  217. Set topics = ##class(Net.MQTT.Aux.Subscription).GetTopicList(..ClientId, msgid, .tSC)
  218. Set:$$$ISOK(tSC) tSC = ..SendSUBSCRIBE(msgid, topics, dup)
  219.  
  220. Quit tSC
  221. }
  222.  
  223. /// Sends a <b><var>UNSUBSCRIBE</var></b> message to the MQTT broker.
  224. /// <p>The <property>ContextId</property> of the corresponding <class>Net.MQTT.Aux.TaskList</class> object is a Message Idenifier.
  225. /// If it contains a colon, this is a repeated attempt, because no acknowledge has been received from the broker within the defined timout period.</p>
  226. /// <p>The details of the <b><var>UNSUBSCRIBE</var></b> message must be stored in a <class>Net.MQTT.Aux.Subscription</class> object.</p>
  227. Method DoUNSUBSCRIBE(pTaskId As %String, pMessageId As %String) As %Status [ Internal, Private ]
  228. {
  229. Set dup = 0
  230. Set msgid = pMessageId
  231. If $Length(pMessageId, ":") > 1 {
  232. Set msgid = $Piece(pMessageId, ":", 1)
  233. Set dup = (''$Piece(pMessageId, ":", 2))
  234. }
  235. Set topics = ##class(Net.MQTT.Aux.Subscription).GetTopicList(..ClientId, msgid, .tSC)
  236. Set:$$$ISOK(tSC) tSC = ..SendUNSUBSCRIBE(msgid, topics, dup)
  237.  
  238. Quit tSC
  239. }
  240.  
  241. /// Sends a <b><var>PUBLISH</var></b> message to the MQTT broker.
  242. /// <p>The <property>ContextId</property> of the corresponding <class>Net.MQTT.Aux.TaskList</class> object is a <class>Net.MQTT.Aux.MessageStore</class> object ID.
  243. /// If it contains a colon, this is a repeated attempt, because no acknowledge has been received from the broker within the defined timout period.</p>
  244. /// <p>The details of the <b><var>PUBLISH</var></b> message are stored in the referenced <class>Net.MQTT.Aux.MessageStore</class> object.</p>
  245. /// <p>On QoS levels &gt; 0 it also pushes the <class>Net.MQTT.Aux.MessageStatus</class> to the next state
  246. /// (either waiting for a <b><var>PUBACK</var></b> or <b><var>PUBREC</var></b> message).</p>
  247. Method DoPUBLISH(pTaskId As %String, pMessageStoreId As %String) As %Status [ Internal, Private ]
  248. {
  249. Set dup = 0
  250. Set msgid = pMessageStoreId
  251. If $Length(pMessageStoreId, ":") > 1 {
  252. Set msgid = $Piece(pMessageStoreId, ":", 1)
  253. Set dup = (''$Piece(pMessageStoreId, ":", 2))
  254. }
  255. Set message = ##class(Net.MQTT.MessageStore).%OpenId(msgid, -1, .tSC)
  256. Set:$$$ISOK(tSC) tSC = ..SendPUBLISH(message, dup)
  257. If $$$ISOK(tSC) {
  258. If message.QoSLevel = 1 {
  259. Set tSC = ##class(Net.MQTT.Aux.MessageStatus).AcknowledgeMessageOut(..ClientId, message.MessageId)
  260. }
  261. ElseIf message.QoSLevel = 2 {
  262. Set tSC = ##class(Net.MQTT.Aux.MessageStatus).ReceiveMessageOut(..ClientId, message.MessageId)
  263. }
  264. }
  265.  
  266. Quit tSC
  267. }
  268.  
  269. /// Sends a <b><var>PUBACK</var></b> message to the MQTT broker for an incoming, QoS Level 1 message.
  270. /// <p>The <property>ContextId</property> of the corresponding <class>Net.MQTT.Aux.TaskList</class> object is a Message Identifier.</p>
  271. /// <p>This is the end of the message flow of the incoming message.</p>
  272. Method DoPUBACK(pTaskId As %String, pMessageId As %String) As %Status [ Internal, Private ]
  273. {
  274. Set tSC = ..SendPUBACK(pMessageId)
  275. Set:$$$ISOK(tSC) tSC = ##class(Net.MQTT.Aux.MessageStatus).DoneMessageIn(..ClientId, pMessageId)
  276.  
  277. Quit tSC
  278. }
  279.  
  280. /// Sends a <b><var>PUBREC</var></b> message to the MQTT broker for an incoming, QoS Level 2 message.
  281. /// <p>The <property>ContextId</property> of the corresponding <class>Net.MQTT.Aux.TaskList</class> object is a Message Identifier.</p>
  282. /// <p>It also pushes the <class>Net.MQTT.Aux.MessageStatus</class> to the next state (waiting for a <b><var>PUBREL</var></b> message).</p>
  283. Method DoPUBREC(pTaskId As %String, pMessageId As %String) As %Status [ Internal, Private ]
  284. {
  285. Set tSC = ..SendPUBREC(pMessageId)
  286. Set:$$$ISOK(tSC) tSC = ##class(Net.MQTT.Aux.MessageStatus).ReleaseMessageIn(..ClientId, pMessageId)
  287.  
  288. Quit tSC
  289. }
  290.  
  291. /// Sends a <b><var>PUBREL</var></b> message to the MQTT broker for an outgoing, QoS Level 2 message.
  292. /// <p>The <property>ContextId</property> of the corresponding <class>Net.MQTT.Aux.TaskList</class> object is a Message Identifier.
  293. /// If it contains a colon, this is a repeated attempt, because no acknowledge has been received for the original <b><var>PUBLISH</var></b> message
  294. /// from the broker within the defined timout period.</p>
  295. /// <p>It also pushes the <class>Net.MQTT.Aux.MessageStatus</class> to the next state (waiting for a <b><var>PUBCOMP</var></b> message).</p>
  296. Method DoPUBREL(pTaskId As %String, pMessageId As %String) As %Status [ Internal, Private ]
  297. {
  298. Set dup = 0
  299. Set msgid = pMessageId
  300. If $Length(pMessageId, ":") > 1 {
  301. Set msgid = $Piece(pMessageId, ":", 1)
  302. Set dup = (''$Piece(pMessageId, ":", 2))
  303. }
  304. Set tSC = ..SendPUBREL(msgid, dup)
  305. Set:$$$ISOK(tSC) tSC = ##class(Net.MQTT.Aux.MessageStatus).CompleteMessageOut(..ClientId, pMessageId)
  306.  
  307. Quit tSC
  308. }
  309.  
  310. /// Sends a <b><var>PUBCOMP</var></b> message to the MQTT broker for an incoming, QoS Level 2 message.
  311. /// <p>The <property>ContextId</property> of the corresponding <class>Net.MQTT.Aux.TaskList</class> object is a Message Identifier.</p>
  312. /// <p>This is the end of the message flow of the incoming message.</p>
  313. Method DoPUBCOMP(pTaskId As %String, pMessageId As %String) As %Status [ Internal, Private ]
  314. {
  315. Set tSC = ..SendPUBCOMP(pMessageId)
  316. Set:$$$ISOK(tSC) tSC = ##class(Net.MQTT.Aux.MessageStatus).DoneMessageIn(..ClientId, pMessageId)
  317.  
  318. Quit tSC
  319. }
  320.  
  321. /// Creates the variable header and payload of a <b><var>CONNECT</var></b> message and sends the package to the MQTT broker.
  322. Method SendCONNECT(pUsername As %String = "", pPassword As %String = "") As %Status [ Internal, Private ]
  323. {
  324. If pUsername = "" {
  325. Set pPassword = ""
  326. }
  327. Set will = ($IsObject(..LastWill) && (..LastWill.Topic '= "") && (..LastWill.Content '= ""))
  328.  
  329. Set varhdr = $Char($Select(..CleanSession: $$$MQTTCleanSession, 1: 0)
  330. + $Select(will: $$$MQTTWillFlag, 1: 0)
  331. + $Select((will && ..LastWill.QoSLevel = 1): $$$MQTTWillQoS1, (will && ..LastWill.QoSLevel = 2): $$$MQTTWillQoS2, 1: 0)
  332. + $Select((will && ..LastWill.Retain): $$$MQTTWillRetain, 1: 0)
  333. + $Select(pUsername '= "": $$$MQTTUsernameFlag, 1: 0)
  334. + $Select(pPassword '= "": $$$MQTTPasswordFlag, 1: 0)
  335. )
  336.  
  337. Set varhdr = ..GetProtocolName() _ ..GetProtocolVersion() _ varhdr _ $$$MQTTEncodeNumber(..KeepAliveInterval)
  338. Set payload = ..GetUTFString(..ClientId)
  339. If will {
  340. Set payload = payload _ ..GetUTFString(..LastWill.Topic) _ ..GetUTFString(..LastWill.Content)
  341. }
  342. If pUsername '= "" {
  343. Set payload = payload _ ..GetUTFString(pUsername)
  344. If pPassword '= "" {
  345. Set payload = payload _ ..GetUTFString(pPassword)
  346. }
  347. }
  348.  
  349. Quit ..PackSendMsg(varhdr_payload, $$$MQTTCONNECT)
  350. }
  351.  
  352. /// Creates a <b><var>DISCONNECT</var></b> message and sends the package to the MQTT broker.
  353. Method SendDISCONNECT() As %Status [ Internal, Private ]
  354. {
  355. Quit ..PackSendMsg("", $$$MQTTDISCONNECT)
  356. }
  357.  
  358. /// Creates a <b><var>PINGREQ</var></b> message and sends the package to the MQTT broker.
  359. Method SendPINGREQ() As %Status [ Internal, Private ]
  360. {
  361. Set tSC = ..PackSendMsg("", $$$MQTTPINGREQ)
  362. Set:$$$ISOK(tSC) ..lastPing = $ZDateTime($ZTimeStamp, 3, 1)
  363.  
  364. Quit tSC
  365. }
  366.  
  367. /// Creates the variable header and payload of a <b><var>SUBSCRIBE</var></b> message and sends the package to the MQTT broker.
  368. Method SendSUBSCRIBE(pMessageId As %Integer, pTopics As %ListOfObjects, pDup As %Boolean = 0) As %Status [ Internal, Private ]
  369. {
  370. #dim topic As Net.MQTT.Message
  371.  
  372. Set varhdr = $$$MQTTEncodeNumber(+pMessageId)
  373.  
  374. Set payload = "", key = ""
  375. While 1 {
  376. Set topic = pTopics.GetNext(.key) Quit:(key = "")
  377. Set payload = payload _ ..GetUTFString(topic.Topic) _ $Char($Case(topic.QoSLevel, 2: $$$MQTTSubQoS2, 1: $$$MQTTSubQoS1, : 0))
  378. }
  379.  
  380. Set tSC = ..PackSendMsg(varhdr_payload, $$$MQTTSUBSCRIBE, pDup, 1)
  381. Quit tSC
  382. }
  383.  
  384. /// Creates the variable header and payload of an <b><var>UNSUBSCRIBE</var></b> message and sends the package to the MQTT broker.
  385. Method SendUNSUBSCRIBE(pMessageId As %Integer, pTopics As %ListOfObjects, pDup As %Boolean = 0) As %Status [ Internal, Private ]
  386. {
  387. #dim topic As Net.MQTT.Message
  388.  
  389. Set varhdr = $$$MQTTEncodeNumber(+pMessageId)
  390.  
  391. Set payload = "", key = ""
  392. While 1 {
  393. Set topic = pTopics.GetNext(.key) Quit:(key = "")
  394. Set payload = payload _ ..GetUTFString(topic.Topic)
  395. }
  396.  
  397. Set tSC = ..PackSendMsg(varhdr_payload, $$$MQTTUNSUBSCRIBE, pDup, 1)
  398. Quit tSC
  399. }
  400.  
  401. /// Creates the variable header and payload of a <b><var>PUBLISH</var></b> message and sends the package to the MQTT broker.
  402. Method SendPUBLISH(pMessage As Net.MQTT.MessageStore, pDup As %Boolean = 0) As %Status [ Internal, Private ]
  403. {
  404. Set varhdr = ..GetUTFString(pMessage.Topic) _ $Select(pMessage.QoSLevel > 0: $$$MQTTEncodeNumber(+pMessage.MessageId), 1: "")
  405.  
  406. Set tSC = ..PackSendMsg(varhdr_pMessage.Content, $$$MQTTPUBLISH, pDup, pMessage.QoSLevel, pMessage.Retain)
  407. Quit tSC
  408. }
  409.  
  410. /// Creates the variable header of a <b><var>PUBACK</var></b> message and sends the package to the MQTT broker.
  411. Method SendPUBACK(pMessageId As %Integer) As %Status [ Internal, Private ]
  412. {
  413. Set varhdr = $$$MQTTEncodeNumber(+pMessageId)
  414.  
  415. Set tSC = ..PackSendMsg(varhdr, $$$MQTTPUBACK)
  416. Quit tSC
  417. }
  418.  
  419. /// Creates the variable header of a <b><var>PUBREC</var></b> message and sends the package to the MQTT broker.
  420. Method SendPUBREC(pMessageId As %Integer) As %Status [ Internal, Private ]
  421. {
  422. Set varhdr = $$$MQTTEncodeNumber(+pMessageId)
  423.  
  424. Set tSC = ..PackSendMsg(varhdr, $$$MQTTPUBREC)
  425. Quit tSC
  426. }
  427.  
  428. /// Creates the variable header of a <b><var>PUBREL</var></b> message and sends the package to the MQTT broker.
  429. Method SendPUBREL(pMessageId As %Integer, pDup As %Boolean = 0) As %Status [ Internal, Private ]
  430. {
  431. Set varhdr = $$$MQTTEncodeNumber(+pMessageId)
  432.  
  433. Set tSC = ..PackSendMsg(varhdr, $$$MQTTPUBREL, pDup, 1)
  434. Quit tSC
  435. }
  436.  
  437. /// Creates the variable header of a <b><var>PUBCOMP</var></b> message and sends the package to the MQTT broker.
  438. Method SendPUBCOMP(pMessageId As %Integer) As %Status [ Internal, Private ]
  439. {
  440. Set varhdr = $$$MQTTEncodeNumber(+pMessageId)
  441.  
  442. Set tSC = ..PackSendMsg(varhdr, $$$MQTTPUBCOMP)
  443. Quit tSC
  444. }
  445.  
  446. /// Evaluates a <b><var>CONNACK</var></b> message, received from the MQTT Broker and signals the initiator (<class>Net.MQTT.Client</class>)
  447. /// via the <class>%SYSTEM.Event</class> API about the success or failure of the connection attempt.
  448. Method RecvCONNACK(pDup As %Boolean, pQoS As %Integer, pRetain As %Boolean, pContent As %String) As %Status [ Internal, Private ]
  449. {
  450. Set tSC = $$$OK
  451.  
  452. Set ret = ""
  453. If $Length(pContent) '= 2 {
  454. Set tSC = $$$ERROR($$$GeneralError, "Invalid CONNACK message has arrived from <" _ ..Host _ ":" _ ..Port _ "> (length: " _ $Length(pContent) _ " <> 2)")
  455. }
  456. Else {
  457. Set ret = +$Ascii($Extract(pContent, 2))
  458. If ret = 1 { Set tSC = $$$ERROR($$$GeneralError, "Connection to <" _ ..Host _ ":" _ ..Port _ "> failed (unacceptable protocol version)") }
  459. ElseIf ret = 2 { Set tSC = $$$ERROR($$$GeneralError, "Connection to <" _ ..Host _ ":" _ ..Port _ "> failed (identifier rejected)") }
  460. ElseIf ret = 3 { Set tSC = $$$ERROR($$$GeneralError, "Connection to <" _ ..Host _ ":" _ ..Port _ "> failed (server unavailable)") }
  461. ElseIf ret = 4 { Set tSC = $$$ERROR($$$GeneralError, "Connection to <" _ ..Host _ ":" _ ..Port _ "> failed (bad username or password)") }
  462. ElseIf ret = 5 { Set tSC = $$$ERROR($$$GeneralError, "Connection to <" _ ..Host _ ":" _ ..Port _ "> failed (not authorized)") }
  463. }
  464.  
  465. Do $System.Event.Signal("^MQTT.Connect(""" _ ..connectionId _ """)", $Select($$$ISOK(tSC): "", 1: $System.Status.GetErrorText(tSC)))
  466.  
  467. Quit tSC
  468. }
  469.  
  470. /// Evaluates a <b><var>PINGRESP</var></b> message, received from the MQTT Broker and clears the <property>lastPing</property> property,
  471. /// so the Agent can know that the broker is still responsive and the connection is successfully kept alive.
  472. Method RecvPINGRESP(pDup As %Boolean, pQoS As %Integer, pRetain As %Boolean, pContent As %String) As %Status [ Internal, Private ]
  473. {
  474. Set tSC = $$$OK
  475.  
  476. If pContent '= "" {
  477. Set tSC = $$$ERROR($$$GeneralError, "Invalid PINGRESP message has arrived from <" _ ..Host _ ":" _ ..Port _ ">")
  478. }
  479. Else {
  480. Set ..lastPing = ""
  481. }
  482.  
  483. Quit tSC
  484. }
  485.  
  486. /// Evaluates an <b><var>UNSUBACK</var></b> message, received from the MQTT Broker and
  487. /// signals the <class>Net.MQTT.Client</class> waiting for this acknowledge of a previously sent <b><var>UNSUBSCRIBE</var></b> message.
  488. Method RecvUNSUBACK(pDup As %Boolean, pQoS As %Integer, pRetain As %Boolean, pContent As %String) As %Status [ CodeMode = expression, Internal, Private ]
  489. {
  490. ..RecvAcknowledge("UNSUBACK", pDup, pQoS, pRetain, pContent)
  491. }
  492.  
  493. /// Evaluates a <b><var>PUBACK</var></b> message, received from the MQTT Broker and
  494. /// signals the <class>Net.MQTT.Client</class> waiting for this acknowledge of a previously published QoS Level 1 message.
  495. /// <p>This is the end of the message flow of the outgoing message.</p>
  496. Method RecvPUBACK(pDup As %Boolean, pQoS As %Integer, pRetain As %Boolean, pContent As %String) As %Status [ Internal, Private ]
  497. {
  498. Set tSC = ..RecvAcknowledge("PUBACK", pDup, pQoS, pRetain, pContent, .msgid)
  499. Set:$$$ISOK(tSC) tSC = ##class(Net.MQTT.Aux.MessageStatus).DoneMessageOut(..ClientId, msgid)
  500.  
  501. Quit tSC
  502. }
  503.  
  504. /// Evaluates a <b><var>PUBREC</var></b> message, received from the MQTT Broker and
  505. /// signals the <class>Net.MQTT.Client</class> waiting for this acknowledge of a previously published QoS Level 2 message.
  506. /// <p>It also pushes the <class>Net.MQTT.Aux.MessageStatus</class> to the next state (a <b><var>PUBREL</var></b> message has to be sent).</p>
  507. Method RecvPUBREC(pDup As %Boolean, pQoS As %Integer, pRetain As %Boolean, pContent As %String) As %Status [ Internal, Private ]
  508. {
  509. Set tSC = ..RecvAcknowledge("PUBREC", pDup, pQoS, pRetain, pContent, .msgid)
  510. Set:$$$ISOK(tSC) tSC = ##class(Net.MQTT.Aux.MessageStatus).ReleaseMessageOut(..ClientId, msgid)
  511.  
  512. Quit tSC
  513. }
  514.  
  515. /// Evaluates a <b><var>PUBREL</var></b> message, received from the MQTT Broker.
  516. /// <p>It pushes the <class>Net.MQTT.Aux.MessageStatus</class> to the next state (a <b><var>PUBCOMP</var></b> message has to be sent),
  517. /// and cretes a new task (<class>Net.MQTT.Aux.TaskList</class>) to complete the next step of the QoS Level 2 message flow.</p>
  518. Method RecvPUBREL(pDup As %Boolean, pQoS As %Integer, pRetain As %Boolean, pContent As %String) As %Status [ Internal, Private ]
  519. {
  520. Set tSC = ..RecvAcknowledge("PUBREL", pDup, pQoS, pRetain, pContent, .msgid)
  521. Set:$$$ISOK(tSC) tSC = ##class(Net.MQTT.Aux.MessageStatus).CompleteMessageIn(..ClientId, msgid)
  522. Set:$$$ISOK(tSC) taskid = ##class(Net.MQTT.Aux.TaskList).CreateNewTask(..connectionId, msgid, "PUBCOMP", .tSC)
  523.  
  524. Quit tSC
  525. }
  526.  
  527. /// Evaluates a <b><var>PUBCOMP</var></b> message, received from the MQTT Broker and
  528. /// signals the <class>Net.MQTT.Client</class> waiting for this acknowledge of a previously published QoS Level 2 message.
  529. /// <p>This is the end of the message flow of the outgoing message.</p>
  530. Method RecvPUBCOMP(pDup As %Boolean, pQoS As %Integer, pRetain As %Boolean, pContent As %String) As %Status [ Internal, Private ]
  531. {
  532. Set tSC = ..RecvAcknowledge("PUBCOMP", pDup, pQoS, pRetain, pContent, .msgid)
  533. Set:$$$ISOK(tSC) tSC = ##class(Net.MQTT.Aux.MessageStatus).DoneMessageOut(..ClientId, msgid)
  534.  
  535. Quit tSC
  536. }
  537.  
  538. /// Generic method for evaluating an acknowledge type message and signaling the appropriate process via the <class>%SYSTEM.Event</class> API
  539. /// waiting for this acknowledge.
  540. Method RecvAcknowledge(pType As %String, pDup As %Boolean, pQoS As %Integer, pRetain As %Boolean, pContent As %String, Output pMsgId As %Integer) As %Status [ Internal, Private ]
  541. {
  542. Set tSC = $$$OK
  543.  
  544. If $Length(pContent) '= 2 {
  545. Set tSC = $$$ERROR($$$GeneralError, "Invalid " _ pType _ " message has arrived from <" _ ..Host _ ":" _ ..Port _ "> (length: " _ $Length(pContent) _ " <> 2)")
  546. }
  547. Else {
  548. Set pMsgId = $$$MQTTDecodeNumber(pContent)
  549. Do ##class(Net.MQTT.Aux.TaskList).SignalAck(..connectionId, pMsgId, pType, $Select($$$ISOK(tSC): "", 1: $System.Status.GetErrorText(tSC)))
  550. }
  551.  
  552. Quit tSC
  553. }
  554.  
  555. /// Evaluates a <b><var>SUBACK</var></b> message, received from the MQTT Broker,
  556. /// stores the QoS levels granted by the broker on the various items of the subscription message (see: <class>Net.MQTT.Aux.Subscription</class>)
  557. /// and signals the <class>Net.MQTT.Client</class> waiting for this acknowledge of the previously sent <b><var>SUBSCRIBE</var></b> message.
  558. Method RecvSUBACK(pDup As %Boolean, pQoS As %Integer, pRetain As %Boolean, pContent As %String) As %Status [ Internal, Private ]
  559. {
  560. Set tSC = $$$OK
  561.  
  562. If $Length(pContent) < 2 {
  563. Set tSC = $$$ERROR($$$GeneralError, "Invalid SUBACK message has arrived from <" _ ..Host _ ":" _ ..Port _ "> (length < 2)")
  564. }
  565. Else {
  566. Set msgid = $$$MQTTDecodeNumber($Extract(pContent, 1, 2))
  567. Set cnt = ##class(Net.MQTT.Aux.Subscription).GetTopicCount(..ClientId, msgid, .tSC)
  568. If $$$ISOK(tSC) {
  569. If $Length(pContent) '= (2 + cnt) {
  570. Set tSC = $$$ERROR($$$GeneralError, "Invalid SUBACK message has arrived from <" _ ..Host _ ":" _ ..Port _ "> (# of topics)")
  571. }
  572. Else {
  573. For i = 1: 1: cnt {
  574. Set tSC = ##class(Net.MQTT.Aux.Subscription).AckTopic(..ClientId, msgid, i, $Case($Ascii($Extract(pContent, 2 + i)), $$$MQTTSubQoS2: 2, $$$MQTTSubQoS1: 1, : 0))
  575. Quit:$$$ISERR(tSC)
  576. }
  577. }
  578. }
  579. Do ##class(Net.MQTT.Aux.TaskList).SignalAck(..connectionId, msgid, "SUBACK", $Select($$$ISOK(tSC): "", 1: $System.Status.GetErrorText(tSC)))
  580. }
  581.  
  582. Quit tSC
  583. }
  584.  
  585. /// Evaluates a <b><var>PUBLISH</var></b> message, received from the MQTT Broker and
  586. /// stores the incoming message as a <class>Net.MQTT.MessageStore</class> object.
  587. /// <p>If the QoS Level > 0, it also pushes the <class>Net.MQTT.Aux.MessageStatus</class> to the next state
  588. /// (either a <b><var>PUBACK</var></b> or <b><var>PUBREC</var></b> message has to be sent),
  589. /// and cretes a new task (<class>Net.MQTT.Aux.TaskList</class>) to complete the next step of the message flow.</p>
  590. Method RecvPUBLISH(pDup As %Boolean, pQoS As %Integer, pRetain As %Boolean, pContent As %String) As %Status [ Internal, Private ]
  591. {
  592. Set tSC = $$$OK
  593.  
  594. Set topicL = $$$MQTTDecodeNumber($Extract(pContent, 1, 2))
  595. Set topic = $Extract(pContent, 3, 2 + topicL)
  596. Set pos = 3 + topicL
  597. If pQoS > 0 {
  598. Set msgid = $$$MQTTDecodeNumber($Extract(pContent, pos, pos + 1))
  599. Set pos = pos + 2
  600. }
  601. Else {
  602. Set msgid = ""
  603. }
  604.  
  605. TStart
  606. Set tSC = ##class(Net.MQTT.Aux.MessageStatus).%LockExtent(0)
  607. TRY {
  608. Set dup = $Select(pQoS > 0: pDup, 1: 0)
  609. If dup {
  610. Set dup = ##class(Net.MQTT.Aux.MessageStatus).IsRegisteredMessageIn(..ClientId, msgid)
  611. }
  612. If 'dup {
  613. If pQoS > 0 {
  614. Set tSC = ##class(Net.MQTT.Aux.MessageStatus).RegisterMessageIn(..ClientId, msgid, pQoS)
  615. }
  616. If $$$ISOK(tSC) {
  617. Set msg = ##class(Net.MQTT.MessageStore).%New()
  618. Set msg.ClientId = ..ClientId
  619. Set msg.Direction = "I"
  620. Set msg.MessageId = msgid
  621. Set msg.QoSLevel = pQoS
  622. Set msg.Retain = pRetain
  623. Set msg.Topic = topic
  624. Set msg.Content = $Extract(pContent, pos, *)
  625. Set tSC = msg.%Save()
  626.  
  627. Set msg = ##class(Net.MQTT.Message).%New()
  628. Set msg.MessageId = msgid
  629. Set msg.QoSLevel = pQoS
  630. Set msg.Retain = pRetain
  631. Set msg.Topic = topic
  632. Set msg.Content = $Extract(pContent, pos, *)
  633. Set tSC = $ClassMethod("Net.MQTT.BMClient", "MyMessageHandler", msg, topic)
  634. set ^msg1($increment(^msg1)) = "OK"
  635.  
  636. }
  637. If $$$ISOK(tSC) && (pQoS > 0) {
  638. Set taskid = ##class(Net.MQTT.Aux.TaskList).CreateNewTask(..connectionId, msgid, $Case(pQoS, 2: "PUBREC", : "PUBACK"), .tSC)
  639. }
  640. }
  641. }
  642. CATCH ex {
  643. Set tSC = ex.AsStatus()
  644. }
  645. Do ##class(Net.MQTT.Aux.MessageStatus).%UnlockExtent()
  646. If $$$ISOK(tSC) { TCommit }
  647. Else { TRollback }
  648.  
  649. Quit tSC
  650. }
  651.  
  652. Method CloseDev() [ Internal, Private ]
  653. {
  654. If ..connected {
  655. $$$MQTTTraceINF("Agent for " _ ..ClientId _ " to <" _ ..Host _ ":" _ ..Port _ "> is stopping TCP connection")
  656. Close ..device
  657. Use ..saveIODev
  658. Set ..connected = 0, ..device = ""
  659. }
  660. }
  661.  
  662. Method PackSendMsg(pMessage As %String, pMessageType As %Integer, pDup As %Boolean = 0, pQoS As %Integer = 0, pRetain As %Boolean = 0) As %Status [ Internal, Private ]
  663. {
  664. Set header = $Char(pMessageType
  665. + $Select(pDup: $$$MQTTDup, 1: 0)
  666. + $Select(pRetain: $$$MQTTRetain, 1: 0)
  667. + $Case(pQoS, 2: $$$MQTTQoS2, 1: $$$MQTTQoS1, : 0)
  668. )
  669.  
  670. Set lng = $Length(pMessage)
  671. While 1 {
  672. Set nxt = lng # 128
  673. Set lng = lng \ 128
  674. If lng > 0 {
  675. Set nxt = nxt + 128
  676. }
  677. Set header = header _ $Char(nxt)
  678. Quit:(lng '> 0)
  679. }
  680.  
  681. Set msg = header_pMessage
  682. Write msg, !
  683. Set ..lastMessage = $ZDateTime($ZTimeStamp, 3, 1)
  684. $$$MQTTTraceOUT($$$MQTTMsgType(pMessageType), msg)
  685.  
  686. Quit $$$OK
  687. }
  688.  
  689. ClassMethod GetMessageType(pHeader As %String) As %Integer
  690. {
  691. Set a = $Factor($Ascii(pHeader))
  692. Set type = 0
  693. For i = 5: 1: 8 {
  694. Set type = type + ($Bit(a, i) * (2 ** (i - 1)))
  695. }
  696.  
  697. Quit type
  698. }
  699.  
  700. ClassMethod IsDuplicate(pHeader As %String) As %Boolean
  701. {
  702. Set a = $Factor($Ascii(pHeader))
  703. Set b = $Factor($$$MQTTDup)
  704. Quit $BitLogic(a & b) = b
  705. }
  706.  
  707. ClassMethod GetQoSLevel(pHeader As %String) As %Integer
  708. {
  709. Set a = $Factor($Ascii(pHeader))
  710. Set q1 = $Factor($$$MQTTQoS1)
  711. Set q2 = $Factor($$$MQTTQoS2)
  712. Quit $Select($BitLogic(a & q2) = q2: 2, $BitLogic(a & q1) = q1: 1, 1: 0)
  713. }
  714.  
  715. ClassMethod IsRetain(pHeader As %String) As %Boolean
  716. {
  717. Set a = $Factor($Ascii(pHeader))
  718. Set b = $Factor($$$MQTTRetain)
  719. Quit $BitLogic(a & b) = b
  720. }
  721.  
  722. Method GetUTFString(pString As %String) As %String
  723. {
  724. Set tString = $ZCVT(pString, "O", ..transTable)
  725. Set tLen = $L(tString)
  726.  
  727. Quit $$$MQTTEncodeNumber(tLen) _ tString
  728. }
  729.  
  730. Storage Default
  731. {
  732. <Data name="AgentDefaultData">
  733. <Subscript>"Agent"</Subscript>
  734. <Value name="1">
  735. <Value>connected</Value>
  736. </Value>
  737. <Value name="2">
  738. <Value>device</Value>
  739. </Value>
  740. <Value name="3">
  741. <Value>saveIODev</Value>
  742. </Value>
  743. <Value name="4">
  744. <Value>transTable</Value>
  745. </Value>
  746. <Value name="5">
  747. <Value>lastMessage</Value>
  748. </Value>
  749. <Value name="6">
  750. <Value>lastPing</Value>
  751. </Value>
  752. <Value name="7">
  753. <Value>fatalError</Value>
  754. </Value>
  755. </Data>
  756. <DefaultData>AgentDefaultData</DefaultData>
  757. <Type>%Library.CacheStorage</Type>
  758. }
  759.  
  760. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement