Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val dataSet = spark.createDataFrame(Seq(
- (1, "shirt for women", Seq("shirt", "women"), 19.1, "ST"),
- (1, "shirt for women", Seq("shirt", "women"), 10.1, null),
- (1, "shirt for women", Seq("shirt", "women"), 12.1, null),
- (0, "shirt group women", Seq("group", "women"), 15.1, null),
- (0, "shirt group women", Seq("group", "women"), 12.1, null),
- (3, "shirt nmn women", Seq("shirt", "women"), 16.1, "ST"),
- (3, "shirt were women", Seq("shirt", "women"), 13.1, "ST")
- )).toDF("id", "raw", "filtered", "score", "types")
- +---+-----------------+--------------+-----+-----+
- |id |raw |filtered |score|types|
- +---+-----------------+--------------+-----+-----+
- |1 |shirt for women |[shirt, women]|19.1 |ST |
- |1 |shirt for women |[shirt, women]|10.1 |null |
- |1 |shirt for women |[shirt, women]|12.1 |null |
- |0 |shirt group women|[group, women]|15.1 |null |
- |0 |shirt group women|[group, women]|12.1 |null |
- |3 |shirt nmn women |[shirt, women]|16.1 |ST |
- |3 |shirt were women |[shirt, women]|13.1 |ST |
- +---+-----------------+--------------+-----+-----+
- +---+------------------+--------------+-----+----+
- |id |raw |filtered |score|types|
- +---+-----------------+--------------+-----+----+
- |1 |shirt for women |[shirt, women]|19.1 |ST |
- |1 |shirt for women |[shirt, women]|10.1 |NA |
- |1 |shirt for women |[shirt, women]|12.1 |null|
- |0 |shirt group women[women, group] |15.1 |null|
- |0 |shirt group women|[women, group]|12.1 |NA |
- |3 |shirt nmn women |[shirt, women]|16.1 |ST |
- |3 |shirt were women |[shirt, women]|13.1 |ST |
- +---+-----------------+--------------+-----+----+
- data.withColumn("max_score",
- when(col("types").isNull,
- max("score")
- .over(Window.partitionBy("id", "filtered")))
- .otherwise($"score"))
- .withColumn("type_temp",
- when(col("score") =!= col("max_score"),
- addReasonsUDF(col("type"),
- lit("NA")))
- .otherwise(col("type")))
- .drop("types", "max_score")
- .withColumnRenamed("type_temp", "types")
- +---+-----------------+--------------+-----+---------+-----+
- |id |raw |filtered |score|max_score|types|
- +---+-----------------+--------------+-----+---------+-----+
- |1 |shirt for women |[shirt, women]|19.1 |19.1 |ST |
- |1 |shirt women |[shirt, women]|10.1 |19.1 |NA |
- |1 |shirt of women |[shirt, women]|12.1 |19.1 |NA |
- |0 |shirt group women|[group, women]|15.1 |15.1 |null |
- |0 |shirt will women |[group, women]|12.1 |15.1 |NA |
- |3 |shirt nmn women |[shirt, women]|16.1 |16.1 |ST |
- |3 |shirt were women |[shirt, women]|13.1 |13.1 |ST |
- +---+-----------------+--------------+-----+---------+-----+
- def addReasons(oldreason: String, newreason: String): String = {
- if (checkIfEmpty(oldreason) && checkIfEmpty(newreason)) {
- null
- } else if (checkIfEmpty(oldreason)) {
- newreason
- } else if (checkIfEmpty(newreason)) {
- oldreason
- } else {
- Set(oldreason, newreason).mkString(",")
- }
- }
- dataSet.withColumn("max_score",
- when(col("types").isNull,
- max("score").over(Window.partitionBy("id", "raw")))
- .otherwise($"score")).show(false)
- +---+-----------------+--------------+-----+-----+---------+
- |id |raw |filtered |score|types|max_score|
- +---+-----------------+--------------+-----+-----+---------+
- |3 |shirt nmn women |[shirt, women]|16.1 |ST |16.1 |
- |0 |shirt group women|[group, women]|15.1 |null |15.1 |
- |0 |shirt group women|[group, women]|12.1 |null |15.1 |
- |3 |shirt were women |[shirt, women]|13.1 |ST |13.1 |
- |1 |shirt for women |[shirt, women]|19.1 |ST |19.1 |
- |1 |shirt for women |[shirt, women]|10.1 |null |19.1 |
- |1 |shirt for women |[shirt, women]|12.1 |null |19.1 |
- +---+-----------------+--------------+-----+-----+---------+
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement