Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import scala.collection.JavaConverters._
- case class Profile(IDCLIENT_VLZ: String, GENDER: String, AGE: String, CARD_TYPE: String, OPTIN_TEL: Int, OPTIN_MAIL: Int, OPTIN_PRINT: Int, OPTIN_SMS: Int, OPTIN_PUSH_APP: Int, OPTIN_PUSH_WEB: Int, ENSEIGNE: String)
- case class CrossCanal(IDCLIENT_VLZ: String, IDSCORE: String, VALUE: String, ENSEIGNE: String)
- case class Profile_Vlz(DATA_USERIDS_VALUE_VLZ: String, DATA_PROFILE_GENDER: String, DATA_PROFILE_BIRTHDATE: String, DATA_PROFILE_LOYALTY: String, DATA_PROFILE_OPTINS_TEL: Boolean, DATA_PROFILE_OPTINS_MAIL: Boolean, DATA_PROFILE_OPTINS_PRINT: Boolean, DATA_PROFILE_OPTINS_SMS: Boolean, DATA_PROFILE_OPTINS_APP: Boolean, DATA_PROFILE_OPTINS_WEB: Boolean, DATA_SCORING_MONTHS_12: String, DATA_SCORING_MONTHS_24: String, DATA_SCORING_MONTHS_36: String)
- val ouputFilePathExport = s"dbfs:/mnt/solardls/$nameExport"
- val outputFileTMPPath = "dbfs:/mnt/solardls/TMPEXPORT/"
- val PROFILE = spark.table("ENTITY.PROFILES").selectExpr("IDCLIENT_VLZ", "GENDER", "AGE", "CARD_TYPE", "OPTIN_TEL", "OPTIN_MAIL", "OPTIN_PRINT", "OPTIN_SMS", "OPTIN_PUSH_APP", "OPTIN_PUSH_WEB", "ENSEIGNE").as[Profile].cache()
- val CROSSCANAL = spark.table("PROCESSED.CROSSCANAL").selectExpr("IDCLIENT_VLZ", "IDSCORE", "VALUE", "ENSEIGNE").as[CrossCanal].where("ENSEIGNE = 'VLZ'").cache()
- val profileValiuz = PROFILE.join(CROSSCANAL.where("IDSCORE = 'CROSSCANAL_12'")
- .selectExpr("IDCLIENT_VLZ", "ENSEIGNE", "LOWER(VALUE) AS DATA_SCORING_MONTHS_12"),
- Seq("IDCLIENT_VLZ", "ENSEIGNE"), "left")
- .join(CROSSCANAL.where("IDSCORE = 'CROSSCANAL_24'")
- .selectExpr("IDCLIENT_VLZ", "ENSEIGNE", "LOWER(VALUE) AS DATA_SCORING_MONTHS_24"),
- Seq("IDCLIENT_VLZ", "ENSEIGNE"), "left")
- .join(CROSSCANAL.where("IDSCORE = 'CROSSCANAL_36'")
- .selectExpr("IDCLIENT_VLZ", "ENSEIGNE", "LOWER(VALUE) AS DATA_SCORING_MONTHS_36"),
- Seq("IDCLIENT_VLZ", "ENSEIGNE"), "left")
- .selectExpr(
- "IDCLIENT_VLZ AS DATA_USERIDS_VALUE_VLZ",
- "LOWER(GENDER) AS DATA_PROFILE_GENDER",
- "AGE AS DATA_PROFILE_BIRTHDATE",
- "LOWER(CARD_TYPE) AS DATA_PROFILE_LOYALTY",
- "if(OPTIN_TEL == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_TEL",
- "if(OPTIN_MAIL == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_MAIL",
- "if(OPTIN_PRINT == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_PRINT",
- "if(OPTIN_SMS == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_SMS",
- "if(OPTIN_PUSH_APP == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_APP",
- "if(OPTIN_PUSH_WEB == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_WEB",
- "DATA_SCORING_MONTHS_12",
- "DATA_SCORING_MONTHS_24",
- "DATA_SCORING_MONTHS_36"
- )
- .as[Profile_Vlz]
- .dropDuplicates()
- val dataDF = profileValiuz.select(profileValiuz.columns.map(c => profileValiuz.col(c).cast("string")): _*)
- val headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
- val finalProfile = headerDF.union(dataDF)
- .write
- .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
- .option("delimiter", "\t")
- .option("header", false)
- .option("nullValue", null)
- .csv(s"${ouputFilePathExport}")
- dbutils.fs.mkdirs(s"${outputFileTMPPath}")
- val files = dbutils.fs.ls(s"${ouputFilePathExport}").filter(r => r.name.endsWith(".gz"))
- for (file <- files) {
- dbutils.fs.cp(file.path, s"${outputFileTMPPath}/${file.name}")
- }
- dbutils.fs.rm(s"${ouputFilePathExport}", true)
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.fs.Path
- import org.apache.hadoop.fs.FileSystem
- import org.apache.hadoop.fs.FileUtil
- val hadoopConfig = new Configuration()
- val hdfs = FileSystem.get(hadoopConfig)
- FileUtil.copyMerge(hdfs, new Path(outputFileTMPPath), hdfs, new Path(ouputFilePathExport), true, hadoopConfig, null)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement