SHARE
TWEET

Untitled

a guest Sep 22nd, 2019 105 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package lab1
  2.  
  3. import org.apache.spark.sql._
  4. import org.apache.spark.sql.types._
  5. import org.apache.spark.sql.functions._
  6. import org.apache.spark.sql.SparkSession
  7. import org.apache.spark.sql.expressions.{UserDefinedFunction, Window}
  8. import org.apache.log4j.{Level, Logger}
  9. import java.sql.Date
  10. import scala.collection.mutable.WrappedArray
  11.  
  12.  
  13. object SBD_Lab1_Df {
  14.     case class GdeltData (                  // the class of the Gdelt dataset
  15.         GKGRECORDID: String,
  16.         DATE: Date,
  17.         SourceCollectionIdentifier: Int,
  18.         SourceCommonName: String,
  19.         DocumentIdentifier: String,
  20.         Counts: String,
  21.         V2Counts: String,
  22.         Themes: String,
  23.         V2Themes: String,
  24.         Locations: String,
  25.         V2Locations: String,
  26.         Persons: String,
  27.         V2Persons: String,
  28.         Organizations: String,
  29.         V2Organizations: String,
  30.         V2Tone: String,
  31.         Dates: String,
  32.         GCAM: String,
  33.         SharingImage: String,
  34.         RelatedImages: String,
  35.         SocialImageEmbeds: String,
  36.         SocialVideoEmbeds: String,
  37.         Quotations: String,
  38.         AllNames: String,
  39.         Amounts: String,
  40.         TranslationInfo: String,
  41.         Extras: String
  42.     )
  43.  
  44.     def main(args: Array[String]) {
  45.  
  46.         val schema = StructType(              // the schema of the Gdelt dataset
  47.             Array(
  48.                 StructField("GKGRECORDID", StringType, nullable = true),
  49.                 StructField("DATE", DateType, nullable = true),
  50.                 StructField("SourceCollectionIdentifier", IntegerType, nullable = true),
  51.                 StructField("SourceCommonName", StringType, nullable = true),
  52.                 StructField("DocumentIdentifier", StringType, nullable = true),
  53.                 StructField("Counts", StringType, nullable = true),
  54.                 StructField("V2Counts", StringType, nullable = true),
  55.                 StructField("Themes", StringType, nullable = true),
  56.                 StructField("V2Themes", StringType, nullable = true),
  57.                 StructField("Locations",StringType, nullable = true),
  58.                 StructField("V2Locations",StringType, nullable = true),
  59.                 StructField("Persons", StringType, nullable = true),
  60.                 StructField("V2Persons", StringType, nullable = true),
  61.                 StructField("Organizations", StringType, nullable = true),
  62.                 StructField("V2Organizations", StringType, nullable = true),
  63.                 StructField("V2Tone", StringType, nullable = true),
  64.                 StructField("Dates",StringType, nullable = true),
  65.                 StructField("GCAM", StringType, nullable = true),
  66.                 StructField("SharingImage", StringType, nullable = true),
  67.                 StructField("RelatedImages", StringType, nullable = true),
  68.                 StructField("SocialImageEmbeds", StringType, nullable = true),
  69.                 StructField("SocialVideoEmbeds", StringType, nullable = true),
  70.                 StructField("Quotations", StringType, nullable = true),
  71.                 StructField("AllNames", StringType, nullable = true),
  72.                 StructField("Amounts", StringType, nullable = true),
  73.                 StructField("TranslationInfo", StringType, nullable = true),
  74.                 StructField("Extras", StringType, nullable = true)
  75.             )
  76.         )
  77.  
  78.         Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  79.  
  80.         val spark = SparkSession.builder.appName("SBD_Lab1").config("spark.master", "local[*]").getOrCreate()
  81.  
  82.         import spark.implicits._   
  83.  
  84.         // a user-defined function for converting a WrappedArray column to Set and then back to Array
  85.         val mkSet = udf((arrayCol: Seq[String]) => arrayCol.asInstanceOf[WrappedArray[String]].toSet.toArray)
  86.  
  87.         // a user-defined function for converting a list of lists to an array of struct of tuples
  88.         val mkList = udf((arrayCol: Seq[Seq[String]]) =>
  89.             arrayCol.asInstanceOf[WrappedArray[WrappedArray[String]]].map(s => s.toString.replaceAll("[()]", "").replaceAll("\\bWrappedArray","").split(",\\s")).map { case Array(f1,f2) => (f1,f2.toInt)})
  90.        
  91.         //the final JSON schema
  92.         val finalJSONSchema: String = "array<struct<topic:string,count:bigint>>"
  93.  
  94.         val ds = spark.read.format("csv")
  95.                 .option("delimiter", "\t")              // set the delimeter option as tab
  96.                 .option("dateFormat", "yyyyMMddHHmmss") // set the date format
  97.                 .schema(schema)
  98.                 .csv(args(0))
  99.                 .as[GdeltData]
  100.  
  101.         val processed_ds = ds
  102.                             .filter(col("DATE").isNotNull && col("AllNames").isNotNull)                         // filter out the null entries
  103.                             .select("DATE", "AllNames")                                                         // keep only the DATE and AllNames columns
  104.                             .select($"DATE", explode(mkSet(split(regexp_replace($"AllNames" ,"[,0-9]", ""), ";"))).as("AllNames"))  // clean the entity names -> convert them into set -> create date-name pairs
  105.                             .filter(!col("AllNames").contains("Type ParentCategory"))                           // Filter out ParentCategory
  106.                             .groupBy("DATE", "AllNames")                                                        // group by the columns in order to count the occurences
  107.                             .count                                                                              // of each distinct name in each day
  108.                             .withColumn("Rank", rank.over(Window.partitionBy("DATE").orderBy($"count".desc)))   // partition by date and find the rank in each day window
  109.                             .filter(col("Rank") <= 10)                                                          // keep only the top 10 counts for each day
  110.                             .groupBy("DATE")                                                                    // group by DATE
  111.                             .agg(collect_list(array("AllNames", "count")).as("TopNames"))                       // and collect top-10 name-count pairs as a list on each date
  112.                             .orderBy($"DATE".asc)                                                               // and ascendingly order by date
  113.                             .withColumn("TopNames", mkList($"TopNames"))                                        // change the structure of the final dataset
  114.                             .select('DATE as "data", 'TopNames.cast(finalJSONSchema) as "result")               // and apply the final JSON format
  115.                            .toJSON
  116.  
  117.        processed_ds.coalesce(1).write.text(args(1))    // write the wanted result
  118.  
  119.        spark.stop()
  120.    }
  121. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Not a member of Pastebin yet?
Sign Up, it unlocks many cool features!
 
Top