Advertisement
Guest User

Untitled

a guest
Jun 8th, 2018
100
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. (ns demeter.mqtt-bus
  2.   (:require [clojurewerkz.machine-head.client :as mqtt]
  3.             [com.stuartsierra.component :as component]
  4.             [schema.core :as s]
  5.             [clojure.core.async
  6.              :as a
  7.              :refer [>! <! >!! <!! go chan buffer close! thread
  8.                      alts! alts!! timeout poll!]])
  9.   (:import (clojure.lang ExceptionInfo)))
  10.  
  11. ;; PRIVATE
  12.  
  13. (defn- safe-connect [host port]
  14.   "Connects to the given broker/port
  15.  without throwing an exception on failure.
  16.  Returns the connection on success or nil on failure"
  17.   (let [uri (str host ":" port)]
  18.     (try
  19.       (mqtt/connect uri)
  20.       (catch Exception e
  21.         (println "mqtt connection failed")
  22.         (println "url was " uri)
  23.         ;(println e)
  24.         nil))))
  25.  
  26. (defn- publish [conn topic message]
  27.   (mqtt/publish conn topic message))
  28.  
  29. (def message-schema {:to s/Str
  30.                      :from s/Any
  31.                      :type (s/enum :info :request :command :auth)
  32.                      :payload s/Any})
  33.  
  34. (defn- validate-message [message]
  35.   "Ensures that a given message has all the required fields."
  36.   (s/validate message-schema message))
  37.  
  38. (defn- serialize [message]
  39.   (str message))
  40.  
  41. (defn- unserialize [message]
  42.   (read-string message))
  43.  
  44. (defn- make-topic [to type]
  45.   (str  to "/" (name type)))
  46.  
  47. (defn- process-incoming [topic _ ^bytes payload]
  48.   "Takes an incoming mqtt message and returns
  49.  a map of it's properties"
  50.   (println "processing...")
  51.   (let [message (unserialize (String. payload))
  52.         type (second (clojure.string/split topic #"/"))]
  53.     {:from (:from message)
  54.      :type type
  55.      :message (:payload message)}))
  56.  
  57.  
  58.  
  59.  
  60. (def subscribe-schema {
  61.                        :type (s/enum :info :request :command :auth)
  62.                        :qos (s/enum 0 1 2)
  63.                        :handler s/Any})
  64.  
  65. (defn- subscribe [connection id handlers]
  66.   (doseq [handler handlers]
  67.     ;(try
  68.       (s/validate subscribe-schema handler)
  69.       (println (str ";; Subscribing to " (make-topic id (:type handler))))
  70.       (mqtt/subscribe connection
  71.                       {(make-topic id (:type handler))
  72.                        (:qos handler)}
  73.                       ;#(println %1 %2 %3))
  74.                       #((try
  75.                          ((:handler handler) %3)
  76.                          (catch Exception e (println e))
  77.                          (finally ((:handler handler) "foo")))))))
  78.       ;(catch Exception e
  79.         ;(println e))))
  80.  
  81.  
  82. (defrecord Bus [host port id connection monitor handlers]
  83.   component/Lifecycle
  84.  
  85.   (start [component]
  86.     (println ";; Starting MQTT Bus")
  87.     (let [stop-chan (chan)
  88.           conn (:connection component)
  89.           conn (if conn, conn, (atom (safe-connect host port)))]
  90.       (thread (while (not (poll! stop-chan))
  91.                 (when-not (and @conn (mqtt/connected? @conn))
  92.                   (println ";; Reconnecting to MQTT Server")
  93.                   (reset! conn (safe-connect host port))
  94.                   (subscribe @conn id handlers))
  95.                 (Thread/sleep 100)))
  96.       (subscribe @conn id handlers)
  97.       (-> component
  98.           (assoc :connection conn)
  99.           (assoc :monitor stop-chan))))
  100.  
  101.   (stop [component]
  102.     (println ";; Stopping MQTT bus")
  103.     (>!! (:monitor component) :stop)           ;; Writing anything to the channel will cause the thread to stop.
  104.     (Thread/sleep 100)
  105.     (try mqtt/disconnect-and-close @(:connection component)) ;; Close the connection
  106.     (-> component                              ;; Then clean up.
  107.         (assoc :monitor nil)
  108.         (assoc :connection nil))))
  109.  
  110. ;; PUBLIC
  111.  
  112. (defn foo [] (println "foo!"))
  113.  
  114. (defn send [component message]
  115.   (try
  116.     (let [message (assoc message :from (:id component))]
  117.       (println message)
  118.       (validate-message message)
  119.       (let [{:keys [to type]} message
  120.             message (dissoc message :to :type)]
  121.         (publish @(:connection component) (make-topic to type) (serialize message))))
  122.     (catch Exception e
  123.       (println "Message send failed."))))
  124.  
  125.  
  126. (defn bus [host port id handlers]
  127.   (map->Bus {:host host :port port :id id :handlers handlers}))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement