Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- attr_df = attr_df.withColumn('SCALE', lit(0.0))
- for i in range(iter):
- for attr in attrs:
- attr_mean = get_mean_name(attr, i)
- this_rdd = attr_df.select([attr, rms]).rdd.map(tuple)
- # get average rms by this attribute
- aTuple = (0, 0)
- avg_rdd = this_rdd.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1),lambda a,b: (a[0] + b[0], a[1] + b[1]))
- avg_rdd = avg_rdd.mapValues(lambda v: v[0]/v[1])
- globals()[attr_mean] = avg_rdd.toDF().toDF(attr, attr_mean)
- JavaObj = _to_java_object_rdd(globals()[attr_mean].rdd)
- nbytes = spark.sparkContext._jvm.org.apache.spark.util.SizeEstimator.estimate(JavaObj)
- print(i, attr, nbytes)
- #accumulate the average to 'scale'
- attr_df = attr_df.join(broadcast(globals()[attr_mean]), attr)
- attr_df = attr_df.withColumn('SCALE', attr_df['SCALE']+attr_df[attr_mean])
- for attr in attrs:
- attr_mean = get_mean_name(attr, i)
- #apply the scale to rms
- attr_df = attr_df.withColumn(rms, attr_df[rms]-attr_df[attr_mean])
- attr_df = attr_df.drop(attr_mean)
- def _to_java_object_rdd(rdd):
- """ Return a JavaRDD of Object by unpickling
- It will convert each Python object into Java object by Pyrolite, whenever the
- RDD is serialized in batch or not.
- """
- rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
- return rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)
- JavaObj = _to_java_object_rdd(globals()[attr_mean].rdd)
- nbytes = spark.sparkContext._jvm.org.apache.spark.util.SizeEstimator.estimate(JavaObj)
- (0, 'OBJ1', 143384392L)
- (0, 'OBJ2', 166065800L)
- (0, 'OBJ3', 191604456L)
- (1, 'OBJ1', 313290816L)
- (1, 'OBJ2', 456140152L)
- (1, 'OBJ3', 601767488L)
- (2, 'OBJ1', 843849656L)
- (2, 'OBJ2', 1106849624L)
- (2, 'OBJ3', 1372792048L)
- (3, 'OBJ1', 1734744936L)
- (3, 'OBJ2', 2114804120L)
- (3, 'OBJ3', 2460847816L)
- Exception in thread "broadcast-exchange-58" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
- at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:110)
- at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:75)
- at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:94)
- at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:74)
- at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:74)
- at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
- at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
- at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
- at java.lang.Thread.run(Thread.java:745)
Add Comment
Please, Sign In to add comment