Advertisement
Kyngston

Spark Core MQTT IoT demo

Jun 13th, 2015
576
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 15.11 KB | None | 0 0
  1. /*
  2.   PubSubClient.cpp - A simple client for MQTT.
  3.   Original Code - Nicholas O'Leary
  4.   http://knolleary.net
  5.  
  6.   Adapted for Spark Core by Chris Howard - chris@kitard.com
  7.   Based on PubSubClient 1.9.1
  8.  
  9.   Changes
  10.   - Added gcc pragma to avoid warnings being thrown as errors (deprecated conversion from string constant to 'char*')
  11.   - publish_P function removed due to lack of Arduino PROGMEN support on the Spark Core
  12.   - Obvious includes commented out
  13.   - Using Spark TCPClient instead of Arduino EthernetClient
  14.  
  15. */
  16.  
  17. #pragma GCC diagnostic ignored "-Wwrite-strings"
  18.  
  19. // #include "PubSubClient.h"
  20. // #include <Arduino.h>
  21. #define ARDUINO_H
  22. #include <stdint.h>
  23. #include <stddef.h>
  24. #include <stdlib.h>
  25.  
  26. //#include "Client.h"
  27.  
  28. // MQTT_MAX_PACKET_SIZE : Maximum packet size
  29. #define MQTT_MAX_PACKET_SIZE 128
  30.  
  31. // MQTT_KEEPALIVE : keepAlive interval in Seconds
  32. #define MQTT_KEEPALIVE 15
  33.  
  34. #define MQTTPROTOCOLVERSION 3
  35. #define MQTTCONNECT     1 << 4  // Client request to connect to Server
  36. #define MQTTCONNACK     2 << 4  // Connect Acknowledgment
  37. #define MQTTPUBLISH     3 << 4  // Publish message
  38. #define MQTTPUBACK      4 << 4  // Publish Acknowledgment
  39. #define MQTTPUBREC      5 << 4  // Publish Received (assured delivery part 1)
  40. #define MQTTPUBREL      6 << 4  // Publish Release (assured delivery part 2)
  41. #define MQTTPUBCOMP     7 << 4  // Publish Complete (assured delivery part 3)
  42. #define MQTTSUBSCRIBE   8 << 4  // Client Subscribe request
  43. #define MQTTSUBACK      9 << 4  // Subscribe Acknowledgment
  44. #define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request
  45. #define MQTTUNSUBACK    11 << 4 // Unsubscribe Acknowledgment
  46. #define MQTTPINGREQ     12 << 4 // PING Request
  47. #define MQTTPINGRESP    13 << 4 // PING Response
  48. #define MQTTDISCONNECT  14 << 4 // Client is Disconnecting
  49. #define MQTTReserved    15 << 4 // Reserved
  50.  
  51. #define MQTTQOS0        (0 << 1)
  52. #define MQTTQOS1        (1 << 1)
  53. #define MQTTQOS2        (2 << 1)
  54.  
  55. class PubSubClient {
  56. private:
  57.    //Client* _client;
  58.    TCPClient* _client; // CH 14Jan2014 - changed Client* to TCPClient*
  59.    uint8_t buffer[MQTT_MAX_PACKET_SIZE];
  60.    uint16_t nextMsgId;
  61.    unsigned long lastOutActivity;
  62.    unsigned long lastInActivity;
  63.    bool pingOutstanding;
  64.    void (*callback)(char*,uint8_t*,unsigned int);
  65.    uint16_t readPacket(uint8_t*);
  66.    uint8_t readByte();
  67.    bool write(uint8_t header, uint8_t* buf, uint16_t length);
  68.    uint16_t writeString(char* string, uint8_t* buf, uint16_t pos);
  69.    uint8_t *ip;
  70.    char* domain;
  71.    uint16_t port;
  72. public:
  73.    PubSubClient();
  74.    PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),TCPClient& client); // CH 14Jan2014 - changed Client& to TCPClient&
  75.    PubSubClient(char *, uint16_t, void(*)(char *,uint8_t*,unsigned int),TCPClient&  client); // CH 14Jan2014 - changed Client& to TCPClient&
  76.    //bool connect(const char *);
  77.    bool connect(char *);
  78.    bool connect(char *, char *, char *);
  79.    bool connect(char *, char *, uint8_t, uint8_t, char *);
  80.    bool connect(char *, char *, char *, char *, uint8_t, uint8_t, char *);
  81.    void disconnect();
  82.    bool publish(char *, char *);
  83.    bool publish(char *, uint8_t *, unsigned int);
  84.    bool publish(char *, uint8_t *, unsigned int, bool);
  85.    bool subscribe(char *);
  86.    bool subscribe(char *, uint8_t qos);
  87.    bool unsubscribe(char *);
  88.    bool puback(uint16_t msgId);
  89.    bool loop();
  90.    bool connected();
  91. };
  92.  
  93.  
  94. #include <string.h>
  95.  
  96. PubSubClient::PubSubClient() {
  97.    this->_client = NULL;
  98. }
  99.  
  100. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), TCPClient& client) { // CH 14Jan2014 - Changed Client& to TCPClient&
  101.    this->_client = &client;
  102.    this->callback = callback;
  103.    this->ip = ip;
  104.    this->port = port;
  105.    this->domain = NULL;
  106. }
  107.  
  108. PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), TCPClient& client) { // CH 14Jan2014 - Changed Client& to TCPClient&
  109.    this->_client = &client;
  110.    this->callback = callback;
  111.    this->domain = domain;
  112.    this->port = port;
  113. }
  114.  
  115.  
  116. // CONNECT
  117.  
  118. //bool PubSubClient::connect(const char *id) {
  119. bool PubSubClient::connect(char *id) {
  120.    return connect(id,NULL,NULL,0,0,0,0);
  121. }
  122.  
  123. bool PubSubClient::connect(char *id, char *user, char *pass) {
  124.    return connect(id,user,pass,0,0,0,0);
  125. }
  126.  
  127. bool PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage)
  128. {
  129.    return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
  130. }
  131.  
  132. bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) {
  133.    if (!connected()) {
  134.       int result = 0;
  135.  
  136.       if (domain != NULL) {
  137.         result = _client->connect(this->domain, this->port);
  138.       } else {
  139.         result = _client->connect(this->ip, this->port);
  140.       }
  141.  
  142.       if (result) {
  143.          nextMsgId = 1;
  144.          uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
  145.          // Leave room in the buffer for header and variable length field
  146.          uint16_t length = 5;
  147.          unsigned int j;
  148.          for (j = 0;j<9;j++) {
  149.             buffer[length++] = d[j];
  150.          }
  151.  
  152.          uint8_t v;
  153.          if (willTopic) {
  154.             v = 0x06|(willQos<<3)|(willRetain<<5);
  155.          } else {
  156.             v = 0x02;
  157.          }
  158.  
  159.          if(user != NULL) {
  160.             v = v|0x80;
  161.  
  162.             if(pass != NULL) {
  163.                v = v|(0x80>>1);
  164.             }
  165.          }
  166.  
  167.          buffer[length++] = v;
  168.  
  169.          buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
  170.          buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
  171.          length = writeString(id,buffer,length);
  172.          if (willTopic) {
  173.             length = writeString(willTopic,buffer,length);
  174.             length = writeString(willMessage,buffer,length);
  175.          }
  176.  
  177.          if(user != NULL) {
  178.             length = writeString(user,buffer,length);
  179.             if(pass != NULL) {
  180.                length = writeString(pass,buffer,length);
  181.             }
  182.          }
  183.  
  184.          write(MQTTCONNECT,buffer,length-5);
  185.  
  186.          lastInActivity = lastOutActivity = millis();
  187.  
  188.          while (!_client->available()) {
  189.             unsigned long t = millis();
  190.             if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
  191.                _client->stop();
  192.                return false;
  193.             }
  194.          }
  195.          uint8_t llen;
  196.          uint16_t len = readPacket(&llen);
  197.  
  198.          if (len == 4 && buffer[3] == 0) {
  199.             lastInActivity = millis();
  200.             pingOutstanding = false;
  201.             return true;
  202.          }
  203.       }
  204.       _client->stop();
  205.    }
  206.    return false;
  207. }
  208.  
  209. uint8_t PubSubClient::readByte() {
  210.    while(!_client->available()) {}
  211.    return _client->read();
  212. }
  213.  
  214. uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
  215.    uint16_t len = 0;
  216.    buffer[len++] = readByte();
  217.    uint32_t multiplier = 1;
  218.    uint16_t length = 0;
  219.    uint8_t digit = 0;
  220.    do {
  221.       digit = readByte();
  222.       buffer[len++] = digit;
  223.       length += (digit & 127) * multiplier;
  224.       multiplier *= 128;
  225.    } while ((digit & 128) != 0);
  226.    *lengthLength = len-1;
  227.    for (uint16_t i = 0;i<length;i++)
  228.    {
  229.       if (len < MQTT_MAX_PACKET_SIZE) {
  230.          buffer[len++] = readByte();
  231.       } else {
  232.          readByte();
  233.          len = 0; // This will cause the packet to be ignored.
  234.       }
  235.    }
  236.  
  237.    return len;
  238. }
  239.  
  240. bool PubSubClient::loop() {
  241.    if (connected()) {
  242.       unsigned long t = millis();
  243.       if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
  244.          if (pingOutstanding) {
  245.             _client->stop();
  246.             return false;
  247.          } else {
  248.             buffer[0] = MQTTPINGREQ;
  249.             buffer[1] = 0;
  250.             _client->write(buffer,2);
  251.             lastOutActivity = t;
  252.             lastInActivity = t;
  253.             pingOutstanding = true;
  254.          }
  255.       }
  256.       if (_client->available()) {
  257.          uint8_t llen;
  258.          uint16_t len = readPacket(&llen);
  259.          uint16_t msgId = 0;
  260.          uint8_t *payload;
  261.          if (len > 0) {
  262.             lastInActivity = t;
  263.             uint8_t type = buffer[0]&0xF0;
  264.             if (type == MQTTPUBLISH) {
  265.                if (callback) {
  266.                   uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2];
  267.                   char topic[tl+1];
  268.                   for (uint16_t i=0;i<tl;i++) {
  269.                      topic[i] = buffer[llen+3+i];
  270.                   }
  271.                   topic[tl] = 0;
  272.                   // msgId only present for QOS>0
  273.                   if (buffer[0]&MQTTQOS1) {
  274.                     msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
  275.                     payload = buffer+llen+3+tl+2;
  276.                     callback(topic,payload,len-llen-3-tl-2);
  277.                     puback(msgId);
  278.                   } else {
  279.                     payload = buffer+llen+3+tl;
  280.                     callback(topic,payload,len-llen-3-tl);
  281.                   }
  282.                }
  283.             } else if (type == MQTTPINGREQ) {
  284.                buffer[0] = MQTTPINGRESP;
  285.                buffer[1] = 0;
  286.                _client->write(buffer,2);
  287.             } else if (type == MQTTPINGRESP) {
  288.                pingOutstanding = false;
  289.             }
  290.          }
  291.       }
  292.       return true;
  293.    }
  294.    return false;
  295. }
  296.  
  297.  
  298. // PUBLISH
  299.  
  300. bool PubSubClient::publish(char* topic, char* payload) {
  301.    return publish(topic,(uint8_t*)payload,strlen(payload),false);
  302. }
  303.  
  304. bool PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength) {
  305.    return publish(topic, payload, plength, false);
  306. }
  307.  
  308. bool PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained) {
  309.    if (connected()) {
  310.       // Leave room in the buffer for header and variable length field
  311.       uint16_t length = 5;
  312.       length = writeString(topic,buffer,length);
  313.       uint16_t i;
  314.       for (i=0;i<plength;i++) {
  315.          buffer[length++] = payload[i];
  316.       }
  317.       uint8_t header = MQTTPUBLISH;
  318.       if (retained) {
  319.          header |= 1;
  320.       }
  321.       return write(header,buffer,length-5);
  322.    }
  323.    return false;
  324. }
  325.  
  326. bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
  327.    uint8_t lenBuf[4];
  328.    uint8_t llen = 0;
  329.    uint8_t digit;
  330.    uint8_t pos = 0;
  331.    uint8_t rc;
  332.    uint8_t len = length;
  333.    do {
  334.       digit = len % 128;
  335.       len = len / 128;
  336.       if (len > 0) {
  337.          digit |= 0x80;
  338.       }
  339.       lenBuf[pos++] = digit;
  340.       llen++;
  341.    } while(len>0);
  342.  
  343.    buf[4-llen] = header;
  344.    for (int i=0;i<llen;i++) {
  345.       buf[5-llen+i] = lenBuf[i];
  346.    }
  347.    rc = _client->write(buf+(4-llen),length+1+llen);
  348.  
  349.    lastOutActivity = millis();
  350.    return (rc == 1+llen+length);
  351. }
  352.  
  353. bool PubSubClient::subscribe(char* topic) {
  354.   return subscribe(topic, 0);
  355. }
  356.  
  357. // SUBSCRIBE
  358.  
  359. bool PubSubClient::subscribe(char* topic, uint8_t qos) {
  360.    if (qos < 0 || qos > 1)
  361.      return false;
  362.  
  363.    if (connected()) {
  364.       // Leave room in the buffer for header and variable length field
  365.       uint16_t length = 5;
  366.       nextMsgId++;
  367.       if (nextMsgId == 0) {
  368.          nextMsgId = 1;
  369.       }
  370.       buffer[length++] = (nextMsgId >> 8);
  371.       buffer[length++] = (nextMsgId & 0xFF);
  372.       length = writeString(topic, buffer,length);
  373.       buffer[length++] = qos;
  374.       return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
  375.    }
  376.    return false;
  377. }
  378.  
  379. bool PubSubClient::puback(uint16_t msgId) {
  380.   if(connected()) {
  381.     // Leave room in the buffer for header and variable length field
  382.     uint16_t length = 5;
  383.     buffer[length++] = (msgId >> 8);
  384.     buffer[length++] = (msgId & 0xFF);
  385.     return write(MQTTPUBACK,buffer,length-5);
  386.   }
  387.   return false;
  388. }
  389.  
  390. // HELPERS
  391.  
  392. //bool PubSubClient::unsubscribe(char* topic) {
  393. bool PubSubClient::unsubscribe(char* topic) {
  394.    if (connected()) {
  395.       uint16_t length = 5;
  396.       nextMsgId++;
  397.       if (nextMsgId == 0) {
  398.          nextMsgId = 1;
  399.       }
  400.       buffer[length++] = (nextMsgId >> 8);
  401.       buffer[length++] = (nextMsgId & 0xFF);
  402.       length = writeString(topic, buffer,length);
  403.       return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
  404.    }
  405.    return false;
  406. }
  407.  
  408. void PubSubClient::disconnect() {
  409.    buffer[0] = MQTTDISCONNECT;
  410.    buffer[1] = 0;
  411.    _client->write(buffer,2);
  412.    _client->stop();
  413.    lastInActivity = lastOutActivity = millis();
  414. }
  415.  
  416. uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) {
  417.    char* idp = string;
  418.    uint16_t i = 0;
  419.    pos += 2;
  420.    while (*idp) {
  421.       buf[pos++] = *idp++;
  422.       i++;
  423.    }
  424.    buf[pos-i-2] = (i >> 8);
  425.    buf[pos-i-1] = (i & 0xFF);
  426.    return pos;
  427. }
  428.  
  429.  
  430. bool PubSubClient::connected() {
  431.    bool rc;
  432.    if (_client == NULL ) {
  433.       rc = false;
  434.    } else {
  435.       rc = (int)_client->connected();
  436.       if (!rc) _client->stop();
  437.    }
  438.    return rc;
  439. }
  440.  
  441.  
  442. // MAIN APPLICATOIN CODE STARTS HERE
  443.  
  444. // Update these with values suitable for your network.
  445. byte ip[] = { 192, 168, 1, 116 };
  446. int LED = D7; // for demo only
  447. int BUTTON = D2;
  448.  
  449. void callback(char* topic, byte* payload, unsigned int length) {
  450.   // handle message arrived - we are only subscribing to one topic so assume all are led related
  451.  
  452.     byte ledOn[] = {0x6F, 0x6E}; // hex for on
  453.     byte ledOff[] = {0x6F, 0x66, 0x66}; // hex for off
  454.     byte ledFlash[] ={0x66, 0x6C, 0x61, 0x73, 0x68}; // hex for flash
  455.  
  456.     if (!memcmp(ledOn, payload, sizeof(ledOn)))
  457.         digitalWrite(LED, HIGH);
  458.  
  459.     if (!memcmp(ledOff, payload, sizeof(ledOff)))
  460.         digitalWrite(LED, LOW);
  461.  
  462.     if (!memcmp(ledFlash, payload, sizeof(ledFlash))) {
  463.         for (int flashLoop=0;flashLoop < 3; flashLoop++) {
  464.             digitalWrite(LED, HIGH);
  465.             delay(250);
  466.             digitalWrite(LED, LOW);
  467.             delay(250);
  468.  
  469.         }
  470.     }
  471. }
  472.  
  473.  
  474. TCPClient tcpClient;
  475. PubSubClient client(ip, 1883, callback, tcpClient);
  476.  
  477.  
  478. // Simple MQTT demo to allow the blue led (D7) to be turned on or off. Send message to topic "led" with payload of "on" or "off"
  479.  
  480. int state = HIGH;       // current state of output pin
  481. int reading;            // current read from the button
  482. int previous = LOW;     // previous reading from the input pin
  483. long t = 0;             // the last time the output was toggled
  484. long debounce = 200;    // the debounce time, increase if the output flickers
  485.  
  486. void setup()
  487. {
  488.     pinMode(LED, OUTPUT); // Use for a simple test of the led on or off by subscribing to a topical called led
  489.     pinMode(BUTTON, INPUT);
  490.    
  491.     if (client.connect("Spark")) { // Anonymous authentication enabled
  492.     //if (client.connect("spark", "userid", "password")) { // uid:pwd based authentication
  493.         client.publish("Spark Status","I'm Alive...");
  494.         client.subscribe("led");
  495.     }
  496. }
  497.  
  498. void loop()
  499. {  
  500.     reading = digitalRead(BUTTON);  // read the button state
  501.    
  502.     // if the input just went from LOW and HIGH and we've waited long enough to ignore
  503.     // any noise on the circuit, toggle the output pin and remember the time
  504.     if (reading == HIGH && previous == LOW && millis() - t > debounce) {
  505.         if (state == LOW) {
  506.             client.publish("led","on");
  507.             state = HIGH;
  508.         } else {
  509.             client.publish("led","off");
  510.             state = LOW;
  511.         }
  512.         t = millis();
  513.     }
  514.     client.loop();
  515.     previous = reading;
  516. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement