Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark import SparkConf, SparkContext
- from pyspark.sql import SQLContext
- from pyspark.sql.functions import *
- import csv, io
- import datetime
- conf = SparkConf().setMaster("local[8]").setAppName("myapp")
- sc = SparkContext(conf = conf)
- sqlctx = SQLContext(sc)
- DoneLot="(SELECT *
- FROM [DBTable] where somefield=0) AS Done"
- NotDoneLot= "(SELECT *
- FROM [DBTable] Where somefield<>0) AS NotDoneLot"
- DoneLot = sqlctx.read.format("jdbc").options(
- url="jdbc:sqlserver://localhost:1433;databaseName=TESTNAV;user=sa;password=sa@123",
- dbtable=DoneLot,driver="com.microsoft.sqlserver.jdbc.SQLServerDriver").load()
- DoneLot.registerTempTable("DoneLot")
- NotDoneLot = sqlctx.read.format("jdbc").options(
- url="jdbc:sqlserver://localhost:1433;databaseName=TESTNAV;user=sa;password=sa@123",
- dbtable=NotDoneLot,driver="com.microsoft.sqlserver.jdbc.SQLServerDriver").load()
- NotDoneLot.registerTempTable("NotDoneLot")
- vRows=1
- vLot=2
- vTrnType='Out'
- while vRows<4:
- vPrevLot=vLot-1
- if vTrnType=='In':
- Temp = sqlctx.sql("SELECT *
- Case When Quantity<0 Then 'NA' Else
- Case When field1<>0 Then field2 Else field3 End End AS Link_In
- From NotDoneLot")
- Ref = sqlctx.sql("SELECT distinct `RefNo`,
- Case When field1<>0 Then field4 Else field6 End AS Link_In
- From DoneLot")
- Refjoin=[Temp.Link_In == Ref.Link_In]
- Temp= Temp.join(Ref,Refjoin,'left')
- Temp.registerTempTable("NotDoneLot")
- NewMapped=sqlctx.sql("SELECT *,
- %d"%vLot+" AS LotNo,
- 'Mapped' AS RefFlag
- From NotDoneLot WHERE Not ISNULL(RefNo)")
- else:
- Temp = sqlctx.sql("SELECT *,
- Case When Quantity<0 Then field6 Else 'NA' End AS Link_Out
- From NotDoneLot")
- Ref = sqlctx.sql("SELECT distinct `RefNo`,
- field7 AS Link_Out
- From DoneLot")
- Refjoin=[Temp.Link_Out == Ref.Link_Out]
- Temp= Temp.join(Ref,Refjoin,'left')
- Temp.registerTempTable("NotDoneLot")
- NewMapped=sqlctx.sql("SELECT *,
- %d"%vLot+" AS LotNo,
- 'Mapped' AS RefFlag
- From NotDoneLot WHERE Not ISNULL(RefNo)")
- NewMapped.registerTempTable("DoneLot")
- DoneLot=DoneLot.unionAll(NewMapped)
- NotDoneLot=sqlctx.sql("SELECT *
- From NotDoneLot WHERE ISNULL(RefNo)")
- NotDoneLot.registerTempTable("NotDoneLot")
- vRows=vRows+1
- vLot=vLot+1
- if vTrnType=='In':
- vTrnType='Out'
- else:
- vTrnType='In'
- DoneLot.write.jdbc(url="jdbc:sqlserver://localhost:1433;databaseName=ConfiguratorNew;user=sa;password=sa@123",
- table="DoneLot", mode="overwrite")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement