Guest User

Untitled

a guest
Feb 21st, 2018
72
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.72 KB | None | 0 0
  1. import scala.reflect.runtime.universe._
  2.  
  3. /**
  4. reads csv or parquet from provied path as Dataset of T case class type
  5. **/
  6. def readFile[T <: Product](path: String, spark: SparkSession)(
  7. implicit tag: TypeTag[T]): Dataset[T] = {
  8. import spark.implicits._
  9. readFileWithSchema(path, spark, Encoders.product[T].schema).as[T]
  10. }
  11.  
  12. def readFileWithSchema(path: String,
  13. spark: SparkSession,
  14. schema: StructType) = {
  15. import org.apache.spark.sql.functions._
  16. val cols = schema.map(
  17. (field: StructField) => {
  18. col(field.name) cast field.dataType
  19. }
  20. )
  21.  
  22. val data = if (path.endsWith(".csv")) {
  23. readCsv(path, spark)
  24. } else {
  25. readParquet(path, spark)
  26. }
  27.  
  28. data.select(cols: _*)
  29. }
Add Comment
Please, Sign In to add comment