Guest User

Untitled

a guest
Jan 22nd, 2018
332
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.80 KB | None | 0 0
  1. from pyspark.sql.types import Row
  2. from pyspark.sql.functions import explode
  3. from pyspark.sql.functions import *
  4. from pyspark.sql.window import Window
  5.  
  6. from pyspark.sql import HiveContext
  7.  
  8. # function to print
  9. def g(x):
  10. print x
  11.  
  12. sqlContext = HiveContext(sc)
  13.  
  14. # read sample data into RDD's
  15. data = sc.textFile('spark/sample.data')
  16. lookup = sc.textFile('spark/sample.lookup')
  17.  
  18. # Remove headers:
  19. # Remove header from sample data RDD's
  20. data_header = data.first()
  21. data = data.filter(lambda line: line != data_header)
  22.  
  23. lookup_header = lookup.first()
  24. lookup = lookup.filter(lambda x: x != lookup_header)
  25. '''
  26. data records:
  27. 2005-10-30,101,sachin:rahul:kallis
  28. 2005-10-25,101,sachin:abd::
  29. 2006-01-15,102,younis:yuvraj:dhoni
  30. 2006-01-18,102,yuvraj:sachin
  31. 2006-01-15,102,younis:yuvraj:dhoni
  32. 2007-05-20,101,sehwag:sachin:ponting::
  33. 2007-05-25,101,hayden:yuvraj:ponting
  34. 2007-05-30,101,clarke:sehwag::ponting
  35. 2008-02-20,103,jaysurya:sangkara:ganguly::
  36. 2008-02-25,103,sachin:yuvraj:attpattu
  37. 2008-02-30,103,sachin:sehwag:tharanga
  38.  
  39. lookup records:
  40. 101,india
  41. 102,pakistan
  42. 103,srilanka
  43. 104,australia
  44. 105,south_africa
  45. '''
  46. # Second question: Identify bad records:
  47. bad = data.filter(lambda x: ("::" in x or ",," in x or ":" not in x))
  48.  
  49. '''
  50. 18/01/12 23:18:34 INFO rdd.HadoopRDD: Input split: hdfs://quickstart.cloudera:8020/user/cloudera/spark/sample.data:0+430
  51. 2005-10-25,101,sachin:abd::
  52. 2007-05-20,101,sehwag:sachin:ponting::
  53. 2007-05-30,101,clarke:sehwag::ponting
  54. 2008-02-20,103,jaysurya:sangkara:ganguly::
  55. '''
  56.  
  57.  
  58. # *************************************************** Transformations **************************************************
  59. # Split data by delimiter ','
  60. data_one = data.map(lambda x: (x.split(",")))
  61.  
  62. # Split players column to fetch different players separated by delimiter
  63. data_two = data_one.map(lambda x: (x[0],x[1],x[2].split(":")))
  64.  
  65.  
  66. # Convert RDD to Dataframe
  67. df = data_two.map(lambda x: Row(**{'time':(x[0]),'region_code':str(x[1]),'players':[x[2]]})).toDF()
  68.  
  69.  
  70. # Explode players column to replicate the rows for each player
  71. temp_df = df.withColumn("players", explode(df.players))
  72. data_df = temp_df.withColumn("players", explode(temp_df.players))
  73.  
  74. # data_df.where("players=''")
  75.  
  76. # Filter out unwanted records
  77. data_df = data_df.filter(data_df.players != '')
  78.  
  79. '''
  80. +--------+-----------+----------+
  81. | players|region_code| time|
  82. +--------+-----------+----------+
  83. | sachin| 101|2005-10-30|
  84. | rahul| 101|2005-10-30|
  85. | kallis| 101|2005-10-30|
  86. | sachin| 101|2005-10-25|
  87. | abd| 101|2005-10-25|
  88. | younis| 102|2006-01-15|
  89. | yuvraj| 102|2006-01-15|
  90. | dhoni| 102|2006-01-15|
  91. | yuvraj| 102|2006-01-18|
  92. | sachin| 102|2006-01-18|
  93. | younis| 102|2006-01-15|
  94. | yuvraj| 102|2006-01-15|
  95. | dhoni| 102|2006-01-15|
  96. | sehwag| 101|2007-05-20|
  97. | sachin| 101|2007-05-20|
  98. | ponting| 101|2007-05-20|
  99. | hayden| 101|2007-05-25|
  100. | yuvraj| 101|2007-05-25|
  101. | ponting| 101|2007-05-25|
  102. | clarke| 101|2007-05-30|
  103. | sehwag| 101|2007-05-30|
  104. | ponting| 101|2007-05-30|
  105. |jaysurya| 103|2008-02-20|
  106. |sangkara| 103|2008-02-20|
  107. | ganguly| 103|2008-02-20|
  108. | sachin| 103|2008-02-25|
  109. | yuvraj| 103|2008-02-25|
  110. |attpattu| 103|2008-02-25|
  111. | sachin| 103|2008-02-30|
  112. | sehwag| 103|2008-02-30|
  113. |tharanga| 103|2008-02-30|
  114. +--------+-----------+----------+
  115.  
  116. '''
  117.  
  118. # ------------------------------------------- Transaformation on lookup table data ---------------------------------------------
  119.  
  120. lookup_one = lookup.map(lambda x: (x.split(",")))
  121. lookup_two = lookup_one.map(lambda x: (x[0],x[1]))
  122.  
  123. lookup_df = lookup_two.map(lambda x: Row(**{'region_code':x[0],'country':x[1]})).toDF()
  124.  
  125. ''' +------------+-----------+
  126. | country|region_code|
  127. +------------+-----------+
  128. | india| 101|
  129. | pakistan| 102|
  130. | srilanka| 103|
  131. | australia| 104|
  132. |south_africa| 105|
  133. +------------+-----------+ '''
  134.  
  135. # Join data and lookup table
  136.  
  137. # Would have benefited if lookup was broadcasted
  138. df = data_df.join(lookup_df,["region_code"])
  139. df.registerTempTable("sample")
  140. df.show()
  141.  
  142. grouped = df.groupBy("region_code")
  143.  
  144. #sqlContext.sql("select count(*),country from sample where players != '' group by country,year").show()
  145. '''
  146. +---+--------+
  147. |_c0| country|
  148. +---+--------+
  149. | 9|srilanka|
  150. | 14| india|
  151. | 8|pakistan|
  152. +---+--------+
  153.  
  154. '''
  155. sqlContext.sql("select count(*),country from sample group by country").show()
  156.  
  157. '''
  158. +---+--------+
  159. |_c0| country|
  160. +---+--------+
  161. | 11|srilanka|
  162. | 19| india|
  163. | 8|pakistan|
  164. +---+--------+
  165. '''
  166. # Convert time to date format and fetch year
  167. date_df = sqlContext.sql("select region_code,players,year(TO_DATE(CAST(UNIX_TIMESTAMP(time,'yyyy-MM-dd') AS TIMESTAMP))) as match_year from sample")
  168.  
  169. '''
  170.  
  171. +-----------+-------+----------+
  172. |region_code|players|match_year|
  173. +-----------+-------+----------+
  174. | 101| sachin|2005-10-30|
  175. | 101| rahul|2005-10-30|
  176. | 101| kallis|2005-10-30|
  177. | 101| sachin|2005-10-25|
  178. | 101| abd|2005-10-25|
  179. | 101| |2005-10-25|
  180. | 101| |2005-10-25|
  181. | 101| sehwag|2007-05-20|
  182. | 101| sachin|2007-05-20|
  183. | 101|ponting|2007-05-20|
  184. | 101| |2007-05-20|
  185. | 101| |2007-05-20|
  186. | 101| hayden|2007-05-25|
  187. | 101| yuvraj|2007-05-25|
  188. | 101|ponting|2007-05-25|
  189. | 101| clarke|2007-05-30|
  190. | 101| sehwag|2007-05-30|
  191. | 101| |2007-05-30|
  192. | 101|ponting|2007-05-30|
  193. | 102| younis|2006-01-15|
  194. '''
  195.  
  196. # date_df.groupBy(['region_code','match_year','players']).agg(max('count(players)'))
  197.  
  198. # Group by country,match_year and add a column count
  199. temp = date_df.select(['players','region_code','match_year']).groupBy(['region_code','match_year','players']).count()
  200.  
  201. # Join temp with lookup table
  202.  
  203. df = temp.join(lookup_df,['region_code'])
  204.  
  205. df.registerTempTable("fin_tbl")
  206.  
  207. # Option 1
  208. sqlContext.sql("select country,match_year,players,count from (select country,match_year,players,count,dense_rank() OVER (PARTITION BY country ORDER BY count desc) as rank from fin_tbl) a where a.rank = 1 group by country,match_year,players,count").show()
  209.  
  210. # sqlContext.sql("select *,dense_rank() over(partition by country order by count desc) as rank from fin_tbl").show()
  211.  
  212.  
  213. # Option 2
  214. ranked = df.withColumn("rank",denseRank().over(Window.partitionBy(["country","match_year"]).orderBy(desc("count"))))
  215.  
  216. ranked.where("rank = 1")
  217. '''
  218. +-----------+----------+-------+-----+--------+----+
  219. |region_code|match_year|players|count| country|rank|
  220. +-----------+----------+-------+-----+--------+----+
  221. | 102| 2006| yuvraj| 3|pakistan| 1|
  222. | 101| 2005| sachin| 2| india| 1|
  223. | 101| 2007|ponting| 3| india| 1|
  224. | 103| 2008| sachin| 2|srilanka| 1|
  225. +-----------+----------+-------+-----+--------+----+
  226.  
  227. '''
Add Comment
Please, Sign In to add comment