Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import re
- import json
- from json import JSONDecodeError
- import psycopg2
- import logging
- logging.basicConfig(filename='logs.log', level=logging.INFO)
- FILES = [
- 'data/43d3a13021.2019-04-16.22.json',
- 'data/43d3a13021.2019-04-17.22.json',
- 'data/43d3a13021.2019-04-17.12.json',
- 'data/43d3a13021.2019-04-18.22.json'
- ]
- HOSTNAME = 'localhost'
- USERNAME = 'postgres'
- PASSWD = 'tracking'
- PORT = 5432
- DB = 'geru_tracking'
- conn = None
- def connect():
- global conn
- try:
- conn = psycopg2.connect(user=USERNAME,
- password=PASSWD,
- host=HOSTNAME,
- port=PORT,
- database=DB)
- except (Exception, psycopg2.Error) as error:
- print("Error while connecting to PostgreSQL", error)
- def run_query(query):
- try:
- cursor = conn.cursor()
- cursor.execute(query)
- record = cursor.fetchone()
- except (Exception, psycopg2.Error) as error:
- print("Error while connecting to PostgreSQL", error)
- else:
- cursor.close()
- return record
- def extract_data(line):
- # Pattern to extract the json message
- pattern_json = r'(?:{"event":\s")(.+)(?:"})'
- # Pattern to extract date from string
- pattern_date = r'(?:<Arrow \[)(.+)(?:]>)'
- # Pattern to replace date in the string
- pattern_replace_date = r'(<Arrow .+]>)'
- # Get just the payload
- match = re.search(pattern_json, line).groups()[0]
- # Replace every '' to ""
- data = match.strip("'<>()[]\"` ").replace('\'', '\"')
- # Extract date
- extract_date = re.search(pattern_date, data).groups()[0]
- # Replace arrow object to date string
- data = re.sub(pattern_replace_date, '"' + extract_date + '"', data).replace('None', 'null')
- # Parsing to json
- return json.loads(data)
- def make_filters(data):
- available_keys = ['occurrence_dt', 'cookie', 'cpf', 'email']
- # Create filters using the available keys
- filters = [f"{k}='{v}'" for k, v in data.items() if k in available_keys and v]
- return " and ".join(filters)
- def is_client(data):
- try:
- if '+' in data['email']:
- return False
- except KeyError:
- pass
- else:
- return True
- def save(data, filename):
- with open(filename, 'a') as json_file:
- dump = json.dumps(data)
- json_file.write(dump + "\n")
- def run():
- connect()
- for _file in FILES:
- with open(_file) as json_file:
- x = 0
- for line in json_file:
- parsing = json.loads(line)
- if 'message' in parsing['_source'] and '_tag' in parsing['_source'] and 'production' in \
- parsing['_source']['_tag'][0]:
- try:
- tag = parsing['_source']['_tag'][0]
- if 'production' not in tag:
- raise AttributeError
- message = parsing['_source']['message']
- extract = extract_data(message)
- except (AttributeError, IndexError, KeyError):
- pass
- except JSONDecodeError:
- logging.info(f"file: {_file} Payload error: {line}")
- else:
- if is_client(extract):
- filters = make_filters(extract)
- result = run_query(
- f"SELECT * FROM event where {filters}")
- if not result:
- if extract['event'] == 'loan_granted':
- save(extract, 'events_granted.json')
- else:
- save(extract, 'events.json')
- logging.info(f"file: {_file} Data does not exist {extract}")
- x += 1
- logging.info(f"Total: {x}")
- json_file.close()
- # Close pg connection
- conn.close()
- if __name__ == "__main__":
- run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement