Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package scala.lms.tutorial
- import lms.core.stub._
- import lms.core.utils
- import lms.macros.SourceContext
- import lms.core.virtualize
- import scala.collection.{mutable,immutable}
- import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, BlockLocation}
- import org.apache.hadoop.hdfs.DistributedFileSystem;
- import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
- import org.apache.hadoop.hdfs.tools.DFSck;
- import org.apache.hadoop.conf.Configuration;
- @virtualize
- class MPI2Test extends TutorialFunSuite {
- val under = "mpi_"
- /**
- MPI API
- -------
- Using MPI requires a few additional headers and support functions, and programs are
- typically compiled and launched with the `mpicc` and `mpirun` tools. We define a
- subclass of `DslDriver` that contains the necessary infrastructure.
- */
- abstract class MPIDriver[T:Manifest,U:Manifest] extends DslDriverC[T,U] with ScannerLowerExp { q =>
- override val codegen = new DslGenC with CGenScannerLower with Run.CGenPreamble {
- val IR: q.type = q
- }
- codegen.registerHeader("<mpi.h>")
- codegen.registerHeader("<string.h>")
- compilerCommand = "mpicc"
- override def eval(a: T): Unit = {
- import scala.sys.process._
- import lms.core.utils._
- val f1 = f; // compile!
- def f2(a: T) = (s"mpirun /tmp/snippet $a": ProcessBuilder).lines.foreach(Console.println _)
- time("eval")(f2(a))
- }
- var pid: Rep[Int] = null
- var nprocs: Rep[Int] = null
- override def wrapper(x: Rep[T]): Rep[U] = {
- unchecked[Unit]("int argc = 0; char** argv = (char**)malloc(0); int provided");
- unchecked[Unit]("MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided)")
- var nprocs1 = 0
- unchecked[Unit]("MPI_Comm_size(MPI_COMM_WORLD, &", nprocs1, ")")
- var myrank = 0
- unchecked[Unit]("MPI_Comm_rank(MPI_COMM_WORLD, &", myrank, ")")
- unchecked[Unit]("MPI_Request req")
- unchecked[Unit]("MPI_Status status")
- pid = readVar(myrank)
- nprocs = readVar(nprocs1)
- val res = super.wrapper(x)
- unchecked[Unit]("MPI_Finalize()")
- res
- }
- def MPI_Issend(msg: Rep[Array[Int]], off: Rep[Int], len: Rep[Int], dst: Rep[Int]) = unchecked[Unit]("MPI_Issend(",msg," + (",off,"), ",len,", MPI_INT, ",dst,", 0, MPI_COMM_WORLD, &req)")
- def MPI_Irecv(msg: Rep[Array[Int]], off: Rep[Int], len: Rep[Int], src: Rep[Int]) = unchecked[Unit]("MPI_Irecv(",msg," + (",off,"), ",len,", MPI_INT, ",src,", 0, MPI_COMM_WORLD, &req)")
- def MPI_Barrier() = unchecked[Unit]("MPI_Barrier(MPI_COMM_WORLD)")
- abstract class RField {
- def print()
- def compare(o: RField): Rep[Boolean]
- def hash: Rep[Long]
- }
- type Schema = Vector[String]
- case class RString(data: Rep[String], len: Rep[Int]) extends RField {
- def print() = printf("%.*s", len, data)//prints(data)
- def printLen() = printf("%.*s", len, data)//printl(data, len)
- def compare(o: RField) = o match { case RString(data2, len2) => if (len == len2) {
- // TODO: we may or may not want to inline this (code bloat and icache considerations).
- var i = 0
- while (i < len && data.charAt(i) == data2.charAt(i)) {
- i += 1
- }
- i == len
- } else false }
- def hash = data.HashCode(len)
- }
- case class RInt(value: Rep[Int]) extends RField {
- def print() = printf("%d",value)
- def compare(o: RField) = o match { case RInt(v2) => value == v2 }
- def hash = value.asInstanceOf[Rep[Long]]
- }
- type Fields = Vector[RField]
- def fieldsEqual(a: Fields, b: Fields) = (a zip b).foldLeft(unit(true)) { (a,b) => b._1 compare b._2 }
- def fieldsHash(a: Fields) = a.foldLeft(unit(0L)) { _ * 41L + _.hash }
- def isNumericCol(s: String) = s.startsWith("#")
- case class Record(fields: Fields, schema: Schema) {
- def apply(key: String): RField = fields(schema indexOf key)
- def apply(keys: Schema): Fields = keys.map(this apply _)
- }
- abstract class ColBuffer
- case class IntColBuffer(data: Rep[Array[Int]]) extends ColBuffer
- case class StringColBuffer(data: Rep[Array[String]], len: Rep[Array[Int]]) extends ColBuffer
- class ArrayBuffer(dataSize: Int, schema: Schema) {
- val buf = schema.map {
- case hd if isNumericCol(hd) => IntColBuffer(NewArray[Int](dataSize))
- case _ => StringColBuffer(NewArray[String](dataSize), NewArray[Int](dataSize))
- }
- var len = 0
- def +=(x: Fields) = {
- this(len) = x
- len += 1
- }
- def update(i: Rep[Int], x: Fields) = (buf,x).zipped.foreach {
- case (IntColBuffer(b), RInt(x)) => b(i) = x
- case (StringColBuffer(b,l), RString(x,y)) => b(i) = x; l(i) = y
- }
- def apply(i: Rep[Int]) = buf.map {
- case IntColBuffer(b) => RInt(b(i))
- case StringColBuffer(b,l) => RString(b(i),l(i))
- }
- }
- object hashDefaults {
- val hashSize = (1 << 8)
- val keysSize = hashSize
- val bucketSize = (1 << 8)
- val dataSize = keysSize * bucketSize
- }
- // common base class to factor out commonalities of group and join hash tables
- class HashMapBase(keySchema: Schema, schema: Schema) {
- import hashDefaults._
- val keys = new ArrayBuffer(keysSize, keySchema)
- val keyCount = var_new(0)
- val hashMask = hashSize - 1
- val htable = NewArray[Int](hashSize)
- for (i <- 0 until hashSize :Rep[Range]) { htable(i) = -1 }
- def lookup(k: Fields) = lookupInternal(k,None)
- def lookupOrUpdate(k: Fields)(init: Rep[Int]=>Rep[Unit]) = lookupInternal(k,Some(init))
- def lookupInternal(k: Fields, init: Option[Rep[Int]=>Rep[Unit]]): Rep[Int] =
- comment[Int]("hash_lookup") {
- val h = fieldsHash(k).toInt
- var pos = h & hashMask
- while (htable(pos) != -1 && !fieldsEqual(keys(htable(pos)),k)) {
- pos = (pos + 1) & hashMask
- }
- if (init.isDefined) {
- if (htable(pos) == -1) {
- val keyPos = keyCount: Rep[Int] // force read
- keys(keyPos) = k
- keyCount += 1
- htable(pos) = keyPos
- init.get(keyPos)
- keyPos
- } else {
- htable(pos)
- }
- } else {
- htable(pos)
- }
- }
- }
- class HashMapAgg(keySchema: Schema, schema: Schema) extends HashMapBase(keySchema: Schema, schema: Schema) {
- import hashDefaults._
- val values = new ArrayBuffer(keysSize, schema) // assuming all summation fields are numeric
- def apply(k: Fields) = new {
- def +=(v: Fields) = {
- val keyPos = lookupOrUpdate(k) { keyPos =>
- values(keyPos) = schema.map(_ => RInt(0))
- }
- values(keyPos) = (values(keyPos) zip v) map { case (RInt(x), RInt(y)) => RInt(x + y) }
- }
- }
- def foreach(f: (Fields,Fields) => Rep[Unit]): Rep[Unit] = {
- for (i <- 0 until keyCount) {
- f(keys(i),values(i))
- }
- }
- }
- }
- /**
- ### Staged and Distributed Implementation
- TODO / Exercise: complete the implementation by writing an mmap-based
- Scanner (assuming each cluster node has access to a common file system)
- and adapting the hash table implementation used for `GroupBy` in the
- query tutorial to the distributed setting with communication along the
- lines used in the character histogram above.
- */
- test("wordcount_staged_seq") {
- //@virtualize
- val snippet = new MPIDriver[String,Unit] {
- def StringScanner(input: String) = new {
- val data = uncheckedPure[Array[Char]](unit(input))
- val pos = var_new(0)
- def next(d: Rep[Char]) = {
- val start: Rep[Int] = pos // force read
- while (data(pos) != d) pos += 1
- val len:Rep[Int] = pos - start
- pos += 1
- RString(stringFromCharArray(data,start,len), len)
- }
- def hasNext = pos < input.length
- }
- trait DataLoop {
- def foreach(f: RString => Unit): Unit
- }
- def parse(str: String) = new DataLoop {
- val sc = StringScanner(str)
- def foreach(f: RString => Unit) = {
- while(sc.hasNext) {
- f(sc.next(' '))
- }
- }
- }
- def snippet(arg: Rep[String]): Rep[Unit] = {
- if (pid == 0) {
- val input = "foo bar baz foo bar foo foo foo boom bang boom boom yum"
- val keySchema = Vector("word")
- val dataSchema = Vector("#count")
- val hm = new HashMapAgg(keySchema, dataSchema)
- // loop through string one word at a time
- parse(input).foreach { word: RString =>
- val key = Vector(word)
- hm(key) += Vector(RInt(1))
- }
- hm.foreach {
- case (key, v) =>
- key.head.asInstanceOf[RString].printLen() // force cast to RString for printLen
- printf(" ")
- v.head.print()
- printf("\n")
- }
- }
- /*input.foreach { c =>
- histogram(c) += 1
- }
- histogram.exchange()
- histogram.foreach { (c,n) =>
- //if (n != 0)
- printf("%d: '%c' %d\n", pid, c, n)
- }*/
- }
- }
- //val expected = snippet.groupBy(c => c).map { case (c,cs) => s": '$c' ${cs.length}" }.toSet
- val actual = lms.core.utils.captureOut(snippet.eval("ARG")) // drop pid, since we don't know many here
- val expected = actual
- assert { actual == expected }
- check("wordcount_seq", snippet.code, "c")
- }
- test("wordcount_staged_par") {
- //@virtualize
- val snippet = new MPIDriver[String,Unit] {
- def StringScanner(input: String) = new {
- val data = uncheckedPure[Array[Char]](unit(input))
- val pos = var_new(0)
- def next(d: Rep[Char]) = {
- val start: Rep[Int] = pos // force read
- while (data(pos) != d) pos += 1
- val len:Rep[Int] = pos - start
- pos += 1
- RString(stringFromCharArray(data,start,len), len)
- }
- def hasNext = pos < input.length
- }
- trait DataLoop {
- def foreach(f: RString => Unit): Unit
- }
- def parse(str: String) = new DataLoop {
- val sc = StringScanner(str)
- def foreach(f: RString => Unit) = {
- while(sc.hasNext) {
- f(sc.next(' '))
- }
- }
- }
- def snippet(arg: Rep[String]): Rep[Unit] = {
- val input = "foo bar baz foo bar foo foo foo boom bang boom boom yum"
- //val size = input.split(" ").length
- //val mySize = size * (pid + 1) / nproc
- val keySchema = Vector("word")
- val dataSchema = Vector("#count")
- val hm = new HashMapAgg(keySchema, dataSchema)
- // loop through string one word at a time
- parse(input).foreach { word: RString =>
- val key = Vector(word)
- hm(key) += Vector(RInt(1))
- }
- hm.foreach{f: (Fields,Fields) =>
- f._1.foreach{s: RField => s.print()}
- f._2.foreach{s: RField => s.print()}
- }
- println(hm.values.buf.getClass.getName)
- // This will be the part for exchange
- //HashMapAgg2.exchange() // Not done yet
- /*hm.foreach {
- case (key, v) =>
- key.head.asInstanceOf[RString].printLen()
- printf(": ")
- v.head.print()
- printf("\n")
- }*/
- }
- }
- //val expected = snippet.groupBy(c => c).map { case (c,cs) => s": '$c' ${cs.length}" }.toSet
- val actual = lms.core.utils.captureOut(snippet.eval("ARG")) // drop pid, since we don't know many here
- println(actual)
- //assert { actual == expected }
- //check("wordcount_seq", snippet.code, "c")
- }
- test("hdfs scanner test"){
- //@virtualize
- val snippet = new MPIDriver[String,Unit] {
- case class HdfsMeta() extends DistributedFileSystem{
- def getDataDirs(): Array[String] = {
- val dataDirsParam = getConf().get("dfs.datanode.data.dir");
- dataDirsParam.split(",")
- }
- def getDataNodes() : Array[String] = {
- val dataNodeStats = getDataNodeStats
- val hosts = new Array[String](dataNodeStats.length)
- println("------hdfs test------")
- println(dataNodeStats.length)
- //for (i <- 0 to dataNodeStats.length)
- //hosts(i) = dataNodeStats(i).getHostName()
- hosts
- }
- def printFileState(status: FileStatus): Unit = {
- println("Metadata for: " + status.getPath)
- println("Is Directory : " + status.isDirectory)
- println("Is Symlink: " + status.isSymlink)
- println("Encrypted: " + status.isEncrypted)
- println("Length: " + status.getLen)
- println("Replication: " + status.getReplication)
- println("Blocksize: " + status.getBlockSize)
- }
- }
- def snippet(arg: Rep[String]): Rep[Unit] = {
- //val conf = new Configuration()
- //val path = new Path("/user/hadoop/hdfs.test");
- //conf.set( "fs.defaultFS", "hdfs://localhost:8020" )
- //val hdfs = FileSystem.get(conf)
- //println(hdfs.getClass)
- //val dataDirsParam = conf.get("dfs.datanode.data.dir");
- //val hdfsMeta = HdfsMeta()
- //println(dataDirsParam.split(","))
- }
- }
- //val actual = lms.core.utils.captureOut(snippet.eval("ARG"))
- //rintln(actual)
- }
- test("wordcount_mmap_hdfs") {
- val actual = lms.core.utils.captureOut(new API {
- /*val conf = new Configuration()
- val path = new Path("/user/hadoop/test2");
- conf.set( "fs.defaultFS", "hdfs://localhost:8020" )
- val hdfs = FileSystem.get(conf)
- val fsck = new DFSck(conf)
- val cmds = Array("/user/hadoop/test2", "-files", "-blocks", "-locations")
- val fileStatus = hdfs.getFileStatus(path)
- println(fileStatus.getPath)
- println(fileStatus.isDirectory)
- println(fileStatus.isFile)
- println(fileStatus.getLen)
- println(hdfs.getDefaultBlockSize)
- val blockLocations = hdfs.getFileBlockLocations(path, 0, fileStatus.getLen)
- val blockLocation = blockLocations(0)
- blockLocation.getNames.map(s => println(s))
- fsck.run(cmds)*/
- //println(conf.getLocalPath("/usr/local/Cellar/hadoop/hdfs/tmp", "/user/hadoop/test2"))
- //val fsckCmd = "hdfs fsck /user/hadoop/test2 -files -blocks -locations" !;
- //println(fsckCmd)
- })
- //println(actual)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement