Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package outliers_relevant_attributes
- import java.io.PrintWriter
- import org.apache.spark.ml.classification.RandomForestClassificationModel
- import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
- import collection.immutable.{ListMap, Map}
- import scala.util.control.Breaks._
- import scala.collection.JavaConversions._
- import Classifier_RandomForest.Classifier_RandomForest
- import org.apache.spark.SparkContext
- import org.apache.spark.ml.feature.VectorAssembler
- import org.apache.spark.sql.types.{StructField, StructType}
- import scala.collection.mutable
- import scala.io.Source
- /**
- * Created by antonio on 12/26/16.
- */
- object FeatureSelector {
- var label_attribute = ""
- var conta_Globale: MutableInt = 0
- var mappa_Contatori = collection.mutable.Map[String,Integer]()
- var mappa_tot_scambi = collection.mutable.Map[String,Integer]()
- var single_list_attrs_for_switches = List[String]()
- var list_attrs_for_switches = List[List[String]]()
- var diversities = collection.mutable.Map[String,Double]() // contiene la diversità di ogni feature [vecchio diversities]
- var blacklist = List[String]() // contiene le feature che vengono esclusi via via dalla computazione
- var switchesToDo = collection.mutable.Map[String,Integer]() // contiene per ogni attributo l'accelerazione discreta degli swap successful [vecchio switchesToDo]
- var batch_size = 200 // TODO: sarà da togliere dopo che lo prendiamo dall'esterno
- var gradienti = collection.mutable.Map[String,Double]()
- var batch_progression = collection.mutable.Map[String,Integer]()
- implicit class MutableInt(var value: Int) {
- def inc() = {
- value += 1
- }
- }
- //TODO: ??
- def function(s: MutableInt) {
- s.inc()
- }
- def get_Features_As_List(DF: org.apache.spark.sql.DataFrame): List[String] = {
- var features = List[String]()
- for (p <- DF.schema.fields) {
- if (p.name != label_attribute) {
- features = p.name :: features
- }
- }
- return features
- }
- def create_Var(s1: String, dict: collection.mutable.Map[String, Integer]) {
- var x = s1.toString
- dict.put(x, 0)
- }
- def init_Counters(mappa_contatori: collection.mutable.Map[String, Integer], df: DataFrame): Unit ={
- for (p <- get_Features_As_List(df)) {
- create_Var(p, mappa_contatori)
- }
- }
- def create_g(s1: String, dict: collection.mutable.Map[String, Double]) {
- var x = s1.toString
- dict.put(x, 0)
- }
- def init_Gradienti(gradienti: collection.mutable.Map[String, Double], df: DataFrame): Unit ={
- for (p <- get_Features_As_List(df)) {
- create_g(p, gradienti)
- }
- }
- @Deprecated
- def create_bp(s1: String, dict: collection.mutable.Map[String, Integer]) {
- var x = s1.toString
- dict.put(x, 0)
- }
- @Deprecated
- def init_Batch_Progression(batch_progression: collection.mutable.Map[String, Integer], switches_attribute: List[String]): Unit ={
- for (a <- switches_attribute) {
- create_bp(a, batch_progression)
- }
- }
- def isCompleted(switchesToDo: collection.mutable.Map[String,Integer]): Boolean ={
- switchesToDo.values.forall(x => x==0)
- }
- def csv_toSingleList_attr(path: String, sc: SparkContext): List[String]={
- var text = sc.textFile(path)
- val map1 = text.map(line => line.split("#"))
- val map2 = map1.map(linea => linea.mkString(",").replace(",", " ").replace(" ", " "))
- val map3 = map2.map(row => if (row.split(" ").last != "degrees" && row.split(" ").last.toDouble >= 95.0 && row.split(" ").last.toDouble < 100.0) row.split(" ").mkString(",") else "")
- var res = List[String]()
- for (linea <- map3.collect()) {
- if (linea != "") {
- for (splitt <- linea.split(",")) {
- if(!splitt.contains("."))
- if(!res.contains(splitt))
- res = splitt :: res
- }
- }
- }
- return res
- }
- def csv_toList_attr(path: String, sc: SparkContext): List[List[String]]={
- var text = sc.textFile(path)
- val map1 = text.map(line => line.split("#"))
- val map2 = map1.map(linea => linea.mkString(",").replace(",", " ").replace(" ", " "))
- val map3 = map2.map(row => if (row.split(" ").last != "degrees" && row.split(" ").last.toDouble >= 95.0 && row.split(" ").last.toDouble < 100.0) row.split(" ").mkString(",") else "")
- var list1 = List(List[String]())
- for (linea <- map3.collect()) {
- if (linea != "") {
- var list2 = List[String]()
- for (splitt <- linea.split(",")) {
- if(!splitt.contains("."))
- list2= splitt :: list2
- }
- list1 = list2 :: list1
- }
- }
- return list1
- }
- def over_Soglia(countmap: collection.mutable.Map[String, Integer]): Boolean = {
- for (p <- 0 until countmap.values.size) {
- var cont = 0
- var cont2 = 0
- var res = 0
- for (pp <- 0 until countmap.values.size) {
- if (countmap.values.toList(p) != countmap.values.toList(pp)) {
- if (countmap.values.toList(p) - countmap.values.toList(pp) >= 200) {
- cont += 1
- }
- if(cont==countmap.size - ((countmap.size/5)-1)) return true
- }
- }
- }
- return false
- }
- //TODO: diventa initDiversities. Dovrebbe contenere un indice di diversità. Io farei distint/count
- def init_Diversities(diversities: collection.mutable.Map[String, Double], df: DataFrame): Unit ={ // [vecchio diversities] //MITHOSIS È ZERO??????
- var mean = df.drop(label_attribute).describe().collectAsList().get(1)
- var stddev = df.drop(label_attribute).describe().collectAsList().get(2)
- val attributi = df.drop(label_attribute).schema.fields
- var list_stddev = List[Double]()
- var list_mean = List[Double]()
- for (s <- 1 until stddev.size){
- list_stddev = list_stddev :+ stddev(s).toString.toDouble
- }
- for (m <- 1 until mean.size){
- list_mean = list_mean :+ mean(m).toString.toDouble
- }
- for (i <- 0 until attributi.size){
- diversities.put(attributi(i).toString().split("\\(")(1).split(",")(0),(list_stddev(i)+list_mean(i))/(list_stddev(i)*list_mean(i)))
- }
- val max = diversities.values.max
- for (d <- diversities.keys){
- diversities.put(d,diversities.get(d).get/max)
- }
- //complemento a 1
- //0 non cambia mai, 1 varia sempre
- for (d <- diversities.keys){
- diversities.put(d,1-diversities.get(d).get)
- }
- }
- def init_Diversities_DistinctCount(diversities: collection.mutable.Map[String, Double], df: DataFrame): Unit ={
- var attributi = get_Features_As_List(df)
- for(a <- attributi){
- diversities.put(a,df.select(a).distinct().count().toDouble/df.count().toDouble)
- }
- val max = diversities.values.max
- for (d <- diversities.keys){
- diversities.put(d,diversities.get(d).get/max)
- }
- }
- // def variability_check(attributo: String, diversities: collection.mutable.Map[String, Double], mappa_contatori: collection.mutable.Map[String, Integer], soglia: Double, soglia_Stop_Counters: Integer): Boolean ={
- // var attributo_cleaned = attributo.replace("cont_", "")
- // if (diversities.get(attributo_cleaned).get >= soglia && mappa_contatori.get(attributo).get>=soglia_Stop_Counters)
- // return false
- // return true
- // }
- //non più valido
- // def diversities_Treshold(diversities: collection.mutable.Map[String,Double]): Double ={
- // return ListMap(diversities.toSeq.sortBy(_._2):_*).values.toList(3)
- // }
- @Deprecated
- def slice_Batch(switchesToDo: collection.mutable.Map[String,Integer], diversities: collection.mutable.Map[String,Double], batch_size: Integer): Unit ={
- for(k <- diversities.keys){
- var temp = diversities.get(k).get * batch_size
- switchesToDo.put(k,math.ceil(temp).toInt)
- }
- }
- def slice_Batch_V2(switchesToDo: collection.mutable.Map[String,Integer], diversities: collection.mutable.Map[String,Double], batch_size: Integer): Unit ={
- var sum = 0.0
- for(k <- diversities.keys){
- sum+=diversities.get(k).get
- }
- for(k <- diversities.keys){
- var temp = (diversities.get(k).get * batch_size) / sum
- switchesToDo.put(k,math.ceil(temp).toInt)
- }
- }
- def init(df: org.apache.spark.sql.DataFrame, l_a: String, sc: SparkContext, BATCH_SIZE: Integer): Unit ={
- label_attribute=l_a
- init_Counters(mappa_Contatori,df)
- init_Counters(mappa_tot_scambi,df)
- //init_Diversities(diversities,df)
- init_Diversities_DistinctCount(diversities,df)
- blacklist = List[String]()
- //single_list_attrs_for_switches = csv_toSingleList_attr("/home/pierluigi/spark-outlier-explain/dependencies/cl_simplified.csv_dependencies", sc)
- single_list_attrs_for_switches = get_Features_As_List(df)
- list_attrs_for_switches = csv_toList_attr("/home/pierluigi/spark-outlier-explain/dependencies/cl_simplified.csv_dependencies", sc)
- batch_size = BATCH_SIZE
- slice_Batch_V2(switchesToDo,diversities,batch_size)
- println("\nDIVERSITIES\n")
- diversities.foreach(println)
- println("\nSWITCHESTODO\n")
- switchesToDo.foreach(println)
- init_Gradienti(gradienti,df)
- }
- def switch(tupla1: org.apache.spark.sql.DataFrame, tupla2: org.apache.spark.sql.DataFrame, attr: String): List[org.apache.spark.sql.DataFrame] = {
- val t1 = tupla1.drop(attr).join(tupla2.select(attr))
- val t2 = tupla2.drop(attr).join(tupla1.select(attr))
- val res = List(t1, t2)
- return res
- }
- def switch_multiple(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, tupla1: org.apache.spark.sql.DataFrame, tupla2: org.apache.spark.sql.DataFrame, attrs: Seq[String]): List[org.apache.spark.sql.DataFrame] = {
- val filteredTupla1 = tupla1.select(outliers.columns
- .filter(colName => !attrs.contains(colName))
- .map(colName => new Column(colName)): _*)
- val filteredTupla2 = tupla2.select(inliers.columns
- .filter(colName => !attrs.contains(colName))
- .map(colName => new Column(colName)): _*)
- val t1 = filteredTupla1.join(tupla2.select(attrs.head, attrs.tail: _*))
- val t2 = filteredTupla2.join(tupla1.select(attrs.head, attrs.tail: _*))
- val res = List(t1, t2)
- return res
- }
- //TODO: tutti questi metodi devono restituire un insieme di feature.
- //TODO: SE DOPO IL SWITCH LA TUPLA GIÀ ESISTE Exit
- /*
- This method try to change every possible inlier with every possible outlier
- */
- def select_features_exhaustive_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, treshold: Double): List[String] = {
- var res = List[String]()
- for (i <- outliers.collectAsList()) {
- for (j <- inliers.collectAsList()) {
- if (i != j) {
- var lista1: java.util.List[Row] = List(i)
- var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
- var lista2: java.util.List[Row] = List(j)
- var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
- for (elem <- single_list_attrs_for_switches) {
- if (df1.select(elem).first() != df2.select(elem).first()) {
- val switch1 = switch(df1, df2, elem.toString)(0)
- var prediction = classifier.classification(outliers, switch1, label_attribute)
- if(prediction!=2.0) { //se è 2.0 vuol dire che la tupla formatasi è già presente nel dataframe
- if (prediction == 0.0)
- mappa_Contatori(elem) += 1
- switchesToDo(elem) -= 1
- mappa_tot_scambi(elem) += 1
- }
- }
- }
- }
- }
- }
- for (m <- mappa_Contatori.keys) {
- if ((mappa_Contatori.get(m).get.toDouble / mappa_tot_scambi.get(m).get.toDouble) / (outliers.count().toDouble * inliers.count().toDouble * mappa_Contatori.size.toDouble) > treshold) {
- res = res :+ m
- }
- }
- return res
- }
- /*
- This method ....
- */
- // def select_features_with_Soglia_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, model: RandomForestClassificationModel, assembler: VectorAssembler, Classifier: Classifier_RandomForest): Unit = {
- // for (i <- outliers.collectAsList()) {
- // for (j <- inliers.collectAsList()) {
- // if (i != j) {
- // var lista1: java.util.List[Row] = List(i)
- // var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
- // var lista2: java.util.List[Row] = List(j)
- // var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
- // for (l <- list_attrs_for_switches) {
- // if (!l.isEmpty) {
- // for (elem <- l) {
- // if(df1.select(elem).first() != df2.select(elem).first()) {
- //
- // val switch1 = switch(df1, df2, elem.toString)(0)
- // //outliers o inliers qui è uguale, mi serve solo per avere lo schema del dataframe
- // var df31RF = Classifier.test_prearrangement(outliers,assembler,label_attribute,switch1)
- // if (Classifier.classify(model,df31RF)==0.0) {
- // mappa_Contatori(elem) += 1
- // }
- // mappa_tot_scambi(elem) += 1
- // }
- // }
- // }
- // }
- // if (over_Soglia(mappa_Contatori)) return
- // }
- // }
- // }
- // }
- // def select_features_with_Counters_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, model: RandomForestClassificationModel, assembler: VectorAssembler, Classifier: Classifier_RandomForest, soglia: Integer): Unit = {
- // var contatore=0
- // for (i <- outliers.collectAsList()) {
- // for (j <- inliers.collectAsList()) {
- // if (i != j) {
- // var lista1: java.util.List[Row] = List(i)
- // var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
- // var lista2: java.util.List[Row] = List(j)
- // var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
- // for (l <- list_attrs_for_switches) {
- // if (!l.isEmpty) {
- // for (elem <- l) {
- // if(df1.select(elem).first() != df2.select(elem).first()) {
- //
- // val switch1 = switch(df1, df2, elem.toString)(0)
- // //outliers o inliers qui è uguale, mi serve solo per avere lo schema del dataframe
- // var df31RF = Classifier.test_prearrangement(outliers,assembler,label_attribute,switch1)
- // if (Classifier.classify(model,df31RF)==0.0) {
- // mappa_Contatori(elem) += 1
- // contatore += 1
- // }
- // mappa_tot_scambi(elem) += 1
- // }
- // }
- // }
- // }
- // if(contatore>soglia) return
- // }
- // }
- // }
- // }
- // def select_features_with_Global_Counter_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, model: RandomForestClassificationModel, assembler: VectorAssembler, Classifier: Classifier_RandomForest, soglia: Integer): Unit = {
- // for (i <- outliers.collectAsList()) {
- // for (j <- inliers.collectAsList()) {
- // if (i != j) {
- // var lista1: java.util.List[Row] = List(i)
- // var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
- // var lista2: java.util.List[Row] = List(j)
- // var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
- // for (l <- list_attrs_for_switches) {
- // if (!l.isEmpty) {
- // for (elem <- l) {
- // if(df1.select(elem).first() != df2.select(elem).first()) {
- //
- // val switch1 = switch(df1, df2, elem.toString)(0)
- // //outliers o inliers qui è uguale, mi serve solo per avere lo schema del dataframe
- // var df31RF = Classifier.test_prearrangement(outliers,assembler,label_attribute,switch1)
- // if (Classifier.classify(model,df31RF)==0.0) {
- // mappa_Contatori(elem) += 1
- // function(conta_Globale)
- // }
- // mappa_tot_scambi(elem) += 1
- // }
- //
- // }
- // }
- // }
- // if(conta_Globale.value>soglia) return
- // }
- // }
- // }
- // }
- // def select_features_with_Diversities_check_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, model: RandomForestClassificationModel, assembler: VectorAssembler, Classifier_RandomForest: Classifier_RandomForest, soglia: Integer): Unit = {
- // val diversities_TH = diversities_Treshold(diversities)
- // for (i <- outliers.collectAsList()) {
- // for (j <- inliers.collectAsList()) {
- // if (i != j) {
- // var lista1: java.util.List[Row] = List(i)
- // var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
- // var lista2: java.util.List[Row] = List(j)
- // var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
- // for (l <- list_attrs_for_switches) {
- // if (!l.isEmpty) {
- // for (elem <- l) {
- // if (!blacklist.contains(elem)) {
- // if (df1.select(elem).first() != df2.select(elem).first()) {
- //
- // val switch1 = switch(df1, df2, elem.toString)(0)
- // //outliers o inliers qui è uguale, mi serve solo per avere lo schema del dataframe
- // var df31RF = Classifier_RandomForest.test_prearrangement(outliers, assembler, label_attribute, switch1)
- // if (Classifier_RandomForest.classify(model,df31RF)==0.0) {
- // if (variability_check("cont_" + elem, diversities, mappa_Contatori, diversities_TH, soglia))
- // mappa_Contatori("cont_" + elem) += 1
- // else
- // blacklist = blacklist :+ elem
- // }
- // mappa_tot_scambi("cont_" + elem) += 1
- // }
- // }
- // }
- // }
- // }
- // }
- // }
- // }
- // }
- def select_features_with_Gradient_check_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Double): List[String] = {
- var res = List[String]()
- var loop = 0
- var redo_batch = false
- for (i <- outliers.collectAsList()) {
- for (j <- inliers.collectAsList()) {
- if (i != j) {
- println(i)
- println(j)
- var lista1: java.util.List[Row] = List(i)
- var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
- var lista2: java.util.List[Row] = List(j)
- var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
- for (elem <- single_list_attrs_for_switches) {
- if (switchesToDo(elem) > 0) {
- if (df1.select(elem).first() != df2.select(elem).first()) {
- val switch1 = switch(df1, df2, elem.toString)(0)
- var prediction = classifier.classification(outliers, switch1, label_attribute)
- if(prediction!=2.0) { //se è 2.0 vuol dire che la tupla formatasi è già presente nel dataframe
- if (prediction == 0.0)
- mappa_Contatori(elem) += 1
- switchesToDo(elem) -= 1
- mappa_tot_scambi(elem) += 1
- }
- }
- }
- }
- if (isCompleted(switchesToDo)) {
- println("\n\n~~~~~~~~~~~~~~~~~~~~~~~~~BATCH~~~~~~~~~~~~~~~~~~~~~~~~~\n\n")
- var attributi_sensibili = 0
- for (m <- gradienti.keys) {
- var curr = (mappa_Contatori.get(m).get.toDouble / mappa_tot_scambi.get(m).get.toDouble) * 100
- //var old = gradienti.get(m).get
- gradienti.put(m, curr)
- if (curr + curr >= soglia*100) {
- println(curr, soglia*100)
- redo_batch = true
- }
- else {
- gradienti.remove(m)
- single_list_attrs_for_switches = single_list_attrs_for_switches.filter(_!=m)
- switchesToDo.remove(m)
- diversities.remove(m)
- }
- if (curr > soglia*100) {
- attributi_sensibili += 1
- }
- }
- if (attributi_sensibili == gradienti.size) {
- gradienti.foreach(println)
- return gradienti.keys.toList
- }
- if (redo_batch) {
- slice_Batch_V2(switchesToDo,diversities,batch_size)
- }
- }
- }
- }
- }
- return res
- }
- def main(args: Array[String]): Unit = {
- println("It works!")
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement