Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- (ns task-scheduler.lib.routine
- (:require
- [clojure.core.async :as a :refer [go go-loop close! chan <! >! put! alt! timeout pipe]])
- (:import [clojure.core.async.impl.channels ManyToManyChannel]))
- ; effects
- ; ----------------------------
- (defn effect [c name & args]
- (let [res-chan (chan)]
- (put! c {:name name :args args :res-chan res-chan})
- res-chan))
- (defmulti handle-effect
- (fn [effect] (:name effect)))
- (defmethod handle-effect :call [{:keys [args]}]
- (apply (first args) (rest args)))
- (defmethod handle-effect :timeout [{:keys [args]}]
- (timeout (first args)))
- (defn closed? [c]
- (go (alt! c true :default false)))
- (defn ensure-chan [v]
- (if (instance? ManyToManyChannel v)
- v
- (let [c (chan 1)]
- (if (nil? v) (close! c) (put! c v (fn [_] (close! c))))
- c)))
- (defn routine-impl [handler]
- (let [process-chan (chan)
- close-chan (chan)]
- (handler (partial effect process-chan))
- (go-loop []
- (when-let [{:keys [res-chan] :as effect} (<! process-chan)]
- (when-not (<! (closed? close-chan))
- (let [result (handle-effect effect)]
- (pipe (ensure-chan result) res-chan))
- (recur))))
- close-chan))
- (defmacro routine [& args]
- `(routine-impl (fn [~'effect] (go ~@args))))
- ;; usage example
- (def sample-routine
- (routine
- (<! (effect :call println "I am performing some side-effect"))
- (<! (effect :timeout 5000))
- (<! (effect :call println "It seems I won't be called if routine gets canceled"))))
- (go
- (<! (timeout 3000))
- (println "canceling routine")
- (close! sample-routine))
- (Thread/sleep 10000)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement