Advertisement
Guest User

Untitled

a guest
Oct 28th, 2016
107
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.09 KB | None | 0 0
  1. Dataset:df_ts_list
  2. +--------------------+
  3. | ts_list|
  4. +--------------------+
  5. |[1477411200, 1477...|
  6. |[1477238400, 1477...|
  7. |[1477022400, 1477...|
  8. |[1477224000, 1477...|
  9. |[1477256400, 1477...|
  10. |[1477346400, 1476...|
  11. |[1476986400, 1477...|
  12. |[1477321200, 1477...|
  13. |[1477306800, 1477...|
  14. |[1477062000, 1477...|
  15. |[1477249200, 1477...|
  16. |[1477040400, 1477...|
  17. |[1477090800, 1477...|
  18. +--------------------+
  19.  
  20.  
  21. Pyspark UDF:
  22.  
  23. >>> def on_time(ts_list):
  24. ... import sys
  25. ... import os
  26. ... sys.path.append('/usr/lib/python2.7/dist-packages')
  27. ... os.system("sudo apt-get install python-numpy -y")
  28. ... import numpy as np
  29. ... import datetime
  30. ... import time
  31. ... from datetime import timedelta
  32. ... ts = np.array(ts_list)
  33. ... if ts.size == 0:
  34. ... count = 0
  35. ... duration = 0
  36. ... st = time.mktime(datetime.now())
  37. ... ymd = str(datetime.fromtimestamp(st).date())
  38. ... else:
  39. ... ts.sort()
  40. ... one_tag = []
  41. ... start = float(ts[0])
  42. ... for i in range(len(ts)):
  43. ... if i == (len(ts)) - 1:
  44. ... end = float(ts[i])
  45. ... a_round = [start, end]
  46. ... one_tag.append(a_round)
  47. ... else:
  48. ... diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i])))
  49. ... if abs(diff.total_seconds()) > 3600:
  50. ... end = float(ts[i])
  51. ... a_round = [start, end]
  52. ... one_tag.append(a_round)
  53. ... start = float(ts[i+1])
  54. ... one_tag = [u for u in one_tag if u[1] - u[0] > 300]
  55. ... count = int(len(one_tag))
  56. ... duration = int(np.diff(one_tag).sum())
  57. ... ymd = str(datetime.datetime.fromtimestamp(time.time()).date())
  58. ... return {'count':count,'duration':duration, 'ymd':ymd}
  59.  
  60.  
  61.  
  62.  
  63. pyspark code:
  64.  
  65.  
  66. >>> on_time=udf(on_time, MapType(StringType(),StringType()))
  67. >>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show()
  68.  
  69.  
  70.  
  71. Error:
  72.  
  73. Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  74. File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main
  75. process()
  76. File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process
  77. serializer.dump_stream(func(split_index, iterator), outfile)
  78. File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda>
  79. func = lambda _, it: map(mapper, it)
  80. File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda>
  81. mapper = lambda a: udf(*a)
  82. File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda>
  83. return lambda *a: f(*a)
  84. File "<stdin>", line 27, in on_time
  85. File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _
  86. jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
  87. AttributeError: 'NoneType' object has no attribute '_jvm'
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement