Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val streamingDataFrame = spark.readStream
- .schema(staticSchema)
- .option("maxFilesPerTrigger", 1)
- .format("csv")
- .option("header", "true")
- .load("/data/retail-data/by-day/*.csv")
- val purchaseByCustomerPerHour = streamingDataFrame
- .selectExpr(
- "CustomerId",
- "(UnitPrice * Quantity) as total_cost",
- "InvoiceDate")
- .groupBy(
- $"CustomerId", window($"InvoiceDate", "1 day"))
- .sum("total_cost")
- purchaseByCustomerPerHour.writeStream
- .format("memory") // memory = store in-memory table
- .queryName("customer_purchases") // the name of the in-memory table
- .outputMode("complete") // complete = all the counts should be in the table
- .start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement