Advertisement
Guest User

Untitled

a guest
Jul 4th, 2015
193
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.23 KB | None | 0 0
  1. package finance;
  2. import scalaz.stream._
  3. import scalaz.concurrent.Task
  4. import scalaz.stream.async.mutable.Queue;
  5. import scalaz.stream.{Cause, Util, Process, Sink}
  6.  
  7.  
  8. object gate1
  9. {
  10. //V is the value type
  11. //T is the type returned from this gate
  12. def gate[V, T](inputs : Seq[Process[Any,V]], f : Seq[V] => T) : Process[Task, T] = {
  13. new Gate[V,T](inputs, f).output
  14. }
  15. }
  16.  
  17.  
  18.  
  19.  
  20. class Gate[V,T](inputs : Seq[Process[Any,V]], f : Seq[V] => T)
  21. { self =>
  22.  
  23. private val queues: Seq[Queue[V]] = inputs.map(stream => {
  24. val queue = async.boundedQueue[V](10)
  25. stream to queue.enqueue
  26. queue
  27. })
  28. private val output_q = async.boundedQueue[T](10)
  29.  
  30.  
  31.  
  32.  
  33.  
  34. def output = Process.suspend {
  35. import scalaz.Scalaz._
  36. if(queues.forall(v=>{ v.size.discrete.runLast.run match {
  37. case Some(size) => (size > 0)
  38. case None => false
  39. }}))
  40. {
  41. println("all values are here")
  42. val step_map: Seq[V] = queues.map(in_queue => {in_queue.dequeue.take(1).runLast.run match {
  43. case Some(valu) => valu
  44. case None => null
  45. }})
  46. output_q.enqueueOne(f(step_map))
  47. }
  48. inputs map {in => in.awaitOption}
  49.  
  50. }
  51.  
  52.  
  53.  
  54. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement