Guest User

Untitled

a guest
May 27th, 2016
50
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.72 KB | None | 0 0
  1. package com.stripe.scorching
  2.  
  3. import com.twitter.scalding.serialization.{OrderedSerialization, Serialization}
  4.  
  5. sealed trait Collection[+T] {
  6. import Collection._
  7. def concatMap[U](f: T => TraversableOnce[U]): Collection[U] = ConcatMap(this, f)
  8. def group[K, V](implicit ord: OrderedSerialization[K],
  9. vser: Serialization[V],
  10. ev: T <:< (K, V)): Grouping[K, V] =
  11. GroupProof[K, V](this.asInstanceOf[Collection[(K, V)]], ord, vser)
  12. def map[U](f: T => U): Collection[U] = Map(this, f)
  13. def write(sink: Sink[T]): Action[Unit] = WriteAction(this, sink)
  14. }
  15.  
  16. sealed trait Grouping[K, +V] {
  17. import Collection._
  18. def mapGroup[U](f: (K, Iterator[V]) => Iterator[U]): Grouping[K, U] = MapGroup(this, f)
  19. def cogroup[U, R](that: Grouping[K, U])(f: (K, Iterator[V], Iterable[U]) => Iterator[R]): Grouping[K, R] = CoGroup(this, that, f)
  20. def ungroup: Collection[(K, V)] = Ungrouped(this)
  21. }
  22.  
  23. trait Sink[-T]
  24. trait Source[+T]
  25.  
  26. sealed trait Action[+T] {
  27. import Collection._
  28. def map[U](f: T => U): Action[U] = MapAction(this, f)
  29. def zip[U](that: Action[U]): Action[(T, U)] = ZipAction(this, that)
  30. def flatMap[U](next: T => Action[U]): Action[U] = FlatMapAction(this, next)
  31. }
  32.  
  33. object Collection {
  34. def read[T](source: Source[T]): Collection[T] = Read(source)
  35. implicit class InvariantMethods[T](s: Collection[T]) {
  36. def forceToDisk(implicit ser: Serialization[T]): Collection[T] = Forced(s, ser)
  37. }
  38.  
  39. private[scorching] case class Read[T](source: Source[T]) extends Collection[T]
  40. private[scorching] case class Map[A, B](s: Collection[A], f: A => B) extends Collection[B]
  41. private[scorching] case class ConcatMap[A, B](s: Collection[A], f: A => TraversableOnce[B]) extends Collection[B]
  42. private[scorching] case class Forced[T](s: Collection[T], ser: Serialization[T]) extends Collection[T]
  43. private[scorching] case class Ungrouped[K, V](g: Grouping[K, V]) extends Collection[(K, V)]
  44.  
  45. private[scorching] case class GroupProof[K, V](c: Collection[(K, V)],
  46. k: OrderedSerialization[K],
  47. v: Serialization[V]) extends Grouping[K, V]
  48. private[scorching] case class MapGroup[K, V, W](g: Grouping[K, V], f: (K, Iterator[V]) => Iterator[W]) extends Grouping[K, W]
  49. private[scorching] case class CoGroup[K, V, W, R](left: Grouping[K, V],
  50. right: Grouping[K, W],
  51. fn: (K, Iterator[V], Iterable[W]) => Iterator[R]) extends Grouping[K, R]
  52.  
  53. private[scorching] case class MapAction[A, B](first: Action[A], next: A => B) extends Action[B]
  54. private[scorching] case class ZipAction[A, B](left: Action[A], right: Action[B]) extends Action[(A, B)]
  55. private[scorching] case class FlatMapAction[A, B](first: Action[A], next: A => Action[B]) extends Action[B]
  56. private[scorching] case class WriteAction[T](c: Collection[T], sink: Sink[T]) extends Action[Unit]
  57. }
Add Comment
Please, Sign In to add comment