Advertisement
Guest User

Untitled

a guest
Jun 22nd, 2015
200
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.28 KB | None | 0 0
  1.  def insertSortedOrUpdateNaive[T: Ordering](elems: Iterator[T], elem: T)(update: (T, T) => T): Traversable[T] = {
  2.     val cmp = Ordering[T]
  3.     val items = ListBuffer[T]()
  4.     var added = false
  5.     while (!added && elems.hasNext) {
  6.       elems.next() match {
  7.         case head if cmp.lt(head, elem) => items += head
  8.         case head if cmp.equiv(head, elem) =>
  9.           items += update(head, elem)
  10.           added = true
  11.         case head =>
  12.           items += elem
  13.           items += head
  14.           added = true
  15.       }
  16.     }
  17.     if (!added) {
  18.       items += elem
  19.     }
  20.     elems.foreach { e =>
  21.       items += e
  22.     }
  23.     items
  24.   }
  25.  
  26.   def insertRowOrUpdateExisting(reader: TraversableOnce[(Int, Int)],
  27.                                 bucket: Int,
  28.                                 value: Int,
  29.                                 mergeBy: MergeFunction = Sum): Traversable[(Int, Int)] = {
  30.  
  31.     implicit val orderByRowId: Ordering[(Int, Int)] = Ordering.by(t => t._1)
  32.     val newRow = (bucket, value)
  33.     Utils.insertSortedOrUpdateNaive(reader.toIterator, newRow) {
  34.       case ((rowId, oldValue), (_, newValue)) =>
  35.         val old = OldValue(oldValue)
  36.         val `new` = NewValue(newValue)
  37.         val result = mergeBy(old, `new`)
  38.         (rowId, result.intValue)
  39.     }
  40.   }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement