Advertisement
Guest User

Untitled

a guest
Nov 25th, 2015
61
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.53 KB | None | 0 0
  1. (comment
  2. ;; Deps
  3. [cheshire "5.4.0"]
  4. [manifold "0.1.1"]
  5. [aleph "0.4.1-beta2"]
  6. [org.clojure/clojure "1.7.0"]
  7. [org.clojure/core.async "0.2.371"]
  8. )
  9.  
  10. (require '[clojure.core.async :as a]
  11. '[manifold.stream :as s]
  12. '[manifold.stream :as s])
  13.  
  14. ;; With the code below streams and channels are not closed after a client closes the connection
  15. (defn hystrix-stream [subscription req]
  16. (let [subscription-with-ping (a/chan)
  17. ping-result (Object.)
  18. ;; Return a ping every two seconds when channels are not being filled
  19. _ (a/go-loop []
  20. (let [timeout-ch (a/timeout 2000)
  21. [v ch] (a/alts! [subscription timeout-ch])
  22. res (if (= ch timeout-ch)
  23. ping-result
  24. v)]
  25. (when res
  26. (a/>! subscription-with-ping res)
  27. (recur))))
  28. s (s/->source subscription-with-ping)
  29. strm (->> s
  30. (s/mapcat (fn [x]
  31. (if (= x ping-result)
  32. ["ping"]
  33. (metrics->hystrix-data x))))
  34. (s/map (fn [msg]
  35. (str "\ndata:" (json/encode msg) "\n"))))]
  36. {:status 200 :body strm :headers {"Content-Type" "text/event-stream;charset=UTF-8"
  37. "Cache-Control" "no-cache, no-store, max-age=0, must-revalidate"
  38. "Pragma" "no-cache"}}))
  39.  
  40. ;; Alternative below should stop when channel is closed, but this doesn't happen
  41. (defn hystrix-stream-alt [subscription req]
  42. (let [ping-result (Object.)
  43. ;; Return a ping every two seconds when channels are not being filled
  44. _ (a/go-loop []
  45. (a/<! (a/timeout 2000))
  46. (if (a/>! subscription ping-result)
  47. (recur)
  48. (println "I was closed!!")))
  49. s (s/->source subscription)
  50. strm (->> s
  51. (s/mapcat (fn [x]
  52. (if (= x ping-result)
  53. ["ping"]
  54. (metrics->hystrix-data x))))
  55. (s/map (fn [msg]
  56. (str "\ndata:" (json/encode msg) "\n"))))]
  57. {:status 200 :body strm :headers {"Content-Type" "text/event-stream;charset=UTF-8"
  58. "Cache-Control" "no-cache, no-store, max-age=0, must-revalidate"
  59. "Pragma" "no-cache"}}))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement