Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #MACIEJ SASINOWSKI SZYMON PISZCZATOWSKI
- #execute : spark-submit /script 2000 1 1
- from __future__ import print_function
- import sys
- import time
- from pyspark.sql import SparkSession
- def getFirst(listofnames):
- return next(iter(listofnames or []),None)
- def main():
- spark = SparkSession\
- .builder\
- .appName("RPJS")\
- .getOrCreate()
- listOFEverything=[]
- linesCSVMapped=spark.read.csv("/home/r/Pulpit/grypa.csv",header="true", mode="DROPMALFORMED").rdd
- columnsnames= linesCSVMapped.toDF().schema.names[:-1]
- uniqueDecisionClass=linesCSVMapped.map(lambda x : x[4]).distinct().collect()
- for decision in uniqueDecisionClass:
- selectedElemetsDecisions=linesCSVMapped.filter(lambda x : x[4]==decision)
- listOfEverything=[]
- print(decision)
- for name in columnsnames:
- inner=selectedElemetsDecisions.map(lambda x: x[name])
- innerinner= inner.map(lambda x:(x,1)).reduceByKey(lambda a, b: a + b)
- sorttedValues=innerinner.sortBy(lambda x: x[1],False)
- listOFEverything.append({"name":name,"value":sorttedValues.take(1)[0]})
- print(listOFEverything)
- spark.stop()
- #filtered.take(10)
- if __name__=="__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement