Advertisement
Guest User

Untitled

a guest
Jun 15th, 2019
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.66 KB | None | 0 0
  1. val uniqueDates = dataDF.select("update_database_time").distinct.
  2. collect.map(elem => elem.getTimestamp(0).getDate)
  3.  
  4. uniqueDates.map(date => {
  5. val resultDF = dataDF.where(to_date(dataDF.col("update_database_time")) <=> date)
  6. val pathToSave = s"${dataDir}/${tableNameValue}/${date}"
  7. dataDF.write
  8. .format("avro")
  9. .option("avroSchema", SchemaRegistry.getSchema(
  10. schemaRegistryConfig.url,
  11. schemaRegistryConfig.dataSchemaSubject,
  12. schemaRegistryConfig.dataSchemaVersion))
  13. .save(s"${hdfsURL}${pathToSave}")
  14. resultDF
  15. })
  16. .reduce(_.union(_))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement