Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package outliers_relevant_attributes
- import java.io.PrintWriter
- import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
- import collection.immutable.{ListMap, Map}
- import scala.collection.JavaConversions._
- import org.apache.spark.SparkContext
- import org.apache.spark.sql.types.{StructField, StructType}
- import scala.concurrent.duration.Duration
- import scala.concurrent.{Await, Future}
- import scala.concurrent._
- import ExecutionContext.Implicits.global
- /**
- * Created by antonio on 12/26/16.
- */
- object FeatureSelector {
- var data : DataFrame = null
- var numThread = 1.0
- var label_attribute = ""
- var conta_Globale: MutableInt = 0
- var mappa_Contatori = collection.mutable.Map[String, Integer]()
- var mappa_tot_scambi = collection.mutable.Map[String, Integer]()
- var single_list_attrs_for_switches = List[String]()
- var list_attrs_for_switches = List[List[String]]()
- var diversities = collection.mutable.Map[String, Double]()
- var switchesToDo = collection.mutable.Map[String, Integer]()
- var batch_size = 200 //TODO: sarà da togliere dopo che lo prendiamo dall'esterno
- var gradienti = collection.mutable.Map[String, Double]()
- var batch_progression = collection.mutable.Map[String, Integer]()
- @Deprecated
- implicit class MutableInt(var value: Int) {
- def inc() = {
- value += 1
- }
- }
- @Deprecated
- def function(s: MutableInt) {
- s.inc()
- }
- def get_Features_As_List(DF: org.apache.spark.sql.DataFrame): List[String] = {
- var features = List[String]()
- for (p <- DF.schema.fields) {
- if (p.name != label_attribute) {
- features = p.name :: features
- }
- }
- return features
- }
- def create_Var(s1: String, dict: collection.mutable.Map[String, Integer]) {
- var x = s1.toString
- dict.put(x, 0)
- }
- def init_Counters(mappa_contatori: collection.mutable.Map[String, Integer], df: DataFrame): Unit = {
- for (p <- get_Features_As_List(df)) {
- create_Var(p, mappa_contatori)
- }
- }
- def create_g(s1: String, dict: collection.mutable.Map[String, Double]) {
- var x = s1.toString
- dict.put(x, 0)
- }
- def init_Gradienti(gradienti: collection.mutable.Map[String, Double], df: DataFrame): Unit = {
- for (p <- get_Features_As_List(df)) {
- create_g(p, gradienti)
- }
- }
- def isCompleted(switchesToDo: collection.mutable.Map[String, Integer]): Boolean = {
- switchesToDo.values.forall(x => x == 0)
- }
- def csv_toSingleList_attr(path: String, sc: SparkContext): List[String] = {
- var text = sc.textFile(path)
- val map1 = text.map(line => line.split("#"))
- val map2 = map1.map(linea => linea.mkString(",").replace(",", " ").replace(" ", " "))
- val map3 = map2.map(row => if (row.split(" ").last != "degrees" && row.split(" ").last.toDouble >= 95.0 && row.split(" ").last.toDouble < 100.0) row.split(" ").mkString(",") else "")
- var res = List[String]()
- for (linea <- map3.collect()) {
- if (linea != "") {
- for (splitt <- linea.split(",")) {
- if (!splitt.contains("."))
- if (!res.contains(splitt))
- res = splitt :: res
- }
- }
- }
- return res
- }
- def csv_toList_attr(path: String, sc: SparkContext): List[List[String]] = {
- var text = sc.textFile(path)
- val map1 = text.map(line => line.split("#"))
- val map2 = map1.map(linea => linea.mkString(",").replace(",", " ").replace(" ", " "))
- val map3 = map2.map(row => if (row.split(" ").last != "degrees" && row.split(" ").last.toDouble >= 95.0 && row.split(" ").last.toDouble < 100.0) row.split(" ").mkString(",") else "")
- var list1 = List(List[String]())
- for (linea <- map3.collect()) {
- if (linea != "") {
- var list2 = List[String]()
- for (splitt <- linea.split(",")) {
- if (!splitt.contains("."))
- list2 = splitt :: list2
- }
- list1 = list2 :: list1
- }
- }
- return list1
- }
- @Deprecated
- def over_Soglia(countmap: collection.mutable.Map[String, Integer]): Boolean = {
- for (p <- 0 until countmap.values.size) {
- var cont = 0
- var cont2 = 0
- var res = 0
- for (pp <- 0 until countmap.values.size) {
- if (countmap.values.toList(p) != countmap.values.toList(pp)) {
- if (countmap.values.toList(p) - countmap.values.toList(pp) >= 200) {
- cont += 1
- }
- if (cont == countmap.size - ((countmap.size / 5) - 1)) return true
- }
- }
- }
- return false
- }
- @Deprecated
- def init_Diversities(diversities: collection.mutable.Map[String, Double], df: DataFrame): Unit = {
- // [vecchio diversities] //MITHOSIS È ZERO??????
- var mean = df.drop(label_attribute).describe().collectAsList().get(1)
- var stddev = df.drop(label_attribute).describe().collectAsList().get(2)
- val attributi = df.drop(label_attribute).schema.fields
- var list_stddev = List[Double]()
- var list_mean = List[Double]()
- for (s <- 1 until stddev.size) {
- list_stddev = list_stddev :+ stddev(s).toString.toDouble
- }
- for (m <- 1 until mean.size) {
- list_mean = list_mean :+ mean(m).toString.toDouble
- }
- for (i <- 0 until attributi.size) {
- diversities.put(attributi(i).toString().split("\\(")(1).split(",")(0), (list_stddev(i) + list_mean(i)) / (list_stddev(i) * list_mean(i)))
- }
- val max = diversities.values.max
- for (d <- diversities.keys) {
- diversities.put(d, diversities.get(d).get / max)
- }
- //complemento a 1
- //0 non cambia mai, 1 varia sempre
- for (d <- diversities.keys) {
- diversities.put(d, 1 - diversities.get(d).get)
- }
- }
- def init_Diversities_DistinctCount(diversities: collection.mutable.Map[String, Double], df: DataFrame): Unit = {
- var attributi = get_Features_As_List(df)
- for (a <- attributi) {
- diversities.put(a, df.select(a).distinct().count().toDouble / df.count().toDouble)
- }
- val max = diversities.values.max
- for (d <- diversities.keys) {
- diversities.put(d, diversities.get(d).get / max)
- }
- }
- def variability_check(attributo: String, diversities: collection.mutable.Map[String, Double], mappa_contatori: collection.mutable.Map[String, Integer], soglia: Double, soglia_Stop_Counters: Integer): Boolean = {
- if (diversities.get(attributo).get < soglia && mappa_contatori.get(attributo).get >= soglia_Stop_Counters)
- return false
- return true
- }
- def diversities_Treshold(diversities: collection.mutable.Map[String, Double]): Double = {
- return ListMap(diversities.toSeq.sortBy(_._2): _*).values.toList(diversities.size-(diversities.size-(diversities.size-(diversities.size/4))))
- }
- @Deprecated
- def slice_Batch(switchesToDo: collection.mutable.Map[String, Integer], diversities: collection.mutable.Map[String, Double], batch_size: Double): Unit = {
- for (k <- diversities.keys) {
- var temp = diversities.get(k).get * batch_size
- switchesToDo.put(k, math.ceil(temp).toInt)
- }
- }
- def slice_Batch_V2(switchesToDo: collection.mutable.Map[String, Integer], diversities: collection.mutable.Map[String, Double], batch_size: Double): Unit = {
- var sum = 0.0
- for (k <- diversities.keys) {
- sum += diversities.get(k).get
- }
- for (k <- diversities.keys) {
- var temp = (diversities.get(k).get * (batch_size/numThread)) / sum
- switchesToDo.put(k, math.ceil(temp).toInt)
- }
- }
- def init(df: org.apache.spark.sql.DataFrame, l_a: String, sc: SparkContext, BATCH_SIZE: Integer): Unit = {
- data = df
- label_attribute = l_a
- init_Counters(mappa_Contatori, df)
- init_Counters(mappa_tot_scambi, df)
- //init_Diversities(diversities,df)
- init_Diversities_DistinctCount(diversities, df)
- //single_list_attrs_for_switches = csv_toSingleList_attr("/home/pierluigi/spark-outlier-explain/dependencies/cl_simplified.csv_dependencies", sc)
- single_list_attrs_for_switches = get_Features_As_List(df) //usare la riga superiore quando sarà pronta la parte relativa alla selezione degli attributi con alta dipendenza
- list_attrs_for_switches = csv_toList_attr("/home/pierluigi/spark-outlier-explain/dependencies/cl_simplified.csv_dependencies", sc)
- batch_size = BATCH_SIZE
- slice_Batch_V2(switchesToDo, diversities, batch_size)
- init_Gradienti(gradienti, df)
- }
- def switch(tupla1: org.apache.spark.sql.DataFrame, tupla2: org.apache.spark.sql.DataFrame, attr: String): List[org.apache.spark.sql.DataFrame] = {
- val t1 = tupla1.drop(attr).join(tupla2.select(attr))
- val t2 = tupla2.drop(attr).join(tupla1.select(attr))
- val res = List(t1, t2)
- return res
- }
- def switch_multiple(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, tupla1: org.apache.spark.sql.DataFrame, tupla2: org.apache.spark.sql.DataFrame, attrs: Seq[String]): List[org.apache.spark.sql.DataFrame] = {
- val filteredTupla1 = tupla1.select(outliers.columns
- .filter(colName => !attrs.contains(colName))
- .map(colName => new Column(colName)): _*)
- val filteredTupla2 = tupla2.select(inliers.columns
- .filter(colName => !attrs.contains(colName))
- .map(colName => new Column(colName)): _*)
- val t1 = filteredTupla1.join(tupla2.select(attrs.head, attrs.tail: _*))
- val t2 = filteredTupla2.join(tupla1.select(attrs.head, attrs.tail: _*))
- val res = List(t1, t2)
- return res
- }
- /*
- This method try to change every possible inlier with every possible outlier
- */
- //EXHAUSTIVE
- def init_Threads_Exhaustive(): (collection.mutable.Map[String, Integer],collection.mutable.Map[String, Integer]) ={
- var mappa_contatori_copy = collection.mutable.Map[String, Integer]()
- var mappa_tot_scambi_copy = collection.mutable.Map[String, Integer]()
- init_Counters(mappa_contatori_copy,data)
- init_Counters(mappa_tot_scambi_copy,data)
- return (mappa_contatori_copy,mappa_tot_scambi_copy)
- }
- def future_producer_exhaustive_SingleSwitch(lista_lanci: List[(DataFrame,DataFrame)],sqlContext: SQLContext, classifier: Classifier, soglia: Double, printWriter: PrintWriter) = {
- var seq = Seq[Future[Any]]()
- for ((o,i) <- lista_lanci){
- var fut = Future{
- var temp = select_features_exhaustive_SingleSwitch(o,i,sqlContext,classifier,soglia);
- temp.foreach(t => printWriter.write(t+"\n"))
- }
- seq = seq :+ fut
- }
- Future.sequence(seq)
- }
- def select_features_exhaustive_SingleSwitch_Parallelism(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Double, numero_thread: Integer, printWriter: PrintWriter): Unit ={
- numThread = numero_thread.toDouble
- var lista_lanci = List[(DataFrame,DataFrame)]()
- var splits = Array.fill(numero_thread){0.0}
- for (i <- 0 until numero_thread){
- splits.update(i,1.0/numThread)
- }
- var split_o = outliers.randomSplit(splits)
- var split_i = inliers.randomSplit(splits)
- for ((o, i) <- (split_o zip split_i)){
- var temp : (DataFrame,DataFrame) = (o,i)
- lista_lanci = lista_lanci :+ temp
- }
- Await.ready(future_producer_exhaustive_SingleSwitch(lista_lanci,sqlContext,classifier,soglia,printWriter), Duration.Inf)
- }
- def select_features_exhaustive_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, treshold: Double): List[String] = {
- var (mappa_contatori_local, mappa_tot_scambi_local) = init_Threads_Exhaustive()
- var res = List[String]()
- for (i <- outliers.collectAsList()) {
- for (j <- inliers.collectAsList()) {
- if (i != j) {
- println(i)
- println(j)
- var lista1: java.util.List[Row] = List(i)
- var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
- var lista2: java.util.List[Row] = List(j)
- var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
- for (elem <- single_list_attrs_for_switches) {
- if (df1.select(elem).first() != df2.select(elem).first()) {
- val switch1 = switch(df1, df2, elem.toString)(0)
- var prediction = classifier.classification(outliers, switch1, label_attribute)
- if (prediction != 2.0) {
- //se è 2.0 vuol dire che la tupla formatasi è già presente nel dataframe
- if (prediction == 0.0)
- mappa_contatori_local(elem) += 1
- mappa_tot_scambi_local(elem) += 1
- }
- }
- }
- }
- }
- }
- for (m <- mappa_contatori_local.keys) {
- if ((mappa_contatori_local.get(m).get.toDouble / mappa_tot_scambi_local.get(m).get.toDouble) / (outliers.count().toDouble * inliers.count().toDouble * mappa_contatori_local.size.toDouble) > treshold) {
- res = res :+ m
- }
- }
- return res
- }
- //FINE EXHAUSTIVE
- //VARIABILITÀ
- def init_Threads_With_Diversities(): (List[String],collection.mutable.Map[String, Integer],collection.mutable.Map[String, Integer]) ={
- var list_attr_copy = get_Features_As_List(data)
- var mappa_contatori_copy = collection.mutable.Map[String, Integer]()
- var mappa_tot_scambi_copy = collection.mutable.Map[String, Integer]()
- init_Counters(mappa_contatori_copy,data)
- init_Counters(mappa_tot_scambi_copy,data)
- return (list_attr_copy,mappa_contatori_copy,mappa_tot_scambi_copy)
- }
- def future_producer_With_Diversities(lista_lanci: List[(DataFrame,DataFrame)],sqlContext: SQLContext, classifier: Classifier, soglia: Integer, printWriter: PrintWriter) = {
- var seq = Seq[Future[Any]]()
- for ((o,i) <- lista_lanci){
- var fut = Future{
- var temp = select_features_with_Diversities_check_SingleSwitch(o,i,sqlContext,classifier,soglia);
- temp.foreach(t => printWriter.write(t+"\n"))
- }
- seq = seq :+ fut
- }
- Future.sequence(seq)
- }
- def select_features_With_Diversities_Check_Single_Switch_Parallelism(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Integer, numero_thread: Integer, printWriter: PrintWriter): Unit ={
- numThread = numero_thread.toDouble
- var lista_lanci = List[(DataFrame,DataFrame)]()
- var splits = Array.fill(numero_thread){0.0}
- for (i <- 0 until numero_thread){
- splits.update(i,1.0/numThread)
- }
- var split_o = outliers.randomSplit(splits)
- var split_i = inliers.randomSplit(splits)
- for ((o, i) <- (split_o zip split_i)){
- var temp : (DataFrame,DataFrame) = (o,i)
- lista_lanci = lista_lanci :+ temp
- }
- Await.ready(future_producer_With_Diversities(lista_lanci,sqlContext,classifier,soglia,printWriter), Duration.Inf)
- }
- def select_features_with_Diversities_check_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Integer): List[String] = {
- var res = List[String]()
- var (single_list_attrs_for_switches_local, mappa_Contatori_local, mappa_tot_scambi_local) = init_Threads_With_Diversities()
- val diversities_TH = diversities_Treshold(diversities)
- for (i <- outliers.collectAsList()) {
- for (j <- inliers.collectAsList()) {
- if (i != j) {
- println(i)
- println(j)
- var lista1: java.util.List[Row] = List(i)
- var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
- var lista2: java.util.List[Row] = List(j)
- var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
- for (elem <- single_list_attrs_for_switches_local) {
- if (df1.select(elem).first() != df2.select(elem).first()) {
- val switch1 = switch(df1, df2, elem.toString)(0)
- var prediction = classifier.classification(outliers, switch1, label_attribute)
- if (prediction != 2.0) {
- if (prediction == 0.0) {
- if (variability_check(elem, diversities, mappa_Contatori_local, diversities_TH, soglia))
- mappa_Contatori_local(elem) += 1
- else
- single_list_attrs_for_switches_local = single_list_attrs_for_switches_local.filter(_ != elem.toString)
- }
- mappa_tot_scambi_local(elem) += 1
- }
- }
- }
- }
- }
- }
- return get_Features_As_List(data).diff(single_list_attrs_for_switches_local)
- }
- //FINE VARIABILITÀ
- //GRADIENTE
- def init_Threads_With_Gradient(): (List[String],collection.mutable.Map[String,Integer],collection.mutable.Map[String,Double],collection.mutable.Map[String,Integer],collection.mutable.Map[String,Double],collection.mutable.Map[String,Integer]) ={
- var list_attr_copy = List[String]()
- var diversities_copy = collection.mutable.Map[String, Double]()
- var mappa_contatori_copy = collection.mutable.Map[String, Integer]()
- var mappa_tot_scambi_copy = collection.mutable.Map[String, Integer]()
- var gradienti_copy = collection.mutable.Map[String, Double]()
- var switchesToDo_copy = collection.mutable.Map[String,Integer]()
- list_attr_copy = get_Features_As_List(data)
- init_Diversities_DistinctCount(diversities_copy,data)
- init_Counters(mappa_contatori_copy,data)
- init_Counters(mappa_tot_scambi_copy,data)
- slice_Batch_V2(switchesToDo_copy,diversities_copy,(batch_size/numThread))
- init_Gradienti(gradienti_copy, data)
- return (list_attr_copy,switchesToDo_copy,diversities_copy,mappa_contatori_copy,gradienti_copy,mappa_tot_scambi_copy)
- }
- def future_producer_with_Gradient(lista_lanci: List[(DataFrame,DataFrame)],sqlContext: SQLContext, classifier: Classifier, soglia: Double, printWriter: PrintWriter) = {
- var seq = Seq[Future[Any]]()
- for ((o,i) <- lista_lanci){
- var fut = Future{
- var temp = select_features_with_Gradient_check_SingleSwitch(o,i,sqlContext,classifier,soglia);
- temp.foreach(t => printWriter.write(t+"\n"))
- }
- seq = seq :+ fut
- }
- Future.sequence(seq)
- }
- def select_features_with_Gradient_check_Single_Switch_Parallelism(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Double, numero_thread: Integer, printWriter: PrintWriter): Unit ={
- numThread = numero_thread.toDouble
- var lista_lanci = List[(DataFrame,DataFrame)]()
- var splits = Array.fill(numero_thread){0.0}
- for (i <- 0 until numero_thread){
- splits.update(i,1.0/numThread)
- }
- var split_o = outliers.randomSplit(splits)
- var split_i = inliers.randomSplit(splits)
- for ((o, i) <- (split_o zip split_i)){
- var temp : (DataFrame,DataFrame) = (o,i)
- lista_lanci = lista_lanci :+ temp
- }
- Await.ready(future_producer_with_Gradient(lista_lanci,sqlContext,classifier,soglia,printWriter), Duration.Inf)
- }
- def select_features_with_Gradient_check_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Double): List[String] = {
- var num_batch = 0
- var (single_list_attrs_local, switchesToDo_local, diversities_local, mappa_contatori_local, gradienti_local, mappa_tot_scambi_local) = init_Threads_With_Gradient()
- var res = List[String]()
- var loop = 0
- var redo_batch = false
- for (i <- outliers.collectAsList()) {
- for (j <- inliers.collectAsList()) {
- redo_batch = false
- if (i != j) {
- println(i)
- println(j)
- var lista1: java.util.List[Row] = List(i)
- var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
- var lista2: java.util.List[Row] = List(j)
- var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
- for (elem <- single_list_attrs_local) {
- if (switchesToDo_local(elem) > 0) {
- if (df1.select(elem).first() != df2.select(elem).first()) {
- val switch1 = switch(df1, df2, elem.toString)(0)
- var prediction = classifier.classification(outliers, switch1, label_attribute)
- if (prediction != 2.0) {
- //se è 2.0 vuol dire che la tupla formatasi è già presente nel dataframe
- if (prediction == 0.0)
- mappa_contatori_local(elem) += 1
- switchesToDo_local(elem) -= 1
- mappa_tot_scambi_local(elem) += 1
- }
- }
- }
- }
- if (isCompleted(switchesToDo_local)) {
- num_batch += 1
- println("\n\n~~~~~~~~~~~~~~~~~~~~~~~~~BATCH_" + num_batch + "~~~~~~~~~~~~~~~~~~~~~~~~~\n\n")
- var attributi_sensibili = 0
- for (m <- gradienti_local.keys) {
- var curr = (mappa_contatori_local.get(m).get.toDouble / mappa_tot_scambi_local.get(m).get.toDouble) * 100
- var old = gradienti_local.get(m).get
- var increment = curr - old
- gradienti_local.put(m, curr)
- if(num_batch !=1) {
- if (curr + increment >= soglia * 100) {
- println(curr, soglia * 100)
- redo_batch = true
- }
- else {
- gradienti_local.remove(m)
- single_list_attrs_local = single_list_attrs_local.filter(_ != m)
- switchesToDo_local.remove(m)
- diversities_local.remove(m)
- }
- if (curr > soglia * 100) {
- attributi_sensibili += 1
- }
- }
- else
- redo_batch = true
- }
- if (attributi_sensibili == gradienti_local.size) {
- gradienti_local.foreach(println)
- return gradienti_local.keys.toList
- }
- if (redo_batch) {
- slice_Batch_V2(switchesToDo_local, diversities_local, batch_size / numThread)
- }
- }
- }
- }
- }
- return res
- }
- //FINE GRADIENTE
- def clean(sparkContext: SparkContext, output_path: String): String ={
- val lines = sparkContext.textFile(output_path)
- val keyed = lines.map(line => line.split(" ")(0) -> line)
- val deduplicated = keyed.reduceByKey((a, b) => a)
- val output_parsed_path = output_path.replace(".txt","_parsed.txt")
- deduplicated.values.repartition(1).saveAsTextFile(output_parsed_path)
- return output_parsed_path
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement