SHARE
TWEET

Untitled

a guest Apr 23rd, 2019 71 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import 'package:mqtt_client/mqtt_client.dart';
  2. import 'package:typed_data/typed_data.dart';
  3. import 'dart:convert';
  4.  
  5. typedef EmitterCallback = void Function(Emitter emitter);
  6. typedef EmitterSubscribeCallback = void Function(String topic);
  7. typedef EmitterKeygenCallback = void Function(dynamic object);
  8. typedef EmitterPresenceCallback = void Function(dynamic object);
  9. typedef EmitterMeCallback = void Function(dynamic object);
  10. typedef EmitterMessageCallback = void Function(EmitterMessage message);
  11.  
  12. class Emitter {
  13.   MqttClient _mqtt;
  14.  
  15.   EmitterCallback onConnect;
  16.   EmitterCallback onDisconnect;
  17.   EmitterSubscribeCallback onSubscribed;
  18.   EmitterSubscribeCallback onUnsubscribed;
  19.   EmitterSubscribeCallback onSubscribeFail;
  20.   EmitterKeygenCallback onKeygen;
  21.   EmitterPresenceCallback onPresence;
  22.   EmitterMeCallback onMe;
  23.   EmitterMessageCallback onMessage;
  24.  
  25.   Future<bool> connect({
  26.     bool secure = false,
  27.     String host = "api.emitter.io",
  28.     int port = 8080,
  29.     int keepalive = 60,
  30.     bool logging = false,
  31.   }) async {
  32.     String brokerUrl = (secure ? "wss://" : "ws://") + host;
  33.     _mqtt = new MqttClient(brokerUrl, "");
  34.     _mqtt.port = port;
  35.     _mqtt.useWebSocket = true;
  36.     _mqtt.useAlternateWebSocketImplementation = false;
  37.     _mqtt.logging(on: logging);
  38.     _mqtt.keepAlivePeriod = keepalive;
  39.     _mqtt.onDisconnected = _onDisconnected;
  40.     _mqtt.onConnected = _onConnected;
  41.     _mqtt.onSubscribed = _onSubscribed;
  42.     _mqtt.onUnsubscribed = _onUnsubscribed;
  43.     _mqtt.onSubscribeFail = _onSubscribeFail;
  44.     _mqtt.pongCallback = _pong;
  45.  
  46.     final MqttConnectMessage connMess = MqttConnectMessage()
  47.         .keepAliveFor(keepalive)
  48.         .startClean()
  49.         .withWillQos(MqttQos.atLeastOnce);
  50.     _mqtt.connectionMessage = connMess;
  51.  
  52.     try {
  53.       await _mqtt.connect();
  54.     } on Exception catch (e) {
  55.       print('EMITTER::client exception - $e');
  56.       _mqtt.disconnect();
  57.       return false;
  58.     }
  59.  
  60.     /// Check we are connected
  61.     if (_mqtt.connectionStatus.state == MqttConnectionState.connected) {
  62.       print('EMITTER::Mosquitto client connected');
  63.     } else {
  64.       print(
  65.           'EMITTER::ERROR Mosquitto client connection failed - disconnecting, state is ${_mqtt.connectionStatus.state}');
  66.       _mqtt.disconnect();
  67.       return false;
  68.     }
  69.  
  70.     _mqtt.updates.listen(_onMessage);
  71.  
  72.     return true;
  73.   }
  74.  
  75.   /*
  76.   * Publishes a message to the currently opened endpoint.
  77.   */
  78.   publish(String key, String channel, String message,
  79.       {bool me = true, int ttl = 0}) {
  80.     var options = new Map<String, String>();
  81.     // The default server's behavior when 'me' is absent, is to send the publisher its own messages.
  82.     // To avoid any ambiguity, this parameter is always set here.
  83.     if (me) {
  84.       options["me"] = "1";
  85.     } else {
  86.       options["me"] = "0";
  87.     }
  88.     options["ttl"] = ttl.toString();
  89.     var topic = _formatChannel(key, channel, options);
  90.     _mqtt.publishMessage(topic, MqttQos.atLeastOnce, _payload(message));
  91.     return this;
  92.   }
  93.  
  94.   /*
  95.   * Publishes a message through a link.
  96.   */
  97.   publishWithLink(String link, String message) {
  98.     _mqtt.publishMessage(link, MqttQos.atLeastOnce, _payload(message));
  99.   }
  100.  
  101.   /*
  102.   * Subscribes to a particular channel.
  103.   */
  104.   subscribe(String key, String channel, {int last = 0}) {
  105.     var options = new Map<String, String>();
  106.     if (last > 0) options["last"] = last.toString();
  107.     var topic = _formatChannel(key, channel, options);
  108.     _mqtt.subscribe(topic, MqttQos.atLeastOnce);
  109.     return this;
  110.   }
  111.  
  112.   /*
  113.   * Create a link to a particular channel.
  114.   */
  115.   link(String key, String channel, String name, bool private, bool subscribe,
  116.       {bool me = true, int ttl = 0}) {
  117.     var options = new Map<String, String>();
  118.     // The default server's behavior when 'me' is absent, is to send the publisher its own messages.
  119.     // To avoid any ambiguity, this parameter is always set here.
  120.     if (me) {
  121.       options["me"] = "1";
  122.     } else {
  123.       options["me"] = "0";
  124.     }
  125.     options["ttl"] = ttl.toString();
  126.  
  127.     String formattedChannel = _formatChannel(null, channel, options);
  128.     var request = {
  129.       "key": key,
  130.       "channel": formattedChannel,
  131.       "name": name,
  132.       "private": private,
  133.       "subscribe": subscribe
  134.     };
  135.     _mqtt.publishMessage(
  136.         "emitter/link/", MqttQos.atLeastOnce, _payload(jsonEncode(request)));
  137.     return this;
  138.   }
  139.  
  140.   /*
  141.   * Unsubscribes from a particular channel.
  142.   */
  143.   unsubscribe(String key, String channel) {
  144.     var topic = _formatChannel(key, channel, null);
  145.     _mqtt.unsubscribe(topic);
  146.     return this;
  147.   }
  148.  
  149.   /*
  150.   * Sends a key generation request to the server.
  151.   * type is the type of the key to generate. Possible options include r for read-only, w for write-only,
  152.   * p for presence only and rw for read-write keys
  153.   * (In addition to rw, you can use any combination of r, w and p for key generation)
  154.   * ttl is the time-to-live of the key, in seconds.
  155.   */
  156.   keygen(String key, String channel, String type, int ttl) {
  157.     var request = {"key": key, "channel": channel, "type": type, "ttl": ttl};
  158.     _mqtt.publishMessage(
  159.         "emitter/keygen/", MqttQos.atLeastOnce, _payload(jsonEncode(request)));
  160.     return this;
  161.   }
  162.  
  163.   /*
  164.   * Sends a presence request to the server.
  165.   * status: Whether a full status should be sent back in the response.
  166.   * changes: Whether we should subscribe this client to presence notification events.
  167.   */
  168.   presence(String key, String channel, bool status, bool changes) {
  169.     var request = {
  170.       "key": key,
  171.       "channel": channel,
  172.       "status": status,
  173.       "changes": changes
  174.     };
  175.     _mqtt.publishMessage("emitter/presence/", MqttQos.atLeastOnce,
  176.         _payload(jsonEncode(request)));
  177.     return this;
  178.   }
  179.  
  180.   /*
  181.   * Request information about the connection to the server.
  182.   */
  183.   me() {
  184.     _mqtt.publishMessage("emitter/me/", MqttQos.atLeastOnce, _payload(""));
  185.   }
  186.  
  187.   _formatChannel(String key, String channel, Map<String, String> options) {
  188.     var formatted = channel;
  189.     // Prefix with the key if any
  190.     if (key != null && key.length > 0)
  191.       formatted = key.endsWith("/") ? key + channel : key + "/" + channel;
  192.     // Add trailing slash
  193.     if (!formatted.endsWith("/")) formatted += "/";
  194.     // Add options
  195.     if (options != null && options.length > 0) {
  196.       formatted += "?";
  197.       options.forEach((key, value) => {formatted += key + "=" + value + "&"});
  198.     }
  199.     if (formatted.endsWith("&"))
  200.       formatted = formatted.substring(0, formatted.length - 1);
  201.     // We're done compiling the channel name
  202.     return formatted;
  203.   }
  204.  
  205.   Uint8Buffer _payload(String message) {
  206.     final MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
  207.     builder.addString(message);
  208.     return builder.payload;
  209.   }
  210.  
  211.   _onDisconnected() {
  212.     print('EMITTER::Disconnected');
  213.     if (onDisconnect != null) onDisconnect(this);
  214.   }
  215.  
  216.   _onConnected() {
  217.     print('EMITTER::Connected');
  218.     if (onConnect != null) onConnect(this);
  219.   }
  220.  
  221.   _onSubscribed(String topic) {
  222.     print('EMITTER::Subscription confirmed for topic $topic');
  223.     if (onSubscribed != null) onSubscribed(topic);
  224.   }
  225.  
  226.   _onUnsubscribed(String topic) {
  227.     print('EMITTER::Unsubscription confirmed for topic $topic');
  228.     if (onUnsubscribed != null) onUnsubscribed(topic);
  229.   }
  230.  
  231.   _onSubscribeFail(String topic) {
  232.     print('EMITTER::Subscription failed for topic $topic');
  233.     if (onSubscribeFail != null) onSubscribeFail(topic);
  234.   }
  235.  
  236.   _pong() {
  237.     print('EMITTER::Pong');
  238.   }
  239.  
  240.   _onMessage(List<MqttReceivedMessage<MqttMessage>> c) {
  241.     final MqttPublishMessage msg = c[0].payload;
  242.     final String topic = c[0].topic;
  243.     var message = new EmitterMessage(topic, msg.payload.message);
  244.     print('EMITTER::$topic -> ${message.asString()}');
  245.     if (topic.startsWith("emitter/keygen")) {
  246.       if (onKeygen != null) onKeygen(message.asObject());
  247.     } else if (topic.startsWith("emitter/presence")) {
  248.       if (onPresence != null) onPresence(message.asObject());
  249.     } else if (topic.startsWith("emitter/me")) {
  250.       if (onMe != null) onMe(message.asObject());
  251.     } else {
  252.       if (onMessage != null) onMessage(message);
  253.     }
  254.   }
  255. }
  256.  
  257. class EmitterMessage {
  258.   String channel;
  259.   Uint8Buffer binary;
  260.  
  261.   EmitterMessage(String topic, Uint8Buffer payload) {
  262.     channel = topic;
  263.     binary = payload;
  264.   }
  265.  
  266.   String asString() {
  267.     return MqttPublishPayload.bytesToStringAsString(binary);
  268.   }
  269.  
  270.   dynamic asObject() {
  271.     dynamic object;
  272.     try {
  273.       object = jsonDecode(asString());
  274.     } catch (err) {}
  275.     return object;
  276.   }
  277.  
  278. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top