Advertisement
rousveiga

Untitled

Jul 15th, 2021
518
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 26.51 KB | None | 0 0
  1. #include "AsyncMqttClient.hpp"
  2.  
  3. AsyncMqttClient::AsyncMqttClient()
  4. : _client()
  5. , _head(nullptr)
  6. , _tail(nullptr)
  7. , _sent(0)
  8. , _state(DISCONNECTED)
  9. , _disconnectReason(AsyncMqttClientDisconnectReason::TCP_DISCONNECTED)
  10. , _lastClientActivity(0)
  11. , _lastServerActivity(0)
  12. , _lastPingRequestTime(0)
  13. , _generatedClientId{0}
  14. , _ip()
  15. , _host(nullptr)
  16. , _useIp(false)
  17. #if ASYNC_TCP_SSL_ENABLED
  18. , _secure(false)
  19. #endif
  20. , _port(0)
  21. , _keepAlive(15)
  22. , _cleanSession(true)
  23. , _clientId(nullptr)
  24. , _username(nullptr)
  25. , _password(nullptr)
  26. , _willTopic(nullptr)
  27. , _willPayload(nullptr)
  28. , _willPayloadLength(0)
  29. , _willQos(0)
  30. , _willRetain(false)
  31. #if ASYNC_TCP_SSL_ENABLED
  32. , _secureServerFingerprints()
  33. #endif
  34. , _onConnectUserCallbacks()
  35. , _onDisconnectUserCallbacks()
  36. , _onSubscribeUserCallbacks()
  37. , _onUnsubscribeUserCallbacks()
  38. , _onMessageUserCallbacks()
  39. , _onPublishUserCallbacks()
  40. , _parsingInformation { .bufferState = AsyncMqttClientInternals::BufferState::NONE }
  41. , _currentParsedPacket(nullptr)
  42. , _remainingLengthBufferPosition(0)
  43. , _remainingLengthBuffer{0}
  44. , _pendingPubRels() {
  45.   _client.onConnect([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onConnect(); }, this);
  46.   _client.onDisconnect([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onDisconnect(); }, this);
  47.   // _client.onError([](void* obj, AsyncClient* c, int8_t error) { (static_cast<AsyncMqttClient*>(obj))->_onError(error); }, this);
  48.   // _client.onTimeout([](void* obj, AsyncClient* c, uint32_t time) { (static_cast<AsyncMqttClient*>(obj))->_onTimeout(); }, this);
  49.   _client.onAck([](void* obj, AsyncClient* c, size_t len, uint32_t time) { (static_cast<AsyncMqttClient*>(obj))->_onAck(len); }, this);
  50.   _client.onData([](void* obj, AsyncClient* c, void* data, size_t len) { (static_cast<AsyncMqttClient*>(obj))->_onData(static_cast<char*>(data), len); }, this);
  51.   _client.onPoll([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onPoll(); }, this);
  52.   _client.setNoDelay(true);  // send small packets immediately (PINGREQ/DISCONN are only 2 bytes)
  53. #ifdef ESP32
  54.   sprintf(_generatedClientId, "esp32-%06llx", ESP.getEfuseMac());
  55.   _xSemaphore = xSemaphoreCreateMutex();
  56. #elif defined(ESP8266)
  57.   sprintf(_generatedClientId, "esp8266-%06x", ESP.getChipId());
  58. #endif
  59.   _clientId = _generatedClientId;
  60.  
  61.   setMaxTopicLength(128);
  62. }
  63.  
  64. AsyncMqttClient::~AsyncMqttClient() {
  65.   delete _currentParsedPacket;
  66.   delete[] _parsingInformation.topicBuffer;
  67.   _clear();
  68.   _pendingPubRels.clear();
  69.   _pendingPubRels.shrink_to_fit();
  70.   _clearQueue(false);  // _clear() doesn't clear session data
  71. #ifdef ESP32
  72.   vSemaphoreDelete(_xSemaphore);
  73. #endif
  74. }
  75.  
  76. AsyncMqttClient& AsyncMqttClient::setKeepAlive(uint16_t keepAlive) {
  77.   _keepAlive = keepAlive;
  78.   return *this;
  79. }
  80.  
  81. AsyncMqttClient& AsyncMqttClient::setClientId(const char* clientId) {
  82.   _clientId = clientId;
  83.   return *this;
  84. }
  85.  
  86. AsyncMqttClient& AsyncMqttClient::setCleanSession(bool cleanSession) {
  87.   _cleanSession = cleanSession;
  88.   return *this;
  89. }
  90.  
  91. AsyncMqttClient& AsyncMqttClient::setMaxTopicLength(uint16_t maxTopicLength) {
  92.   _parsingInformation.maxTopicLength = maxTopicLength;
  93.   delete[] _parsingInformation.topicBuffer;
  94.   _parsingInformation.topicBuffer = new char[maxTopicLength + 1];
  95.   return *this;
  96. }
  97.  
  98. AsyncMqttClient& AsyncMqttClient::setCredentials(const char* username, const char* password) {
  99.   _username = username;
  100.   _password = password;
  101.   return *this;
  102. }
  103.  
  104. AsyncMqttClient& AsyncMqttClient::setWill(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length) {
  105.   _willTopic = topic;
  106.   _willQos = qos;
  107.   _willRetain = retain;
  108.   _willPayload = payload;
  109.   _willPayloadLength = length;
  110.   return *this;
  111. }
  112.  
  113. AsyncMqttClient& AsyncMqttClient::setServer(IPAddress ip, uint16_t port) {
  114.   _useIp = true;
  115.   _ip = ip;
  116.   _port = port;
  117.   return *this;
  118. }
  119.  
  120. AsyncMqttClient& AsyncMqttClient::setServer(const char* host, uint16_t port) {
  121.   _useIp = false;
  122.   _host = host;
  123.   _port = port;
  124.   return *this;
  125. }
  126.  
  127. #if ASYNC_TCP_SSL_ENABLED
  128. AsyncMqttClient& AsyncMqttClient::setSecure(bool secure) {
  129.   _secure = secure;
  130.   return *this;
  131. }
  132.  
  133. AsyncMqttClient& AsyncMqttClient::addServerFingerprint(const uint8_t* fingerprint) {
  134.   std::array<uint8_t, SHA1_SIZE> newFingerprint;
  135.   memcpy(newFingerprint.data(), fingerprint, SHA1_SIZE);
  136.   _secureServerFingerprints.push_back(newFingerprint);
  137.   return *this;
  138. }
  139. #endif
  140.  
  141. AsyncMqttClient& AsyncMqttClient::onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback) {
  142.   _onConnectUserCallbacks.push_back(callback);
  143.   return *this;
  144. }
  145.  
  146. AsyncMqttClient& AsyncMqttClient::onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback) {
  147.   _onDisconnectUserCallbacks.push_back(callback);
  148.   return *this;
  149. }
  150.  
  151. AsyncMqttClient& AsyncMqttClient::onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback) {
  152.   _onSubscribeUserCallbacks.push_back(callback);
  153.   return *this;
  154. }
  155.  
  156. AsyncMqttClient& AsyncMqttClient::onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback) {
  157.   _onUnsubscribeUserCallbacks.push_back(callback);
  158.   return *this;
  159. }
  160.  
  161. AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback) {
  162.   _onMessageUserCallbacks.push_back(callback);
  163.   return *this;
  164. }
  165.  
  166. AsyncMqttClient& AsyncMqttClient::onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback) {
  167.   _onPublishUserCallbacks.push_back(callback);
  168.   return *this;
  169. }
  170.  
  171. void AsyncMqttClient::_freeCurrentParsedPacket() {
  172.   delete _currentParsedPacket;
  173.   _currentParsedPacket = nullptr;
  174. }
  175.  
  176. void AsyncMqttClient::_clear() {
  177.   _lastPingRequestTime = 0;
  178.   _freeCurrentParsedPacket();
  179.   _clearQueue(true);  // keep session data for now
  180.  
  181.   _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE;
  182.  
  183.   _client.setRxTimeout(0);
  184. }
  185.  
  186. /* TCP */
  187. void AsyncMqttClient::_onConnect() {
  188.   log_i("TCP conn, MQTT CONNECT");
  189. #if ASYNC_TCP_SSL_ENABLED
  190.   if (_secure && _secureServerFingerprints.size() > 0) {
  191.     SSL* clientSsl = _client.getSSL();
  192.  
  193.     bool sslFoundFingerprint = false;
  194.     for (std::array<uint8_t, SHA1_SIZE> fingerprint : _secureServerFingerprints) {
  195.       if (ssl_match_fingerprint(clientSsl, fingerprint.data()) == SSL_OK) {
  196.         sslFoundFingerprint = true;
  197.         break;
  198.       }
  199.     }
  200.  
  201.     if (!sslFoundFingerprint) {
  202.       _disconnectReason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT;
  203.       _client.close(true);
  204.       return;
  205.     }
  206.   }
  207. #endif
  208.   AsyncMqttClientInternals::OutPacket* msg =
  209.   new AsyncMqttClientInternals::ConnectOutPacket(_cleanSession,
  210.                                                  _username,
  211.                                                  _password,
  212.                                                  _willTopic,
  213.                                                  _willRetain,
  214.                                                  _willQos,
  215.                                                  _willPayload,
  216.                                                  _willPayloadLength,
  217.                                                  _keepAlive,
  218.                                                  _clientId);
  219.   _addFront(msg);
  220.   _handleQueue();
  221. }
  222.  
  223. void AsyncMqttClient::_onDisconnect() {
  224.   log_i("TCP disconn");
  225.   _state = DISCONNECTED;
  226.  
  227.   _clear();
  228.  
  229.   for (auto callback : _onDisconnectUserCallbacks) callback(_disconnectReason);
  230. }
  231.  
  232. /*
  233. void AsyncMqttClient::_onError(int8_t error) {
  234.   (void)error;
  235.   // _onDisconnect called anyway
  236. }
  237.  
  238. void AsyncMqttClient::_onTimeout() {
  239.   // disconnection will be handled by ping/pong management
  240. }
  241. */
  242.  
  243. void AsyncMqttClient::_onAck(size_t len) {
  244.   log_i("ack %u", len);
  245.   _handleQueue();
  246. }
  247.  
  248. void AsyncMqttClient::_onData(char* data, size_t len) {
  249.   log_i("data rcv (%u)", len);
  250.   size_t currentBytePosition = 0;
  251.   char currentByte;
  252.   _lastServerActivity = millis();
  253.   do {
  254.     switch (_parsingInformation.bufferState) {
  255.       case AsyncMqttClientInternals::BufferState::NONE:
  256.         currentByte = data[currentBytePosition++];
  257.         _parsingInformation.packetType = currentByte >> 4;
  258.         _parsingInformation.packetFlags = (currentByte << 4) >> 4;
  259.         _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::REMAINING_LENGTH;
  260.         switch (_parsingInformation.packetType) {
  261.           case AsyncMqttClientInternals::PacketType.CONNACK:
  262.             log_i("rcv CONNACK");
  263.             _currentParsedPacket = new AsyncMqttClientInternals::ConnAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onConnAck, this, std::placeholders::_1, std::placeholders::_2));
  264.             _client.setRxTimeout(0);
  265.             break;
  266.           case AsyncMqttClientInternals::PacketType.PINGRESP:
  267.             log_i("rcv PINGRESP");
  268.             _currentParsedPacket = new AsyncMqttClientInternals::PingRespPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPingResp, this));
  269.             break;
  270.           case AsyncMqttClientInternals::PacketType.SUBACK:
  271.             log_i("rcv SUBACK");
  272.             _currentParsedPacket = new AsyncMqttClientInternals::SubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onSubAck, this, std::placeholders::_1, std::placeholders::_2));
  273.             break;
  274.           case AsyncMqttClientInternals::PacketType.UNSUBACK:
  275.             log_i("rcv UNSUBACK");
  276.             _currentParsedPacket = new AsyncMqttClientInternals::UnsubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onUnsubAck, this, std::placeholders::_1));
  277.             break;
  278.           case AsyncMqttClientInternals::PacketType.PUBLISH:
  279.             log_i("rcv PUBLISH");
  280.             _currentParsedPacket = new AsyncMqttClientInternals::PublishPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8, std::placeholders::_9), std::bind(&AsyncMqttClient::_onPublish, this, std::placeholders::_1, std::placeholders::_2));
  281.             break;
  282.           case AsyncMqttClientInternals::PacketType.PUBREL:
  283.             log_i("rcv PUBREL");
  284.             _currentParsedPacket = new AsyncMqttClientInternals::PubRelPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRel, this, std::placeholders::_1));
  285.             break;
  286.           case AsyncMqttClientInternals::PacketType.PUBACK:
  287.             log_i("rcv PUBACK");
  288.             _currentParsedPacket = new AsyncMqttClientInternals::PubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubAck, this, std::placeholders::_1));
  289.             break;
  290.           case AsyncMqttClientInternals::PacketType.PUBREC:
  291.             log_i("rcv PUBREC");
  292.             _currentParsedPacket = new AsyncMqttClientInternals::PubRecPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRec, this, std::placeholders::_1));
  293.             break;
  294.           case AsyncMqttClientInternals::PacketType.PUBCOMP:
  295.             log_i("rcv PUBCOMP");
  296.             _currentParsedPacket = new AsyncMqttClientInternals::PubCompPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubComp, this, std::placeholders::_1));
  297.             break;
  298.           default:
  299.             log_i("rcv PROTOCOL VIOLATION");
  300.             disconnect(true);
  301.             break;
  302.         }
  303.         break;
  304.       case AsyncMqttClientInternals::BufferState::REMAINING_LENGTH:
  305.         currentByte = data[currentBytePosition++];
  306.         _remainingLengthBuffer[_remainingLengthBufferPosition++] = currentByte;
  307.         if (currentByte >> 7 == 0) {
  308.           _parsingInformation.remainingLength = AsyncMqttClientInternals::Helpers::decodeRemainingLength(_remainingLengthBuffer);
  309.           _remainingLengthBufferPosition = 0;
  310.           if (_parsingInformation.remainingLength > 0) {
  311.             _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::VARIABLE_HEADER;
  312.           } else {
  313.             // PINGRESP is a special case where it has no variable header, so the packet ends right here
  314.             _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE;
  315.             _onPingResp();
  316.           }
  317.         }
  318.         break;
  319.       case AsyncMqttClientInternals::BufferState::VARIABLE_HEADER:
  320.         _currentParsedPacket->parseVariableHeader(data, len, &currentBytePosition);
  321.         break;
  322.       case AsyncMqttClientInternals::BufferState::PAYLOAD:
  323.         _currentParsedPacket->parsePayload(data, len, &currentBytePosition);
  324.         break;
  325.       default:
  326.         currentBytePosition = len;
  327.     }
  328.   } while (currentBytePosition != len);
  329. }
  330.  
  331. void AsyncMqttClient::_onPoll() {
  332.   // if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections
  333.   if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) {
  334.     log_w("PING t/o, disconnecting");
  335.     disconnect(true);
  336.     return;
  337.   }
  338.   // send ping to ensure the server will receive at least one message inside keepalive window
  339.   if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * 1000 * 0.7)) {
  340.     _sendPing();
  341.   // send ping to verify if the server is still there (ensure this is not a half connection)
  342.   } else if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * 1000 * 0.7)) {
  343.     _sendPing();
  344.   }
  345.   _handleQueue();
  346. }
  347.  
  348. /* QUEUE */
  349.  
  350. void AsyncMqttClient::_insert(AsyncMqttClientInternals::OutPacket* packet) {
  351.   // We only use this for QoS2 PUBREL so there must be a PUBLISH packet present.
  352.   // The queue therefore cannot be empty and _head points to this PUBLISH packet.
  353.   SEMAPHORE_TAKE();
  354.   log_i("new insert #%u", packet->packetType());
  355.   packet->next = _head->next;
  356.   _head->next = packet;
  357.   if (_head == _tail) {  // PUB packet is the only one in the queue
  358.     _tail = packet;
  359.   }
  360.   SEMAPHORE_GIVE();
  361.   _handleQueue();
  362. }
  363.  
  364. void AsyncMqttClient::_addFront(AsyncMqttClientInternals::OutPacket* packet) {
  365.   // This is only used for the CONNECT packet, to be able to establish a connection
  366.   // before anything else. The queue can be empty or has packets from the continued session.
  367.   // In both cases, _head should always point to the CONNECT packet afterwards.
  368.   SEMAPHORE_TAKE();
  369.   log_i("new front #%u", packet->packetType());
  370.   if (_head == nullptr) {
  371.     _tail = packet;
  372.   } else {
  373.     packet->next = _head;
  374.   }
  375.   _head = packet;
  376.   _client.setRxTimeout(10);
  377.   SEMAPHORE_GIVE();
  378.   _handleQueue();
  379. }
  380.  
  381. void AsyncMqttClient::_addBack(AsyncMqttClientInternals::OutPacket* packet) {
  382.   SEMAPHORE_TAKE();
  383.   log_i("new back #%u", packet->packetType());
  384.   if (!_tail) {
  385.     _head = packet;
  386.   } else {
  387.     _tail->next = packet;
  388.   }
  389.   _tail = packet;
  390.   _tail->next = nullptr;
  391.   SEMAPHORE_GIVE();
  392.   _handleQueue();
  393. }
  394.  
  395. void AsyncMqttClient::_handleQueue() {
  396.   SEMAPHORE_TAKE();
  397.   // On ESP32, onDisconnect is called within the close()-call. So we need to make sure we don't lock
  398.   bool disconnect = false;
  399.  
  400.   while (_head && _client.space() > 10) {  // safe but arbitrary value, send at least 10 bytes
  401.     // 1. try to send
  402.     if (_head->size() > _sent) {
  403.       // On SSL the TCP library returns the total amount of bytes, not just the unencrypted payload length.
  404.       // So we calculate the amount to be written ourselves.
  405.       size_t willSend = std::min(_head->size() - _sent, _client.space());
  406.       size_t realSent = _client.add(reinterpret_cast<const char*>(_head->data(_sent)), willSend, ASYNC_WRITE_FLAG_COPY);  // flag is set by LWIP anyway, added for clarity
  407.       _sent += willSend;
  408.       (void)realSent;
  409.       _client.send();
  410.       _lastClientActivity = millis();
  411.       _lastPingRequestTime = 0;
  412.       #if ASYNC_TCP_SSL_ENABLED
  413.       log_i("snd #%u: (tls: %u) %u/%u", _head->packetType(), realSent, _sent, _head->size());
  414.       #else
  415.       log_i("snd #%u: %u/%u", _head->packetType(), _sent, _head->size());
  416.       #endif
  417.       if (_head->packetType() == AsyncMqttClientInternals::PacketType.DISCONNECT) {
  418.         disconnect = true;
  419.       }
  420.     }
  421.  
  422.     // 2. stop processing when we have to wait for an MQTT acknowledgment
  423.     if (_head->size() == _sent) {
  424.       if (_head->released()) {
  425.         log_i("p #%d rel", _head->packetType());
  426.         AsyncMqttClientInternals::OutPacket* tmp = _head;
  427.         _head = _head->next;
  428.         if (!_head) _tail = nullptr;
  429.         delete tmp;
  430.         _sent = 0;
  431.       } else {
  432.         break;  // sending is complete however send next only after mqtt confirmation
  433.       }
  434.     }
  435.   }
  436.  
  437.   SEMAPHORE_GIVE();
  438.   if (disconnect) {
  439.     log_i("snd DISCONN, disconnecting");
  440.     _client.close();
  441.   }
  442. }
  443.  
  444. void AsyncMqttClient::_clearQueue(bool keepSessionData) {
  445.   SEMAPHORE_TAKE();
  446.   AsyncMqttClientInternals::OutPacket* packet = _head;
  447.   _head = nullptr;
  448.   _tail = nullptr;
  449.  
  450.   while (packet) {
  451.     /* MQTT spec 3.1.2.4 Clean Session:
  452.      *  - QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged.
  453.      *  - QoS 2 messages which have been received from the Server, but have not been completely acknowledged.
  454.      * + (unsent PUB messages with QoS > 0)
  455.      *
  456.      * To be kept:
  457.      * - possibly first message (sent to server but not acked)
  458.      * - PUBREC messages (QoS 2 PUB received but not acked)
  459.      * - PUBCOMP messages (QoS 2 PUBREL received but not acked)
  460.      */
  461.     if (keepSessionData) {
  462.       if (packet->qos() > 0 && packet->size() <= _sent) {  // check for qos includes check for PUB-packet type
  463.         reinterpret_cast<AsyncMqttClientInternals::PublishOutPacket*>(packet)->setDup();
  464.         AsyncMqttClientInternals::OutPacket* next = packet->next;
  465.         log_i("keep #%u", packet->packetType());
  466.         SEMAPHORE_GIVE();
  467.         _addBack(packet);
  468.         SEMAPHORE_TAKE();
  469.         packet = next;
  470.       } else if (packet->qos() > 0 ||
  471.                  packet->packetType() == AsyncMqttClientInternals::PacketType.PUBREC ||
  472.                  packet->packetType() == AsyncMqttClientInternals::PacketType.PUBCOMP) {
  473.         AsyncMqttClientInternals::OutPacket* next = packet->next;
  474.         log_i("keep #%u", packet->packetType());
  475.         SEMAPHORE_GIVE();
  476.         _addBack(packet);
  477.         SEMAPHORE_TAKE();
  478.         packet = next;
  479.       } else {
  480.         AsyncMqttClientInternals::OutPacket* next = packet->next;
  481.         delete packet;
  482.         packet = next;
  483.       }
  484.     /* Delete everything when not keeping session data
  485.      */
  486.     } else {
  487.       AsyncMqttClientInternals::OutPacket* next = packet->next;
  488.       delete packet;
  489.       packet = next;
  490.     }
  491.   }
  492.   _sent = 0;
  493.   SEMAPHORE_GIVE();
  494. }
  495.  
  496. /* MQTT */
  497. void AsyncMqttClient::_onPingResp() {
  498.   log_i("PINGRESP");
  499.   _freeCurrentParsedPacket();
  500.   _lastPingRequestTime = 0;
  501. }
  502.  
  503. void AsyncMqttClient::_onConnAck(bool sessionPresent, uint8_t connectReturnCode) {
  504.   log_i("CONNACK");
  505.   _freeCurrentParsedPacket();
  506.  
  507.   if (!sessionPresent) {
  508.     _pendingPubRels.clear();
  509.     _pendingPubRels.shrink_to_fit();
  510.     _clearQueue(false);  // remove session data
  511.   }
  512.  
  513.   if (connectReturnCode == 0) {
  514.     _state = CONNECTED;
  515.     for (auto callback : _onConnectUserCallbacks) callback(sessionPresent);
  516.   } else {
  517.     // Callbacks are handled by the onDisconnect function which is called from the AsyncTcp lib
  518.     _disconnectReason = static_cast<AsyncMqttClientDisconnectReason>(connectReturnCode);
  519.     return;
  520.   }
  521.   _handleQueue();  // send any remaining data from continued session
  522. }
  523.  
  524. void AsyncMqttClient::_onSubAck(uint16_t packetId, char status) {
  525.   log_i("SUBACK");
  526.   _freeCurrentParsedPacket();
  527.   SEMAPHORE_TAKE();
  528.   if (_head && _head->packetId() == packetId) {
  529.     _head->release();
  530.     log_i("SUB released");
  531.   }
  532.   SEMAPHORE_GIVE();
  533.  
  534.   for (auto callback : _onSubscribeUserCallbacks) callback(packetId, status);
  535.  
  536.   _handleQueue();  // subscribe confirmed, ready to send next queued item
  537. }
  538.  
  539. void AsyncMqttClient::_onUnsubAck(uint16_t packetId) {
  540.   log_i("UNSUBACK");
  541.   _freeCurrentParsedPacket();
  542.   SEMAPHORE_TAKE();
  543.   if (_head && _head->packetId() == packetId) {
  544.     _head->release();
  545.     log_i("UNSUB released");
  546.   }
  547.   SEMAPHORE_GIVE();
  548.  
  549.   for (auto callback : _onUnsubscribeUserCallbacks) callback(packetId);
  550.  
  551.   _handleQueue();  // unsubscribe confirmed, ready to send next queued item
  552. }
  553.  
  554. void AsyncMqttClient::_onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId) {
  555.   bool notifyPublish = true;
  556.  
  557.   if (qos == 2) {
  558.     for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) {
  559.       if (pendingPubRel.packetId == packetId) {
  560.         notifyPublish = false;
  561.         break;
  562.       }
  563.     }
  564.   }
  565.  
  566.   if (notifyPublish) {
  567.     AsyncMqttClientMessageProperties properties;
  568.     properties.qos = qos;
  569.     properties.dup = dup;
  570.     properties.retain = retain;
  571.  
  572.     for (auto callback : _onMessageUserCallbacks) callback(topic, payload, properties, len, index, total);
  573.   }
  574. }
  575.  
  576. void AsyncMqttClient::_onPublish(uint16_t packetId, uint8_t qos) {
  577.   AsyncMqttClientInternals::PendingAck pendingAck;
  578.  
  579.   if (qos == 1) {
  580.     pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBACK;
  581.     pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED;
  582.     pendingAck.packetId = packetId;
  583.     AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
  584.     _addBack(msg);
  585.   } else if (qos == 2) {
  586.     pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREC;
  587.     pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED;
  588.     pendingAck.packetId = packetId;
  589.     AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
  590.     _addBack(msg);
  591.  
  592.     bool pubRelAwaiting = false;
  593.     for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) {
  594.       if (pendingPubRel.packetId == packetId) {
  595.         pubRelAwaiting = true;
  596.         break;
  597.       }
  598.     }
  599.  
  600.     if (!pubRelAwaiting) {
  601.       AsyncMqttClientInternals::PendingPubRel pendingPubRel;
  602.       pendingPubRel.packetId = packetId;
  603.       _pendingPubRels.push_back(pendingPubRel);
  604.     }
  605.   }
  606.  
  607.   _freeCurrentParsedPacket();
  608. }
  609.  
  610. void AsyncMqttClient::_onPubRel(uint16_t packetId) {
  611.   _freeCurrentParsedPacket();
  612.  
  613.   AsyncMqttClientInternals::PendingAck pendingAck;
  614.   pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBCOMP;
  615.   pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED;
  616.   pendingAck.packetId = packetId;
  617.   if (_head && _head->packetId() == packetId) {
  618.     AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
  619.     _head->release();
  620.     _insert(msg);
  621.     log_i("PUBREC released");
  622.   }
  623.  
  624.   for (size_t i = 0; i < _pendingPubRels.size(); i++) {
  625.     if (_pendingPubRels[i].packetId == packetId) {
  626.       _pendingPubRels.erase(_pendingPubRels.begin() + i);
  627.       _pendingPubRels.shrink_to_fit();
  628.     }
  629.   }
  630. }
  631.  
  632. void AsyncMqttClient::_onPubAck(uint16_t packetId) {
  633.   _freeCurrentParsedPacket();
  634.   if (_head && _head->packetId() == packetId) {
  635.     _head->release();
  636.     log_i("PUB released");
  637.   }
  638.  
  639.   for (auto callback : _onPublishUserCallbacks) callback(packetId);
  640. }
  641.  
  642. void AsyncMqttClient::_onPubRec(uint16_t packetId) {
  643.   _freeCurrentParsedPacket();
  644.  
  645.   // We will only be sending 1 QoS>0 PUB message at a time (to honor message
  646.   // ordering). So no need to store ACKS in a separate container as it will
  647.   // be stored in the outgoing queue until a PUBCOMP comes in.
  648.   AsyncMqttClientInternals::PendingAck pendingAck;
  649.   pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREL;
  650.   pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED;
  651.   pendingAck.packetId = packetId;
  652.   log_i("snd PUBREL");
  653.  
  654.   AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
  655.   if (_head && _head->packetId() == packetId) {
  656.     _head->release();
  657.     log_i("PUB released");
  658.   }
  659.   _insert(msg);
  660. }
  661.  
  662. void AsyncMqttClient::_onPubComp(uint16_t packetId) {
  663.   _freeCurrentParsedPacket();
  664.  
  665.   // _head points to the PUBREL package
  666.   if (_head && _head->packetId() == packetId) {
  667.     _head->release();
  668.     log_i("PUBREL released");
  669.   }
  670.  
  671.   for (auto callback : _onPublishUserCallbacks) callback(packetId);
  672. }
  673.  
  674. void AsyncMqttClient::_sendPing() {
  675.   log_i("PING");
  676.   _lastPingRequestTime = millis();
  677.   AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PingReqOutPacket;
  678.   _addBack(msg);
  679. }
  680.  
  681. bool AsyncMqttClient::connected() const {
  682.   return _state == CONNECTED;
  683. }
  684.  
  685. void AsyncMqttClient::connect() {
  686.   if (_state != DISCONNECTED) return;
  687.   log_i("CONNECTING");
  688.   _state = CONNECTING;
  689.   _disconnectReason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED;  // reset any previous
  690.  
  691. #if ASYNC_TCP_SSL_ENABLED
  692.   if (_useIp) {
  693.     _client.connect(_ip, _port, _secure);
  694.   } else {
  695.     _client.connect(_host, _port, _secure);
  696.   }
  697. #else
  698.   if (_useIp) {
  699.     _client.connect(_ip, _port);
  700.   } else {
  701.     _client.connect(_host, _port);
  702.   }
  703. #endif
  704. }
  705.  
  706. void AsyncMqttClient::disconnect(bool force) {
  707.   if (_state == DISCONNECTED) return;
  708.   log_i("DISCONNECT (f:%d)", force);
  709.   if (force) {
  710.     _state = DISCONNECTED;
  711.     _client.close(true);
  712.   } else if (_state != DISCONNECTING) {
  713.     _state = DISCONNECTING;
  714.     AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::DisconnOutPacket;
  715.     _addBack(msg);
  716.   }
  717. }
  718.  
  719. uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) {
  720.   if (_state != CONNECTED) return 0;
  721.   log_i("SUBSCRIBE");
  722.  
  723.   AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::SubscribeOutPacket(topic, qos);
  724.   _addBack(msg);
  725.   return msg->packetId();
  726. }
  727.  
  728. uint16_t AsyncMqttClient::unsubscribe(const char* topic) {
  729.   if (_state != CONNECTED) return 0;
  730.   log_i("UNSUBSCRIBE");
  731.  
  732.   AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::UnsubscribeOutPacket(topic);
  733.   _addBack(msg);
  734.   return msg->packetId();
  735. }
  736.  
  737. uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length, bool dup, uint16_t message_id) {
  738.   if (_state != CONNECTED || GET_FREE_MEMORY() < MQTT_MIN_FREE_MEMORY) return 0;
  739.   log_i("PUBLISH");
  740.  
  741.   AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PublishOutPacket(topic, qos, retain, payload, length);
  742.   _addBack(msg);
  743.   return msg->packetId();
  744. }
  745.  
  746. const char* AsyncMqttClient::getClientId() const {
  747.   return _clientId;
  748. }
  749.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement