Advertisement
Guest User

Untitled

a guest
Mar 23rd, 2019
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.78 KB | None | 0 0
  1. public Dataset<Row> buildPairsForM0Prep(Dataset<Row> df, Dataset<Row> dfOld, GlentParams params) {
  2. dfOld = dfOld
  3. .where(col("part").equalTo(params.glentDfM0PrepPartOld)) // limit to previous portion of M0Prep dataframe
  4. .drop(col("part"));
  5.  
  6. Column oldTsCondition = null;
  7. if (dfOld.isEmpty()) {
  8. oldTsCondition = lit(true);
  9. } else {
  10. Row[] oldTsRows = dfOld.agg(max("q1_ts").alias("tsmax")).collect();
  11. int fieldIdx = oldTsRows[0].fieldIndex("q1_ts");
  12. oldTsCondition = col("q2.ts").gt(oldTsRows[0].getInt(fieldIdx)); // latest timestamp of the records in previous portion of M0Prep
  13. }
  14. // to avoid double counting
  15.  
  16. df = df
  17. .withColumn("suggCount", lit(1))
  18. .withColumn("part", lit(params.glentDfM0PrepPartNew))
  19. .where(oldTsCondition.and(col("q1.ts").gt(params.logTsFrom)).and(col("q1.ts").lt(params.logTsTo)))
  20. .select(col("q1.query").alias("q1_query"), col("q1.queryNorm").alias("q1_queryNorm"), col("q1.queryType").alias("q1_queryType"),
  21. col("q1.wikiid").alias("q1_wikiid"), col("q1.lang").alias("q1_lang"), col("q1.source").alias("q1_source"),
  22. col("q2.query").alias("q2_query"), col("q2.queryNorm").alias("q2_queryNorm"), col("q2.queryType").alias("q2_queryType"),
  23. col("q2.wikiid").alias("q2_wikiid"), col("q2.lang").alias("q2_lang"), col("q2.source").alias("q2_source"),
  24. col("q1.ts").alias("q1_ts"), col("q1.hitsTotal").alias("q1_hitsTotal"), col("q2.hitsTotal").alias("q2_hitsTotal"),
  25. col("q1q2LevenDist"), col("suggCount"), col("part"));
  26. if (!dfOld.isEmpty())
  27. df = df.union(dfOld); // add previous M0Prep
  28.  
  29. // creates new M0Prep
  30. return df
  31. .groupBy("q1_query", "q1_queryNorm", "q1_queryType", "q1_wikiid", "q1_source",
  32. "q2_query", "q2_queryNorm", "q2_queryType", "q2_wikiid", "q2_source", "q1q2LevenDist")
  33. .agg(
  34. max("q1_ts").alias("q1_ts"),
  35. max("q1_hitsTotal").alias("q1_hitsTotal"),
  36. max("q2_hitsTotal").alias("q2_hitsTotal"),
  37. sum("suggCount").alias("suggCount"))
  38. .select(col("q1_query"), col("q1_queryNorm"), col("q1_queryType"), col("q1_wikiid"), col("q1_lang"), col("q1_source"),
  39. col("q2_query"), col("q2_queryNorm"), col("q2_queryType"), col("q2_wikiid"), col("q2_lang"), col("q2_source"),
  40. col("q1_ts"), col("q1_hitsTotal"), col("q2_hitsTotal"), col("q1q2LevenDist"), col("suggCount"), col("part"));
  41. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement