Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.rdd.RDD
- import org.apache.spark.SparkContext
- import org.joda.time.LocalDate
- import au.com.bytecode.opencsv.CSVReader
- import java.io.StringReader
- /** Carga un archivo en memoria y retorna un RDD.
- *
- *
- * @param file Path al archivo
- * @return RDD[Array[String]] Cada Array[String] tiene 6 elementos:
- * date, doi, ip_code, country, city, coords
- */
- def loadFile(file : String) : RDD[Array[String]] = {
- val resRDD = sc.textFile(file).map(l => l.split("\t"))
- return resRDD
- }
- /** Carga una lista de archivos en memoria y retorna un unico
- * RDD[Array[String]] con el contenido de todos los archivos.
- *
- * @param files Lista de paths
- * @return RDD[Array[String]] Cada Array[String] tiene 6 elementos:
- * date, doi, ip_code, country, city, coords
- */
- def loadDataset(files:List[String]) : RDD[Array[String]] = {
- val loaded = files.map(f => loadFile(f)).toArray
- val emptyRDD : RDD[Array[String]] = sc.emptyRDD
- val result = loaded.fold(emptyRDD)(_ ++ _)
- return result
- }
- /** Transforma un RDD[Array(String)] en un RDD[(DateTime,Int)].
- *
- * @param raw Dataset "crudo" de sci-hub.
- * @return RDD[(LocalDate,Int)]
- */
- def toDateTuple(raw: RDD[Array[String]]) : RDD[(LocalDate, Int)] = {
- val datetupleRDD = raw.map(w => w(0).split(" ")(0))
- val result = datetupleRDD.map(d => (LocalDate.parse(d), 1))
- return result
- }
- def aggregateByDay(data : RDD[(LocalDate, Int)]) = {
- data.reduceByKey((d,v) => d+v)
- }
- def aggregateByMonth(data : RDD[(LocalDate, Int)]) : RDD[(String,Int)] = {
- val res = data.map(d => (d._1.toString("MM-yyyy"), d._2))
- val result = res.reduceByKey((m,v) => m+v)
- return result
- }
- def aggregateByWeekDay(data : RDD[(LocalDate, Int)]) = {
- data.map{case (d,v) => (d.getDayOfWeek, v)}
- data.reduceByKey((d,v) => d+v)
- }
- val file_loaded = loadFile("/home/mam0110/zeppelin-0.6.1-bin-all/doc/dec2015_short.tab")
- val dataset_loaded = loadDataset(List("/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/sep2015.tab","/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/oct2015.tab","/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/nov2015.tab","/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/jan2016.tab","/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/feb2016.tab"))
- //"/home/mrc/Facultad/BigData/zeppelin-0.6.1-bin-all/doc/scihub_data/dec2015.tab", <=========> SANITIZAR
- //file_loaded.collect()
- val res = toDateTuple(dataset_loaded).cache()
- //val test = aggregateByDay(res)
- val downPerDay = aggregateByDay(res).collect()
- val downPerMonth = aggregateByMonth(res).collect()
- val downPerWeekDay = aggregateByWeekDay(res).collect()
- /** GRAFICOS **/
- //POR DIA
- println("%table Date\tDownloads\n" + downPerDay)
- downPerDay.foreach{ case(dow,count) => println("\"" + dow + "\"" + "\t" + count)}
- //POR MES
- println("%table Date\tDownloads\n" + downPerMonth)
- downPerMonth.foreach{ case(month,value) => println("\"" + month + "\"" + "\t" + value)}
- // POR DIA DE LA SEMANA
- println("%table Date\tDownloads\n" + downPerWeekDay)
- downPerWeekDay.foreach{ case(day,value) => println("\"" + day + "\"" + "\t" + value)}
- /** Carga un archivo contiene el mapeo prefix -> publisher.
- *
- * @param file Path al archivo.
- * @return RDD[(String,String)]
- */
- def loadPrefixMapping(file : String) : RDD[(String,String)] = {
- val inputRDD = sc.textFile(file)
- val res = inputRDD.map{l => val reader = new CSVReader(new StringReader(l));
- reader.readNext();
- }
- val result = res.filter(arr => arr != null).map(line => line.filter(! _.isEmpty)).filter(_(0)(0).isDigit).filter(arr => !arr.isEmpty).map(arr_str => (arr_str(2),arr_str(1)))
- return result
- }
- /** Retorna la metadata asociada a un DOI
- *
- * @param doi DOI
- * @return ...
- */
- def getMetadata(doi:String) = {
- }
- /**
- def aggregateByDoi(...) = {
- }
- def aggregateByPrefix(...) = {
- }
- def aggregateByCountry(...) = {
- }
- */
- val test = loadPrefixMapping("/home/mam0110/zeppelin-0.6.1-bin-all/doc/publisher_DOI_prefixes.csv")
- test.collect()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement