Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import sqlalchemy
- from datetime import datetime
- from pyspark.sql import SparkSession
- from pyspark.sql.types import LongType
- from pyspark.sql.functions import col
- from decimal import Decimal
- def dbconnection():
- import sys
- args = sys.argv[1:]
- return sqlalchemy.create_engine(f'postgresql+pg8000://{args[4]}@{args[3]}')
- def initializeDatabaseRequestParams(sc, request_id, **kwargs):
- import sys
- args = sys.argv[1:]
- srvuri=f"jdbc:sqlserver://{args[1]};user={args[2].split(':')[0]};password={args[2].split(':')[1]}"
- connection = sc._jvm.java.sql.DriverManager.getConnection(srvuri)
- statement = connection.prepareCall("DELETE FROM TAB_SOLICITACAO_NOVA_OPORTUNIDADE WHERE ID_SOLICITACAO = ?")
- statement.setInt(1, int(request_id))
- statement.execute()
- statement = connection.prepareCall("DELETE FROM TAB_SOLICITACAO_CONVENIOS WHERE ID_SOLICITACAO = ?")
- statement.setInt(1, int(request_id))
- statement.execute()
- statement = connection.prepareCall("DELETE FROM TAB_SOLICITACAO_ESTADOS WHERE ID_SOLICITACAO = ?")
- statement.setInt(1, int(request_id))
- statement.execute()
- statement = connection.prepareCall("DELETE FROM TAB_SOLICITACAO_CIDADES WHERE ID_SOLICITACAO = ?")
- statement.setInt(1, int(request_id))
- statement.execute()
- def generateResultsFromProc(sc, request, **kwargs):
- import sys
- args = sys.argv[1:]
- srvuri=f"jdbc:sqlserver://{args[1]};user={args[2].split(':')[0]};password={args[2].split(':')[1]}"
- connection = sc._jvm.java.sql.DriverManager.getConnection(srvuri)
- if request.requestobjective == "adesao" and kwargs['agreement'] == 164:
- statement = connection.prepareCall("{call USP_GERA_BASE_ADESAO_SIAPE(?, ?)}")
- statement.setInt("ID_SOLICITACAO", int(request_id))
- statement.setInt("QTD_NOMES", 100)
- statement.execute()
- if request.requestobjective == "adesao" and kwargs['agreement'] != 164:
- statement = connection.prepareCall("{call USP_GERA_BASE_ADESAO(?, ?)}")
- statement.setInt("ID_SOLICITACAO", int(request_id))
- statement.setInt("QTD_NOMES", 100)
- statement.execute()
- return f"TAB_ADES_{request.id}"
- return True
- args = sys.argv[1:]
- request_id = args[0]
- spark = SparkSession \
- .builder \
- .appName('Database Request') \
- .config("spark.driver.extraClassPath", "/home/ricardo/Documents/projects/CAOS/gaia-bmg/bigdata/spark/driver/sqljdbc41.jar") \
- .config("spark.executor.extraClassPath", "/home/ricardo/Documents/projects/CAOS/gaia-bmg/bigdata/spark/driver/sqljdbc41.jar") \
- .getOrCreate()
- sc = spark.sparkContext
- sc.setLogLevel("INFO")
- log4jLogger = sc._jvm.org.apache.log4j
- LOGGER = log4jLogger.LogManager.getLogger(__name__)
- start = datetime.now()
- LOGGER.info(f"START TIME: {start}")
- db = dbconnection()
- with db.connect() as conn:
- # DATABASE REQUEST PROCESSING
- request = spark.read.format("jdbc") \
- .option("url", f"jdbc:postgresql://{args[3]}") \
- .option("driver","org.postgresql.Driver") \
- .option("user",args[4].split(':')[0]) \
- .option("password", args[4].split(':')[1]) \
- .option("query", f"SELECT * FROM api_databaserequest WHERE id = {request_id} AND status = 'processing'") \
- .load()
- if request.count() > 0:
- conn.execute(f"UPDATE api_databaserequest SET start_time = '{start}' WHERE id = {request_id}")
- initializeDatabaseRequestParams(sc, request_id)
- agreementsqr = f"""
- SELECT
- databaserequest_id as ID_SOLICITACAO,
- agreement_id as CD_CONV
- FROM api_databaserequest_agreements
- WHERE databaserequest_id = {request_id}"""
- agreements = spark.read.format("jdbc") \
- .option("url", f"jdbc:postgresql://{args[3]}") \
- .option("driver","org.postgresql.Driver") \
- .option("user",args[4].split(':')[0]) \
- .option("password", args[4].split(':')[1]) \
- .option("query", agreementsqr) \
- .load()
- agreements.write \
- .format("jdbc") \
- .mode("append") \
- .option("url", f"jdbc:sqlserver://{args[1]}") \
- .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
- .option("dbtable", "TAB_SOLICITACAO_CONVENIOS") \
- .option("user", args[2].split(':')[0]) \
- .option("password", args[2].split(':')[1]) \
- .save()
- statesqr = f"""
- SELECT
- databaserequest_id as ID_SOLICITACAO,
- estados_id as DS_ESTD
- FROM api_databaserequest_states
- WHERE databaserequest_id = {request_id}"""
- states = spark.read.format("jdbc") \
- .option("url", f"jdbc:postgresql://{args[3]}") \
- .option("driver","org.postgresql.Driver") \
- .option("user",args[4].split(':')[0]) \
- .option("password", args[4].split(':')[1]) \
- .option("query", statesqr) \
- .load()
- states.write \
- .format("jdbc") \
- .mode("append") \
- .option("url", f"jdbc:sqlserver://{args[1]}") \
- .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
- .option("dbtable", "TAB_SOLICITACAO_ESTADOS") \
- .option("user", args[2].split(':')[0]) \
- .option("password", args[2].split(':')[1]) \
- .save()
- citiesq = f"""
- SELECT
- c.cidade as DS_CIDADE,
- drc.databaserequest_id as ID_SOLICITACAO
- FROM api_databaserequest_cities drc
- JOIN api_cidades c
- ON drc.cidades_id = c.id
- WHERE drc.databaserequest_id = {request_id}"""
- cities = spark.read.format("jdbc") \
- .option("url", f"jdbc:postgresql://{args[3]}") \
- .option("driver","org.postgresql.Driver") \
- .option("user",args[4].split(':')[0]) \
- .option("password", args[4].split(':')[1]) \
- .option("query", citiesq) \
- .load()
- cities.write \
- .format("jdbc") \
- .mode("append") \
- .option("url", f"jdbc:sqlserver://{args[1]}") \
- .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
- .option("dbtable", "TAB_SOLICITACAO_CIDADES") \
- .option("user", args[2].split(':')[0]) \
- .option("password", args[2].split(':')[1]) \
- .save()
- opportunityqr = f"""
- SELECT dr.*, o.linked_code
- FROM api_databaserequest dr
- JOIN api_organization o
- ON dr.organization_id = o.id
- WHERE dr.id = {request_id}"""
- newoportunity = spark.read.format("jdbc") \
- .option("url", f"jdbc:postgresql://{args[3]}") \
- .option("driver","org.postgresql.Driver") \
- .option("user",args[4].split(':')[0]) \
- .option("password", args[4].split(':')[1]) \
- .option("query", opportunityqr) \
- .load().first()
- opportunitydataset = {
- "ID_SOLICITACAO": request_id,
- "FG_APTO_SEGURO": newoportunity.apt_insurance,
- "IDADE_INI": None if newoportunity.age == 'empty' else str(newoportunity.age).split(',')[0].replace('[', ''),
- "IDADE_END": None if newoportunity.age == 'empty' else str(newoportunity.age).split(',')[1].replace(')', ''),
- "QTD_EMPR_INI": None if newoportunity.emprestimos == 'empty' else int(str(newoportunity.emprestimos).split(',')[0].replace('[', '')),
- "QTD_EMPR_END": None if newoportunity.emprestimos == 'empty' else int(str(newoportunity.emprestimos).split(',')[1].replace(')', '')),
- "MARG_DISP_INI": None if newoportunity.margin == 'empty' else Decimal(str(newoportunity.margin).split(',')[0].replace('[', '')),
- "MARG_DISP_END": None if newoportunity.margin == 'empty' else Decimal(str(newoportunity.margin).split(',')[1].replace(')', '')),
- "CRED_DISP_INI": None if newoportunity.credit == 'empty' else Decimal(str(newoportunity.credit).split(',')[0].replace('[', '')),
- "CRED_DISP_END": None if newoportunity.credit == 'empty' else Decimal(str(newoportunity.credit).split(',')[1].replace(')', ''))}
- opportunitydataset = {
- key: value for key, value in opportunitydataset.items() if value is not None}
- opportunity = spark.createDataFrame([opportunitydataset])
- opportunity.write \
- .format("jdbc") \
- .mode("append") \
- .option("url", f"jdbc:sqlserver://{args[1]}") \
- .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
- .option("dbtable", "TAB_SOLICITACAO_NOVA_OPORTUNIDADE") \
- .option("user", args[2].split(':')[0]) \
- .option("password", args[2].split(':')[1]) \
- .save()
- tabname = generateResultsFromProc(sc, newoportunity, agreement=agreements.first().cd_conv)
- returntable = spark.read.format("jdbc") \
- .option("url", f"jdbc:sqlserver://{args[1]}") \
- .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
- .option("dbtable", tabname) \
- .option("user", args[2].split(':')[0]) \
- .option("password", args[2].split(':')[1]) \
- .load()
- fieldsSwap = {
- 'NR_TELL_FONE1': 'DS_TELL_FONE1',
- 'NR_TELL_FONE2': 'DS_TELL_FONE2',
- 'NR_TELL_FONE3': 'DS_TELL_FONE3',
- 'NR_TELL_FONE4': 'DS_TELL_FONE4',
- 'NR_BENF': 'NR_BENEF',
- 'VL_LIMT': 'VL_LIMIT',
- 'VL_LIMT_DISP': 'VL_LIMIT_DISP',
- 'VL_CRED_DISP': 'VL_LIMIT_DISP',
- 'DS_TIPO_BENF': 'NR_BENEF',
- 'QT_EMPR_CONSG': 'QT_EMPR_CONSIG',
- 'VL_MARG_LIVR': 'VL_MARGM_DISP',
- 'DT_BENF': 'DT_BENEF'}
- for origin,source in fieldsSwap.items():
- if origin in returntable.schema.names:
- returntable = returntable.withColumnRenamed(origin, source)
- cpfs = returntable.select("NR_CPF").rdd.flatMap(lambda x: x).collect()
- conn.execute(f"""
- DELETE FROM api_databaserequest
- WHERE organization_id = newopportunity.organization_id
- AND NR_CPF IN ({','.join(map(str, cpfs))})
- """)
- returntable.withColumn('organization_id', newoportunity.organization_id)
- returntable.write \
- .format("jdbc") \
- .mode("append") \
- .option("url", f"jdbc:postgresql://{args[3]}") \
- .option("driver","org.postgresql.Driver") \
- .option("user",args[4].split(':')[0]) \
- .option("password", args[4].split(':')[1]) \
- .option("dbtable", "api_contact") \
- .load()
- returntable.show()
- end = datetime.now()
- LOGGER.info(f"END TIME: {end}")
- conn.execute(f"UPDATE api_databaserequest SET end_time = '{end}' WHERE id = {request_id}")
- spark.stop()
- else:
- spark.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement