Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- (ns demeter.mqtt-bus
- (:require [clojurewerkz.machine-head.client :as mqtt]
- [com.stuartsierra.component :as component]
- [schema.core :as s]
- [clojure.core.async
- :as a
- :refer [>! <! >!! <!! go chan buffer close! thread
- alts! alts!! timeout poll!]])
- (:import (clojure.lang ExceptionInfo)))
- ;; PRIVATE
- (defn- safe-connect [host port]
- "Connects to the given broker/port
- without throwing an exception on failure.
- Returns the connection on success or nil on failure"
- (let [uri (str host ":" port)]
- (try
- (mqtt/connect uri)
- (catch Exception e
- (println "mqtt connection failed")
- (println "url was " uri)
- ;(println e)
- nil))))
- (defn- publish [conn topic message]
- (mqtt/publish conn topic message))
- (def message-schema {:to s/Str
- :from s/Any
- :type (s/enum :info :request :command :auth)
- :payload s/Any})
- (defn- validate-message [message]
- "Ensures that a given message has all the required fields."
- (s/validate message-schema message))
- (defn- serialize [message]
- (str message))
- (defn- unserialize [message]
- (read-string message))
- (defn- make-topic [to type]
- (str to "/" (name type)))
- (defn- process-incoming [topic _ ^bytes payload]
- "Takes an incoming mqtt message and returns
- a map of it's properties"
- (println "processing...")
- (let [message (unserialize (String. payload))
- type (second (clojure.string/split topic #"/"))]
- {:from (:from message)
- :type type
- :message (:payload message)}))
- (def subscribe-schema {
- :type (s/enum :info :request :command :auth)
- :qos (s/enum 0 1 2)
- :handler s/Any})
- (defn- subscribe [connection id handlers]
- (doseq [handler handlers]
- ;(try
- (s/validate subscribe-schema handler)
- (println (str ";; Subscribing to " (make-topic id (:type handler))))
- (mqtt/subscribe connection
- {(make-topic id (:type handler))
- (:qos handler)}
- ;#(println %1 %2 %3))
- #((try
- ((:handler handler) %3)
- (catch Exception e (println e))
- (finally ((:handler handler) "foo")))))))
- ;(catch Exception e
- ;(println e))))
- (defrecord Bus [host port id connection monitor handlers]
- component/Lifecycle
- (start [component]
- (println ";; Starting MQTT Bus")
- (let [stop-chan (chan)
- conn (:connection component)
- conn (if conn, conn, (atom (safe-connect host port)))]
- (thread (while (not (poll! stop-chan))
- (when-not (and @conn (mqtt/connected? @conn))
- (println ";; Reconnecting to MQTT Server")
- (reset! conn (safe-connect host port))
- (subscribe @conn id handlers))
- (Thread/sleep 100)))
- (subscribe @conn id handlers)
- (-> component
- (assoc :connection conn)
- (assoc :monitor stop-chan))))
- (stop [component]
- (println ";; Stopping MQTT bus")
- (>!! (:monitor component) :stop) ;; Writing anything to the channel will cause the thread to stop.
- (Thread/sleep 100)
- (try mqtt/disconnect-and-close @(:connection component)) ;; Close the connection
- (-> component ;; Then clean up.
- (assoc :monitor nil)
- (assoc :connection nil))))
- ;; PUBLIC
- (defn foo [] (println "foo!"))
- (defn send [component message]
- (try
- (let [message (assoc message :from (:id component))]
- (println message)
- (validate-message message)
- (let [{:keys [to type]} message
- message (dissoc message :to :type)]
- (publish @(:connection component) (make-topic to type) (serialize message))))
- (catch Exception e
- (println "Message send failed."))))
- (defn bus [host port id handlers]
- (map->Bus {:host host :port port :id id :handlers handlers}))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement