Advertisement
Guest User

Untitled

a guest
Apr 25th, 2019
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.38 KB | None | 0 0
  1. import scala.collection.JavaConverters._
  2.  
  3. case class Profile(IDCLIENT_VLZ: String, GENDER: String, AGE: String, CARD_TYPE: String, OPTIN_TEL: Int, OPTIN_MAIL: Int, OPTIN_PRINT: Int, OPTIN_SMS: Int, OPTIN_PUSH_APP: Int, OPTIN_PUSH_WEB: Int, ENSEIGNE: String)
  4. case class CrossCanal(IDCLIENT_VLZ: String, IDSCORE: String, VALUE: String, ENSEIGNE: String)
  5. case class Profile_Vlz(DATA_USERIDS_VALUE_VLZ: String, DATA_PROFILE_GENDER: String, DATA_PROFILE_BIRTHDATE: String, DATA_PROFILE_LOYALTY: String, DATA_PROFILE_OPTINS_TEL: Boolean, DATA_PROFILE_OPTINS_MAIL: Boolean, DATA_PROFILE_OPTINS_PRINT: Boolean, DATA_PROFILE_OPTINS_SMS: Boolean, DATA_PROFILE_OPTINS_APP: Boolean, DATA_PROFILE_OPTINS_WEB: Boolean, DATA_SCORING_MONTHS_12: String, DATA_SCORING_MONTHS_24: String, DATA_SCORING_MONTHS_36: String)
  6.  
  7. val ouputFilePathExport = s"dbfs:/mnt/solardls/$nameExport"
  8. val outputFileTMPPath = "dbfs:/mnt/solardls/TMPEXPORT/"
  9.  
  10. val PROFILE = spark.table("ENTITY.PROFILES").selectExpr("IDCLIENT_VLZ", "GENDER", "AGE", "CARD_TYPE", "OPTIN_TEL", "OPTIN_MAIL", "OPTIN_PRINT", "OPTIN_SMS", "OPTIN_PUSH_APP", "OPTIN_PUSH_WEB", "ENSEIGNE").as[Profile].cache()
  11. val CROSSCANAL = spark.table("PROCESSED.CROSSCANAL").selectExpr("IDCLIENT_VLZ", "IDSCORE", "VALUE", "ENSEIGNE").as[CrossCanal].where("ENSEIGNE = 'VLZ'").cache()
  12. val profileValiuz = PROFILE.join(CROSSCANAL.where("IDSCORE = 'CROSSCANAL_12'")
  13. .selectExpr("IDCLIENT_VLZ", "ENSEIGNE", "LOWER(VALUE) AS DATA_SCORING_MONTHS_12"),
  14. Seq("IDCLIENT_VLZ", "ENSEIGNE"), "left")
  15. .join(CROSSCANAL.where("IDSCORE = 'CROSSCANAL_24'")
  16. .selectExpr("IDCLIENT_VLZ", "ENSEIGNE", "LOWER(VALUE) AS DATA_SCORING_MONTHS_24"),
  17. Seq("IDCLIENT_VLZ", "ENSEIGNE"), "left")
  18. .join(CROSSCANAL.where("IDSCORE = 'CROSSCANAL_36'")
  19. .selectExpr("IDCLIENT_VLZ", "ENSEIGNE", "LOWER(VALUE) AS DATA_SCORING_MONTHS_36"),
  20. Seq("IDCLIENT_VLZ", "ENSEIGNE"), "left")
  21. .selectExpr(
  22. "IDCLIENT_VLZ AS DATA_USERIDS_VALUE_VLZ",
  23. "LOWER(GENDER) AS DATA_PROFILE_GENDER",
  24. "AGE AS DATA_PROFILE_BIRTHDATE",
  25. "LOWER(CARD_TYPE) AS DATA_PROFILE_LOYALTY",
  26. "if(OPTIN_TEL == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_TEL",
  27. "if(OPTIN_MAIL == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_MAIL",
  28. "if(OPTIN_PRINT == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_PRINT",
  29. "if(OPTIN_SMS == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_SMS",
  30. "if(OPTIN_PUSH_APP == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_APP",
  31. "if(OPTIN_PUSH_WEB == 1, 'true', 'false') AS DATA_PROFILE_OPTINS_WEB",
  32. "DATA_SCORING_MONTHS_12",
  33. "DATA_SCORING_MONTHS_24",
  34. "DATA_SCORING_MONTHS_36"
  35. )
  36. .as[Profile_Vlz]
  37. .dropDuplicates()
  38.  
  39.  
  40.  
  41.  
  42. val dataDF = profileValiuz.select(profileValiuz.columns.map(c => profileValiuz.col(c).cast("string")): _*)
  43. val headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
  44.  
  45. val finalProfile = headerDF.union(dataDF)
  46. .write
  47. .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
  48. .option("delimiter", "\t")
  49. .option("header", false)
  50. .option("nullValue", null)
  51. .csv(s"${ouputFilePathExport}")
  52.  
  53. dbutils.fs.mkdirs(s"${outputFileTMPPath}")
  54. val files = dbutils.fs.ls(s"${ouputFilePathExport}").filter(r => r.name.endsWith(".gz"))
  55. for (file <- files) {
  56. dbutils.fs.cp(file.path, s"${outputFileTMPPath}/${file.name}")
  57. }
  58. dbutils.fs.rm(s"${ouputFilePathExport}", true)
  59.  
  60. import org.apache.hadoop.conf.Configuration
  61. import org.apache.hadoop.fs.Path
  62. import org.apache.hadoop.fs.FileSystem
  63. import org.apache.hadoop.fs.FileUtil
  64.  
  65. val hadoopConfig = new Configuration()
  66. val hdfs = FileSystem.get(hadoopConfig)
  67. FileUtil.copyMerge(hdfs, new Path(outputFileTMPPath), hdfs, new Path(ouputFilePathExport), true, hadoopConfig, null)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement