Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- {"Floor_Id":"Shop Floor 1","Timestamp":1550589234000,"HaltRecord":{"HaltReason":"Test1","Severity":"Low","FaultErrorCategory":"Docked","NonFaultErrorCategory":null},"Category":{"Type":"Halt","End_time":1549010152834,"Start_time":1549009072834}}
- {"Floor_Id":"Shop Floor 1","Timestamp":1550589294000,"HaltRecord":{"HaltReason":"Test1","Severity":"Low","FaultErrorCategory":"Docked","NonFaultErrorCategory":null},"Category":{"Type":"Halt","End_time":1549010152834,"Start_time":1549009072834}}
- {"Floor_Id":"Shop Floor 1","Timestamp":1550589354000,"HaltRecord":{"HaltReason":"Test1","Severity":"Low","FaultErrorCategory":"Docked","NonFaultErrorCategory":null},"Category":{"Type":"Halt","End_time":1549010152834,"Start_time":1549009072834}}
- { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 1, "Date_Time" : 1550589295000}}]}
- { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 2, "Date_Time" : 1550589235000}}]}
- { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 3, "Date_Time" : 1550589295000}}]}
- { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 4, "Date_Time" : 1550589355000}}]}
- { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 5, "Date_Time" : 1550589235000}}]}
- { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 1, "Date_Time" : 1550589235000}}]}
- { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 2, "Date_Time" : 1550589295000}}]}
- { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 3, "Date_Time" : 1550589355000}}]}
- object ErrorCategory extends App {
- Logger.getLogger("org").setLevel(Level.ERROR)
- Logger.getLogger("akka").setLevel(Level.ERROR)
- val readServer = <host>:<port>
- val readTopic = <topic1>
- val writeServer = <host>:<port>
- val writeTopic = <topic2>
- val spark = SparkSession.builder
- .appName("StreamAssetList")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .getOrCreate()
- val hma = new HashMapAccumulator()
- val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
- val streamingInputDf = spark.readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", readServer)
- .option("subscribe", readTopic)
- .load()
- val schema = new StructType()
- .add("Floor_Id", StringType)
- .add("Category",
- new StructType()
- .add("Type", StringType)
- .add("End_time", LongType)
- .add("Start_time", LongType))
- .add("HaltRecord",
- new StructType()
- .add("HaltReason", StringType)
- .add("Severity", StringType)
- .add("FaultErrorCategory", StringType)
- .add("NonFaultErrorCategory", StringType))
- .add("Timestamp", LongType)
- val streamingSelectDF = streamingInputDf.selectExpr("CAST(value AS STRING)")
- .select(from_json(col("value"), schema = schema) as "data")
- .select("data.Floor_Id", "data.Category.Type", "data.Timestamp", "data.HaltRecord.HaltReason")
- println("nntt<< Ready to process messages >> n")
- // streamingSelectDF.printSchema()
- val filterDF = streamingSelectDF
- .filter(
- streamingSelectDF("Floor_Id") === "Shop Floor 1" &&
- streamingSelectDF("IsError") === "y" && streamingSelectDF("Type") === "Halt")
- .groupBy(streamingSelectDF("Floor_Id"), streamingSelectDF("HaltReason"), window(to_timestamp(from_unixtime(streamingSelectDF("Timestamp")/1000)), "1 second", "1 second"))
- .count().writeStream.foreach(new ForeachWriter[Row] {
- var producer: KafkaProducer[String, String] = _
- override def open(partitionId: Long, version: Long): Boolean = {
- val kafkaProperties = new Properties()
- kafkaProperties.put("bootstrap.servers", writeServer)
- kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
- kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
- producer = new KafkaProducer(kafkaProperties)
- true
- }
- override def process(value: Row): Unit = {
- val dataAcc = hma.getInstance(spark.sparkContext)
- val floorId = value.getAs[String]("Floor_Id")
- val haltReason = value.getAs[String]("HaltReason")
- val count = value.getAs[Long]("count")
- val t = value.getAs[Row]("window").getAs[java.sql.Timestamp](1)
- val key = floorId +":"+haltReason
- if(floorId == null || floorId.trim.length == 0){
- return;
- }
- if(dataAcc.value.contains(key)){
- val prevCount = dataAcc.value(key)
- dataAcc.value(key) = prevCount+count
- }else{
- dataAcc += (key -> count)
- }
- var day = java.util.Calendar.getInstance().get(java.util.Calendar.DATE).toLong
- if (dataAcc.value.contains("Day")) {
- val id = dataAcc.value("Day")
- if (id != day)
- dataAcc.zero
- } else {
- dataAcc += ("Day" -> day)
- }
- var oMap = dataAcc.value - "Day";
- var outputMap = new MutableHashMap[String, MutableHashMap[String,Long]]()
- for (key <- oMap.keys) {
- val fid = key.split(":")(0)
- val cat = key.split(":")(1)
- if(outputMap.contains(fid)){
- var catMap = outputMap(fid)
- if(catMap != null && catMap.contains(cat)){
- catMap += (cat -> (catMap (cat) + dataAcc.value(key)) )
- }else{
- outputMap(fid) += (cat -> dataAcc.value(key))
- }
- }else{
- var catMap = new MutableHashMap[String,Long]()
- catMap += (cat -> dataAcc.value(key))
- outputMap += (fid -> catMap)
- }
- }
- var output = ""
- for (field <- outputMap.keys) {
- output += "{ "Floor_Id": "" + field + "", "Error_Category" : ["
- for (error <- outputMap(field).keys) {
- output += " { "Category" : "" + error + "", "DataPoints" : { "NumberOfErrors": " + outputMap(field)(error) +
- ", "Date_Time" : " + t.getTime + "}},"
- }
- }
- if(output.indexOf("Floor_Id") > 0){
- if(output.endsWith(",")){
- output = output.substring(0,output.lastIndexOf(","))
- }
- output += "]}"
- // println("output > "+output)
- producer.send(new ProducerRecord(writeTopic, output))
- }
- }
- override def close(errorOrNull: Throwable): Unit = {
- producer.close()
- }
- }).outputMode("complete")
- .trigger(Trigger.ProcessingTime("2 seconds"))
- .start.awaitTermination()
- }
- class HashMapAccumulator extends AccumulableParam[MutableHashMap[String, Long], (String, Long)] {
- private var accumulator: Accumulable[MutableHashMap[String, Long], (String, Long)] = _
- def addAccumulator(acc: MutableHashMap[String, Long], elem: (String, Long)): MutableHashMap[String, Long] = {
- val (k1, v1) = elem
- acc += acc.find(_._1 == k1).map {
- case (k2, v2) => k2 -> (v1 + v2)
- }.getOrElse(elem)
- acc
- }
- def addInPlace(acc1: MutableHashMap[String, Long], acc2: MutableHashMap[String, Long]): MutableHashMap[String, Long] = {
- acc2.foreach(elem => addAccumulator(acc1, elem))
- acc1
- }
- def zero(initialValue: MutableHashMap[String, Long]): MutableHashMap[String, Long] = {
- val ser = new JavaSerializer(new SparkConf(false)).newInstance()
- val copy = ser.deserialize[MutableHashMap[String, Long]](ser.serialize(initialValue))
- copy.clear()
- copy
- }
- def getInstance(sc: SparkContext): Accumulable[MutableHashMap[String, Long], (String, Long)] = {
- if (accumulator == null) {
- synchronized {
- if (accumulator == null) {
- accumulator = sc.accumulable(MutableHashMap.empty[String, Long],"ErrorCountAccumulator")(new HashMapAccumulator)
- }
- }
- }
- accumulator
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement