Guest User

Untitled

a guest
Nov 21st, 2017
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.04 KB | None | 0 0
  1. QQ
  2. 1
  3. 2
  4. 3
  5. ZZ
  6. b
  7. QQ
  8. 4
  9. 5
  10. 6
  11. ZZ
  12. a
  13. QQ
  14. 9
  15. 8
  16. 23
  17.  
  18. [1,2,3]
  19. [4,5,6]
  20. [9,8]
  21.  
  22. from pyspark.sql.types import *
  23. from pyspark import SparkContext
  24. from pyspark.sql import SQLContext
  25.  
  26. path ="/tmp/Poonam.Raskar/Sample.txt"
  27. sc =SparkContext()
  28. sqlContext = SQLContext(sc)
  29. sc.setLogLevel("ERROR")
  30. textFile = sc.textFile(path)
  31.  
  32. wi = textFile.zipWithIndex()
  33. startPos = wi.filter(lambda x: x[0].startswith('QQ')).map(lambda (key,index) : index).collect()
  34. endPos = wi.filter(lambda x: x[0].startswith('ZZ')).map(lambda (key,index) : index).collect()
  35. finalPos =zip(startPos,endPos)
  36. dtlRow =[]
  37.  
  38. for pos in finalPos:
  39. #print(pos)
  40. #print(wi.filter())
  41. dtlRow1 = [[wi.filter(lambda x: x[1]==1).map(lambda (key,index) : key ,).collect() for i in range(pos[0],pos[1])]] #Required option for collect...program is taking long time while executing this statement
  42. #print(dtlRow1)
  43. dtlRow.append(dtlRow1)
  44.  
  45.  
  46. cSchema = StructType([StructField("DataFromList", ArrayType(StringType()))])
  47. df = sqlContext.createDataFrame(dtlRow,schema=cSchema)
  48. print(df.show())
Add Comment
Please, Sign In to add comment