Advertisement
Guest User

Untitled

a guest
Mar 21st, 2019
58
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.02 KB | None | 0 0
  1. import pyspark.sql.functions as F
  2. from pyspark.sql import Row
  3. from pyspark.sql import *
  4. from pyspark.sql.functions import *
  5. import random
  6. from random import randint
  7.  
  8. # Produce example data
  9. T = range(10**6)
  10. A = [random.uniform(0, 1) < 0.5 for _ in T]
  11. B = [None if a else randint(0, 9) for a in A]
  12. df = sc.parallelize([Row(T=t, A=a, B=b) for (t, a, b) in zip(T, A, B)])
  13. df = df.toDF().cache()
  14.  
  15. w = Window.orderBy('T').rowsBetween(Window.currentRow, Window.unboundedFollowing)
  16.  
  17. @pandas_udf('long')
  18. def backfill_udf(col):
  19. return col.fillna(method='backfill')
  20.  
  21. %timeit -n1 -r1 df.withColumn('B', F.first(df.B, ignorenulls=True).over(w)).show()
  22.  
  23. %timeit -n1 -r1 df.orderBy('t').withColumn('B', backfill_udf(df.B)).show()
  24.  
  25. # Still works
  26. df.withColumn('B', F.first(df.B, ignorenulls=True).over(w)).filter(df.A).show()
  27.  
  28. # This doesn't work -- all B rows are null
  29. df.orderBy('T').withColumn('B', backfill_udf(df.B)).filter(df.A).show()
  30.  
  31. # Works again
  32. df.orderBy('T').withColumn('B', backfill_udf(df.B)).cache().filter(df.A).show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement