Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package finance;
- import scalaz.stream._
- import scalaz.concurrent.Task
- import scalaz.stream.async.mutable.Queue;
- import scalaz.stream.{Cause, Util, Process, Sink}
- object gate1
- {
- //V is the value type
- //T is the type returned from this gate
- def gate[V, T](inputs : Seq[Process[Any,V]], f : Seq[V] => T) : Process[Task, T] = {
- new Gate[V,T](inputs, f).output
- }
- }
- class Gate[V,T](inputs : Seq[Process[Any,V]], f : Seq[V] => T)
- { self =>
- private val queues: Seq[Queue[V]] = inputs.map(stream => {
- val queue = async.boundedQueue[V](10)
- stream to queue.enqueue
- queue
- })
- private val output_q = async.boundedQueue[T](10)
- def output = Process.suspend {
- import scalaz.Scalaz._
- if(queues.forall(v=>{ v.size.discrete.runLast.run match {
- case Some(size) => (size > 0)
- case None => false
- }}))
- {
- println("all values are here")
- val step_map: Seq[V] = queues.map(in_queue => {in_queue.dequeue.take(1).runLast.run match {
- case Some(valu) => valu
- case None => null
- }})
- output_q.enqueueOne(f(step_map))
- }
- inputs map {in => in.awaitOption}
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement