Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Created on Thu May 05 16:23:15 2016
- @author: bghosh
- """
- import re
- from pyspark import SparkContext
- from pyspark.streaming import StreamingContext
- from pyspark.sql import SQLContext,functions as func,Row
- sc = SparkContext("local[2]", "realtimeApp")
- sqlContext = SQLContext(sc)
- ssc = StreamingContext(sc,10)
- files = ssc.textFileStream("hdfs://RealTimeInputFolder/")
- ########Lets get the data from the db which is relavant for streaming ###
- driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
- dataurl = "jdbc:sqlserver://devserver:1433"
- db = "devDB"
- table = "stream_helper"
- credential = "dev_credential"
- ########basic data for evaluation purpose ########
- #base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load()
- base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load()
- base_data.registerTempTable("base_data")
- ######
- files_count = files.flatMap(lambda file: file.split( ))
- #pattern = '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\S+ )(TranDescription=u.)([a-zA-z\s]+)([\S\s]+ )(dSc=u.)([A-Z]{2}.[0-9]+)'
- tranfiles = "wasb://vanspark01@vanspark01.blob.core.windows.net/RealTimeInputFolder01/"
- def getSqlContextInstance(sparkContext):
- if ('sqlContextSingletonInstance' not in globals()):
- globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
- return globals()['sqlContextSingletonInstance']
- def preparse(logline):
- #match = re.search(pattern,logline)
- pre = logline.split(",")
- return(
- Row(
- Customer_id = pre[-1],
- trantype = pre[-4],
- amount = float(pre[-5]))
- )
- def parse():
- parsed_tran = ssc.textFileStream(tranfiles).map(preparse)
- #success = parsed_tran.filter(lambda s: s[1] == 1).map(lambda x:x[0])
- #fail = parsed_tran.filter(lambda s:s[1] == 0).map(lambda x:x[0])
- """if fail.count() > 0:
- print "no of non parsed file : %d",fail.count()
- """
- return parsed_tran#success
- def check_historic(rdd):
- #checking with the historical table #
- try:
- streamSqlcontext = getSqlContextInstance(rdd)
- stream_df = streamSqlcontext.createDataFrame(rdd)
- stream_df.registerTempTable("stream_df")
- result_data_frame = streamSqlcontext.sql("select * from stream_df LEFT OUTER JOIN base_data on stream_df.Customer_id= base_data.Customer_id" )
- result_data_frame.show()
- except:
- pass
- #return result_data_frame.rdd
- success = parse()
- success.foreachRDD(check_historic)
- ssc.start()
- ssc.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement