SHARE
TWEET

Untitled

a guest Apr 25th, 2019 73 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import scala.collection.JavaConverters._
  2.  
  3. case class Profile(IDCLIENT_VLZ: String, GENDER: String, AGE: String, CARD_TYPE: String, OPTIN_TEL: Int, OPTIN_MAIL: Int, OPTIN_PRINT: Int, OPTIN_SMS: Int, OPTIN_PUSH_APP: Int, OPTIN_PUSH_WEB: Int, ENSEIGNE: String)
  4. case class CrossCanal(IDCLIENT_VLZ: String, IDSCORE: String, VALUE: String, ENSEIGNE: String)
  5. case class Profile_Vlz(DATA_USERIDS_VALUE_VLZ: String, DATA_PROFILE_GENDER: String, DATA_PROFILE_BIRTHDATE: String, DATA_PROFILE_LOYALTY: String, DATA_PROFILE_OPTINS_TEL: Boolean, DATA_PROFILE_OPTINS_MAIL: Boolean, DATA_PROFILE_OPTINS_PRINT: Boolean, DATA_PROFILE_OPTINS_SMS: Boolean, DATA_PROFILE_OPTINS_APP: Boolean, DATA_PROFILE_OPTINS_WEB: Boolean, DATA_SCORING_MONTHS_12: String, DATA_SCORING_MONTHS_24: String, DATA_SCORING_MONTHS_36: String)
  6.  
  7. val ouputFilePathExport = s"dbfs:/mnt/solardls/$nameExport"
  8. val outputFileTMPPath = "dbfs:/mnt/solardls/TMPEXPORT/"
  9.  
  10. val PROFILE = spark.table("ENTITY.PROFILES").selectExpr("IDCLIENT_VLZ", "GENDER", "AGE", "CARD_TYPE", "OPTIN_TEL", "OPTIN_MAIL", "OPTIN_PRINT", "OPTIN_SMS", "OPTIN_PUSH_APP", "OPTIN_PUSH_WEB", "ENSEIGNE").as[Profile].cache()
  11. val CROSSCANAL = spark.table("PROCESSED.CROSSCANAL").selectExpr("IDCLIENT_VLZ", "IDSCORE", "VALUE", "ENSEIGNE").as[CrossCanal].where("ENSEIGNE = 'VLZ'").cache()
  12. val profileValiuz = PROFILE.join(CROSSCANAL.where("IDSCORE = 'CROSSCANAL_12'")
  13.                                          .selectExpr("IDCLIENT_VLZ", "ENSEIGNE", "LOWER(VALUE) AS DATA_SCORING_MONTHS_12"),
  14.                                Seq("IDCLIENT_VLZ", "ENSEIGNE"), "left")
  15.                          .join(CROSSCANAL.where("IDSCORE = 'CROSSCANAL_24'")
  16.                                          .selectExpr("IDCLIENT_VLZ", "ENSEIGNE", "LOWER(VALUE) AS DATA_SCORING_MONTHS_24"),
  17.                                Seq("IDCLIENT_VLZ", "ENSEIGNE"), "left")
  18.                          .join(CROSSCANAL.where("IDSCORE = 'CROSSCANAL_36'")
  19.                                          .selectExpr("IDCLIENT_VLZ", "ENSEIGNE", "LOWER(VALUE) AS DATA_SCORING_MONTHS_36"),
  20.                                Seq("IDCLIENT_VLZ", "ENSEIGNE"), "left")
  21.                          .selectExpr(
  22.                            "IDCLIENT_VLZ AS DATA_USERIDS_VALUE_VLZ",
  23.                            "LOWER(GENDER) AS DATA_PROFILE_GENDER",
  24.                            "AGE AS DATA_PROFILE_BIRTHDATE",
  25.                            "LOWER(CARD_TYPE) AS DATA_PROFILE_LOYALTY",
  26.                            "if(OPTIN_TEL == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_TEL",
  27.                            "if(OPTIN_MAIL == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_MAIL",
  28.                            "if(OPTIN_PRINT == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_PRINT",
  29.                            "if(OPTIN_SMS == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_SMS",
  30.                            "if(OPTIN_PUSH_APP == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_APP",
  31.                            "if(OPTIN_PUSH_WEB == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_WEB",
  32.                            "DATA_SCORING_MONTHS_12",
  33.                            "DATA_SCORING_MONTHS_24",
  34.                            "DATA_SCORING_MONTHS_36"
  35.                          )
  36.                          .as[Profile_Vlz]
  37.                          .dropDuplicates()
  38.  
  39.  
  40.  
  41.  
  42. val dataDF = profileValiuz.select(profileValiuz.columns.map(c => profileValiuz.col(c).cast("string")): _*)
  43. val headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
  44.  
  45. val finalProfile = headerDF.union(dataDF)
  46.                            .write
  47.                            .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
  48.                            .option("delimiter", "\t")
  49.                            .option("header", false)
  50.                            .option("nullValue", null)
  51.                            .csv(s"${ouputFilePathExport}")
  52.  
  53. dbutils.fs.mkdirs(s"${outputFileTMPPath}")
  54. val files = dbutils.fs.ls(s"${ouputFilePathExport}").filter(r => r.name.endsWith(".gz"))
  55. for (file <- files) {
  56.   dbutils.fs.cp(file.path, s"${outputFileTMPPath}/${file.name}")
  57. }
  58. dbutils.fs.rm(s"${ouputFilePathExport}", true)
  59.  
  60. import org.apache.hadoop.conf.Configuration
  61. import org.apache.hadoop.fs.Path
  62. import org.apache.hadoop.fs.FileSystem
  63. import org.apache.hadoop.fs.FileUtil
  64.  
  65. val hadoopConfig = new Configuration()
  66. val hdfs = FileSystem.get(hadoopConfig)
  67. FileUtil.copyMerge(hdfs, new Path(outputFileTMPPath), hdfs, new Path(ouputFilePathExport), true, hadoopConfig, null)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Not a member of Pastebin yet?
Sign Up, it unlocks many cool features!
 
Top