seonghyeoncho96

ImageTest.scala

May 2nd, 2017
150
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.58 KB | None | 0 0
  1. package org.apache.spark.examples.streaming
  2. import java.util.HashMap
  3.  
  4. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
  5.  
  6. import org.apache.hadoop.conf.Configuration
  7. import org.apache.hadoop.fs.{FileSystem, Path}
  8.  
  9. import org.apache.spark.SparkConf
  10. import org.apache.spark.streaming._
  11. import org.apache.spark.streaming.kafka._
  12.  
  13. import org.apache.log4j.{Level, Logger}
  14. import org.apache.spark.internal.Logging
  15.  
  16. import java.awt.image.BufferedImage
  17. import java.io._
  18. import java.util.Base64
  19. import javax.imageio.ImageIO
  20.  
  21. import scala.reflect.{ClassTag, classTag}
  22.  
  23.  
  24. object ImageTest {
  25.     def dump[T: ClassTag](t: T): String = {
  26.         "%s: %s".format(t, classTag[T])
  27.     }
  28.  
  29.     def main(args: Array[String]) {
  30.         if (args.length < 4) {
  31.             System.err.println("Usage: ImageTest <zkQuorum> <group> <topics> <numThreads>")
  32.             System.exit(1)
  33.         }
  34.  
  35.         println("- a -");
  36.  
  37.         val Array(zkQuorum, group, topics, numThreads) = args
  38.         val sparkConf = new SparkConf().setAppName("ImageTest")
  39.         val ssc = new StreamingContext(sparkConf, Seconds(2))
  40.         ssc.checkpoint("checkpoint")
  41.  
  42.         println("- b -");
  43.  
  44.         val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
  45.         val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
  46.         val words = lines.flatMap(_.split(" "))
  47.  
  48.         println("- c -");
  49.  
  50.         val wordCounts = words.map(x => (x, 1L))
  51.             .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
  52.        
  53.         println("tt")
  54.  
  55.         val r = scala.util.Random
  56.  
  57.         //hadoop vars
  58.         val conf = new Configuration()
  59.         val fs = FileSystem.get(conf)
  60.  
  61.         //each rdd
  62.         // wordCounts.foreachRDD(rdd => {
  63.         //  rdd.foreach {
  64.         //      record => {
  65.         //          println("sz: " + record._1.size)
  66.         //          val decoded: Array[Byte] = Base64.getDecoder.decode(record._1)
  67.  
  68.         //          //val in = new ByteArrayInputStream(decoded)
  69.         //          //val bImageFromConvert = ImageIO.read(in)
  70.         //          val timestamp: Long = System.currentTimeMillis/1000
  71.  
  72.         //          println(timestamp)
  73.                    
  74.         //          // ImageIO.write(bImageFromConvert, "jpg", new File("/home/dke242/Desktop/test/"+timestamp+".jpg"))
  75.         //          //ImageIO.write(bImageFromConvert, "jpg", new File("/home/csh/Desktop/csh/temp/"+timestamp+".jpg"))
  76.  
  77.         //          //record.saveAsTextFile("hdfs://csh_test")
  78.         //          // val os = fs.create(new Path("/csh_test/"+timestamp+".jpg"))
  79.         //          // os.write(decoded)
  80.         //      }
  81.         //  }
  82.         // })
  83.  
  84.         wordCounts.saveAsTextFiles("hdfs://192.168.56.101:9000/csh_test", "txt");
  85.        
  86.  
  87.         println("dump wordcounts ")
  88.         println(dump(wordCounts))
  89.  
  90.         println("- d -");
  91.  
  92.         ssc.start()
  93.         ssc.awaitTermination()
  94.     }
  95. }
Advertisement