Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.sec.mvingAvg
- //Jai Vekeria
- //Import statements for the required libraries
- import scala.collection.mutable.{ Map => MMap }
- import scala.math._
- import scala.collection.mutable.ArrayBuffer
- import scala.collection.mutable.HashMap
- import kafka.serializer.StringDecoder
- import kafka.utils.Time
- import java.util.Date
- import java.text.SimpleDateFormat
- import java.text.DateFormat
- import java.util.Calendar
- import java.io._
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.{ Minutes, Seconds, StreamingContext }
- import org.apache.spark.streaming.dstream.DStream
- import org.apache.spark.streaming.kafka._
- import org.apache.spark.SparkContext
- import org.apache.commons.math3.exception.MathIllegalArgumentException
- import org.apache.commons.math3.exception.util.LocalizedFormats
- import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
- import org.apache.spark.mllib.linalg.Vectors
- import org.apache.log4j.LogManager
- import org.slf4j.Logger
- import org.apache.log4j.Level
- import com.cloudera.sparkts.models._
- import com.cloudera.sparkts.TimeSeriesRDD
- import com.fasterxml.jackson.databind.ObjectMapper
- import com.fasterxml.jackson.module.scala.DefaultScalaModule
- import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
- import breeze.linalg.{ SparseVector, DenseMatrix, DenseVector }
- /*
- * This program accepts 7 arguments -> brokers, topics, sw, lw, sigma, maxSize, features
- * brokers - Kafka broker
- * topics - kafka topic
- * sw - sliding window
- * lw = window duration
- * sigma = sigma for bollinger band
- * maxSize = minimum amount of data required to make prediction
- * features = list of features, should be the feature followed by the minimum band ie. tcp_syn-100
- */
- object multFeat {
- def main(args: Array[String]) {
- // check command line arguments
- if (args.length < 2) {
- Static.logger.error(s"""
- |Usage: DirectKafkaWordCount <brokers> <topics>
- | <brokers> is a list of one or more Kafka brokers
- | <topics> is a list of one or more kafka topics to consume from
- |
- """.stripMargin)
- System.exit(1)
- }
- /*
- * Here the vector is ts is being passed through the spark time series library which is autofitting the model based on
- * configurable parameters, the array 'arr' is not actually used, it was part of an older piece of code
- */
- def modelPredictionIO(ts: org.apache.spark.mllib.linalg.Vector, arr: Array[Double], modelValue: Int): Array[Double] = {
- val nullArry:Array[Double] = null
- try {
- Static.logger.info("Entering model")
- val arimaModel = ARIMA.autoFit(ts, modelValue, modelValue, modelValue)
- Static.logger.info("Finished Arima Autofitting")
- val forecast = arimaModel.forecast(ts, 1)
- Static.logger.info("Forecast of next window: " + forecast.toArray.mkString(","))
- return forecast.toArray.toArray
- } catch {
- case e: org.apache.commons.math3.linear.SingularMatrixException => Static.logger.warn("Singular matrix error")
- case e: org.apache.commons.math3.exception.TooManyEvaluationsException => Static.logger.warn("Too many calculations")
- case e: java.lang.IllegalArgumentException => Static.logger.warn("Illegal start of expression")
- case e: java.lang.Exception =>
- Static.logger.warn("Stationary not achieved")
- return nullArry
- }
- return nullArry
- }
- /* Bollinger Band Class
- * This is the Bolinger band class which defines the object Bollinger Band, here are three methods
- * The BollBandStd method basically takes in the actual values extracted from the data stream and the predicted value
- * and determines the standard deviation. The calcHighBand and calcLowBand take in the both the predicted values and the
- * actual values and then call the calcavg function to determin sigma and then use a configurable parameter to to determin the
- * bollinger band by adding or subtractig sigma respectivley
- */
- var sigmaValue: Double = 2
- class BollingerBand() {
- private def calcavg(svgBuffer: Double, origVals: Array[Double]): Double = {
- var BollBandStd = new ArrayBuffer[Double]
- val count = origVals.size
- val mean = origVals.sum / count
- val devs = origVals.map(score => (score - mean) * (score - mean))
- val stddev = Math.sqrt(devs.sum / count)
- return stddev
- }
- //Calculated the high band
- def calcHighBand(svg: Double, arr: Array[Double]): Double = {
- var BollBandStd = calcavg(svg, arr)
- val HighBollingerBand = svg + sigmaValue * BollBandStd
- Static.logger.info("HighBollingerBand: " + HighBollingerBand)
- return HighBollingerBand
- }
- //Calculate the low band
- def calcLowBand(svg: Double, arr: Array[Double]): Double = {
- var BollBandStd = calcavg(svg, arr)
- val LowBollingerBand = svg - sigmaValue * BollBandStd
- Static.logger.info("LowBollingerBand: " + LowBollingerBand)
- return LowBollingerBand
- }
- }
- //Initilization of variables and kafkastream
- /*
- * Essentially here, the kafka stream is initialized and the command line arguments are processed intro thier respected arrays and
- * variables
- */
- Static.logger.info("Command Line Arguments:")
- args.foreach { x => Static.logger.info(x) }
- val Array(brokers, topics, sw, lw, sigma, maxSize, features) = args
- Static.logger.info("Arguments " + args.mkString(","))
- sigmaValue = sigma.toDouble
- Static.logger.info("Sigma " + sigmaValue)
- val maxThreshold = new RandomAccessFile(new File("minThresholds.txt"), "rw")
- /*
- * Separating the features from the minimum thresholds inputed in the commands line
- * then place the features in the newFeatArray an place the thresholds into a txt file
- * minThreshold.txt
- */
- Static.logger.info("Features: " + features)
- val multFeatArray = features.split(",")
- val newFeatArray = new ArrayBuffer[String]
- for (i <- 0 until multFeatArray.size) {
- var tempFeatureArray = multFeatArray(i).split("-")
- maxThreshold.skipBytes(maxThreshold.length().toInt)
- newFeatArray.+=(tempFeatureArray(0))
- maxThreshold.writeChars(tempFeatureArray(0) + "," + tempFeatureArray(1) + "\n")
- }
- maxThreshold.close()
- /*
- * Declaring spark context and kafka stream
- */
- val sparkConf = new SparkConf().setAppName("MovingAverageStream").setMaster("local[4]")
- val ssc = new StreamingContext(sparkConf, Seconds(10))
- val topicsSet = topics.split(",").toSet
- val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
- val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, topicsSet).map(_._2)
- Static.logger.info("This is newFeatArray " + newFeatArray)
- newFeatArray.foreach(x => print("This is an element of the new feature array " + x + "\n"))
- /*
- * Converting the raw JSON netflows into map format
- */
- val fil = messages.map { x =>
- {
- val jsonMapper = new ObjectMapper
- jsonMapper.registerModule(DefaultScalaModule)
- lazy val data = {
- try {
- jsonMapper.readValue(x, classOf[Map[String, Any]])
- } catch {
- case e: Exception => { Static.logger.error("\n\nError while processing JSON : Error : " + e + ", Json : " + x); null }
- }
- }
- val in_flag = System.currentTimeMillis()
- data + ("event_received_time" -> in_flag)
- }
- }
- //Merges the the netflow map in the larger map to the rest of the map so there are not nested maps in the processing stream, basically just one large un-nested map
- val flatenedMapStream = fil.map(x => {
- var netflowMapOption = x.get("netflow").map { case m2: Map[String, _] => m2 }
- var netflowMap = netflowMapOption.get
- var myMutableMap = collection.mutable.Map() ++ x
- var removedNetflowMap = myMutableMap.-("netflow")
- var flattenedCombinedMap = netflowMap ++ removedNetflowMap
- //println("This is the removedNetflowMap " + removedNetflowMap)
- flattenedCombinedMap
- })
- //Filter out IPs whose source address space is private
- val privateFil = flatenedMapStream.filter(x => x.exists(_ == "src_addr_space" -> "private"))
- /*
- * Extracting features from the map, ie tcp_syn, in_bytes etc. to from a new map only containing the features
- * specified. A map is outputed at the end with the key as the ip and teh features and their values in another
- * map as the value to the big map ie. HashMap[String, HashMap[String, String]]]
- */
- val nestedIpMap = privateFil.map(x => {
- var organizedFeatureArray: HashMap[String, String] = HashMap()
- newFeatArray.foreach(i => {
- if (x.getOrElse(i, "0").toString().toDouble > 0) {
- organizedFeatureArray.+=(i.toString() -> x.getOrElse(i, "0").toString())
- } else
- organizedFeatureArray.+=(i.toString() -> "0.0")
- })
- HashMap(x.getOrElse("ipv4_src_addr", "0.0.0.0.0").toString() -> organizedFeatureArray)
- })
- /*Convert the data structure described above into tuple
- *
- */
- val newIP = nestedIpMap.map(x => {
- val xi = x.keySet.toArray
- val value: HashMap[String, String] = x.getOrElse(xi(0), null)
- Tuple2(xi(0), value)
- })
- //Aggregate 10 seconds data and reduce to single map per ip
- val newStream = newIP.reduceByKey((x, y) => {
- var tempMap = HashMap[String, String]()
- newFeatArray.foreach(i => {
- if (x.contains(i) && y.contains(i))
- tempMap.put(i, (x.getOrElse(i, "NaN").toDouble + y.getOrElse(i, "NaN").toDouble).toString())
- })
- tempMap
- })
- //Printing out the values to the csv every 10 seconds
- newStream.foreachRDD(rdd => rdd.foreach(map => {
- map._2.keySet.foreach(key => {
- var predictionFile = new RandomAccessFile(map._1 + "_" + key + "prediction" + ".csv", "rw")
- var str = "N/A"
- if (predictionFile.length() > 1) {
- str = predictionFile.readLine().toString()
- predictionFile.close()
- }
- val dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
- val date = new Date();
- val hshMap = map._2.getOrElse(key, null)
- val writeCharsr = new RandomAccessFile(new File(map._1 + "_" + key + ".csv"), "rw")
- writeCharsr.skipBytes(writeCharsr.length().toInt)
- writeCharsr.writeChars(dateFormat.format(date) + "," + hshMap + "," + str + "\n")
- writeCharsr.close()
- predictionFile.close()
- })
- }))
- try {
- /*
- *This is the window declaration, here the sliding window duration is the represented by a command line parameter sw and the duration
- * is also a command line parameter represented by lw, ex. if the duration is 15min and the sliding window is 1min then the window will compute
- * every 1min with the last 15min of data
- */
- val win2 = newStream.window(Seconds(lw.toString().split(",")(0).toString().toLong), Seconds(sw.toString().split(",")(0).toString().toLong))
- //Reducing the duration of data into an csv for unique ip and feature for processing
- val window2ProcessingStream = win2.reduceByKey((x, y) => {
- var tempMap = HashMap[String, String]()
- newFeatArray.foreach(i => {
- if (x.contains(i) && y.contains(i)) {
- var tempArray = new ArrayBuffer[String]
- tempArray.+=(x.getOrElse(i, "NaN"))
- tempArray.+=(y.getOrElse(i, "NaN"))
- var valueArray = tempArray.toArray
- var valueString = valueArray.mkString(",")
- tempMap.put(i, valueString)
- }
- })
- tempMap
- })
- // pass the data through the prediction and band creation functions
- val finalData = window2ProcessingStream.map(bigMapIn => {
- //Initializing the required arrays used for the calculation
- var movBuffer = new ArrayBuffer[Array[Double]]
- var newHashArray = new ArrayBuffer[Map[String, Map[String, Array[Double]]]]
- val MovingAverageHash = new ArrayBuffer[Map[String, Map[String, Array[Double]]]]
- var movingAverageArray = new ArrayBuffer[Array[Double]]
- var counter = 0
- var featureArray = new ArrayBuffer[HashMap[String, Array[Double]]]
- var ip = ""
- //Place all the values into an array and filter out all the zeros
- try {
- bigMapIn._2.keySet.toArray.foreach(i => {
- var movAvgBuffer = new ArrayBuffer[Double]
- bigMapIn._2.getOrElse(i, "").split(",").map(_.toDouble).foreach(z => movAvgBuffer.+=(z))
- movAvgBuffer = movAvgBuffer.filter { x => x > 0 }
- if (movAvgBuffer.size > 0) {
- Static.logger.info("movAvgBuffer: " + movAvgBuffer)
- movBuffer.+=(movAvgBuffer.toArray)
- newHashArray.+=(Map(bigMapIn._1 -> Map(i -> movAvgBuffer.toArray)))
- }
- })
- } catch {
- case e: java.lang.IndexOutOfBoundsException => ("Feature not available")
- case e: java.lang.ArrayIndexOutOfBoundsException => ("---")
- }
- Static.logger.info("newHashArray: " + newHashArray)
- Static.logger.info("movBuffer size: " + movBuffer.size)
- // create Moving Average Hash
- newHashArray.foreach(u => {
- var fmovAvgBuffer = new ArrayBuffer[Double]
- u.values.foreach(k => {
- k.values.foreach({ n =>
- val temAvg = new ArrayBuffer[Double]
- var avg: Double = 0
- n.foreach(o => {
- temAvg.+=(o)
- avg = temAvg.sum / temAvg.size
- fmovAvgBuffer.+=(avg)
- })
- })
- movingAverageArray.+=(fmovAvgBuffer.toArray)
- val nKeyArray = k.keySet.toArray
- val nKey = nKeyArray(0)
- var keyArr = u.keySet.toArray
- var key = keyArr(0)
- MovingAverageHash.+=(Map(key -> Map(nKey -> fmovAvgBuffer.toArray)))
- })
- })
- Static.logger.info("MovingAverageHash: " + MovingAverageHash + " Size: " + MovingAverageHash.size)
- Static.logger.info("movingAverageArray: " + movingAverageArray.size)
- /*
- * Here we are looping through each of the maps and extracting each array from each feature for each IP
- * then vectorizing them and pumping them through the ARIMA model, after, these returned values are ran through
- * the bollinger band class where the bands are calcluated
- */
- for (a <- 0 until MovingAverageHash.size) {
- Static.logger.info("Current MovingAverageHash: " + MovingAverageHash(a))
- MovingAverageHash(a).values.foreach(p => {
- var sKeyArr = p.keySet.toArray
- var sKey = sKeyArr(0)
- Static.logger.info("Feature: " + sKey)
- //Loop through each of the features
- p.values.foreach(v => {
- var AverageArray = movingAverageArray(a)
- val key = MovingAverageHash(a).keySet.toArray
- val keyVal = key(0)
- ip = keyVal
- println("This is the value of v: " + v.mkString(" "))
- //Making sure that there is an appropriate amount of data, set as a command line parameter, in each array
- if (v.size > maxSize.toDouble) {
- val vect = Vectors.dense(v.toArray)
- var predictionArray:Array[Double] = null
- val modelValue = 1
- Static.logger.info("Vector for model: " + vect)
- Static.logger.info("Mean " + (v.sum / v.size))
- //Make sure that the data is not static
- if ((v.sum / v.size) != v(0)) {
- predictionArray = modelPredictionIO(vect, movBuffer(a), modelValue)
- }
- println("Doing stuff now")
- val actualValArr = movBuffer(a)
- counter = counter + 1
- //Here we check that that the a prediction vetor has been created and then proceed with the bollinger band calculation
- if (predictionArray != null) {
- println("This is the prediction array: " + predictionArray.mkString(" "))
- var boll = new BollingerBand()
- var highband = boll.calcHighBand((predictionArray.sum / predictionArray.size), actualValArr)
- var lowband = boll.calcLowBand((predictionArray.sum / predictionArray.size), actualValArr)
- var finalHashMap = new HashMap[String, Array[Double]]
- var exArray = new ArrayBuffer[Double]
- exArray.+=(predictionArray.sum / predictionArray.size)
- exArray.+=(highband)
- exArray.+=(lowband)
- finalHashMap.put(sKey, exArray.toArray)
- featureArray.+=(finalHashMap)
- Static.logger.info("Done printing to csv")
- }
- }
- })
- })
- }
- Tuple2(ip, featureArray)
- })
- //Combing both feature maps into one single map and into tuple
- val editedFinalStream = finalData.map(x => {
- var CombinedMap = new HashMap[String, Array[Double]]
- for (i <- 0 until x._2.size) {
- CombinedMap = x._2(i) ++ CombinedMap
- }
- (x._1, CombinedMap)
- })
- //Updating a prediction csv every sliding window and checking the min threshold file to assure that the threshold is above a certain value
- editedFinalStream.foreachRDD(x => x.foreach(y => {
- y._2.keySet.foreach(map => {
- /*val maxThreshold = new RandomAccessFile(new File("minThresholds.txt"), "rw")
- var maxThresholdBuffer = new ArrayBuffer[String]
- maxThreshold.seek(0)
- for(i <- 1 until maxThreshold.length.toInt)
- maxThresholdBuffer.+=(maxThreshold.readLine())
- println("This is the contents of the maxThresholdBuffer: " + maxThresholdBuffer)
- println("This is the content of first element of Buffer: " + maxThresholdBuffer(0).toString())
- */
- var arr = y._2.getOrElse(map, null)
- Static.logger.info("Printing to csv")
- val writeCharsr = new RandomAccessFile(new File(y._1 + "_" + map + "prediction" + ".csv"), "rw")
- writeCharsr.seek(0)
- writeCharsr.writeChars(arr(0) + "," + arr(1) + "," + arr(2))
- writeCharsr.close()
- })
- }))
- } catch {
- case e: java.io.NotSerializableException => Static.logger.warn("Probably no hits in the netflow enough in window")
- case e: java.lang.IndexOutOfBoundsException => Static.logger.warn("Not enough data for processing")
- }
- ssc.start()
- ssc.awaitTermination()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement