Advertisement
mrcmoresi

Sci-Hub lab 1

Sep 22nd, 2016
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 4.25 KB | None | 0 0
  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.SparkContext
  3. import org.joda.time.LocalDate
  4.  
  5. import au.com.bytecode.opencsv.CSVReader
  6. import java.io.StringReader
  7.  
  8.  
  9.  
  10. /** Carga un archivo en memoria y retorna un RDD.
  11.   *  
  12.   *
  13.   *  @param     file                Path al archivo
  14.   *  @return    RDD[Array[String]]  Cada Array[String] tiene 6 elementos:
  15.   *                                 date, doi, ip_code, country, city, coords
  16.  */
  17. def loadFile(file : String) : RDD[Array[String]] = {
  18.     val resRDD = sc.textFile(file).map(l => l.split("\t"))
  19.     return resRDD
  20. }  
  21.  
  22. /** Carga una lista de archivos en memoria y retorna un unico
  23.   * RDD[Array[String]] con el contenido de todos los archivos.
  24.   *  
  25.   *  @param     files               Lista de paths
  26.   *  @return    RDD[Array[String]]  Cada Array[String] tiene 6 elementos:
  27.   *                                 date, doi, ip_code, country, city, coords
  28. */
  29. def loadDataset(files:List[String]) : RDD[Array[String]] = {
  30.  
  31.     val loaded = files.map(f => loadFile(f)).toArray
  32.     val emptyRDD : RDD[Array[String]] = sc.emptyRDD
  33.     val result = loaded.fold(emptyRDD)(_ ++ _)
  34.     return result
  35. }
  36.  
  37. /** Transforma un RDD[Array(String)] en un RDD[(DateTime,Int)].
  38.   *  
  39.   *  @param     raw                 Dataset "crudo" de sci-hub.
  40.   *  @return    RDD[(LocalDate,Int)]   
  41.  
  42. */
  43. def toDateTuple(raw: RDD[Array[String]]) : RDD[(LocalDate, Int)] = {
  44.    
  45.     val datetupleRDD = raw.map(w => w(0).split(" ")(0))
  46.     val result = datetupleRDD.map(d => (LocalDate.parse(d), 1))
  47.     return result
  48.  
  49. }
  50.  
  51.  
  52.  
  53. def aggregateByDay(data :  RDD[(LocalDate, Int)]) = {
  54.  
  55.      data.reduceByKey((d,v) => d+v)
  56.    
  57. }
  58.  
  59.  
  60. def aggregateByMonth(data :  RDD[(LocalDate, Int)]) : RDD[(String,Int)] = {
  61.     val res = data.map(d => (d._1.toString("MM-yyyy"), d._2))
  62.     val result = res.reduceByKey((m,v) => m+v)
  63.     return result
  64. }
  65.  
  66. def aggregateByWeekDay(data :  RDD[(LocalDate, Int)]) = {
  67.     data.map{case (d,v) => (d.getDayOfWeek, v)}
  68.     data.reduceByKey((d,v) => d+v)
  69.  
  70. }
  71.  
  72.  
  73.  
  74. val file_loaded = loadFile("/home/mam0110/zeppelin-0.6.1-bin-all/doc/dec2015_short.tab")
  75. val dataset_loaded = loadDataset(List("/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/sep2015.tab","/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/oct2015.tab","/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/nov2015.tab","/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/jan2016.tab","/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/feb2016.tab"))
  76.  
  77.  
  78. //"/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/dec2015.tab", <=========> SANITIZAR
  79.  
  80. //file_loaded.collect()
  81. val res = toDateTuple(dataset_loaded).cache()
  82. //val test = aggregateByDay(res)
  83. val downPerDay = aggregateByDay(res).collect()
  84. val downPerMonth = aggregateByMonth(res).collect()
  85. val downPerWeekDay = aggregateByWeekDay(res).collect()
  86.  
  87.  
  88.  
  89. /**      GRAFICOS        **/
  90.  
  91.  
  92. //POR DIA
  93.  
  94. println("%table Date\tDownloads\n" + downPerDay)
  95. downPerDay.foreach{ case(dow,count) => println("\"" + dow + "\"" + "\t" + count)}
  96.  
  97. //POR MES
  98.  
  99. println("%table Date\tDownloads\n" + downPerMonth)
  100. downPerMonth.foreach{ case(month,value) =>  println("\"" + month + "\"" + "\t" + value)}
  101.  
  102. // POR DIA DE LA SEMANA
  103.  
  104. println("%table Date\tDownloads\n" + downPerWeekDay)
  105. downPerWeekDay.foreach{ case(day,value) =>  println("\"" + day + "\"" + "\t" + value)}
  106.  
  107.  
  108.  
  109.  
  110.  
  111. /** Carga un archivo contiene el mapeo prefix -> publisher.
  112. *  
  113. *  @param   file                    Path al archivo.
  114. *  @return  RDD[(String,String)]    
  115. */
  116. def loadPrefixMapping(file : String) : RDD[(String,String)] = {
  117.     val inputRDD = sc.textFile(file)
  118.     val res = inputRDD.map{l => val reader = new CSVReader(new StringReader(l));
  119.         reader.readNext();
  120.     }
  121.    
  122.     val result = res.filter(arr => arr != null).map(line => line.filter(! _.isEmpty)).filter(_(0)(0).isDigit).filter(arr => !arr.isEmpty).map(arr_str => (arr_str(2),arr_str(1)))
  123.     return result
  124. }
  125.  
  126. /** Retorna la metadata asociada a un DOI
  127. *  
  128. *  @param   doi                 DOI
  129. *  @return  ...    
  130. */
  131. def getMetadata(doi:String) = {
  132.    
  133. }
  134.  
  135.  
  136. /**
  137. def aggregateByDoi(...) = {
  138.  
  139. }
  140.  
  141. def aggregateByPrefix(...) = {
  142.  
  143. }
  144.  
  145. def aggregateByCountry(...) = {
  146.  
  147. }
  148. */
  149. val test = loadPrefixMapping("/home/mam0110/zeppelin-0.6.1-bin-all/doc/publisher_DOI_prefixes.csv")
  150.  
  151. test.collect()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement