Advertisement
Guest User

Untitled

a guest
Jan 20th, 2017
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 20.98 KB | None | 0 0
  1. package outliers_relevant_attributes
  2.  
  3. import java.io.PrintWriter
  4.  
  5. import org.apache.spark.ml.classification.RandomForestClassificationModel
  6. import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
  7.  
  8. import collection.immutable.{ListMap, Map}
  9. import scala.util.control.Breaks._
  10. import scala.collection.JavaConversions._
  11. import Classifier_RandomForest.Classifier_RandomForest
  12. import org.apache.spark.SparkContext
  13. import org.apache.spark.ml.feature.VectorAssembler
  14. import org.apache.spark.sql.types.{StructField, StructType}
  15.  
  16. import scala.collection.mutable
  17. import scala.io.Source
  18.  
  19. /**
  20. * Created by antonio on 12/26/16.
  21. */
  22.  
  23. object FeatureSelector {
  24. var label_attribute = ""
  25. var conta_Globale: MutableInt = 0
  26. var mappa_Contatori = collection.mutable.Map[String,Integer]()
  27. var mappa_tot_scambi = collection.mutable.Map[String,Integer]()
  28. var single_list_attrs_for_switches = List[String]()
  29. var list_attrs_for_switches = List[List[String]]()
  30. var diversities = collection.mutable.Map[String,Double]() // contiene la diversità di ogni feature [vecchio diversities]
  31. var blacklist = List[String]() // contiene le feature che vengono esclusi via via dalla computazione
  32. var switchesToDo = collection.mutable.Map[String,Integer]() // contiene per ogni attributo l'accelerazione discreta degli swap successful [vecchio switchesToDo]
  33. var batch_size = 200 // TODO: sarà da togliere dopo che lo prendiamo dall'esterno
  34. var gradienti = collection.mutable.Map[String,Double]()
  35. var batch_progression = collection.mutable.Map[String,Integer]()
  36.  
  37. implicit class MutableInt(var value: Int) {
  38. def inc() = {
  39. value += 1
  40. }
  41. }
  42.  
  43. //TODO: ??
  44. def function(s: MutableInt) {
  45. s.inc()
  46. }
  47.  
  48. def get_Features_As_List(DF: org.apache.spark.sql.DataFrame): List[String] = {
  49. var features = List[String]()
  50. for (p <- DF.schema.fields) {
  51. if (p.name != label_attribute) {
  52. features = p.name :: features
  53. }
  54. }
  55. return features
  56. }
  57.  
  58. def create_Var(s1: String, dict: collection.mutable.Map[String, Integer]) {
  59. var x = s1.toString
  60. dict.put(x, 0)
  61. }
  62.  
  63. def init_Counters(mappa_contatori: collection.mutable.Map[String, Integer], df: DataFrame): Unit ={
  64. for (p <- get_Features_As_List(df)) {
  65. create_Var(p, mappa_contatori)
  66. }
  67. }
  68.  
  69.  
  70. def create_g(s1: String, dict: collection.mutable.Map[String, Double]) {
  71. var x = s1.toString
  72. dict.put(x, 0)
  73. }
  74.  
  75. def init_Gradienti(gradienti: collection.mutable.Map[String, Double], df: DataFrame): Unit ={
  76. for (p <- get_Features_As_List(df)) {
  77. create_g(p, gradienti)
  78. }
  79. }
  80. @Deprecated
  81. def create_bp(s1: String, dict: collection.mutable.Map[String, Integer]) {
  82. var x = s1.toString
  83. dict.put(x, 0)
  84. }
  85. @Deprecated
  86. def init_Batch_Progression(batch_progression: collection.mutable.Map[String, Integer], switches_attribute: List[String]): Unit ={
  87. for (a <- switches_attribute) {
  88. create_bp(a, batch_progression)
  89. }
  90. }
  91. def isCompleted(switchesToDo: collection.mutable.Map[String,Integer]): Boolean ={
  92. switchesToDo.values.forall(x => x==0)
  93. }
  94.  
  95. def csv_toSingleList_attr(path: String, sc: SparkContext): List[String]={
  96. var text = sc.textFile(path)
  97. val map1 = text.map(line => line.split("#"))
  98. val map2 = map1.map(linea => linea.mkString(",").replace(",", " ").replace(" ", " "))
  99. val map3 = map2.map(row => if (row.split(" ").last != "degrees" && row.split(" ").last.toDouble >= 95.0 && row.split(" ").last.toDouble < 100.0) row.split(" ").mkString(",") else "")
  100.  
  101. var res = List[String]()
  102. for (linea <- map3.collect()) {
  103. if (linea != "") {
  104. for (splitt <- linea.split(",")) {
  105. if(!splitt.contains("."))
  106. if(!res.contains(splitt))
  107. res = splitt :: res
  108. }
  109. }
  110. }
  111. return res
  112. }
  113.  
  114. def csv_toList_attr(path: String, sc: SparkContext): List[List[String]]={
  115. var text = sc.textFile(path)
  116. val map1 = text.map(line => line.split("#"))
  117. val map2 = map1.map(linea => linea.mkString(",").replace(",", " ").replace(" ", " "))
  118. val map3 = map2.map(row => if (row.split(" ").last != "degrees" && row.split(" ").last.toDouble >= 95.0 && row.split(" ").last.toDouble < 100.0) row.split(" ").mkString(",") else "")
  119.  
  120. var list1 = List(List[String]())
  121. for (linea <- map3.collect()) {
  122. if (linea != "") {
  123. var list2 = List[String]()
  124. for (splitt <- linea.split(",")) {
  125. if(!splitt.contains("."))
  126. list2= splitt :: list2
  127. }
  128. list1 = list2 :: list1
  129. }
  130. }
  131. return list1
  132. }
  133.  
  134.  
  135. def over_Soglia(countmap: collection.mutable.Map[String, Integer]): Boolean = {
  136. for (p <- 0 until countmap.values.size) {
  137. var cont = 0
  138. var cont2 = 0
  139. var res = 0
  140. for (pp <- 0 until countmap.values.size) {
  141. if (countmap.values.toList(p) != countmap.values.toList(pp)) {
  142. if (countmap.values.toList(p) - countmap.values.toList(pp) >= 200) {
  143. cont += 1
  144. }
  145. if(cont==countmap.size - ((countmap.size/5)-1)) return true
  146. }
  147. }
  148. }
  149. return false
  150. }
  151.  
  152. //TODO: diventa initDiversities. Dovrebbe contenere un indice di diversità. Io farei distint/count
  153. def init_Diversities(diversities: collection.mutable.Map[String, Double], df: DataFrame): Unit ={ // [vecchio diversities] //MITHOSIS È ZERO??????
  154. var mean = df.drop(label_attribute).describe().collectAsList().get(1)
  155. var stddev = df.drop(label_attribute).describe().collectAsList().get(2)
  156. val attributi = df.drop(label_attribute).schema.fields
  157. var list_stddev = List[Double]()
  158. var list_mean = List[Double]()
  159.  
  160. for (s <- 1 until stddev.size){
  161. list_stddev = list_stddev :+ stddev(s).toString.toDouble
  162. }
  163. for (m <- 1 until mean.size){
  164. list_mean = list_mean :+ mean(m).toString.toDouble
  165. }
  166. for (i <- 0 until attributi.size){
  167. diversities.put(attributi(i).toString().split("\\(")(1).split(",")(0),(list_stddev(i)+list_mean(i))/(list_stddev(i)*list_mean(i)))
  168. }
  169. val max = diversities.values.max
  170. for (d <- diversities.keys){
  171. diversities.put(d,diversities.get(d).get/max)
  172. }
  173. //complemento a 1
  174. //0 non cambia mai, 1 varia sempre
  175. for (d <- diversities.keys){
  176. diversities.put(d,1-diversities.get(d).get)
  177. }
  178. }
  179.  
  180. def init_Diversities_DistinctCount(diversities: collection.mutable.Map[String, Double], df: DataFrame): Unit ={
  181. var attributi = get_Features_As_List(df)
  182. for(a <- attributi){
  183. diversities.put(a,df.select(a).distinct().count().toDouble/df.count().toDouble)
  184. }
  185. val max = diversities.values.max
  186. for (d <- diversities.keys){
  187. diversities.put(d,diversities.get(d).get/max)
  188. }
  189. }
  190.  
  191. // def variability_check(attributo: String, diversities: collection.mutable.Map[String, Double], mappa_contatori: collection.mutable.Map[String, Integer], soglia: Double, soglia_Stop_Counters: Integer): Boolean ={
  192. // var attributo_cleaned = attributo.replace("cont_", "")
  193. // if (diversities.get(attributo_cleaned).get >= soglia && mappa_contatori.get(attributo).get>=soglia_Stop_Counters)
  194. // return false
  195. // return true
  196. // }
  197.  
  198. //non più valido
  199. // def diversities_Treshold(diversities: collection.mutable.Map[String,Double]): Double ={
  200. // return ListMap(diversities.toSeq.sortBy(_._2):_*).values.toList(3)
  201. // }
  202. @Deprecated
  203. def slice_Batch(switchesToDo: collection.mutable.Map[String,Integer], diversities: collection.mutable.Map[String,Double], batch_size: Integer): Unit ={
  204. for(k <- diversities.keys){
  205. var temp = diversities.get(k).get * batch_size
  206. switchesToDo.put(k,math.ceil(temp).toInt)
  207. }
  208. }
  209.  
  210. def slice_Batch_V2(switchesToDo: collection.mutable.Map[String,Integer], diversities: collection.mutable.Map[String,Double], batch_size: Integer): Unit ={
  211. var sum = 0.0
  212. for(k <- diversities.keys){
  213. sum+=diversities.get(k).get
  214. }
  215. for(k <- diversities.keys){
  216. var temp = (diversities.get(k).get * batch_size) / sum
  217. switchesToDo.put(k,math.ceil(temp).toInt)
  218. }
  219. }
  220.  
  221.  
  222.  
  223. def init(df: org.apache.spark.sql.DataFrame, l_a: String, sc: SparkContext, BATCH_SIZE: Integer): Unit ={
  224. label_attribute=l_a
  225. init_Counters(mappa_Contatori,df)
  226. init_Counters(mappa_tot_scambi,df)
  227. //init_Diversities(diversities,df)
  228. init_Diversities_DistinctCount(diversities,df)
  229. blacklist = List[String]()
  230. //single_list_attrs_for_switches = csv_toSingleList_attr("/home/pierluigi/spark-outlier-explain/dependencies/cl_simplified.csv_dependencies", sc)
  231. single_list_attrs_for_switches = get_Features_As_List(df)
  232. list_attrs_for_switches = csv_toList_attr("/home/pierluigi/spark-outlier-explain/dependencies/cl_simplified.csv_dependencies", sc)
  233. batch_size = BATCH_SIZE
  234. slice_Batch_V2(switchesToDo,diversities,batch_size)
  235. println("\nDIVERSITIES\n")
  236. diversities.foreach(println)
  237. println("\nSWITCHESTODO\n")
  238. switchesToDo.foreach(println)
  239. init_Gradienti(gradienti,df)
  240. }
  241.  
  242.  
  243.  
  244.  
  245. def switch(tupla1: org.apache.spark.sql.DataFrame, tupla2: org.apache.spark.sql.DataFrame, attr: String): List[org.apache.spark.sql.DataFrame] = {
  246. val t1 = tupla1.drop(attr).join(tupla2.select(attr))
  247. val t2 = tupla2.drop(attr).join(tupla1.select(attr))
  248. val res = List(t1, t2)
  249. return res
  250. }
  251.  
  252.  
  253. def switch_multiple(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, tupla1: org.apache.spark.sql.DataFrame, tupla2: org.apache.spark.sql.DataFrame, attrs: Seq[String]): List[org.apache.spark.sql.DataFrame] = {
  254. val filteredTupla1 = tupla1.select(outliers.columns
  255. .filter(colName => !attrs.contains(colName))
  256. .map(colName => new Column(colName)): _*)
  257. val filteredTupla2 = tupla2.select(inliers.columns
  258. .filter(colName => !attrs.contains(colName))
  259. .map(colName => new Column(colName)): _*)
  260. val t1 = filteredTupla1.join(tupla2.select(attrs.head, attrs.tail: _*))
  261. val t2 = filteredTupla2.join(tupla1.select(attrs.head, attrs.tail: _*))
  262. val res = List(t1, t2)
  263. return res
  264. }
  265.  
  266.  
  267. //TODO: tutti questi metodi devono restituire un insieme di feature.
  268. //TODO: SE DOPO IL SWITCH LA TUPLA GIÀ ESISTE Exit
  269. /*
  270. This method try to change every possible inlier with every possible outlier
  271. */
  272. def select_features_exhaustive_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, treshold: Double): List[String] = {
  273. var res = List[String]()
  274. for (i <- outliers.collectAsList()) {
  275. for (j <- inliers.collectAsList()) {
  276. if (i != j) {
  277. var lista1: java.util.List[Row] = List(i)
  278. var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
  279. var lista2: java.util.List[Row] = List(j)
  280. var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
  281. for (elem <- single_list_attrs_for_switches) {
  282. if (df1.select(elem).first() != df2.select(elem).first()) {
  283. val switch1 = switch(df1, df2, elem.toString)(0)
  284. var prediction = classifier.classification(outliers, switch1, label_attribute)
  285. if(prediction!=2.0) { //se è 2.0 vuol dire che la tupla formatasi è già presente nel dataframe
  286. if (prediction == 0.0)
  287. mappa_Contatori(elem) += 1
  288. switchesToDo(elem) -= 1
  289. mappa_tot_scambi(elem) += 1
  290. }
  291. }
  292. }
  293. }
  294. }
  295. }
  296. for (m <- mappa_Contatori.keys) {
  297. if ((mappa_Contatori.get(m).get.toDouble / mappa_tot_scambi.get(m).get.toDouble) / (outliers.count().toDouble * inliers.count().toDouble * mappa_Contatori.size.toDouble) > treshold) {
  298. res = res :+ m
  299. }
  300. }
  301. return res
  302. }
  303.  
  304. /*
  305. This method ....
  306. */
  307. // def select_features_with_Soglia_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, model: RandomForestClassificationModel, assembler: VectorAssembler, Classifier: Classifier_RandomForest): Unit = {
  308. // for (i <- outliers.collectAsList()) {
  309. // for (j <- inliers.collectAsList()) {
  310. // if (i != j) {
  311. // var lista1: java.util.List[Row] = List(i)
  312. // var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
  313. // var lista2: java.util.List[Row] = List(j)
  314. // var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
  315. // for (l <- list_attrs_for_switches) {
  316. // if (!l.isEmpty) {
  317. // for (elem <- l) {
  318. // if(df1.select(elem).first() != df2.select(elem).first()) {
  319. //
  320. // val switch1 = switch(df1, df2, elem.toString)(0)
  321. // //outliers o inliers qui è uguale, mi serve solo per avere lo schema del dataframe
  322. // var df31RF = Classifier.test_prearrangement(outliers,assembler,label_attribute,switch1)
  323. // if (Classifier.classify(model,df31RF)==0.0) {
  324. // mappa_Contatori(elem) += 1
  325. // }
  326. // mappa_tot_scambi(elem) += 1
  327. // }
  328. // }
  329. // }
  330. // }
  331. // if (over_Soglia(mappa_Contatori)) return
  332. // }
  333. // }
  334. // }
  335. // }
  336.  
  337. // def select_features_with_Counters_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, model: RandomForestClassificationModel, assembler: VectorAssembler, Classifier: Classifier_RandomForest, soglia: Integer): Unit = {
  338. // var contatore=0
  339. // for (i <- outliers.collectAsList()) {
  340. // for (j <- inliers.collectAsList()) {
  341. // if (i != j) {
  342. // var lista1: java.util.List[Row] = List(i)
  343. // var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
  344. // var lista2: java.util.List[Row] = List(j)
  345. // var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
  346. // for (l <- list_attrs_for_switches) {
  347. // if (!l.isEmpty) {
  348. // for (elem <- l) {
  349. // if(df1.select(elem).first() != df2.select(elem).first()) {
  350. //
  351. // val switch1 = switch(df1, df2, elem.toString)(0)
  352. // //outliers o inliers qui è uguale, mi serve solo per avere lo schema del dataframe
  353. // var df31RF = Classifier.test_prearrangement(outliers,assembler,label_attribute,switch1)
  354. // if (Classifier.classify(model,df31RF)==0.0) {
  355. // mappa_Contatori(elem) += 1
  356. // contatore += 1
  357. // }
  358. // mappa_tot_scambi(elem) += 1
  359. // }
  360. // }
  361. // }
  362. // }
  363. // if(contatore>soglia) return
  364. // }
  365. // }
  366. // }
  367. // }
  368.  
  369. // def select_features_with_Global_Counter_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, model: RandomForestClassificationModel, assembler: VectorAssembler, Classifier: Classifier_RandomForest, soglia: Integer): Unit = {
  370. // for (i <- outliers.collectAsList()) {
  371. // for (j <- inliers.collectAsList()) {
  372. // if (i != j) {
  373. // var lista1: java.util.List[Row] = List(i)
  374. // var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
  375. // var lista2: java.util.List[Row] = List(j)
  376. // var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
  377. // for (l <- list_attrs_for_switches) {
  378. // if (!l.isEmpty) {
  379. // for (elem <- l) {
  380. // if(df1.select(elem).first() != df2.select(elem).first()) {
  381. //
  382. // val switch1 = switch(df1, df2, elem.toString)(0)
  383. // //outliers o inliers qui è uguale, mi serve solo per avere lo schema del dataframe
  384. // var df31RF = Classifier.test_prearrangement(outliers,assembler,label_attribute,switch1)
  385. // if (Classifier.classify(model,df31RF)==0.0) {
  386. // mappa_Contatori(elem) += 1
  387. // function(conta_Globale)
  388. // }
  389. // mappa_tot_scambi(elem) += 1
  390. // }
  391. //
  392. // }
  393. // }
  394. // }
  395. // if(conta_Globale.value>soglia) return
  396. // }
  397. // }
  398. // }
  399. // }
  400.  
  401. // def select_features_with_Diversities_check_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, model: RandomForestClassificationModel, assembler: VectorAssembler, Classifier_RandomForest: Classifier_RandomForest, soglia: Integer): Unit = {
  402. // val diversities_TH = diversities_Treshold(diversities)
  403. // for (i <- outliers.collectAsList()) {
  404. // for (j <- inliers.collectAsList()) {
  405. // if (i != j) {
  406. // var lista1: java.util.List[Row] = List(i)
  407. // var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
  408. // var lista2: java.util.List[Row] = List(j)
  409. // var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
  410. // for (l <- list_attrs_for_switches) {
  411. // if (!l.isEmpty) {
  412. // for (elem <- l) {
  413. // if (!blacklist.contains(elem)) {
  414. // if (df1.select(elem).first() != df2.select(elem).first()) {
  415. //
  416. // val switch1 = switch(df1, df2, elem.toString)(0)
  417. // //outliers o inliers qui è uguale, mi serve solo per avere lo schema del dataframe
  418. // var df31RF = Classifier_RandomForest.test_prearrangement(outliers, assembler, label_attribute, switch1)
  419. // if (Classifier_RandomForest.classify(model,df31RF)==0.0) {
  420. // if (variability_check("cont_" + elem, diversities, mappa_Contatori, diversities_TH, soglia))
  421. // mappa_Contatori("cont_" + elem) += 1
  422. // else
  423. // blacklist = blacklist :+ elem
  424. // }
  425. // mappa_tot_scambi("cont_" + elem) += 1
  426. // }
  427. // }
  428. // }
  429. // }
  430. // }
  431. // }
  432. // }
  433. // }
  434. // }
  435.  
  436. def select_features_with_Gradient_check_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Double): List[String] = {
  437. var res = List[String]()
  438. var loop = 0
  439. var redo_batch = false
  440. for (i <- outliers.collectAsList()) {
  441. for (j <- inliers.collectAsList()) {
  442. if (i != j) {
  443. println(i)
  444. println(j)
  445. var lista1: java.util.List[Row] = List(i)
  446. var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
  447. var lista2: java.util.List[Row] = List(j)
  448. var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
  449.  
  450. for (elem <- single_list_attrs_for_switches) {
  451. if (switchesToDo(elem) > 0) {
  452. if (df1.select(elem).first() != df2.select(elem).first()) {
  453. val switch1 = switch(df1, df2, elem.toString)(0)
  454. var prediction = classifier.classification(outliers, switch1, label_attribute)
  455. if(prediction!=2.0) { //se è 2.0 vuol dire che la tupla formatasi è già presente nel dataframe
  456. if (prediction == 0.0)
  457. mappa_Contatori(elem) += 1
  458. switchesToDo(elem) -= 1
  459. mappa_tot_scambi(elem) += 1
  460. }
  461. }
  462. }
  463. }
  464. if (isCompleted(switchesToDo)) {
  465. println("\n\n~~~~~~~~~~~~~~~~~~~~~~~~~BATCH~~~~~~~~~~~~~~~~~~~~~~~~~\n\n")
  466. var attributi_sensibili = 0
  467. for (m <- gradienti.keys) {
  468. var curr = (mappa_Contatori.get(m).get.toDouble / mappa_tot_scambi.get(m).get.toDouble) * 100
  469. //var old = gradienti.get(m).get
  470. gradienti.put(m, curr)
  471. if (curr + curr >= soglia*100) {
  472. println(curr, soglia*100)
  473. redo_batch = true
  474. }
  475. else {
  476. gradienti.remove(m)
  477. single_list_attrs_for_switches = single_list_attrs_for_switches.filter(_!=m)
  478. switchesToDo.remove(m)
  479. diversities.remove(m)
  480. }
  481. if (curr > soglia*100) {
  482. attributi_sensibili += 1
  483. }
  484. }
  485. if (attributi_sensibili == gradienti.size) {
  486. gradienti.foreach(println)
  487. return gradienti.keys.toList
  488. }
  489. if (redo_batch) {
  490. slice_Batch_V2(switchesToDo,diversities,batch_size)
  491. }
  492. }
  493. }
  494. }
  495. }
  496. return res
  497. }
  498.  
  499.  
  500.  
  501.  
  502. def main(args: Array[String]): Unit = {
  503. println("It works!")
  504.  
  505.  
  506. }
  507. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement