Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.apache.spark.examples.streaming
- import java.util.HashMap
- import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.fs.{FileSystem, Path}
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.kafka._
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.internal.Logging
- import java.awt.image.BufferedImage
- import java.io._
- import java.util.Base64
- import javax.imageio.ImageIO
- import scala.reflect.{ClassTag, classTag}
- object ImageTest {
- def dump[T: ClassTag](t: T): String = {
- "%s: %s".format(t, classTag[T])
- }
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: ImageTest <zkQuorum> <group> <topics> <numThreads>")
- System.exit(1)
- }
- println("- a -");
- val Array(zkQuorum, group, topics, numThreads) = args
- val sparkConf = new SparkConf().setAppName("ImageTest")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
- ssc.checkpoint("checkpoint")
- println("- b -");
- val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
- val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
- val words = lines.flatMap(_.split(" "))
- println("- c -");
- val wordCounts = words.map(x => (x, 1L))
- .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
- println("tt")
- val r = scala.util.Random
- //hadoop vars
- val conf = new Configuration()
- val fs = FileSystem.get(conf)
- //each rdd
- // wordCounts.foreachRDD(rdd => {
- // rdd.foreach {
- // record => {
- // println("sz: " + record._1.size)
- // val decoded: Array[Byte] = Base64.getDecoder.decode(record._1)
- // //val in = new ByteArrayInputStream(decoded)
- // //val bImageFromConvert = ImageIO.read(in)
- // val timestamp: Long = System.currentTimeMillis/1000
- // println(timestamp)
- // // ImageIO.write(bImageFromConvert, "jpg", new File("/home/dke242/Desktop/test/"+timestamp+".jpg"))
- // //ImageIO.write(bImageFromConvert, "jpg", new File("/home/csh/Desktop/csh/temp/"+timestamp+".jpg"))
- // //record.saveAsTextFile("hdfs://csh_test")
- // // val os = fs.create(new Path("/csh_test/"+timestamp+".jpg"))
- // // os.write(decoded)
- // }
- // }
- // })
- wordCounts.saveAsTextFiles("hdfs://192.168.56.101:9000/csh_test", "txt");
- println("dump wordcounts ")
- println(dump(wordCounts))
- println("- d -");
- ssc.start()
- ssc.awaitTermination()
- }
- }
Advertisement