Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val uniqueDates = dataDF.select("update_database_time").distinct.
- collect.map(elem => elem.getTimestamp(0).getDate)
- uniqueDates.map(date => {
- val resultDF = dataDF.where(to_date(dataDF.col("update_database_time")) <=> date)
- val pathToSave = s"${dataDir}/${tableNameValue}/${date}"
- dataDF.write
- .format("avro")
- .option("avroSchema", SchemaRegistry.getSchema(
- schemaRegistryConfig.url,
- schemaRegistryConfig.dataSchemaSubject,
- schemaRegistryConfig.dataSchemaVersion))
- .save(s"${hdfsURL}${pathToSave}")
- resultDF
- })
- .reduce(_.union(_))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement