Advertisement
Guest User

Untitled

a guest
Jul 21st, 2019
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.47 KB | None | 0 0
  1. package template.spark
  2.  
  3. import org.apache.spark.sql.functions._
  4. import SchemaTypes._
  5.  
  6. sealed trait Country extends Col[Country]
  7. object Country extends Country
  8.  
  9. sealed trait Age extends Col[Age]
  10. object Age extends Age
  11.  
  12. sealed trait FirstName extends Col[FirstName]
  13. object FirstName extends FirstName
  14.  
  15.  
  16. object Main extends InitSpark {
  17. def processAge[T <: Age](ds: Dataset[T]): Int = {
  18. val ageColumnName = ds.getColumnName(Age)
  19. ds.select(ageColumnName).take(1).head.getInt(0)
  20. }
  21. def processAgeAndCountry[T <: Age with Country](ds: Dataset[T]): String = {
  22. val ageColumnName = ds.getColumnName(Age)
  23. val countryColumnName = ds.getColumnName(Country)
  24. val row = ds.select(ageColumnName, countryColumnName).take(1).head
  25. row.getString(1) + row.getInt(0)
  26. }
  27.  
  28. def processFirstName[T <: FirstName](ds: Dataset[T]): String = {
  29. val firstNameColumnName = ds.getColumnName(FirstName)
  30. val row = ds.select(firstNameColumnName).take(1).head
  31.  
  32. row.getString(0)
  33. }
  34.  
  35. // This method makes the compilation fail because although it gets called with a Dataset having the correct type,
  36. // the method implementation uses a column that's not part of the schema specified in its signature.
  37. def invalidProcessAgeAndFirstName[T <: Age](ds: Dataset[T]): String = {
  38. val firstNameColumnName = ds.getColumnName(FirstName)
  39. val row = ds.select(firstNameColumnName).take(1).head
  40.  
  41. row.getString(0)
  42. }
  43.  
  44. def main(args: Array[String]): Unit = {
  45. import spark.implicits._
  46. val myDf = Seq(("Felipe", "Martins")).toDF("firstName", "lastName")
  47. val myDfWithAge = myDf.withColumn("age", lit(29))
  48. val myDfWithAgeAndCountry = myDfWithAge.withColumn("country", lit("Brazil"))
  49.  
  50. val myDataWithAge: Dataset[Age] = myDfWithAgeAndCountry.validate(Age, "age")
  51.  
  52. val myDataWithAgeAndCountry = myDataWithAge.validate(Country, "country")
  53. val myDataWithAgeAndCountryAndFirstName = myDataWithAge.validate(FirstName, "firstName")
  54.  
  55. println(processAge(myDataWithAge))
  56. println(processAgeAndCountry(myDataWithAgeAndCountry))
  57. // This fails because the Dataset hasn't validated the Country column
  58. println(processAgeAndCountry(myDataWithAge))
  59. println(processFirstName(myDataWithAgeAndCountryAndFirstName))
  60. // This fails because, although the passed Dataset has validated the "firstName" column,
  61. // the method implementation uses this column without requiring it in its signature.
  62. println(invalidProcessAgeAndFirstName(myDataWithAgeAndCountryAndFirstName))
  63.  
  64. close()
  65. }
  66. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement