Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark.sql.types import Row
- from pyspark.sql.functions import explode
- from pyspark.sql.functions import *
- from pyspark.sql.window import Window
- from pyspark.sql import HiveContext
- # function to print
- def g(x):
- print x
- sqlContext = HiveContext(sc)
- # read sample data into RDD's
- data = sc.textFile('spark/sample.data')
- lookup = sc.textFile('spark/sample.lookup')
- # Remove headers:
- # Remove header from sample data RDD's
- data_header = data.first()
- data = data.filter(lambda line: line != data_header)
- lookup_header = lookup.first()
- lookup = lookup.filter(lambda x: x != lookup_header)
- '''
- data records:
- 2005-10-30,101,sachin:rahul:kallis
- 2005-10-25,101,sachin:abd::
- 2006-01-15,102,younis:yuvraj:dhoni
- 2006-01-18,102,yuvraj:sachin
- 2006-01-15,102,younis:yuvraj:dhoni
- 2007-05-20,101,sehwag:sachin:ponting::
- 2007-05-25,101,hayden:yuvraj:ponting
- 2007-05-30,101,clarke:sehwag::ponting
- 2008-02-20,103,jaysurya:sangkara:ganguly::
- 2008-02-25,103,sachin:yuvraj:attpattu
- 2008-02-30,103,sachin:sehwag:tharanga
- lookup records:
- 101,india
- 102,pakistan
- 103,srilanka
- 104,australia
- 105,south_africa
- '''
- # Second question: Identify bad records:
- bad = data.filter(lambda x: ("::" in x or ",," in x or ":" not in x))
- '''
- 18/01/12 23:18:34 INFO rdd.HadoopRDD: Input split: hdfs://quickstart.cloudera:8020/user/cloudera/spark/sample.data:0+430
- 2005-10-25,101,sachin:abd::
- 2007-05-20,101,sehwag:sachin:ponting::
- 2007-05-30,101,clarke:sehwag::ponting
- 2008-02-20,103,jaysurya:sangkara:ganguly::
- '''
- # *************************************************** Transformations **************************************************
- # Split data by delimiter ','
- data_one = data.map(lambda x: (x.split(",")))
- # Split players column to fetch different players separated by delimiter
- data_two = data_one.map(lambda x: (x[0],x[1],x[2].split(":")))
- # Convert RDD to Dataframe
- df = data_two.map(lambda x: Row(**{'time':(x[0]),'region_code':str(x[1]),'players':[x[2]]})).toDF()
- # Explode players column to replicate the rows for each player
- temp_df = df.withColumn("players", explode(df.players))
- data_df = temp_df.withColumn("players", explode(temp_df.players))
- # data_df.where("players=''")
- # Filter out unwanted records
- data_df = data_df.filter(data_df.players != '')
- '''
- +--------+-----------+----------+
- | players|region_code| time|
- +--------+-----------+----------+
- | sachin| 101|2005-10-30|
- | rahul| 101|2005-10-30|
- | kallis| 101|2005-10-30|
- | sachin| 101|2005-10-25|
- | abd| 101|2005-10-25|
- | younis| 102|2006-01-15|
- | yuvraj| 102|2006-01-15|
- | dhoni| 102|2006-01-15|
- | yuvraj| 102|2006-01-18|
- | sachin| 102|2006-01-18|
- | younis| 102|2006-01-15|
- | yuvraj| 102|2006-01-15|
- | dhoni| 102|2006-01-15|
- | sehwag| 101|2007-05-20|
- | sachin| 101|2007-05-20|
- | ponting| 101|2007-05-20|
- | hayden| 101|2007-05-25|
- | yuvraj| 101|2007-05-25|
- | ponting| 101|2007-05-25|
- | clarke| 101|2007-05-30|
- | sehwag| 101|2007-05-30|
- | ponting| 101|2007-05-30|
- |jaysurya| 103|2008-02-20|
- |sangkara| 103|2008-02-20|
- | ganguly| 103|2008-02-20|
- | sachin| 103|2008-02-25|
- | yuvraj| 103|2008-02-25|
- |attpattu| 103|2008-02-25|
- | sachin| 103|2008-02-30|
- | sehwag| 103|2008-02-30|
- |tharanga| 103|2008-02-30|
- +--------+-----------+----------+
- '''
- # ------------------------------------------- Transaformation on lookup table data ---------------------------------------------
- lookup_one = lookup.map(lambda x: (x.split(",")))
- lookup_two = lookup_one.map(lambda x: (x[0],x[1]))
- lookup_df = lookup_two.map(lambda x: Row(**{'region_code':x[0],'country':x[1]})).toDF()
- ''' +------------+-----------+
- | country|region_code|
- +------------+-----------+
- | india| 101|
- | pakistan| 102|
- | srilanka| 103|
- | australia| 104|
- |south_africa| 105|
- +------------+-----------+ '''
- # Join data and lookup table
- # Would have benefited if lookup was broadcasted
- df = data_df.join(lookup_df,["region_code"])
- df.registerTempTable("sample")
- df.show()
- grouped = df.groupBy("region_code")
- #sqlContext.sql("select count(*),country from sample where players != '' group by country,year").show()
- '''
- +---+--------+
- |_c0| country|
- +---+--------+
- | 9|srilanka|
- | 14| india|
- | 8|pakistan|
- +---+--------+
- '''
- sqlContext.sql("select count(*),country from sample group by country").show()
- '''
- +---+--------+
- |_c0| country|
- +---+--------+
- | 11|srilanka|
- | 19| india|
- | 8|pakistan|
- +---+--------+
- '''
- # Convert time to date format and fetch year
- date_df = sqlContext.sql("select region_code,players,year(TO_DATE(CAST(UNIX_TIMESTAMP(time,'yyyy-MM-dd') AS TIMESTAMP))) as match_year from sample")
- '''
- +-----------+-------+----------+
- |region_code|players|match_year|
- +-----------+-------+----------+
- | 101| sachin|2005-10-30|
- | 101| rahul|2005-10-30|
- | 101| kallis|2005-10-30|
- | 101| sachin|2005-10-25|
- | 101| abd|2005-10-25|
- | 101| |2005-10-25|
- | 101| |2005-10-25|
- | 101| sehwag|2007-05-20|
- | 101| sachin|2007-05-20|
- | 101|ponting|2007-05-20|
- | 101| |2007-05-20|
- | 101| |2007-05-20|
- | 101| hayden|2007-05-25|
- | 101| yuvraj|2007-05-25|
- | 101|ponting|2007-05-25|
- | 101| clarke|2007-05-30|
- | 101| sehwag|2007-05-30|
- | 101| |2007-05-30|
- | 101|ponting|2007-05-30|
- | 102| younis|2006-01-15|
- '''
- # date_df.groupBy(['region_code','match_year','players']).agg(max('count(players)'))
- # Group by country,match_year and add a column count
- temp = date_df.select(['players','region_code','match_year']).groupBy(['region_code','match_year','players']).count()
- # Join temp with lookup table
- df = temp.join(lookup_df,['region_code'])
- df.registerTempTable("fin_tbl")
- # Option 1
- sqlContext.sql("select country,match_year,players,count from (select country,match_year,players,count,dense_rank() OVER (PARTITION BY country ORDER BY count desc) as rank from fin_tbl) a where a.rank = 1 group by country,match_year,players,count").show()
- # sqlContext.sql("select *,dense_rank() over(partition by country order by count desc) as rank from fin_tbl").show()
- # Option 2
- ranked = df.withColumn("rank",denseRank().over(Window.partitionBy(["country","match_year"]).orderBy(desc("count"))))
- ranked.where("rank = 1")
- '''
- +-----------+----------+-------+-----+--------+----+
- |region_code|match_year|players|count| country|rank|
- +-----------+----------+-------+-----+--------+----+
- | 102| 2006| yuvraj| 3|pakistan| 1|
- | 101| 2005| sachin| 2| india| 1|
- | 101| 2007|ponting| 3| india| 1|
- | 103| 2008| sachin| 2|srilanka| 1|
- +-----------+----------+-------+-----+--------+----+
- '''
Add Comment
Please, Sign In to add comment