Advertisement
Guest User

Untitled

a guest
Apr 8th, 2020
317
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 11.08 KB | None | 0 0
  1. import sys
  2. import sqlalchemy
  3. from datetime import datetime
  4. from pyspark.sql import SparkSession
  5. from pyspark.sql.types import LongType
  6. from pyspark.sql.functions import col
  7. from decimal import Decimal
  8.  
  9. def dbconnection():
  10.     import sys
  11.     args = sys.argv[1:]
  12.     return sqlalchemy.create_engine(f'postgresql+pg8000://{args[4]}@{args[3]}')
  13.  
  14. def initializeDatabaseRequestParams(sc, request_id, **kwargs):
  15.     import sys
  16.  
  17.     args = sys.argv[1:]
  18.     srvuri=f"jdbc:sqlserver://{args[1]};user={args[2].split(':')[0]};password={args[2].split(':')[1]}"
  19.     connection = sc._jvm.java.sql.DriverManager.getConnection(srvuri)
  20.  
  21.     statement = connection.prepareCall("DELETE FROM TAB_SOLICITACAO_NOVA_OPORTUNIDADE WHERE ID_SOLICITACAO = ?")
  22.     statement.setInt(1, int(request_id))
  23.     statement.execute()
  24.     statement = connection.prepareCall("DELETE FROM TAB_SOLICITACAO_CONVENIOS WHERE ID_SOLICITACAO = ?")
  25.     statement.setInt(1, int(request_id))
  26.     statement.execute()
  27.     statement = connection.prepareCall("DELETE FROM TAB_SOLICITACAO_ESTADOS WHERE ID_SOLICITACAO = ?")
  28.     statement.setInt(1, int(request_id))
  29.     statement.execute()
  30.     statement = connection.prepareCall("DELETE FROM TAB_SOLICITACAO_CIDADES WHERE ID_SOLICITACAO = ?")
  31.     statement.setInt(1, int(request_id))
  32.     statement.execute()
  33.  
  34. def generateResultsFromProc(sc, request, **kwargs):
  35.     import sys
  36.  
  37.     args = sys.argv[1:]
  38.     srvuri=f"jdbc:sqlserver://{args[1]};user={args[2].split(':')[0]};password={args[2].split(':')[1]}"
  39.     connection = sc._jvm.java.sql.DriverManager.getConnection(srvuri)
  40.  
  41.     if request.requestobjective == "adesao" and kwargs['agreement'] == 164:
  42.         statement = connection.prepareCall("{call USP_GERA_BASE_ADESAO_SIAPE(?, ?)}")
  43.         statement.setInt("ID_SOLICITACAO", int(request_id))
  44.         statement.setInt("QTD_NOMES", 100)
  45.         statement.execute()
  46.     if request.requestobjective == "adesao" and kwargs['agreement'] != 164:
  47.         statement = connection.prepareCall("{call USP_GERA_BASE_ADESAO(?, ?)}")
  48.         statement.setInt("ID_SOLICITACAO", int(request_id))
  49.         statement.setInt("QTD_NOMES", 100)
  50.         statement.execute()
  51.         return f"TAB_ADES_{request.id}"
  52.    
  53.     return True
  54.  
  55. args = sys.argv[1:]
  56. request_id = args[0]
  57.  
  58. spark = SparkSession \
  59.     .builder \
  60.     .appName('Database Request') \
  61.     .config("spark.driver.extraClassPath", "/home/ricardo/Documents/projects/CAOS/gaia-bmg/bigdata/spark/driver/sqljdbc41.jar") \
  62.     .config("spark.executor.extraClassPath", "/home/ricardo/Documents/projects/CAOS/gaia-bmg/bigdata/spark/driver/sqljdbc41.jar") \
  63.     .getOrCreate()
  64.  
  65. sc = spark.sparkContext
  66. sc.setLogLevel("INFO")
  67. log4jLogger = sc._jvm.org.apache.log4j
  68. LOGGER = log4jLogger.LogManager.getLogger(__name__)
  69.  
  70. start = datetime.now()
  71. LOGGER.info(f"START TIME: {start}")
  72.  
  73. db = dbconnection()
  74. with db.connect() as conn:
  75.     # DATABASE REQUEST PROCESSING
  76.     request = spark.read.format("jdbc") \
  77.         .option("url", f"jdbc:postgresql://{args[3]}") \
  78.         .option("driver","org.postgresql.Driver") \
  79.         .option("user",args[4].split(':')[0]) \
  80.         .option("password", args[4].split(':')[1]) \
  81.         .option("query", f"SELECT * FROM api_databaserequest WHERE id = {request_id} AND status = 'processing'") \
  82.         .load()
  83.  
  84.     if request.count() > 0:
  85.         conn.execute(f"UPDATE api_databaserequest SET start_time = '{start}' WHERE id = {request_id}")
  86.  
  87.         initializeDatabaseRequestParams(sc, request_id)
  88.  
  89.         agreementsqr = f"""
  90.            SELECT
  91.                databaserequest_id as ID_SOLICITACAO,
  92.                agreement_id as CD_CONV
  93.            FROM api_databaserequest_agreements
  94.            WHERE databaserequest_id = {request_id}"""
  95.         agreements = spark.read.format("jdbc") \
  96.             .option("url", f"jdbc:postgresql://{args[3]}") \
  97.             .option("driver","org.postgresql.Driver") \
  98.             .option("user",args[4].split(':')[0]) \
  99.             .option("password", args[4].split(':')[1]) \
  100.             .option("query", agreementsqr) \
  101.             .load()
  102.         agreements.write \
  103.             .format("jdbc") \
  104.             .mode("append") \
  105.             .option("url", f"jdbc:sqlserver://{args[1]}") \
  106.             .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
  107.             .option("dbtable", "TAB_SOLICITACAO_CONVENIOS") \
  108.             .option("user", args[2].split(':')[0]) \
  109.             .option("password", args[2].split(':')[1]) \
  110.             .save()
  111.  
  112.         statesqr = f"""
  113.            SELECT
  114.                databaserequest_id as ID_SOLICITACAO,
  115.                estados_id as DS_ESTD
  116.            FROM api_databaserequest_states
  117.            WHERE databaserequest_id = {request_id}"""
  118.         states = spark.read.format("jdbc") \
  119.             .option("url", f"jdbc:postgresql://{args[3]}") \
  120.             .option("driver","org.postgresql.Driver") \
  121.             .option("user",args[4].split(':')[0]) \
  122.             .option("password", args[4].split(':')[1]) \
  123.             .option("query", statesqr) \
  124.             .load()
  125.         states.write \
  126.             .format("jdbc") \
  127.             .mode("append") \
  128.             .option("url", f"jdbc:sqlserver://{args[1]}") \
  129.             .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
  130.             .option("dbtable", "TAB_SOLICITACAO_ESTADOS") \
  131.             .option("user", args[2].split(':')[0]) \
  132.             .option("password", args[2].split(':')[1]) \
  133.             .save()
  134.  
  135.         citiesq = f"""
  136.            SELECT
  137.                c.cidade as DS_CIDADE,
  138.                drc.databaserequest_id as ID_SOLICITACAO
  139.            FROM api_databaserequest_cities drc
  140.            JOIN api_cidades c
  141.            ON drc.cidades_id = c.id
  142.            WHERE drc.databaserequest_id = {request_id}"""
  143.         cities = spark.read.format("jdbc") \
  144.             .option("url", f"jdbc:postgresql://{args[3]}") \
  145.             .option("driver","org.postgresql.Driver") \
  146.             .option("user",args[4].split(':')[0]) \
  147.             .option("password", args[4].split(':')[1]) \
  148.             .option("query", citiesq) \
  149.             .load()
  150.         cities.write \
  151.             .format("jdbc") \
  152.             .mode("append") \
  153.             .option("url", f"jdbc:sqlserver://{args[1]}") \
  154.             .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
  155.             .option("dbtable", "TAB_SOLICITACAO_CIDADES") \
  156.             .option("user", args[2].split(':')[0]) \
  157.             .option("password", args[2].split(':')[1]) \
  158.             .save()
  159.  
  160.         opportunityqr = f"""
  161.            SELECT dr.*, o.linked_code
  162.            FROM api_databaserequest dr
  163.            JOIN api_organization o
  164.            ON dr.organization_id = o.id
  165.            WHERE dr.id = {request_id}"""
  166.         newoportunity = spark.read.format("jdbc") \
  167.             .option("url", f"jdbc:postgresql://{args[3]}") \
  168.             .option("driver","org.postgresql.Driver") \
  169.             .option("user",args[4].split(':')[0]) \
  170.             .option("password", args[4].split(':')[1]) \
  171.             .option("query", opportunityqr) \
  172.             .load().first()
  173.         opportunitydataset = {
  174.             "ID_SOLICITACAO": request_id,
  175.             "FG_APTO_SEGURO": newoportunity.apt_insurance,
  176.             "IDADE_INI": None if newoportunity.age == 'empty' else str(newoportunity.age).split(',')[0].replace('[', ''),
  177.             "IDADE_END": None if newoportunity.age == 'empty' else str(newoportunity.age).split(',')[1].replace(')', ''),
  178.             "QTD_EMPR_INI": None if newoportunity.emprestimos == 'empty' else int(str(newoportunity.emprestimos).split(',')[0].replace('[', '')),
  179.             "QTD_EMPR_END": None if newoportunity.emprestimos == 'empty' else int(str(newoportunity.emprestimos).split(',')[1].replace(')', '')),
  180.             "MARG_DISP_INI": None if newoportunity.margin == 'empty' else Decimal(str(newoportunity.margin).split(',')[0].replace('[', '')),
  181.             "MARG_DISP_END": None if newoportunity.margin == 'empty' else Decimal(str(newoportunity.margin).split(',')[1].replace(')', '')),
  182.             "CRED_DISP_INI": None if newoportunity.credit == 'empty' else Decimal(str(newoportunity.credit).split(',')[0].replace('[', '')),
  183.             "CRED_DISP_END": None if newoportunity.credit == 'empty' else Decimal(str(newoportunity.credit).split(',')[1].replace(')', ''))}
  184.         opportunitydataset = {
  185.             key: value for key, value in opportunitydataset.items() if value is not None}
  186.  
  187.         opportunity = spark.createDataFrame([opportunitydataset])
  188.         opportunity.write \
  189.             .format("jdbc") \
  190.             .mode("append") \
  191.             .option("url", f"jdbc:sqlserver://{args[1]}") \
  192.             .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
  193.             .option("dbtable", "TAB_SOLICITACAO_NOVA_OPORTUNIDADE") \
  194.             .option("user", args[2].split(':')[0]) \
  195.             .option("password", args[2].split(':')[1]) \
  196.             .save()
  197.  
  198.         tabname = generateResultsFromProc(sc, newoportunity, agreement=agreements.first().cd_conv)
  199.         returntable = spark.read.format("jdbc") \
  200.             .option("url", f"jdbc:sqlserver://{args[1]}") \
  201.             .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
  202.             .option("dbtable", tabname) \
  203.             .option("user", args[2].split(':')[0]) \
  204.             .option("password", args[2].split(':')[1]) \
  205.             .load()
  206.         fieldsSwap = {
  207.             'NR_TELL_FONE1': 'DS_TELL_FONE1',
  208.             'NR_TELL_FONE2': 'DS_TELL_FONE2',
  209.             'NR_TELL_FONE3': 'DS_TELL_FONE3',
  210.             'NR_TELL_FONE4': 'DS_TELL_FONE4',
  211.             'NR_BENF': 'NR_BENEF',
  212.             'VL_LIMT': 'VL_LIMIT',
  213.             'VL_LIMT_DISP': 'VL_LIMIT_DISP',
  214.             'VL_CRED_DISP': 'VL_LIMIT_DISP',
  215.             'DS_TIPO_BENF': 'NR_BENEF',
  216.             'QT_EMPR_CONSG': 'QT_EMPR_CONSIG',
  217.             'VL_MARG_LIVR': 'VL_MARGM_DISP',
  218.             'DT_BENF': 'DT_BENEF'}
  219.         for origin,source in fieldsSwap.items():
  220.             if origin in returntable.schema.names:
  221.                 returntable = returntable.withColumnRenamed(origin, source)
  222.  
  223.         cpfs = returntable.select("NR_CPF").rdd.flatMap(lambda x: x).collect()
  224.  
  225.         conn.execute(f"""
  226.            DELETE FROM api_databaserequest
  227.            WHERE organization_id = newopportunity.organization_id
  228.            AND NR_CPF IN ({','.join(map(str, cpfs))})
  229.        """)
  230.  
  231.         returntable.withColumn('organization_id', newoportunity.organization_id)
  232.         returntable.write \
  233.             .format("jdbc") \
  234.             .mode("append") \
  235.             .option("url", f"jdbc:postgresql://{args[3]}") \
  236.             .option("driver","org.postgresql.Driver") \
  237.             .option("user",args[4].split(':')[0]) \
  238.             .option("password", args[4].split(':')[1]) \
  239.             .option("dbtable", "api_contact") \
  240.             .load()
  241.         returntable.show()
  242.  
  243.         end = datetime.now()
  244.         LOGGER.info(f"END TIME: {end}")
  245.         conn.execute(f"UPDATE api_databaserequest SET end_time = '{end}' WHERE id = {request_id}")
  246.         spark.stop()
  247.     else:
  248.         spark.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement