Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import scala.reflect.runtime.universe._
- /**
- reads csv or parquet from provied path as Dataset of T case class type
- **/
- def readFile[T <: Product](path: String, spark: SparkSession)(
- implicit tag: TypeTag[T]): Dataset[T] = {
- import spark.implicits._
- readFileWithSchema(path, spark, Encoders.product[T].schema).as[T]
- }
- def readFileWithSchema(path: String,
- spark: SparkSession,
- schema: StructType) = {
- import org.apache.spark.sql.functions._
- val cols = schema.map(
- (field: StructField) => {
- col(field.name) cast field.dataType
- }
- )
- val data = if (path.endsWith(".csv")) {
- readCsv(path, spark)
- } else {
- readParquet(path, spark)
- }
- data.select(cols: _*)
- }
Add Comment
Please, Sign In to add comment