Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- (comment
- ;; Deps
- [cheshire "5.4.0"]
- [manifold "0.1.1"]
- [aleph "0.4.1-beta2"]
- [org.clojure/clojure "1.7.0"]
- [org.clojure/core.async "0.2.371"]
- )
- (require '[clojure.core.async :as a]
- '[manifold.stream :as s]
- '[manifold.stream :as s])
- ;; With the code below streams and channels are not closed after a client closes the connection
- (defn hystrix-stream [subscription req]
- (let [subscription-with-ping (a/chan)
- ping-result (Object.)
- ;; Return a ping every two seconds when channels are not being filled
- _ (a/go-loop []
- (let [timeout-ch (a/timeout 2000)
- [v ch] (a/alts! [subscription timeout-ch])
- res (if (= ch timeout-ch)
- ping-result
- v)]
- (when res
- (a/>! subscription-with-ping res)
- (recur))))
- s (s/->source subscription-with-ping)
- strm (->> s
- (s/mapcat (fn [x]
- (if (= x ping-result)
- ["ping"]
- (metrics->hystrix-data x))))
- (s/map (fn [msg]
- (str "\ndata:" (json/encode msg) "\n"))))]
- {:status 200 :body strm :headers {"Content-Type" "text/event-stream;charset=UTF-8"
- "Cache-Control" "no-cache, no-store, max-age=0, must-revalidate"
- "Pragma" "no-cache"}}))
- ;; Alternative below should stop when channel is closed, but this doesn't happen
- (defn hystrix-stream-alt [subscription req]
- (let [ping-result (Object.)
- ;; Return a ping every two seconds when channels are not being filled
- _ (a/go-loop []
- (a/<! (a/timeout 2000))
- (if (a/>! subscription ping-result)
- (recur)
- (println "I was closed!!")))
- s (s/->source subscription)
- strm (->> s
- (s/mapcat (fn [x]
- (if (= x ping-result)
- ["ping"]
- (metrics->hystrix-data x))))
- (s/map (fn [msg]
- (str "\ndata:" (json/encode msg) "\n"))))]
- {:status 200 :body strm :headers {"Content-Type" "text/event-stream;charset=UTF-8"
- "Cache-Control" "no-cache, no-store, max-age=0, must-revalidate"
- "Pragma" "no-cache"}}))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement