Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def insertSortedOrUpdateNaive[T: Ordering](elems: Iterator[T], elem: T)(update: (T, T) => T): Traversable[T] = {
- val cmp = Ordering[T]
- val items = ListBuffer[T]()
- var added = false
- while (!added && elems.hasNext) {
- elems.next() match {
- case head if cmp.lt(head, elem) => items += head
- case head if cmp.equiv(head, elem) =>
- items += update(head, elem)
- added = true
- case head =>
- items += elem
- items += head
- added = true
- }
- }
- if (!added) {
- items += elem
- }
- elems.foreach { e =>
- items += e
- }
- items
- }
- def insertRowOrUpdateExisting(reader: TraversableOnce[(Int, Int)],
- bucket: Int,
- value: Int,
- mergeBy: MergeFunction = Sum): Traversable[(Int, Int)] = {
- implicit val orderByRowId: Ordering[(Int, Int)] = Ordering.by(t => t._1)
- val newRow = (bucket, value)
- Utils.insertSortedOrUpdateNaive(reader.toIterator, newRow) {
- case ((rowId, oldValue), (_, newValue)) =>
- val old = OldValue(oldValue)
- val `new` = NewValue(newValue)
- val result = mergeBy(old, `new`)
- (rowId, result.intValue)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement