Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.sql.types._
- import org.apache.spark.sql.Row
- import java.sql.Timestamp
- import org.apache.spark.sql.functions.{lit, when}
- import org.apache.spark.sql.SaveMode
- import org.apache.spark.sql.expressions.Window
- var dir : String = "event-log/datos_prueba"
- var exp_session : Int= 1
- // Crea un RDD
- val eventsRDD = spark.sparkContext.textFile(dir)
- // Se encodea el schema en un string auxiliar
- val schemaString = "timestamp id_user id_event amount".split(" ")
- // Se genera el schema según el string
- val f_timestamp = StructField(schemaString(0), TimestampType, nullable=false)
- val f_id_user = StructField(schemaString(1), StringType, nullable=false)
- val f_id_event = StructField(schemaString(2), StringType, nullable=false)
- val f_amount = StructField(schemaString(3), StringType, nullable=true)
- var fields = Array(f_timestamp, f_id_user, f_id_event, f_amount)
- val schema = StructType(fields)
- // Convierte el RDD de String a Row
- val rowRDD = eventsRDD
- .map(_.split("\t"))
- .map(attributes => {
- // Se parsea el string de date a Timestamp
- val date = Timestamp.valueOf(attributes(0))
- Row(date, attributes(1), attributes(2), attributes(3))
- }
- )
- // Aplica el schema al RDD creando el Dataframe y luego se cambian los valores de amount a tipo Float
- val eventsDF = spark.createDataFrame(rowRDD, schema)
- .withColumn("amount",
- when($"amount" === "NULL", lit(null))
- .otherwise($"amount")
- .cast(FloatType)
- )
- eventsDF.printSchema()
- eventsDF.show(false)
- // Se guarda el archivo en formato parquet
- eventsDF.write.format("parquet").mode(SaveMode.Overwrite).save("event-log/hits.parquet")
- // EJERCICIO 3A
- /* ESTA PARTE ESTA ECHA COMO SI EL TIMESTAMP FUERAN ENTEROS
- HABRIA QUE CAMBIAR LA PARTE DE LOS TIMESTAMP-PREV, SACANDO LAS DIFERENCIAS DE TIEMPOS DE LA FORMA:
- val difMiliSeg = fecha2.getTime() - fecha.getTime()
- val difSeg = difMiliSeg/1000
- */
- val byIdUser = Window.partitionBy('id_user).orderBy('timestamp)
- // Se agrega una columna con un 1 los eventos de comienzo de cada sesion, y 0 al resto
- val prev = lag('timestamp, 1).over(byIdUser)
- eventsDF.withColumn("start_session", when(prev.isNull, 1).otherwise(when('timestamp-prev > exp_session, 1).otherwise(0))).show(false)
- // ASI ESTABA PROBANDO EN SPARK CON TIMESTAMP DE TIPO INT
- val exp_session = 1
- case class Event(timestamp: Int, id_user: String, id_event: String, amount: Double)
- val eventsDF = Seq(
- Event(1,"user1", "ev1", 0),
- Event(2,"user1","ev1",0),
- Event(4,"user2","ev2",756.70),
- Event(5,"user3","ev2",792.20),
- Event(10,"user1","ev1",0),
- Event(12,"user2","ev1",0),
- Event(13,"user2","ev1",0),
- Event(15,"user1","ev2",323.09),
- Event(15,"user1","ev2",719.26),
- Event(15,"user1","ev2",756.10),
- Event(16,"user3","ev1",0),
- Event(19,"user3","ev1",0)).toDF
- import org.apache.spark.sql.expressions.Window
- val byIdUser = Window.partitionBy('id_user).orderBy('timestamp)
- // Se agrega una columna con un 1 los eventos de comienzo de cada sesion, y 0 al resto
- val prev = lag('timestamp, 1).over(byIdUser)
- eventsDF.withColumn("start_session", when(prev.isNull, 1).otherwise(when('timestamp-prev > exp_session, 1).otherwise(0))).show(false)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement