Advertisement
Guest User

Untitled

a guest
Jan 23rd, 2017
167
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 22.60 KB | None | 0 0
  1. package outliers_relevant_attributes
  2.  
  3. import java.io.PrintWriter
  4. import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
  5. import collection.immutable.{ListMap, Map}
  6. import scala.collection.JavaConversions._
  7. import org.apache.spark.SparkContext
  8. import org.apache.spark.sql.types.{StructField, StructType}
  9. import scala.concurrent.duration.Duration
  10. import scala.concurrent.{Await, Future}
  11. import scala.concurrent._
  12. import ExecutionContext.Implicits.global
  13. /**
  14. * Created by antonio on 12/26/16.
  15. */
  16.  
  17. object FeatureSelector {
  18. var data : DataFrame = null
  19. var numThread = 1.0
  20. var label_attribute = ""
  21. var conta_Globale: MutableInt = 0
  22. var mappa_Contatori = collection.mutable.Map[String, Integer]()
  23. var mappa_tot_scambi = collection.mutable.Map[String, Integer]()
  24. var single_list_attrs_for_switches = List[String]()
  25. var list_attrs_for_switches = List[List[String]]()
  26. var diversities = collection.mutable.Map[String, Double]()
  27. var switchesToDo = collection.mutable.Map[String, Integer]()
  28. var batch_size = 200 //TODO: sarà da togliere dopo che lo prendiamo dall'esterno
  29. var gradienti = collection.mutable.Map[String, Double]()
  30. var batch_progression = collection.mutable.Map[String, Integer]()
  31. @Deprecated
  32. implicit class MutableInt(var value: Int) {
  33. def inc() = {
  34. value += 1
  35. }
  36. }
  37. @Deprecated
  38. def function(s: MutableInt) {
  39. s.inc()
  40. }
  41.  
  42. def get_Features_As_List(DF: org.apache.spark.sql.DataFrame): List[String] = {
  43. var features = List[String]()
  44. for (p <- DF.schema.fields) {
  45. if (p.name != label_attribute) {
  46. features = p.name :: features
  47. }
  48. }
  49. return features
  50. }
  51.  
  52. def create_Var(s1: String, dict: collection.mutable.Map[String, Integer]) {
  53. var x = s1.toString
  54. dict.put(x, 0)
  55. }
  56.  
  57. def init_Counters(mappa_contatori: collection.mutable.Map[String, Integer], df: DataFrame): Unit = {
  58. for (p <- get_Features_As_List(df)) {
  59. create_Var(p, mappa_contatori)
  60. }
  61. }
  62.  
  63.  
  64. def create_g(s1: String, dict: collection.mutable.Map[String, Double]) {
  65. var x = s1.toString
  66. dict.put(x, 0)
  67. }
  68.  
  69. def init_Gradienti(gradienti: collection.mutable.Map[String, Double], df: DataFrame): Unit = {
  70. for (p <- get_Features_As_List(df)) {
  71. create_g(p, gradienti)
  72. }
  73. }
  74.  
  75. def isCompleted(switchesToDo: collection.mutable.Map[String, Integer]): Boolean = {
  76. switchesToDo.values.forall(x => x == 0)
  77. }
  78.  
  79. def csv_toSingleList_attr(path: String, sc: SparkContext): List[String] = {
  80. var text = sc.textFile(path)
  81. val map1 = text.map(line => line.split("#"))
  82. val map2 = map1.map(linea => linea.mkString(",").replace(",", " ").replace(" ", " "))
  83. val map3 = map2.map(row => if (row.split(" ").last != "degrees" && row.split(" ").last.toDouble >= 95.0 && row.split(" ").last.toDouble < 100.0) row.split(" ").mkString(",") else "")
  84.  
  85. var res = List[String]()
  86. for (linea <- map3.collect()) {
  87. if (linea != "") {
  88. for (splitt <- linea.split(",")) {
  89. if (!splitt.contains("."))
  90. if (!res.contains(splitt))
  91. res = splitt :: res
  92. }
  93. }
  94. }
  95. return res
  96. }
  97.  
  98. def csv_toList_attr(path: String, sc: SparkContext): List[List[String]] = {
  99. var text = sc.textFile(path)
  100. val map1 = text.map(line => line.split("#"))
  101. val map2 = map1.map(linea => linea.mkString(",").replace(",", " ").replace(" ", " "))
  102. val map3 = map2.map(row => if (row.split(" ").last != "degrees" && row.split(" ").last.toDouble >= 95.0 && row.split(" ").last.toDouble < 100.0) row.split(" ").mkString(",") else "")
  103.  
  104. var list1 = List(List[String]())
  105. for (linea <- map3.collect()) {
  106. if (linea != "") {
  107. var list2 = List[String]()
  108. for (splitt <- linea.split(",")) {
  109. if (!splitt.contains("."))
  110. list2 = splitt :: list2
  111. }
  112. list1 = list2 :: list1
  113. }
  114. }
  115. return list1
  116. }
  117.  
  118. @Deprecated
  119. def over_Soglia(countmap: collection.mutable.Map[String, Integer]): Boolean = {
  120. for (p <- 0 until countmap.values.size) {
  121. var cont = 0
  122. var cont2 = 0
  123. var res = 0
  124. for (pp <- 0 until countmap.values.size) {
  125. if (countmap.values.toList(p) != countmap.values.toList(pp)) {
  126. if (countmap.values.toList(p) - countmap.values.toList(pp) >= 200) {
  127. cont += 1
  128. }
  129. if (cont == countmap.size - ((countmap.size / 5) - 1)) return true
  130. }
  131. }
  132. }
  133. return false
  134. }
  135. @Deprecated
  136. def init_Diversities(diversities: collection.mutable.Map[String, Double], df: DataFrame): Unit = {
  137. // [vecchio diversities] //MITHOSIS È ZERO??????
  138. var mean = df.drop(label_attribute).describe().collectAsList().get(1)
  139. var stddev = df.drop(label_attribute).describe().collectAsList().get(2)
  140. val attributi = df.drop(label_attribute).schema.fields
  141. var list_stddev = List[Double]()
  142. var list_mean = List[Double]()
  143.  
  144. for (s <- 1 until stddev.size) {
  145. list_stddev = list_stddev :+ stddev(s).toString.toDouble
  146. }
  147. for (m <- 1 until mean.size) {
  148. list_mean = list_mean :+ mean(m).toString.toDouble
  149. }
  150. for (i <- 0 until attributi.size) {
  151. diversities.put(attributi(i).toString().split("\\(")(1).split(",")(0), (list_stddev(i) + list_mean(i)) / (list_stddev(i) * list_mean(i)))
  152. }
  153. val max = diversities.values.max
  154. for (d <- diversities.keys) {
  155. diversities.put(d, diversities.get(d).get / max)
  156. }
  157. //complemento a 1
  158. //0 non cambia mai, 1 varia sempre
  159. for (d <- diversities.keys) {
  160. diversities.put(d, 1 - diversities.get(d).get)
  161. }
  162. }
  163.  
  164. def init_Diversities_DistinctCount(diversities: collection.mutable.Map[String, Double], df: DataFrame): Unit = {
  165. var attributi = get_Features_As_List(df)
  166. for (a <- attributi) {
  167. diversities.put(a, df.select(a).distinct().count().toDouble / df.count().toDouble)
  168. }
  169. val max = diversities.values.max
  170. for (d <- diversities.keys) {
  171. diversities.put(d, diversities.get(d).get / max)
  172. }
  173. }
  174.  
  175. def variability_check(attributo: String, diversities: collection.mutable.Map[String, Double], mappa_contatori: collection.mutable.Map[String, Integer], soglia: Double, soglia_Stop_Counters: Integer): Boolean = {
  176. if (diversities.get(attributo).get < soglia && mappa_contatori.get(attributo).get >= soglia_Stop_Counters)
  177. return false
  178. return true
  179. }
  180.  
  181. def diversities_Treshold(diversities: collection.mutable.Map[String, Double]): Double = {
  182. return ListMap(diversities.toSeq.sortBy(_._2): _*).values.toList(diversities.size-(diversities.size-(diversities.size-(diversities.size/4))))
  183. }
  184.  
  185.  
  186. @Deprecated
  187. def slice_Batch(switchesToDo: collection.mutable.Map[String, Integer], diversities: collection.mutable.Map[String, Double], batch_size: Double): Unit = {
  188. for (k <- diversities.keys) {
  189. var temp = diversities.get(k).get * batch_size
  190. switchesToDo.put(k, math.ceil(temp).toInt)
  191. }
  192. }
  193.  
  194. def slice_Batch_V2(switchesToDo: collection.mutable.Map[String, Integer], diversities: collection.mutable.Map[String, Double], batch_size: Double): Unit = {
  195. var sum = 0.0
  196. for (k <- diversities.keys) {
  197. sum += diversities.get(k).get
  198. }
  199. for (k <- diversities.keys) {
  200. var temp = (diversities.get(k).get * (batch_size/numThread)) / sum
  201. switchesToDo.put(k, math.ceil(temp).toInt)
  202. }
  203. }
  204.  
  205.  
  206. def init(df: org.apache.spark.sql.DataFrame, l_a: String, sc: SparkContext, BATCH_SIZE: Integer): Unit = {
  207. data = df
  208. label_attribute = l_a
  209. init_Counters(mappa_Contatori, df)
  210. init_Counters(mappa_tot_scambi, df)
  211. //init_Diversities(diversities,df)
  212. init_Diversities_DistinctCount(diversities, df)
  213. //single_list_attrs_for_switches = csv_toSingleList_attr("/home/pierluigi/spark-outlier-explain/dependencies/cl_simplified.csv_dependencies", sc)
  214. single_list_attrs_for_switches = get_Features_As_List(df) //usare la riga superiore quando sarà pronta la parte relativa alla selezione degli attributi con alta dipendenza
  215. list_attrs_for_switches = csv_toList_attr("/home/pierluigi/spark-outlier-explain/dependencies/cl_simplified.csv_dependencies", sc)
  216. batch_size = BATCH_SIZE
  217. slice_Batch_V2(switchesToDo, diversities, batch_size)
  218. init_Gradienti(gradienti, df)
  219. }
  220.  
  221.  
  222. def switch(tupla1: org.apache.spark.sql.DataFrame, tupla2: org.apache.spark.sql.DataFrame, attr: String): List[org.apache.spark.sql.DataFrame] = {
  223. val t1 = tupla1.drop(attr).join(tupla2.select(attr))
  224. val t2 = tupla2.drop(attr).join(tupla1.select(attr))
  225. val res = List(t1, t2)
  226. return res
  227. }
  228.  
  229.  
  230. def switch_multiple(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, tupla1: org.apache.spark.sql.DataFrame, tupla2: org.apache.spark.sql.DataFrame, attrs: Seq[String]): List[org.apache.spark.sql.DataFrame] = {
  231. val filteredTupla1 = tupla1.select(outliers.columns
  232. .filter(colName => !attrs.contains(colName))
  233. .map(colName => new Column(colName)): _*)
  234. val filteredTupla2 = tupla2.select(inliers.columns
  235. .filter(colName => !attrs.contains(colName))
  236. .map(colName => new Column(colName)): _*)
  237. val t1 = filteredTupla1.join(tupla2.select(attrs.head, attrs.tail: _*))
  238. val t2 = filteredTupla2.join(tupla1.select(attrs.head, attrs.tail: _*))
  239. val res = List(t1, t2)
  240. return res
  241. }
  242.  
  243. /*
  244. This method try to change every possible inlier with every possible outlier
  245. */
  246. //EXHAUSTIVE
  247. def init_Threads_Exhaustive(): (collection.mutable.Map[String, Integer],collection.mutable.Map[String, Integer]) ={
  248. var mappa_contatori_copy = collection.mutable.Map[String, Integer]()
  249. var mappa_tot_scambi_copy = collection.mutable.Map[String, Integer]()
  250. init_Counters(mappa_contatori_copy,data)
  251. init_Counters(mappa_tot_scambi_copy,data)
  252. return (mappa_contatori_copy,mappa_tot_scambi_copy)
  253. }
  254.  
  255. def future_producer_exhaustive_SingleSwitch(lista_lanci: List[(DataFrame,DataFrame)],sqlContext: SQLContext, classifier: Classifier, soglia: Double, printWriter: PrintWriter) = {
  256. var seq = Seq[Future[Any]]()
  257. for ((o,i) <- lista_lanci){
  258. var fut = Future{
  259. var temp = select_features_exhaustive_SingleSwitch(o,i,sqlContext,classifier,soglia);
  260. temp.foreach(t => printWriter.write(t+"\n"))
  261. }
  262. seq = seq :+ fut
  263. }
  264. Future.sequence(seq)
  265. }
  266.  
  267. def select_features_exhaustive_SingleSwitch_Parallelism(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Double, numero_thread: Integer, printWriter: PrintWriter): Unit ={
  268. numThread = numero_thread.toDouble
  269. var lista_lanci = List[(DataFrame,DataFrame)]()
  270. var splits = Array.fill(numero_thread){0.0}
  271. for (i <- 0 until numero_thread){
  272. splits.update(i,1.0/numThread)
  273. }
  274. var split_o = outliers.randomSplit(splits)
  275. var split_i = inliers.randomSplit(splits)
  276. for ((o, i) <- (split_o zip split_i)){
  277. var temp : (DataFrame,DataFrame) = (o,i)
  278. lista_lanci = lista_lanci :+ temp
  279. }
  280. Await.ready(future_producer_exhaustive_SingleSwitch(lista_lanci,sqlContext,classifier,soglia,printWriter), Duration.Inf)
  281. }
  282.  
  283. def select_features_exhaustive_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, treshold: Double): List[String] = {
  284. var (mappa_contatori_local, mappa_tot_scambi_local) = init_Threads_Exhaustive()
  285. var res = List[String]()
  286. for (i <- outliers.collectAsList()) {
  287. for (j <- inliers.collectAsList()) {
  288. if (i != j) {
  289. println(i)
  290. println(j)
  291. var lista1: java.util.List[Row] = List(i)
  292. var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
  293. var lista2: java.util.List[Row] = List(j)
  294. var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
  295. for (elem <- single_list_attrs_for_switches) {
  296. if (df1.select(elem).first() != df2.select(elem).first()) {
  297. val switch1 = switch(df1, df2, elem.toString)(0)
  298. var prediction = classifier.classification(outliers, switch1, label_attribute)
  299. if (prediction != 2.0) {
  300. //se è 2.0 vuol dire che la tupla formatasi è già presente nel dataframe
  301. if (prediction == 0.0)
  302. mappa_contatori_local(elem) += 1
  303. mappa_tot_scambi_local(elem) += 1
  304. }
  305. }
  306. }
  307. }
  308. }
  309. }
  310. for (m <- mappa_contatori_local.keys) {
  311. if ((mappa_contatori_local.get(m).get.toDouble / mappa_tot_scambi_local.get(m).get.toDouble) / (outliers.count().toDouble * inliers.count().toDouble * mappa_contatori_local.size.toDouble) > treshold) {
  312. res = res :+ m
  313. }
  314. }
  315. return res
  316. }
  317. //FINE EXHAUSTIVE
  318.  
  319. //VARIABILITÀ
  320. def init_Threads_With_Diversities(): (List[String],collection.mutable.Map[String, Integer],collection.mutable.Map[String, Integer]) ={
  321. var list_attr_copy = get_Features_As_List(data)
  322. var mappa_contatori_copy = collection.mutable.Map[String, Integer]()
  323. var mappa_tot_scambi_copy = collection.mutable.Map[String, Integer]()
  324. init_Counters(mappa_contatori_copy,data)
  325. init_Counters(mappa_tot_scambi_copy,data)
  326. return (list_attr_copy,mappa_contatori_copy,mappa_tot_scambi_copy)
  327. }
  328.  
  329. def future_producer_With_Diversities(lista_lanci: List[(DataFrame,DataFrame)],sqlContext: SQLContext, classifier: Classifier, soglia: Integer, printWriter: PrintWriter) = {
  330. var seq = Seq[Future[Any]]()
  331. for ((o,i) <- lista_lanci){
  332. var fut = Future{
  333. var temp = select_features_with_Diversities_check_SingleSwitch(o,i,sqlContext,classifier,soglia);
  334. temp.foreach(t => printWriter.write(t+"\n"))
  335. }
  336. seq = seq :+ fut
  337. }
  338. Future.sequence(seq)
  339. }
  340.  
  341. def select_features_With_Diversities_Check_Single_Switch_Parallelism(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Integer, numero_thread: Integer, printWriter: PrintWriter): Unit ={
  342. numThread = numero_thread.toDouble
  343. var lista_lanci = List[(DataFrame,DataFrame)]()
  344. var splits = Array.fill(numero_thread){0.0}
  345. for (i <- 0 until numero_thread){
  346. splits.update(i,1.0/numThread)
  347. }
  348. var split_o = outliers.randomSplit(splits)
  349. var split_i = inliers.randomSplit(splits)
  350. for ((o, i) <- (split_o zip split_i)){
  351. var temp : (DataFrame,DataFrame) = (o,i)
  352. lista_lanci = lista_lanci :+ temp
  353. }
  354. Await.ready(future_producer_With_Diversities(lista_lanci,sqlContext,classifier,soglia,printWriter), Duration.Inf)
  355. }
  356.  
  357.  
  358. def select_features_with_Diversities_check_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Integer): List[String] = {
  359. var res = List[String]()
  360. var (single_list_attrs_for_switches_local, mappa_Contatori_local, mappa_tot_scambi_local) = init_Threads_With_Diversities()
  361. val diversities_TH = diversities_Treshold(diversities)
  362. for (i <- outliers.collectAsList()) {
  363. for (j <- inliers.collectAsList()) {
  364. if (i != j) {
  365. println(i)
  366. println(j)
  367. var lista1: java.util.List[Row] = List(i)
  368. var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
  369. var lista2: java.util.List[Row] = List(j)
  370. var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
  371. for (elem <- single_list_attrs_for_switches_local) {
  372. if (df1.select(elem).first() != df2.select(elem).first()) {
  373. val switch1 = switch(df1, df2, elem.toString)(0)
  374. var prediction = classifier.classification(outliers, switch1, label_attribute)
  375. if (prediction != 2.0) {
  376. if (prediction == 0.0) {
  377. if (variability_check(elem, diversities, mappa_Contatori_local, diversities_TH, soglia))
  378. mappa_Contatori_local(elem) += 1
  379. else
  380. single_list_attrs_for_switches_local = single_list_attrs_for_switches_local.filter(_ != elem.toString)
  381. }
  382. mappa_tot_scambi_local(elem) += 1
  383. }
  384. }
  385.  
  386. }
  387. }
  388. }
  389. }
  390. return get_Features_As_List(data).diff(single_list_attrs_for_switches_local)
  391. }
  392. //FINE VARIABILITÀ
  393.  
  394. //GRADIENTE
  395. def init_Threads_With_Gradient(): (List[String],collection.mutable.Map[String,Integer],collection.mutable.Map[String,Double],collection.mutable.Map[String,Integer],collection.mutable.Map[String,Double],collection.mutable.Map[String,Integer]) ={
  396. var list_attr_copy = List[String]()
  397. var diversities_copy = collection.mutable.Map[String, Double]()
  398. var mappa_contatori_copy = collection.mutable.Map[String, Integer]()
  399. var mappa_tot_scambi_copy = collection.mutable.Map[String, Integer]()
  400. var gradienti_copy = collection.mutable.Map[String, Double]()
  401. var switchesToDo_copy = collection.mutable.Map[String,Integer]()
  402. list_attr_copy = get_Features_As_List(data)
  403. init_Diversities_DistinctCount(diversities_copy,data)
  404. init_Counters(mappa_contatori_copy,data)
  405. init_Counters(mappa_tot_scambi_copy,data)
  406. slice_Batch_V2(switchesToDo_copy,diversities_copy,(batch_size/numThread))
  407. init_Gradienti(gradienti_copy, data)
  408. return (list_attr_copy,switchesToDo_copy,diversities_copy,mappa_contatori_copy,gradienti_copy,mappa_tot_scambi_copy)
  409. }
  410.  
  411. def future_producer_with_Gradient(lista_lanci: List[(DataFrame,DataFrame)],sqlContext: SQLContext, classifier: Classifier, soglia: Double, printWriter: PrintWriter) = {
  412. var seq = Seq[Future[Any]]()
  413. for ((o,i) <- lista_lanci){
  414. var fut = Future{
  415. var temp = select_features_with_Gradient_check_SingleSwitch(o,i,sqlContext,classifier,soglia);
  416. temp.foreach(t => printWriter.write(t+"\n"))
  417. }
  418. seq = seq :+ fut
  419. }
  420. Future.sequence(seq)
  421. }
  422.  
  423.  
  424. def select_features_with_Gradient_check_Single_Switch_Parallelism(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Double, numero_thread: Integer, printWriter: PrintWriter): Unit ={
  425. numThread = numero_thread.toDouble
  426. var lista_lanci = List[(DataFrame,DataFrame)]()
  427. var splits = Array.fill(numero_thread){0.0}
  428. for (i <- 0 until numero_thread){
  429. splits.update(i,1.0/numThread)
  430. }
  431. var split_o = outliers.randomSplit(splits)
  432. var split_i = inliers.randomSplit(splits)
  433. for ((o, i) <- (split_o zip split_i)){
  434. var temp : (DataFrame,DataFrame) = (o,i)
  435. lista_lanci = lista_lanci :+ temp
  436. }
  437. Await.ready(future_producer_with_Gradient(lista_lanci,sqlContext,classifier,soglia,printWriter), Duration.Inf)
  438. }
  439.  
  440.  
  441. def select_features_with_Gradient_check_SingleSwitch(outliers: org.apache.spark.sql.DataFrame, inliers: org.apache.spark.sql.DataFrame, sqlContext: org.apache.spark.sql.SQLContext, classifier: Classifier, soglia: Double): List[String] = {
  442. var num_batch = 0
  443. var (single_list_attrs_local, switchesToDo_local, diversities_local, mappa_contatori_local, gradienti_local, mappa_tot_scambi_local) = init_Threads_With_Gradient()
  444. var res = List[String]()
  445. var loop = 0
  446. var redo_batch = false
  447. for (i <- outliers.collectAsList()) {
  448. for (j <- inliers.collectAsList()) {
  449. redo_batch = false
  450. if (i != j) {
  451. println(i)
  452. println(j)
  453. var lista1: java.util.List[Row] = List(i)
  454. var df1 = sqlContext.createDataFrame(lista1, outliers.schema)
  455. var lista2: java.util.List[Row] = List(j)
  456. var df2 = sqlContext.createDataFrame(lista2, inliers.schema)
  457.  
  458. for (elem <- single_list_attrs_local) {
  459. if (switchesToDo_local(elem) > 0) {
  460. if (df1.select(elem).first() != df2.select(elem).first()) {
  461. val switch1 = switch(df1, df2, elem.toString)(0)
  462. var prediction = classifier.classification(outliers, switch1, label_attribute)
  463. if (prediction != 2.0) {
  464. //se è 2.0 vuol dire che la tupla formatasi è già presente nel dataframe
  465. if (prediction == 0.0)
  466. mappa_contatori_local(elem) += 1
  467. switchesToDo_local(elem) -= 1
  468. mappa_tot_scambi_local(elem) += 1
  469. }
  470. }
  471. }
  472. }
  473. if (isCompleted(switchesToDo_local)) {
  474. num_batch += 1
  475. println("\n\n~~~~~~~~~~~~~~~~~~~~~~~~~BATCH_" + num_batch + "~~~~~~~~~~~~~~~~~~~~~~~~~\n\n")
  476. var attributi_sensibili = 0
  477. for (m <- gradienti_local.keys) {
  478. var curr = (mappa_contatori_local.get(m).get.toDouble / mappa_tot_scambi_local.get(m).get.toDouble) * 100
  479. var old = gradienti_local.get(m).get
  480. var increment = curr - old
  481. gradienti_local.put(m, curr)
  482. if(num_batch !=1) {
  483. if (curr + increment >= soglia * 100) {
  484. println(curr, soglia * 100)
  485. redo_batch = true
  486. }
  487. else {
  488. gradienti_local.remove(m)
  489. single_list_attrs_local = single_list_attrs_local.filter(_ != m)
  490. switchesToDo_local.remove(m)
  491. diversities_local.remove(m)
  492. }
  493.  
  494. if (curr > soglia * 100) {
  495. attributi_sensibili += 1
  496. }
  497. }
  498. else
  499. redo_batch = true
  500. }
  501. if (attributi_sensibili == gradienti_local.size) {
  502. gradienti_local.foreach(println)
  503. return gradienti_local.keys.toList
  504. }
  505. if (redo_batch) {
  506. slice_Batch_V2(switchesToDo_local, diversities_local, batch_size / numThread)
  507. }
  508. }
  509. }
  510. }
  511. }
  512. return res
  513. }
  514. //FINE GRADIENTE
  515.  
  516. def clean(sparkContext: SparkContext, output_path: String): String ={
  517. val lines = sparkContext.textFile(output_path)
  518. val keyed = lines.map(line => line.split(" ")(0) -> line)
  519. val deduplicated = keyed.reduceByKey((a, b) => a)
  520. val output_parsed_path = output_path.replace(".txt","_parsed.txt")
  521. deduplicated.values.repartition(1).saveAsTextFile(output_parsed_path)
  522. return output_parsed_path
  523. }
  524.  
  525. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement