Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- with open(myfile, "a") as csv_file:
- writer = csv.writer(csv_file, delimiter='t')
- writer.writerow(["vertex" + "t" + "id_source" + "t" + "id_target" + "t"+ "similarity"])
- for part_id in range(joinDesrdd_df.rdd.getNumPartitions()):
- part_rdd = joinDesrdd_df.rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
- data_from_part_rdd = part_rdd.collect()
- vertex_list = set()
- for row in data_from_part_rdd:
- writer.writerow([....])
- csv_file.flush()
- in the workers log:
- 19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/2 finished with state KILLED exitStatus 143
- 14: 19/07/22 08:58:57 INFO ExternalShuffleBlockResolver: Application app-20190722085320-0000 removed, cleanupLocalDirs = true
- 14: 19/07/22 08:58:57 INFO Worker: Cleaning up local directories for application app-20190722085320-0000
- 5: 19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/1 finished with state KILLED exitStatus 143
- 7: 19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/14 finished with state KILLED exitStatus 143
- ...
- Traceback (most recent call last):
- File "/project/6008168/tamouze/RWLastVersion2207/module1.py", line 306, in <module>
- for part_id in range(joinDesrdd_df.rdd.getNumPartitions()):
- File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 88, in rdd
- File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
- File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
- File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
- py4j.protocol.Py4JJavaError: An error occurred while calling o528.javaToPython.
- : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
- Exchange hashpartitioning(id_source#263, id_target#292, similarity#258, 1024)
- +- *(11) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement