Advertisement
vitareinforce

MQTTWrapper

May 5th, 2025
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.33 KB | None | 0 0
  1. import kotlinx.coroutines.*
  2. import org.eclipse.paho.client.mqttv3.*
  3.  
  4. class MqttWrapper(
  5. private val brokerUrl: String,
  6. private val clientId: String,
  7. private val callbackScope: CoroutineScope = CoroutineScope(Dispatchers.Main)
  8. ) {
  9.  
  10. private var mqttClient: MqttClient? = null
  11.  
  12. // Variable to hold the latest received MQTT message payload
  13. var latestMessage: String? = null
  14. private set
  15.  
  16. // Callback to notify when a new message arrives
  17. var onMessageReceived: ((topic: String, message: String) -> Unit)? = null
  18.  
  19. fun connect() {
  20. try {
  21. mqttClient = MqttClient(brokerUrl, clientId, null)
  22. val options = MqttConnectOptions()
  23. options.isCleanSession = true
  24.  
  25. mqttClient?.setCallback(object : MqttCallback {
  26. override fun connectionLost(cause: Throwable?) {
  27. // Handle loss of connection if needed
  28. }
  29.  
  30. override fun messageArrived(topic: String?, message: MqttMessage?) {
  31. val msgStr = message?.toString() ?: ""
  32. latestMessage = msgStr
  33. // Invoke the callback on the main thread
  34. callbackScope.launch {
  35. onMessageReceived?.invoke(topic ?: "", msgStr)
  36. }
  37. }
  38.  
  39. override fun deliveryComplete(token: IMqttDeliveryToken?) {
  40. // Optionally handle delivery complete
  41. }
  42. })
  43.  
  44. mqttClient?.connect(options)
  45.  
  46. } catch (e: MqttException) {
  47. e.printStackTrace()
  48. }
  49. }
  50.  
  51. fun subscribe(topic: String, qos: Int = 1) {
  52. try {
  53. mqttClient?.subscribe(topic, qos)
  54. } catch (e: MqttException) {
  55. e.printStackTrace()
  56. }
  57. }
  58.  
  59. fun publish(topic: String, message: String, qos: Int = 1) {
  60. try {
  61. val mqttMessage = MqttMessage(message.toByteArray())
  62. mqttMessage.qos = qos
  63. mqttClient?.publish(topic, mqttMessage)
  64. } catch (e: MqttException) {
  65. e.printStackTrace()
  66. }
  67. }
  68.  
  69. fun disconnect() {
  70. try {
  71. mqttClient?.disconnect()
  72. } catch (e: MqttException) {
  73. e.printStackTrace()
  74. }
  75. }
  76. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement