Advertisement
Guest User

Untitled

a guest
Apr 23rd, 2019
129
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.31 KB | None | 0 0
  1. import 'package:mqtt_client/mqtt_client.dart';
  2. import 'package:typed_data/typed_data.dart';
  3. import 'dart:convert';
  4.  
  5. typedef EmitterCallback = void Function(Emitter emitter);
  6. typedef EmitterSubscribeCallback = void Function(String topic);
  7. typedef EmitterKeygenCallback = void Function(dynamic object);
  8. typedef EmitterPresenceCallback = void Function(dynamic object);
  9. typedef EmitterMeCallback = void Function(dynamic object);
  10. typedef EmitterMessageCallback = void Function(EmitterMessage message);
  11.  
  12. class Emitter {
  13. MqttClient _mqtt;
  14.  
  15. EmitterCallback onConnect;
  16. EmitterCallback onDisconnect;
  17. EmitterSubscribeCallback onSubscribed;
  18. EmitterSubscribeCallback onUnsubscribed;
  19. EmitterSubscribeCallback onSubscribeFail;
  20. EmitterKeygenCallback onKeygen;
  21. EmitterPresenceCallback onPresence;
  22. EmitterMeCallback onMe;
  23. EmitterMessageCallback onMessage;
  24.  
  25. Future<bool> connect({
  26. bool secure = false,
  27. String host = "api.emitter.io",
  28. int port = 8080,
  29. int keepalive = 60,
  30. bool logging = false,
  31. }) async {
  32. String brokerUrl = (secure ? "wss://" : "ws://") + host;
  33. _mqtt = new MqttClient(brokerUrl, "");
  34. _mqtt.port = port;
  35. _mqtt.useWebSocket = true;
  36. _mqtt.useAlternateWebSocketImplementation = false;
  37. _mqtt.logging(on: logging);
  38. _mqtt.keepAlivePeriod = keepalive;
  39. _mqtt.onDisconnected = _onDisconnected;
  40. _mqtt.onConnected = _onConnected;
  41. _mqtt.onSubscribed = _onSubscribed;
  42. _mqtt.onUnsubscribed = _onUnsubscribed;
  43. _mqtt.onSubscribeFail = _onSubscribeFail;
  44. _mqtt.pongCallback = _pong;
  45.  
  46. final MqttConnectMessage connMess = MqttConnectMessage()
  47. .keepAliveFor(keepalive)
  48. .startClean()
  49. .withWillQos(MqttQos.atLeastOnce);
  50. _mqtt.connectionMessage = connMess;
  51.  
  52. try {
  53. await _mqtt.connect();
  54. } on Exception catch (e) {
  55. print('EMITTER::client exception - $e');
  56. _mqtt.disconnect();
  57. return false;
  58. }
  59.  
  60. /// Check we are connected
  61. if (_mqtt.connectionStatus.state == MqttConnectionState.connected) {
  62. print('EMITTER::Mosquitto client connected');
  63. } else {
  64. print(
  65. 'EMITTER::ERROR Mosquitto client connection failed - disconnecting, state is ${_mqtt.connectionStatus.state}');
  66. _mqtt.disconnect();
  67. return false;
  68. }
  69.  
  70. _mqtt.updates.listen(_onMessage);
  71.  
  72. return true;
  73. }
  74.  
  75. /*
  76. * Publishes a message to the currently opened endpoint.
  77. */
  78. publish(String key, String channel, String message,
  79. {bool me = true, int ttl = 0}) {
  80. var options = new Map<String, String>();
  81. // The default server's behavior when 'me' is absent, is to send the publisher its own messages.
  82. // To avoid any ambiguity, this parameter is always set here.
  83. if (me) {
  84. options["me"] = "1";
  85. } else {
  86. options["me"] = "0";
  87. }
  88. options["ttl"] = ttl.toString();
  89. var topic = _formatChannel(key, channel, options);
  90. _mqtt.publishMessage(topic, MqttQos.atLeastOnce, _payload(message));
  91. return this;
  92. }
  93.  
  94. /*
  95. * Publishes a message through a link.
  96. */
  97. publishWithLink(String link, String message) {
  98. _mqtt.publishMessage(link, MqttQos.atLeastOnce, _payload(message));
  99. }
  100.  
  101. /*
  102. * Subscribes to a particular channel.
  103. */
  104. subscribe(String key, String channel, {int last = 0}) {
  105. var options = new Map<String, String>();
  106. if (last > 0) options["last"] = last.toString();
  107. var topic = _formatChannel(key, channel, options);
  108. _mqtt.subscribe(topic, MqttQos.atLeastOnce);
  109. return this;
  110. }
  111.  
  112. /*
  113. * Create a link to a particular channel.
  114. */
  115. link(String key, String channel, String name, bool private, bool subscribe,
  116. {bool me = true, int ttl = 0}) {
  117. var options = new Map<String, String>();
  118. // The default server's behavior when 'me' is absent, is to send the publisher its own messages.
  119. // To avoid any ambiguity, this parameter is always set here.
  120. if (me) {
  121. options["me"] = "1";
  122. } else {
  123. options["me"] = "0";
  124. }
  125. options["ttl"] = ttl.toString();
  126.  
  127. String formattedChannel = _formatChannel(null, channel, options);
  128. var request = {
  129. "key": key,
  130. "channel": formattedChannel,
  131. "name": name,
  132. "private": private,
  133. "subscribe": subscribe
  134. };
  135. _mqtt.publishMessage(
  136. "emitter/link/", MqttQos.atLeastOnce, _payload(jsonEncode(request)));
  137. return this;
  138. }
  139.  
  140. /*
  141. * Unsubscribes from a particular channel.
  142. */
  143. unsubscribe(String key, String channel) {
  144. var topic = _formatChannel(key, channel, null);
  145. _mqtt.unsubscribe(topic);
  146. return this;
  147. }
  148.  
  149. /*
  150. * Sends a key generation request to the server.
  151. * type is the type of the key to generate. Possible options include r for read-only, w for write-only,
  152. * p for presence only and rw for read-write keys
  153. * (In addition to rw, you can use any combination of r, w and p for key generation)
  154. * ttl is the time-to-live of the key, in seconds.
  155. */
  156. keygen(String key, String channel, String type, int ttl) {
  157. var request = {"key": key, "channel": channel, "type": type, "ttl": ttl};
  158. _mqtt.publishMessage(
  159. "emitter/keygen/", MqttQos.atLeastOnce, _payload(jsonEncode(request)));
  160. return this;
  161. }
  162.  
  163. /*
  164. * Sends a presence request to the server.
  165. * status: Whether a full status should be sent back in the response.
  166. * changes: Whether we should subscribe this client to presence notification events.
  167. */
  168. presence(String key, String channel, bool status, bool changes) {
  169. var request = {
  170. "key": key,
  171. "channel": channel,
  172. "status": status,
  173. "changes": changes
  174. };
  175. _mqtt.publishMessage("emitter/presence/", MqttQos.atLeastOnce,
  176. _payload(jsonEncode(request)));
  177. return this;
  178. }
  179.  
  180. /*
  181. * Request information about the connection to the server.
  182. */
  183. me() {
  184. _mqtt.publishMessage("emitter/me/", MqttQos.atLeastOnce, _payload(""));
  185. }
  186.  
  187. _formatChannel(String key, String channel, Map<String, String> options) {
  188. var formatted = channel;
  189. // Prefix with the key if any
  190. if (key != null && key.length > 0)
  191. formatted = key.endsWith("/") ? key + channel : key + "/" + channel;
  192. // Add trailing slash
  193. if (!formatted.endsWith("/")) formatted += "/";
  194. // Add options
  195. if (options != null && options.length > 0) {
  196. formatted += "?";
  197. options.forEach((key, value) => {formatted += key + "=" + value + "&"});
  198. }
  199. if (formatted.endsWith("&"))
  200. formatted = formatted.substring(0, formatted.length - 1);
  201. // We're done compiling the channel name
  202. return formatted;
  203. }
  204.  
  205. Uint8Buffer _payload(String message) {
  206. final MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
  207. builder.addString(message);
  208. return builder.payload;
  209. }
  210.  
  211. _onDisconnected() {
  212. print('EMITTER::Disconnected');
  213. if (onDisconnect != null) onDisconnect(this);
  214. }
  215.  
  216. _onConnected() {
  217. print('EMITTER::Connected');
  218. if (onConnect != null) onConnect(this);
  219. }
  220.  
  221. _onSubscribed(String topic) {
  222. print('EMITTER::Subscription confirmed for topic $topic');
  223. if (onSubscribed != null) onSubscribed(topic);
  224. }
  225.  
  226. _onUnsubscribed(String topic) {
  227. print('EMITTER::Unsubscription confirmed for topic $topic');
  228. if (onUnsubscribed != null) onUnsubscribed(topic);
  229. }
  230.  
  231. _onSubscribeFail(String topic) {
  232. print('EMITTER::Subscription failed for topic $topic');
  233. if (onSubscribeFail != null) onSubscribeFail(topic);
  234. }
  235.  
  236. _pong() {
  237. print('EMITTER::Pong');
  238. }
  239.  
  240. _onMessage(List<MqttReceivedMessage<MqttMessage>> c) {
  241. final MqttPublishMessage msg = c[0].payload;
  242. final String topic = c[0].topic;
  243. var message = new EmitterMessage(topic, msg.payload.message);
  244. print('EMITTER::$topic -> ${message.asString()}');
  245. if (topic.startsWith("emitter/keygen")) {
  246. if (onKeygen != null) onKeygen(message.asObject());
  247. } else if (topic.startsWith("emitter/presence")) {
  248. if (onPresence != null) onPresence(message.asObject());
  249. } else if (topic.startsWith("emitter/me")) {
  250. if (onMe != null) onMe(message.asObject());
  251. } else {
  252. if (onMessage != null) onMessage(message);
  253. }
  254. }
  255. }
  256.  
  257. class EmitterMessage {
  258. String channel;
  259. Uint8Buffer binary;
  260.  
  261. EmitterMessage(String topic, Uint8Buffer payload) {
  262. channel = topic;
  263. binary = payload;
  264. }
  265.  
  266. String asString() {
  267. return MqttPublishPayload.bytesToStringAsString(binary);
  268. }
  269.  
  270. dynamic asObject() {
  271. dynamic object;
  272. try {
  273. object = jsonDecode(asString());
  274. } catch (err) {}
  275. return object;
  276. }
  277.  
  278. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement