Guest User

Untitled

a guest
Mar 22nd, 2018
125
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.25 KB | None | 0 0
  1. attr_df = attr_df.withColumn('SCALE', lit(0.0))
  2. for i in range(iter):
  3. for attr in attrs:
  4. attr_mean = get_mean_name(attr, i)
  5. this_rdd = attr_df.select([attr, rms]).rdd.map(tuple)
  6.  
  7. # get average rms by this attribute
  8. aTuple = (0, 0)
  9. avg_rdd = this_rdd.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1),lambda a,b: (a[0] + b[0], a[1] + b[1]))
  10. avg_rdd = avg_rdd.mapValues(lambda v: v[0]/v[1])
  11. globals()[attr_mean] = avg_rdd.toDF().toDF(attr, attr_mean)
  12.  
  13. JavaObj = _to_java_object_rdd(globals()[attr_mean].rdd)
  14. nbytes = spark.sparkContext._jvm.org.apache.spark.util.SizeEstimator.estimate(JavaObj)
  15. print(i, attr, nbytes)
  16.  
  17. #accumulate the average to 'scale'
  18. attr_df = attr_df.join(broadcast(globals()[attr_mean]), attr)
  19. attr_df = attr_df.withColumn('SCALE', attr_df['SCALE']+attr_df[attr_mean])
  20.  
  21. for attr in attrs:
  22. attr_mean = get_mean_name(attr, i)
  23. #apply the scale to rms
  24. attr_df = attr_df.withColumn(rms, attr_df[rms]-attr_df[attr_mean])
  25. attr_df = attr_df.drop(attr_mean)
  26.  
  27. def _to_java_object_rdd(rdd):
  28. """ Return a JavaRDD of Object by unpickling
  29. It will convert each Python object into Java object by Pyrolite, whenever the
  30. RDD is serialized in batch or not.
  31. """
  32. rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
  33. return rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)
  34.  
  35.  
  36. JavaObj = _to_java_object_rdd(globals()[attr_mean].rdd)
  37.  
  38. nbytes = spark.sparkContext._jvm.org.apache.spark.util.SizeEstimator.estimate(JavaObj)
  39.  
  40. (0, 'OBJ1', 143384392L)
  41. (0, 'OBJ2', 166065800L)
  42. (0, 'OBJ3', 191604456L)
  43. (1, 'OBJ1', 313290816L)
  44. (1, 'OBJ2', 456140152L)
  45. (1, 'OBJ3', 601767488L)
  46. (2, 'OBJ1', 843849656L)
  47. (2, 'OBJ2', 1106849624L)
  48. (2, 'OBJ3', 1372792048L)
  49. (3, 'OBJ1', 1734744936L)
  50. (3, 'OBJ2', 2114804120L)
  51. (3, 'OBJ3', 2460847816L)
  52.  
  53. Exception in thread "broadcast-exchange-58" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
  54. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:110)
  55. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:75)
  56. at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:94)
  57. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:74)
  58. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:74)
  59. at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  60. at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  61. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  62. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  63. at java.lang.Thread.run(Thread.java:745)
Add Comment
Please, Sign In to add comment