Advertisement
Guest User

Untitled

a guest
Feb 21st, 2017
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.58 KB | None | 0 0
  1. (ns sandbox.component
  2. (:require [clojure.core.async :refer [chan <! >! go-loop close! >!!]]
  3. [taoensso.timbre :as log]
  4. [mount.core :as mount]
  5. [com.stuartsierra.component :as component]))
  6.  
  7.  
  8. (defrecord Channel [capacity label ch]
  9. component/Lifecycle
  10.  
  11. (start [this]
  12. (log/infof "Starting channel %s" label)
  13. (assoc this :ch (chan capacity)))
  14.  
  15. (stop [this]
  16. (log/infof "Stopping channel %s" label)
  17. (close! ch)))
  18.  
  19. (defn make-channel
  20. [capacity label]
  21. (map->Channel {:capacity capacity :label label}))
  22.  
  23. (defrecord Reader [opts trigger-ch out-ch]
  24. component/Lifecycle
  25.  
  26. (start [this]
  27. (log/info "Starting reader")
  28. (let [market (:market opts)]
  29. (go-loop [i 1]
  30. (if-let [trigger (<! (:ch trigger-ch))]
  31. (do (log/infof "Received trigger %s" trigger)
  32. (>! (:ch out-ch) i)
  33. (recur (inc i)))
  34. (log/info "Stopping reader"))))
  35. this)
  36.  
  37. (stop [this] this))
  38.  
  39. (defn make-reader
  40. [opts]
  41. (map->Reader {:opts opts}))
  42.  
  43. (defrecord Parser [in-ch out-ch]
  44. component/Lifecycle
  45. (start [this]
  46. (log/info "Starting parser")
  47. (go-loop []
  48. (if-let [line (<! (:ch in-ch))]
  49. (do (log/infof "Parser received %s" line)
  50. (>! (:ch out-ch) line)
  51. (recur))
  52. (log/info "Stopping parser")))
  53. this)
  54.  
  55. (stop [this] this))
  56.  
  57. (defn make-parser
  58. []
  59. (map->Parser {}))
  60.  
  61. (defn push-delta
  62. [market delta]
  63. (log/infof "Writing %s for market %s" delta market))
  64.  
  65. (defrecord Writer [opts in-ch]
  66. component/Lifecycle
  67.  
  68. (start [this]
  69. (log/infof "Starting writer for market %s" (:market opts))
  70. (let [market (:market opts)]
  71. (go-loop []
  72. (if-let [delta (<! (:ch in-ch))]
  73. (do (push-delta market delta)
  74. (recur))
  75. (log/info "Stopping writer"))))
  76. this)
  77.  
  78. (stop [this] this))
  79.  
  80. (defn make-writer
  81. [opts]
  82. (map->Writer {:opts opts}))
  83.  
  84. (defn make-system
  85. [config]
  86. (component/system-map
  87. :trigger-ch (make-channel 1 "trigger-ch")
  88. :reader (component/using
  89. (make-reader config) {:trigger-ch :trigger-ch
  90. :out-ch :line-ch})
  91. :line-ch (make-channel 1 "line-ch")
  92. :parser (component/using
  93. (make-parser) {:in-ch :line-ch
  94. :out-ch :delta-ch})
  95. :delta-ch (make-channel 1 "delta-ch")
  96. :writer (component/using
  97. (make-writer config) {:in-ch :delta-ch})))
  98.  
  99. (mount/defstate system
  100. :start (component/start (make-system {:market "bla"}))
  101. :stop (component/stop system))
  102.  
  103. (defn trigger
  104. [trigger]
  105. (>!! (get-in system [:trigger-ch :ch]) trigger))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement