Advertisement
Guest User

Untitled

a guest
Feb 19th, 2019
144
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.89 KB | None | 0 0
  1. {"Floor_Id":"Shop Floor 1","Timestamp":1550589234000,"HaltRecord":{"HaltReason":"Test1","Severity":"Low","FaultErrorCategory":"Docked","NonFaultErrorCategory":null},"Category":{"Type":"Halt","End_time":1549010152834,"Start_time":1549009072834}}
  2. {"Floor_Id":"Shop Floor 1","Timestamp":1550589294000,"HaltRecord":{"HaltReason":"Test1","Severity":"Low","FaultErrorCategory":"Docked","NonFaultErrorCategory":null},"Category":{"Type":"Halt","End_time":1549010152834,"Start_time":1549009072834}}
  3. {"Floor_Id":"Shop Floor 1","Timestamp":1550589354000,"HaltRecord":{"HaltReason":"Test1","Severity":"Low","FaultErrorCategory":"Docked","NonFaultErrorCategory":null},"Category":{"Type":"Halt","End_time":1549010152834,"Start_time":1549009072834}}
  4.  
  5. { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 1, "Date_Time" : 1550589295000}}]}
  6. { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 2, "Date_Time" : 1550589235000}}]}
  7. { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 3, "Date_Time" : 1550589295000}}]}
  8. { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 4, "Date_Time" : 1550589355000}}]}
  9. { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 5, "Date_Time" : 1550589235000}}]}
  10.  
  11. { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 1, "Date_Time" : 1550589235000}}]}
  12. { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 2, "Date_Time" : 1550589295000}}]}
  13. { "Floor_Id": "Shop Floor 1", "Error_Category" : [ { "Category" : "Test1", "DataPoints" : { "NumberOfErrors": 3, "Date_Time" : 1550589355000}}]}
  14.  
  15. object ErrorCategory extends App {
  16.  
  17. Logger.getLogger("org").setLevel(Level.ERROR)
  18. Logger.getLogger("akka").setLevel(Level.ERROR)
  19.  
  20. val readServer = <host>:<port>
  21. val readTopic = <topic1>
  22. val writeServer = <host>:<port>
  23. val writeTopic = <topic2>
  24.  
  25. val spark = SparkSession.builder
  26. .appName("StreamAssetList")
  27. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  28. .getOrCreate()
  29.  
  30. val hma = new HashMapAccumulator()
  31.  
  32. val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
  33.  
  34. val streamingInputDf = spark.readStream
  35. .format("kafka")
  36. .option("kafka.bootstrap.servers", readServer)
  37. .option("subscribe", readTopic)
  38. .load()
  39.  
  40.  
  41. val schema = new StructType()
  42. .add("Floor_Id", StringType)
  43. .add("Category",
  44. new StructType()
  45. .add("Type", StringType)
  46. .add("End_time", LongType)
  47. .add("Start_time", LongType))
  48. .add("HaltRecord",
  49. new StructType()
  50. .add("HaltReason", StringType)
  51. .add("Severity", StringType)
  52. .add("FaultErrorCategory", StringType)
  53. .add("NonFaultErrorCategory", StringType))
  54. .add("Timestamp", LongType)
  55.  
  56.  
  57. val streamingSelectDF = streamingInputDf.selectExpr("CAST(value AS STRING)")
  58. .select(from_json(col("value"), schema = schema) as "data")
  59. .select("data.Floor_Id", "data.Category.Type", "data.Timestamp", "data.HaltRecord.HaltReason")
  60.  
  61. println("nntt<< Ready to process messages >> n")
  62. // streamingSelectDF.printSchema()
  63.  
  64. val filterDF = streamingSelectDF
  65. .filter(
  66. streamingSelectDF("Floor_Id") === "Shop Floor 1" &&
  67. streamingSelectDF("IsError") === "y" && streamingSelectDF("Type") === "Halt")
  68. .groupBy(streamingSelectDF("Floor_Id"), streamingSelectDF("HaltReason"), window(to_timestamp(from_unixtime(streamingSelectDF("Timestamp")/1000)), "1 second", "1 second"))
  69. .count().writeStream.foreach(new ForeachWriter[Row] {
  70.  
  71. var producer: KafkaProducer[String, String] = _
  72.  
  73. override def open(partitionId: Long, version: Long): Boolean = {
  74.  
  75. val kafkaProperties = new Properties()
  76.  
  77. kafkaProperties.put("bootstrap.servers", writeServer)
  78. kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  79. kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  80.  
  81. producer = new KafkaProducer(kafkaProperties)
  82. true
  83. }
  84.  
  85. override def process(value: Row): Unit = {
  86.  
  87. val dataAcc = hma.getInstance(spark.sparkContext)
  88.  
  89. val floorId = value.getAs[String]("Floor_Id")
  90. val haltReason = value.getAs[String]("HaltReason")
  91. val count = value.getAs[Long]("count")
  92.  
  93. val t = value.getAs[Row]("window").getAs[java.sql.Timestamp](1)
  94.  
  95. val key = floorId +":"+haltReason
  96. if(floorId == null || floorId.trim.length == 0){
  97. return;
  98. }
  99. if(dataAcc.value.contains(key)){
  100. val prevCount = dataAcc.value(key)
  101. dataAcc.value(key) = prevCount+count
  102. }else{
  103. dataAcc += (key -> count)
  104. }
  105.  
  106. var day = java.util.Calendar.getInstance().get(java.util.Calendar.DATE).toLong
  107.  
  108. if (dataAcc.value.contains("Day")) {
  109. val id = dataAcc.value("Day")
  110. if (id != day)
  111. dataAcc.zero
  112. } else {
  113. dataAcc += ("Day" -> day)
  114. }
  115.  
  116. var oMap = dataAcc.value - "Day";
  117. var outputMap = new MutableHashMap[String, MutableHashMap[String,Long]]()
  118.  
  119. for (key <- oMap.keys) {
  120.  
  121. val fid = key.split(":")(0)
  122. val cat = key.split(":")(1)
  123.  
  124. if(outputMap.contains(fid)){
  125. var catMap = outputMap(fid)
  126.  
  127. if(catMap != null && catMap.contains(cat)){
  128. catMap += (cat -> (catMap (cat) + dataAcc.value(key)) )
  129. }else{
  130. outputMap(fid) += (cat -> dataAcc.value(key))
  131. }
  132. }else{
  133. var catMap = new MutableHashMap[String,Long]()
  134. catMap += (cat -> dataAcc.value(key))
  135. outputMap += (fid -> catMap)
  136. }
  137. }
  138.  
  139. var output = ""
  140.  
  141. for (field <- outputMap.keys) {
  142. output += "{ "Floor_Id": "" + field + "", "Error_Category" : ["
  143. for (error <- outputMap(field).keys) {
  144. output += " { "Category" : "" + error + "", "DataPoints" : { "NumberOfErrors": " + outputMap(field)(error) +
  145. ", "Date_Time" : " + t.getTime + "}},"
  146. }
  147. }
  148.  
  149. if(output.indexOf("Floor_Id") > 0){
  150. if(output.endsWith(",")){
  151. output = output.substring(0,output.lastIndexOf(","))
  152. }
  153. output += "]}"
  154. // println("output > "+output)
  155. producer.send(new ProducerRecord(writeTopic, output))
  156. }
  157.  
  158.  
  159. }
  160.  
  161. override def close(errorOrNull: Throwable): Unit = {
  162. producer.close()
  163. }
  164. }).outputMode("complete")
  165. .trigger(Trigger.ProcessingTime("2 seconds"))
  166. .start.awaitTermination()
  167.  
  168. }
  169.  
  170. class HashMapAccumulator extends AccumulableParam[MutableHashMap[String, Long], (String, Long)] {
  171.  
  172. private var accumulator: Accumulable[MutableHashMap[String, Long], (String, Long)] = _
  173.  
  174. def addAccumulator(acc: MutableHashMap[String, Long], elem: (String, Long)): MutableHashMap[String, Long] = {
  175. val (k1, v1) = elem
  176. acc += acc.find(_._1 == k1).map {
  177. case (k2, v2) => k2 -> (v1 + v2)
  178. }.getOrElse(elem)
  179.  
  180. acc
  181. }
  182.  
  183. def addInPlace(acc1: MutableHashMap[String, Long], acc2: MutableHashMap[String, Long]): MutableHashMap[String, Long] = {
  184. acc2.foreach(elem => addAccumulator(acc1, elem))
  185. acc1
  186. }
  187.  
  188. def zero(initialValue: MutableHashMap[String, Long]): MutableHashMap[String, Long] = {
  189. val ser = new JavaSerializer(new SparkConf(false)).newInstance()
  190. val copy = ser.deserialize[MutableHashMap[String, Long]](ser.serialize(initialValue))
  191. copy.clear()
  192. copy
  193. }
  194.  
  195. def getInstance(sc: SparkContext): Accumulable[MutableHashMap[String, Long], (String, Long)] = {
  196. if (accumulator == null) {
  197. synchronized {
  198. if (accumulator == null) {
  199. accumulator = sc.accumulable(MutableHashMap.empty[String, Long],"ErrorCountAccumulator")(new HashMapAccumulator)
  200. }
  201. }
  202. }
  203. accumulator
  204. }
  205.  
  206. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement