Advertisement
Guest User

Untitled

a guest
Apr 26th, 2019
160
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.99 KB | None | 0 0
  1. import re
  2. import json
  3. from json import JSONDecodeError
  4.  
  5. import psycopg2
  6. import logging
  7.  
  8. logging.basicConfig(filename='logs.log', level=logging.INFO)
  9.  
  10. FILES = [
  11. 'data/43d3a13021.2019-04-16.22.json',
  12. 'data/43d3a13021.2019-04-17.22.json',
  13. 'data/43d3a13021.2019-04-17.12.json',
  14. 'data/43d3a13021.2019-04-18.22.json'
  15. ]
  16. HOSTNAME = 'localhost'
  17. USERNAME = 'postgres'
  18. PASSWD = 'tracking'
  19. PORT = 5432
  20. DB = 'geru_tracking'
  21. conn = None
  22.  
  23.  
  24. def connect():
  25. global conn
  26. try:
  27. conn = psycopg2.connect(user=USERNAME,
  28. password=PASSWD,
  29. host=HOSTNAME,
  30. port=PORT,
  31. database=DB)
  32. except (Exception, psycopg2.Error) as error:
  33. print("Error while connecting to PostgreSQL", error)
  34.  
  35.  
  36. def run_query(query):
  37. try:
  38. cursor = conn.cursor()
  39. cursor.execute(query)
  40. record = cursor.fetchone()
  41. except (Exception, psycopg2.Error) as error:
  42. print("Error while connecting to PostgreSQL", error)
  43. else:
  44. cursor.close()
  45. return record
  46.  
  47.  
  48. def extract_data(line):
  49. # Pattern to extract the json message
  50. pattern_json = r'(?:{"event":\s")(.+)(?:"})'
  51. # Pattern to extract date from string
  52. pattern_date = r'(?:<Arrow \[)(.+)(?:]>)'
  53. # Pattern to replace date in the string
  54. pattern_replace_date = r'(<Arrow .+]>)'
  55.  
  56. # Get just the payload
  57. match = re.search(pattern_json, line).groups()[0]
  58. # Replace every '' to ""
  59. data = match.strip("'<>()[]\"` ").replace('\'', '\"')
  60.  
  61. # Extract date
  62. extract_date = re.search(pattern_date, data).groups()[0]
  63.  
  64. # Replace arrow object to date string
  65. data = re.sub(pattern_replace_date, '"' + extract_date + '"', data).replace('None', 'null')
  66.  
  67. # Parsing to json
  68. return json.loads(data)
  69.  
  70.  
  71. def make_filters(data):
  72. available_keys = ['occurrence_dt', 'cookie', 'cpf', 'email']
  73. # Create filters using the available keys
  74. filters = [f"{k}='{v}'" for k, v in data.items() if k in available_keys and v]
  75. return " and ".join(filters)
  76.  
  77.  
  78. def is_client(data):
  79. try:
  80. if '+' in data['email']:
  81. return False
  82. except KeyError:
  83. pass
  84. else:
  85. return True
  86.  
  87.  
  88. def save(data, filename):
  89. with open(filename, 'a') as json_file:
  90. dump = json.dumps(data)
  91. json_file.write(dump + "\n")
  92.  
  93.  
  94. def run():
  95. connect()
  96. for _file in FILES:
  97. with open(_file) as json_file:
  98. x = 0
  99. for line in json_file:
  100. parsing = json.loads(line)
  101. if 'message' in parsing['_source'] and '_tag' in parsing['_source'] and 'production' in \
  102. parsing['_source']['_tag'][0]:
  103. try:
  104. tag = parsing['_source']['_tag'][0]
  105. if 'production' not in tag:
  106. raise AttributeError
  107. message = parsing['_source']['message']
  108. extract = extract_data(message)
  109. except (AttributeError, IndexError, KeyError):
  110. pass
  111. except JSONDecodeError:
  112. logging.info(f"file: {_file} Payload error: {line}")
  113. else:
  114. if is_client(extract):
  115. filters = make_filters(extract)
  116. result = run_query(
  117. f"SELECT * FROM event where {filters}")
  118. if not result:
  119. if extract['event'] == 'loan_granted':
  120. save(extract, 'events_granted.json')
  121. else:
  122. save(extract, 'events.json')
  123. logging.info(f"file: {_file} Data does not exist {extract}")
  124. x += 1
  125. logging.info(f"Total: {x}")
  126. json_file.close()
  127. # Close pg connection
  128. conn.close()
  129.  
  130.  
  131. if __name__ == "__main__":
  132. run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement