Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class HashMapBase(keySchema: Schema, schema: Schema) {
- import hashDefaults._
- val keys = new ArrayBuffer(keysSize, keySchema)
- val keyCount = var_new(0)
- val hashMask = hashSize - 1
- val htable = NewArray[Int](hashSize)
- for (i <- 0 until hashSize :Rep[Range]) { htable(i) = -1 }
- def lookup(k: Fields) = lookupInternal(k,None)
- def lookupOrUpdate(k: Fields)(init: Rep[Int]=>Rep[Unit]) = lookupInternal(k,Some(init))
- def lookupInternal(k: Fields, init: Option[Rep[Int]=>Rep[Unit]]): Rep[Int] =
- comment[Int]("hash_lookup") {
- val h = fieldsHash(k).toInt
- var pos = h & hashMask
- while (htable(pos) != -1 && !fieldsEqual(keys(htable(pos)),k)) {
- pos = (pos + 1) & hashMask
- }
- if (init.isDefined) {
- if (htable(pos) == -1) {
- val keyPos = keyCount: Rep[Int] // force read
- keys(keyPos) = k
- keyCount += 1
- htable(pos) = keyPos
- init.get(keyPos)
- keyPos
- } else {
- htable(pos)
- }
- } else {
- htable(pos)
- }
- }
- }
- // hash table for groupBy, storing sums
- class HashMapAgg(keySchema: Schema, schema: Schema) extends HashMapBase(keySchema: Schema, schema: Schema) {
- import hashDefaults._
- val values = new ArrayBuffer(keysSize, schema) // assuming all summation fields are numeric
- def apply(k: Fields) = new {
- def +=(v: Fields) = {
- val keyPos = lookupOrUpdate(k) { keyPos =>
- values(keyPos) = schema.map(_ => RInt(0))
- }
- values(keyPos) = (values(keyPos) zip v) map { case (RInt(x), RInt(y)) => RInt(x + y) }
- }
- }
- def foreach(f: (Fields,Fields) => Rep[Unit]): Rep[Unit] = {
- for (i <- 0 until keyCount) {
- f(keys(i),values(i))
- }
- }
- }
- // column-oriented array buffer, with a row-oriented interface,
- // specialized to data representation
- abstract class ColBuffer
- case class IntColBuffer(data: Rep[Array[Int]]) extends ColBuffer
- case class StringColBuffer(data: Rep[Array[String]], len: Rep[Array[Int]]) extends ColBuffer
- class ArrayBuffer(dataSize: Int, schema: Schema) {
- val buf = schema.map {
- case hd if isNumericCol(hd) => IntColBuffer(NewArray[Int](dataSize))
- case _ => StringColBuffer(NewArray[String](dataSize), NewArray[Int](dataSize))
- }
- var len = 0
- def +=(x: Fields) = {
- this(len) = x
- len += 1
- }
- def update(i: Rep[Int], x: Fields) = (buf,x).zipped.foreach {
- case (IntColBuffer(b), RInt(x)) => b(i) = x
- case (StringColBuffer(b,l), RString(x,y)) => b(i) = x; l(i) = y
- }
- def apply(i: Rep[Int]) = buf.map {
- case IntColBuffer(b) => RInt(b(i))
- case StringColBuffer(b,l) => RString(b(i),l(i))
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement