Advertisement
Guest User

Untitled

a guest
May 31st, 2016
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.58 KB | None | 0 0
  1. """
  2. Created on Thu May 05 16:23:15 2016
  3.  
  4. @author: bghosh
  5. """
  6. import re
  7. from pyspark import SparkContext
  8. from pyspark.streaming import StreamingContext
  9. from pyspark.sql import SQLContext,functions as func,Row
  10.  
  11.  
  12. sc = SparkContext("local[2]", "realtimeApp")
  13. sqlContext = SQLContext(sc)
  14. ssc = StreamingContext(sc,10)
  15. files = ssc.textFileStream("hdfs://RealTimeInputFolder/")
  16.  
  17. ########Lets get the data from the db which is relavant for streaming ###
  18.  
  19. driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  20. dataurl = "jdbc:sqlserver://devserver:1433"
  21. db = "devDB"
  22. table = "stream_helper"
  23. credential = "dev_credential"
  24.  
  25. ########basic data for evaluation purpose ########
  26.  
  27. #base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load()
  28.  
  29. base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load()
  30. base_data.registerTempTable("base_data")
  31.  
  32. ######
  33. files_count = files.flatMap(lambda file: file.split( ))
  34.  
  35. #pattern = '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\S+ )(TranDescription=u.)([a-zA-z\s]+)([\S\s]+ )(dSc=u.)([A-Z]{2}.[0-9]+)'
  36.  
  37.  
  38. tranfiles = "wasb://vanspark01@vanspark01.blob.core.windows.net/RealTimeInputFolder01/"
  39.  
  40. def getSqlContextInstance(sparkContext):
  41. if ('sqlContextSingletonInstance' not in globals()):
  42. globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
  43. return globals()['sqlContextSingletonInstance']
  44.  
  45. def preparse(logline):
  46. #match = re.search(pattern,logline)
  47. pre = logline.split(",")
  48.  
  49.  
  50. return(
  51. Row(
  52. Customer_id = pre[-1],
  53. trantype = pre[-4],
  54. amount = float(pre[-5]))
  55. )
  56.  
  57.  
  58.  
  59. def parse():
  60. parsed_tran = ssc.textFileStream(tranfiles).map(preparse)
  61. #success = parsed_tran.filter(lambda s: s[1] == 1).map(lambda x:x[0])
  62.  
  63. #fail = parsed_tran.filter(lambda s:s[1] == 0).map(lambda x:x[0])
  64. """if fail.count() > 0:
  65. print "no of non parsed file : %d",fail.count()
  66. """
  67. return parsed_tran#success
  68.  
  69.  
  70. def check_historic(rdd):
  71.  
  72. #checking with the historical table #
  73. try:
  74. streamSqlcontext = getSqlContextInstance(rdd)
  75. stream_df = streamSqlcontext.createDataFrame(rdd)
  76. stream_df.registerTempTable("stream_df")
  77. result_data_frame = streamSqlcontext.sql("select * from stream_df LEFT OUTER JOIN base_data on stream_df.Customer_id= base_data.Customer_id" )
  78. result_data_frame.show()
  79.  
  80. except:
  81. pass
  82. #return result_data_frame.rdd
  83.  
  84.  
  85. success = parse()
  86.  
  87. success.foreachRDD(check_historic)
  88. ssc.start()
  89. ssc.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement