Advertisement
Guest User

Untitled

a guest
Feb 26th, 2018
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 12.92 KB | None | 0 0
  1. import usocket as socket
  2. import ustruct as struct
  3. from ubinascii import hexlify
  4.  
  5. class MQTTException(Exception):
  6. pass
  7.  
  8. class MQTTClient:
  9.  
  10. def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
  11. ssl=False, ssl_params={}):
  12. if port == 0:
  13. port = 8883 if ssl else 1883
  14. self.client_id = client_id
  15. self.sock = None
  16. self.server = server
  17. self.port = port
  18. self.ssl = ssl
  19. self.ssl_params = ssl_params
  20. self.pid = 0
  21. self.cb = None
  22. self.user = user
  23. self.pswd = password
  24. self.keepalive = keepalive
  25. self.lw_topic = None
  26. self.lw_msg = None
  27. self.lw_qos = 0
  28. self.lw_retain = False
  29.  
  30. def _send_str(self, s):
  31. self.sock.write(struct.pack("!H", len(s)))
  32. self.sock.write(s)
  33.  
  34. def _recv_len(self):
  35. n = 0
  36. sh = 0
  37. while 1:
  38. b = self.sock.read(1)[0]
  39. n |= (b & 0x7f) << sh
  40. if not b & 0x80:
  41. return n
  42. sh += 7
  43.  
  44. def set_callback(self, f):
  45. self.cb = f
  46.  
  47. def set_last_will(self, topic, msg, retain=False, qos=0):
  48. assert 0 <= qos <= 2
  49. assert topic
  50. self.lw_topic = topic
  51. self.lw_msg = msg
  52. self.lw_qos = qos
  53. self.lw_retain = retain
  54.  
  55. def connect(self, clean_session=True):
  56. self.sock = socket.socket()
  57. addr = socket.getaddrinfo(self.server, self.port)[0][-1]
  58. self.sock.connect(addr)
  59. if self.ssl:
  60. import ussl
  61. self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
  62. premsg = bytearray(b"\x10\0\0\0\0\0")
  63. msg = bytearray(b"\x04MQTT\x04\x02\0\0")
  64.  
  65. sz = 10 + 2 + len(self.client_id)
  66. msg[6] = clean_session << 1
  67. if self.user is not None:
  68. sz += 2 + len(self.user) + 2 + len(self.pswd)
  69. msg[6] |= 0xC0
  70. if self.keepalive:
  71. assert self.keepalive < 65536
  72. msg[7] |= self.keepalive >> 8
  73. msg[8] |= self.keepalive & 0x00FF
  74. if self.lw_topic:
  75. sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
  76. msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
  77. msg[6] |= self.lw_retain << 5
  78.  
  79. i = 1
  80. while sz > 0x7f:
  81. premsg[i] = (sz & 0x7f) | 0x80
  82. sz >>= 7
  83. i += 1
  84. premsg[i] = sz
  85.  
  86. self.sock.write(premsg, i + 2)
  87. self.sock.write(msg)
  88. #print(hex(len(msg)), hexlify(msg, ":"))
  89. self._send_str(self.client_id)
  90. if self.lw_topic:
  91. self._send_str(self.lw_topic)
  92. self._send_str(self.lw_msg)
  93. if self.user is not None:
  94. self._send_str(self.user)
  95. self._send_str(self.pswd)
  96. resp = self.sock.read(4)
  97. assert resp[0] == 0x20 and resp[1] == 0x02
  98. if resp[3] != 0:
  99. raise MQTTException(resp[3])
  100. return resp[2] & 1
  101.  
  102. def disconnect(self):
  103. self.sock.write(b"\xe0\0")
  104. self.sock.close()
  105.  
  106. def ping(self):
  107. self.sock.write(b"\xc0\0")
  108.  
  109. def publish(self, topic, msg, retain=False, qos=0):
  110. pkt = bytearray(b"\x30\0\0\0")
  111. pkt[0] |= qos << 1 | retain
  112. sz = 2 + len(topic) + len(msg)
  113. if qos > 0:
  114. sz += 2
  115. assert sz < 2097152
  116. i = 1
  117. while sz > 0x7f:
  118. pkt[i] = (sz & 0x7f) | 0x80
  119. sz >>= 7
  120. i += 1
  121. pkt[i] = sz
  122. #print(hex(len(pkt)), hexlify(pkt, ":"))
  123. self.sock.write(pkt, i + 1)
  124. self._send_str(topic)
  125. if qos > 0:
  126. self.pid += 1
  127. pid = self.pid
  128. struct.pack_into("!H", pkt, 0, pid)
  129. self.sock.write(pkt, 2)
  130. self.sock.write(msg)
  131. if qos == 1:
  132. while 1:
  133. op = self.wait_msg()
  134. if op == 0x40:
  135. sz = self.sock.read(1)
  136. assert sz == b"\x02"
  137. rcv_pid = self.sock.read(2)
  138. rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
  139. if pid == rcv_pid:
  140. return
  141. elif qos == 2:
  142. assert 0
  143.  
  144. def subscribe(self, topic, qos=0):
  145. assert self.cb is not None, "Subscribe callback is not set"
  146. pkt = bytearray(b"\x82\0\0\0")
  147. self.pid += 1
  148. struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
  149. #print(hex(len(pkt)), hexlify(pkt, ":"))
  150. self.sock.write(pkt)
  151. self._send_str(topic)
  152. self.sock.write(qos.to_bytes(1, "little"))
  153. while 1:
  154. op = self.wait_msg()
  155. if op == 0x90:
  156. resp = self.sock.read(4)
  157. #print(resp)
  158. assert resp[1] == pkt[2] and resp[2] == pkt[3]
  159. if resp[3] == 0x80:
  160. raise MQTTException(resp[3])
  161. return
  162.  
  163. # Wait for a single incoming MQTT message and process it.
  164. # Subscribed messages are delivered to a callback previously
  165. # set by .set_callback() method. Other (internal) MQTT
  166. # messages processed internally.
  167. def wait_msg(self):
  168. res = self.sock.read(1)
  169. self.sock.setblocking(True)
  170. if res is None:
  171. return None
  172. if res == b"":
  173. raise OSError(-1)
  174. if res == b"\xd0": # PINGRESP
  175. sz = self.sock.read(1)[0]
  176. assert sz == 0
  177. return None
  178. op = res[0]
  179. if op & 0xf0 != 0x30:
  180. return op
  181. sz = self._recv_len()
  182. topic_len = self.sock.read(2)
  183. topic_len = (topic_len[0] << 8) | topic_len[1]
  184. topic = self.sock.read(topic_len)
  185. sz -= topic_len + 2
  186. if op & 6:
  187. pid = self.sock.read(2)
  188. pid = pid[0] << 8 | pid[1]
  189. sz -= 2
  190. msg = self.sock.read(sz)
  191. self.cb(topic, msg)
  192. if op & 6 == 2:
  193. pkt = bytearray(b"\x40\x02\0\0")
  194. struct.pack_into("!H", pkt, 2, pid)
  195. self.sock.write(pkt)
  196. elif op & 6 == 4:
  197. assert 0
  198.  
  199. # Checks whether a pending message from server is available.
  200. # If not, returns immediately with None. Otherwise, does
  201. # the same processing as wait_msg.
  202. def check_msg(self):import usocket as socket
  203. import ustruct as struct
  204. from ubinascii import hexlify
  205.  
  206. class MQTTException(Exception):
  207. pass
  208.  
  209. class MQTTClient:
  210.  
  211. def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
  212. ssl=False, ssl_params={}):
  213. if port == 0:
  214. port = 8883 if ssl else 1883
  215. self.client_id = client_id
  216. self.sock = None
  217. self.server = server
  218. self.port = port
  219. self.ssl = ssl
  220. self.ssl_params = ssl_params
  221. self.pid = 0
  222. self.cb = None
  223. self.user = user
  224. self.pswd = password
  225. self.keepalive = keepalive
  226. self.lw_topic = None
  227. self.lw_msg = None
  228. self.lw_qos = 0
  229. self.lw_retain = False
  230.  
  231. def _send_str(self, s):
  232. self.sock.write(struct.pack("!H", len(s)))
  233. self.sock.write(s)
  234.  
  235. def _recv_len(self):
  236. n = 0
  237. sh = 0
  238. while 1:
  239. b = self.sock.read(1)[0]
  240. n |= (b & 0x7f) << sh
  241. if not b & 0x80:
  242. return n
  243. sh += 7
  244.  
  245. def set_callback(self, f):
  246. self.cb = f
  247.  
  248. def set_last_will(self, topic, msg, retain=False, qos=0):
  249. assert 0 <= qos <= 2
  250. assert topic
  251. self.lw_topic = topic
  252. self.lw_msg = msg
  253. self.lw_qos = qos
  254. self.lw_retain = retain
  255.  
  256. def connect(self, clean_session=True):
  257. self.sock = socket.socket()
  258. addr = socket.getaddrinfo(self.server, self.port)[0][-1]
  259. self.sock.connect(addr)
  260. if self.ssl:
  261. import ussl
  262. self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
  263. premsg = bytearray(b"\x10\0\0\0\0\0")
  264. msg = bytearray(b"\x04MQTT\x04\x02\0\0")
  265.  
  266. sz = 10 + 2 + len(self.client_id)
  267. msg[6] = clean_session << 1
  268. if self.user is not None:
  269. sz += 2 + len(self.user) + 2 + len(self.pswd)
  270. msg[6] |= 0xC0
  271. if self.keepalive:
  272. assert self.keepalive < 65536
  273. msg[7] |= self.keepalive >> 8
  274. msg[8] |= self.keepalive & 0x00FF
  275. if self.lw_topic:
  276. sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
  277. msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
  278. msg[6] |= self.lw_retain << 5
  279.  
  280. i = 1
  281. while sz > 0x7f:
  282. premsg[i] = (sz & 0x7f) | 0x80
  283. sz >>= 7
  284. i += 1
  285. premsg[i] = sz
  286.  
  287. self.sock.write(premsg, i + 2)
  288. self.sock.write(msg)
  289. #print(hex(len(msg)), hexlify(msg, ":"))
  290. self._send_str(self.client_id)
  291. if self.lw_topic:
  292. self._send_str(self.lw_topic)
  293. self._send_str(self.lw_msg)
  294. if self.user is not None:
  295. self._send_str(self.user)
  296. self._send_str(self.pswd)
  297. resp = self.sock.read(4)
  298. assert resp[0] == 0x20 and resp[1] == 0x02
  299. if resp[3] != 0:
  300. raise MQTTException(resp[3])
  301. return resp[2] & 1
  302.  
  303. def disconnect(self):
  304. self.sock.write(b"\xe0\0")
  305. self.sock.close()
  306.  
  307. def ping(self):
  308. self.sock.write(b"\xc0\0")
  309.  
  310. def publish(self, topic, msg, retain=False, qos=0):
  311. pkt = bytearray(b"\x30\0\0\0")
  312. pkt[0] |= qos << 1 | retain
  313. sz = 2 + len(topic) + len(msg)
  314. if qos > 0:
  315. sz += 2
  316. assert sz < 2097152
  317. i = 1
  318. while sz > 0x7f:
  319. pkt[i] = (sz & 0x7f) | 0x80
  320. sz >>= 7
  321. i += 1
  322. pkt[i] = sz
  323. #print(hex(len(pkt)), hexlify(pkt, ":"))
  324. self.sock.write(pkt, i + 1)
  325. self._send_str(topic)
  326. if qos > 0:
  327. self.pid += 1
  328. pid = self.pid
  329. struct.pack_into("!H", pkt, 0, pid)
  330. self.sock.write(pkt, 2)
  331. self.sock.write(msg)
  332. if qos == 1:
  333. while 1:
  334. op = self.wait_msg()
  335. if op == 0x40:
  336. sz = self.sock.read(1)
  337. assert sz == b"\x02"
  338. rcv_pid = self.sock.read(2)
  339. rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
  340. if pid == rcv_pid:
  341. return
  342. elif qos == 2:
  343. assert 0
  344.  
  345. def subscribe(self, topic, qos=0):
  346. assert self.cb is not None, "Subscribe callback is not set"
  347. pkt = bytearray(b"\x82\0\0\0")
  348. self.pid += 1
  349. struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
  350. #print(hex(len(pkt)), hexlify(pkt, ":"))
  351. self.sock.write(pkt)
  352. self._send_str(topic)
  353. self.sock.write(qos.to_bytes(1, "little"))
  354. while 1:
  355. op = self.wait_msg()
  356. if op == 0x90:
  357. resp = self.sock.read(4)
  358. #print(resp)
  359. assert resp[1] == pkt[2] and resp[2] == pkt[3]
  360. if resp[3] == 0x80:
  361. raise MQTTException(resp[3])
  362. return
  363.  
  364. # Wait for a single incoming MQTT message and process it.
  365. # Subscribed messages are delivered to a callback previously
  366. # set by .set_callback() method. Other (internal) MQTT
  367. # messages processed internally.
  368. def wait_msg(self):
  369. res = self.sock.read(1)
  370. self.sock.setblocking(True)
  371. if res is None:
  372. return None
  373. if res == b"":
  374. raise OSError(-1)
  375. if res == b"\xd0": # PINGRESP
  376. sz = self.sock.read(1)[0]
  377. assert sz == 0
  378. return None
  379. op = res[0]
  380. if op & 0xf0 != 0x30:
  381. return op
  382. sz = self._recv_len()
  383. topic_len = self.sock.read(2)
  384. topic_len = (topic_len[0] << 8) | topic_len[1]
  385. topic = self.sock.read(topic_len)
  386. sz -= topic_len + 2
  387. if op & 6:
  388. pid = self.sock.read(2)
  389. pid = pid[0] << 8 | pid[1]
  390. sz -= 2
  391. msg = self.sock.read(sz)
  392. self.cb(topic, msg)
  393. if op & 6 == 2:
  394. pkt = bytearray(b"\x40\x02\0\0")
  395. struct.pack_into("!H", pkt, 2, pid)
  396. self.sock.write(pkt)
  397. elif op & 6 == 4:
  398. assert 0
  399.  
  400. # Checks whether a pending message from server is available.
  401. # If not, returns immediately with None. Otherwise, does
  402. # the same processing as wait_msg.
  403. def check_msg(self):
  404. self.sock.setblocking(False)
  405. return self.wait_msg()
  406.  
  407. self.sock.setblocking(False)
  408. return self.wait_msg()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement