Advertisement
Guest User

Untitled

a guest
Jun 18th, 2019
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.78 KB | None | 0 0
  1.   class HashMapBase(keySchema: Schema, schema: Schema) {
  2.     import hashDefaults._
  3.  
  4.     val keys = new ArrayBuffer(keysSize, keySchema)
  5.     val keyCount = var_new(0)
  6.  
  7.     val hashMask = hashSize - 1
  8.     val htable = NewArray[Int](hashSize)
  9.     for (i <- 0 until hashSize :Rep[Range]) { htable(i) = -1 }
  10.  
  11.     def lookup(k: Fields) = lookupInternal(k,None)
  12.     def lookupOrUpdate(k: Fields)(init: Rep[Int]=>Rep[Unit]) = lookupInternal(k,Some(init))
  13.     def lookupInternal(k: Fields, init: Option[Rep[Int]=>Rep[Unit]]): Rep[Int] =
  14.     comment[Int]("hash_lookup") {
  15.       val h = fieldsHash(k).toInt
  16.       var pos = h & hashMask
  17.       while (htable(pos) != -1 && !fieldsEqual(keys(htable(pos)),k)) {
  18.         pos = (pos + 1) & hashMask
  19.       }
  20.       if (init.isDefined) {
  21.         if (htable(pos) == -1) {
  22.           val keyPos = keyCount: Rep[Int] // force read
  23.           keys(keyPos) = k
  24.           keyCount += 1
  25.           htable(pos) = keyPos
  26.           init.get(keyPos)
  27.           keyPos
  28.         } else {
  29.           htable(pos)
  30.         }
  31.       } else {
  32.         htable(pos)
  33.       }
  34.     }
  35.   }
  36.  
  37.   // hash table for groupBy, storing sums
  38.  
  39.   class HashMapAgg(keySchema: Schema, schema: Schema) extends HashMapBase(keySchema: Schema, schema: Schema) {
  40.     import hashDefaults._
  41.  
  42.     val values = new ArrayBuffer(keysSize, schema) // assuming all summation fields are numeric
  43.  
  44.     def apply(k: Fields) = new {
  45.       def +=(v: Fields) = {
  46.         val keyPos = lookupOrUpdate(k) { keyPos =>
  47.           values(keyPos) = schema.map(_ => RInt(0))
  48.         }
  49.         values(keyPos) = (values(keyPos) zip v) map { case (RInt(x), RInt(y)) => RInt(x + y) }
  50.       }
  51.     }
  52.  
  53.     def foreach(f: (Fields,Fields) => Rep[Unit]): Rep[Unit] = {
  54.       for (i <- 0 until keyCount) {
  55.         f(keys(i),values(i))
  56.       }
  57.     }
  58.  
  59.   }
  60.  
  61.  
  62.  
  63.   // column-oriented array buffer, with a row-oriented interface,
  64.   // specialized to data representation
  65.  
  66.   abstract class ColBuffer
  67.   case class IntColBuffer(data: Rep[Array[Int]]) extends ColBuffer
  68.   case class StringColBuffer(data: Rep[Array[String]], len: Rep[Array[Int]]) extends ColBuffer
  69.  
  70.   class ArrayBuffer(dataSize: Int, schema: Schema) {
  71.     val buf = schema.map {
  72.       case hd if isNumericCol(hd) => IntColBuffer(NewArray[Int](dataSize))
  73.       case _ => StringColBuffer(NewArray[String](dataSize), NewArray[Int](dataSize))
  74.     }
  75.  
  76.     var len = 0
  77.     def +=(x: Fields) = {
  78.       this(len) = x
  79.       len += 1
  80.     }
  81.     def update(i: Rep[Int], x: Fields) = (buf,x).zipped.foreach {
  82.       case (IntColBuffer(b), RInt(x)) => b(i) = x
  83.       case (StringColBuffer(b,l), RString(x,y)) => b(i) = x; l(i) = y
  84.     }
  85.     def apply(i: Rep[Int]) = buf.map {
  86.       case IntColBuffer(b) => RInt(b(i))
  87.       case StringColBuffer(b,l) => RString(b(i),l(i))
  88.     }
  89.   }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement