Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package template.spark
- import org.apache.spark.sql.functions._
- import SchemaTypes._
- sealed trait Country extends Col[Country]
- object Country extends Country
- sealed trait Age extends Col[Age]
- object Age extends Age
- sealed trait FirstName extends Col[FirstName]
- object FirstName extends FirstName
- object Main extends InitSpark {
- def processAge[T <: Age](ds: Dataset[T]): Int = {
- val ageColumnName = ds.getColumnName(Age)
- ds.select(ageColumnName).take(1).head.getInt(0)
- }
- def processAgeAndCountry[T <: Age with Country](ds: Dataset[T]): String = {
- val ageColumnName = ds.getColumnName(Age)
- val countryColumnName = ds.getColumnName(Country)
- val row = ds.select(ageColumnName, countryColumnName).take(1).head
- row.getString(1) + row.getInt(0)
- }
- def processFirstName[T <: FirstName](ds: Dataset[T]): String = {
- val firstNameColumnName = ds.getColumnName(FirstName)
- val row = ds.select(firstNameColumnName).take(1).head
- row.getString(0)
- }
- // This method makes the compilation fail because although it gets called with a Dataset having the correct type,
- // the method implementation uses a column that's not part of the schema specified in its signature.
- def invalidProcessAgeAndFirstName[T <: Age](ds: Dataset[T]): String = {
- val firstNameColumnName = ds.getColumnName(FirstName)
- val row = ds.select(firstNameColumnName).take(1).head
- row.getString(0)
- }
- def main(args: Array[String]): Unit = {
- import spark.implicits._
- val myDf = Seq(("Felipe", "Martins")).toDF("firstName", "lastName")
- val myDfWithAge = myDf.withColumn("age", lit(29))
- val myDfWithAgeAndCountry = myDfWithAge.withColumn("country", lit("Brazil"))
- val myDataWithAge: Dataset[Age] = myDfWithAgeAndCountry.validate(Age, "age")
- val myDataWithAgeAndCountry = myDataWithAge.validate(Country, "country")
- val myDataWithAgeAndCountryAndFirstName = myDataWithAge.validate(FirstName, "firstName")
- println(processAge(myDataWithAge))
- println(processAgeAndCountry(myDataWithAgeAndCountry))
- // This fails because the Dataset hasn't validated the Country column
- println(processAgeAndCountry(myDataWithAge))
- println(processFirstName(myDataWithAgeAndCountryAndFirstName))
- // This fails because, although the passed Dataset has validated the "firstName" column,
- // the method implementation uses this column without requiring it in its signature.
- println(invalidProcessAgeAndFirstName(myDataWithAgeAndCountryAndFirstName))
- close()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement