Advertisement
Guest User

Nacho

a guest
Oct 26th, 2016
54
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.34 KB | None | 0 0
  1. import org.apache.spark.sql.types._
  2. import org.apache.spark.sql.Row
  3. import java.sql.Timestamp
  4. import org.apache.spark.sql.functions.{lit, when}
  5. import org.apache.spark.sql.SaveMode
  6. import org.apache.spark.sql.expressions.Window
  7.  
  8. var dir : String = "event-log/datos_prueba"
  9. var exp_session : Int= 1
  10.  
  11. // Crea un RDD
  12. val eventsRDD = spark.sparkContext.textFile(dir)
  13.  
  14. // Se encodea el schema en un string auxiliar
  15. val schemaString = "timestamp id_user id_event amount".split(" ")
  16.  
  17. // Se genera el schema según el string
  18. val f_timestamp = StructField(schemaString(0), TimestampType, nullable=false)
  19. val f_id_user = StructField(schemaString(1), StringType, nullable=false)
  20. val f_id_event = StructField(schemaString(2), StringType, nullable=false)
  21. val f_amount = StructField(schemaString(3), StringType, nullable=true)
  22.  
  23. var fields =  Array(f_timestamp, f_id_user, f_id_event, f_amount)
  24. val schema = StructType(fields)
  25.  
  26. // Convierte el RDD de String a Row
  27. val rowRDD = eventsRDD
  28.   .map(_.split("\t"))
  29.   .map(attributes => {
  30.         // Se parsea el string de date a Timestamp
  31.         val date = Timestamp.valueOf(attributes(0))
  32.         Row(date, attributes(1), attributes(2), attributes(3))
  33.         }
  34.     )
  35.  
  36. // Aplica el schema al RDD creando el Dataframe y luego se cambian los valores de amount a tipo Float
  37. val eventsDF = spark.createDataFrame(rowRDD, schema)
  38.                     .withColumn("amount",
  39.                         when($"amount" === "NULL", lit(null))
  40.                         .otherwise($"amount")
  41.                         .cast(FloatType)
  42.                 )
  43.  
  44.  
  45. eventsDF.printSchema()
  46. eventsDF.show(false)
  47.  
  48. // Se guarda el archivo en formato parquet
  49. eventsDF.write.format("parquet").mode(SaveMode.Overwrite).save("event-log/hits.parquet")
  50.  
  51.  
  52. // EJERCICIO 3A
  53. /* ESTA PARTE ESTA ECHA COMO SI EL TIMESTAMP FUERAN ENTEROS
  54.    HABRIA QUE CAMBIAR LA PARTE DE LOS TIMESTAMP-PREV, SACANDO LAS DIFERENCIAS DE TIEMPOS DE LA FORMA:
  55.  
  56.  
  57. val difMiliSeg = fecha2.getTime() - fecha.getTime()
  58. val difSeg = difMiliSeg/1000
  59.  
  60. */
  61.  
  62. val byIdUser = Window.partitionBy('id_user).orderBy('timestamp)
  63. // Se agrega una columna con un 1 los eventos de comienzo de cada sesion, y 0 al resto  
  64. val prev = lag('timestamp, 1).over(byIdUser)
  65. eventsDF.withColumn("start_session", when(prev.isNull, 1).otherwise(when('timestamp-prev > exp_session, 1).otherwise(0))).show(false)
  66.  
  67.  
  68.  
  69.  
  70.  
  71.  
  72.  
  73.  
  74.  
  75. // ASI ESTABA PROBANDO EN SPARK CON TIMESTAMP DE TIPO INT
  76.  
  77. val exp_session = 1
  78. case class Event(timestamp: Int, id_user: String, id_event: String, amount: Double)
  79.  
  80. val eventsDF = Seq(
  81.     Event(1,"user1", "ev1", 0),
  82.     Event(2,"user1","ev1",0),
  83.     Event(4,"user2","ev2",756.70),
  84.     Event(5,"user3","ev2",792.20),
  85.     Event(10,"user1","ev1",0),
  86.     Event(12,"user2","ev1",0),
  87.     Event(13,"user2","ev1",0),
  88.     Event(15,"user1","ev2",323.09),
  89.     Event(15,"user1","ev2",719.26),
  90.     Event(15,"user1","ev2",756.10),
  91.     Event(16,"user3","ev1",0),
  92.     Event(19,"user3","ev1",0)).toDF
  93.  
  94. import org.apache.spark.sql.expressions.Window
  95.  
  96. val byIdUser = Window.partitionBy('id_user).orderBy('timestamp)
  97.  
  98. // Se agrega una columna con un 1 los eventos de comienzo de cada sesion, y 0 al resto  
  99. val prev = lag('timestamp, 1).over(byIdUser)
  100. eventsDF.withColumn("start_session", when(prev.isNull, 1).otherwise(when('timestamp-prev > exp_session, 1).otherwise(0))).show(false)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement