Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests as requester
- import os
- import xml.etree.ElementTree as et
- import base64
- from Principal.configuracion_lectura import lectura,conf_path
- from Principal.validators import Validator
- from Principal.os_identifier import config_path, path_splitter
- # testing on dev
- # from dependencies.configuracion_lectura import lectura
- # from dependencies.validators import Validator
- # from dependencies.os_identifier import config_path,path_splitter
- # conf_path = config_path()
- class DataHandler(object):
- """
- this object is going to manage the data and validate the data
- based on the established parameters in the forms, also it will be able to concoct
- all values and to return a dictionary of dictionaries which it will held the data
- and this is the one to be modified and the post request is going to be generated
- periodically based on this.
- the different parameters required for the object are:
- tables: a dictionary containing the following data:
- :key: names of the table in the database
- :value: the fields of the given database
- """
- def __init__(self, tables, load_related_models=False):
- self.tables = tables
- self.load_related_models = load_related_models
- self.validator = Validator()
- self.general_stacker = {}
- self.tables_columns = {}
- self.tables_columns_config = {}
- self.set_configuration()
- self.validators = {'string': self.validator.check_str,
- 'float': self.validator.check_float,
- 'integer': self.validator.check_int,
- 'date': self.validator.check_date,
- 'datetime': self.validator.check_datetime,
- 'bool': self.validator.check_bool,
- 'email':self.validator.validate_email,
- 'alpha':self.validator.check_alphanumeric
- }
- self.get_all_data()
- def set_configuration(self):
- """
- Sets all the required values in order to start the connection
- :return: None
- """
- configuration_reader = lectura()
- self.main_url = configuration_reader.url
- self.user_id = configuration_reader.user_id
- self.user = configuration_reader.user
- self.password = configuration_reader.api_pass
- self.owner = configuration_reader.owner
- self.end_points = {
- 'base_server': self.main_url,
- 'login': "%s/site/login" % self.main_url,
- 'get_data': "%s/get/getData" % self.main_url,
- 'post_data': "%s/insert/insertData" % self.main_url,
- 'delete_data': "%s/delete/deleteData" % self.main_url,
- 'get_fields': '%s/site/getTableStructure' % self.main_url,
- }
- authorization = "%s:%s" % (self.owner, self.password)
- self.headers = {"Content-Type": "text/xml", "Authorization": base64.b64encode(bytes(authorization,
- encoding='utf-8'))}
- connected = self.verify_connection()
- if not connected:
- print(connected)
- raise ConnectionError('the connection you\'re trying to establish is not available')
- for table in self.tables:
- conf_data = self.get_tables_fields(table)
- if isinstance(conf_data, list):
- self.tables_columns[table] = [column['name'] for column in conf_data if not isinstance(conf_data,str)]
- self.tables_columns_config[table] = conf_data
- else:
- raise ValueError('there was an error with the tables you\'re trying to read:%s' % conf_data)
- else:
- check_missing_models = [(model, error) for model, error in self.tables_columns.items()
- if isinstance(error, str) or error is None]
- if len(check_missing_models) > 0:
- raise AttributeError("Please verify the given models: %s" % check_missing_models.__str__()) # this will be displayed for the moment.
- def get_tables_fields(self, table):
- """
- Gets the fields and data type of the tables provided
- :return: list of dictionaries
- """
- if not self.verify_connection():
- return None
- url = self.end_points['get_fields']
- returning_fields = []
- if self.user_id is None or self.user_id == '':
- raise AttributeError('The user doesn\'t exist') # this will be displayed for the moment.
- pass_encode = base64.b64encode(self.password.encode('ascii'))
- password = ""
- for x in pass_encode:
- password = password + chr(x)
- servicio = et.Element("serviceName_req", name="get")
- estructura = et.SubElement(servicio, "ws_structure")
- parametros = et.SubElement(estructura, "params")
- mod = et.SubElement(parametros, "model").text = table
- atr = et.SubElement(parametros, "attributes")
- s_atr = et.SubElement(atr, "attribute").text = ""
- rel = et.SubElement(parametros, "relations").text = 'false'
- autorizacion = et.SubElement(servicio, "authorization")
- et.SubElement(autorizacion, "owner_code").text = self.owner
- et.SubElement(autorizacion, "id_user").text = self.user_id
- et.SubElement(autorizacion, "language").text = "es"
- tree = et.ElementTree(servicio)
- tree.write(conf_path+"table_fields.xml", xml_declaration=True, encoding='utf-8', method="xml")
- f = open(conf_path+'table_fields.xml', 'r')
- xml_str = f.read()
- f.close()
- response = requester.post(url, data=xml_str, headers=self.headers)
- # print (response.status_code)
- with open(conf_path+"response_table_fields.xml", "wt") as s:
- s.write(response.content.decode())
- s.close()
- parser = et.XMLParser(encoding="ISO-8859-1")
- tree_resp = et.parse(conf_path+"response_table_fields.xml", parser=parser)
- root_resp = tree_resp.getroot()
- status = root_resp[1].text
- if status == 'true':
- field_count = len([r.tag for x in root_resp[3] for r in x if r.tag == "label"])
- fields = self.data_formatter([(r.tag, r.text) for x in root_resp[3] for r in x], 'label', field_count)
- for record in fields:
- field = {}
- # key = [record.pop(record.index(x)) for x in record if 'name' in x][1]
- for values in record:
- field[values[0]] = values[1]
- else:
- returning_fields.append(field)
- else:
- # final_field_count = len(returning_fields)
- # if final_field_count != field_count:
- # print(field_count, final_field_count, table, returning_fields)
- # exit()
- return returning_fields
- else:
- return "the model doesn't exists or authentication failed."
- def validate_user_and_get_data(self, table):
- """
- makes the connection with the server and validates the user, also gets the data from the server
- :param table:
- :return:
- """
- user_id = self.user_id
- pass_encode = base64.b64encode(self.password.encode('ascii'))
- password = ""
- for x in pass_encode:
- password = password + chr(x)
- servicio = et.Element("serviceName_req", name="get")
- estructura = et.SubElement(servicio, "ws_structure")
- parametros = et.SubElement(estructura, "params")
- mod = et.SubElement(parametros, "model").text = table
- atr = et.SubElement(parametros, "attributes")
- s_atr = et.SubElement(atr, "attribute").text = ""
- rel = et.SubElement(parametros, "relations").text = self.load_related_models
- autorizacion = et.SubElement(servicio, "authorization")
- et.SubElement(autorizacion, "owner_code").text = self.owner
- # et.SubElement(autorizacion, "id_user").text = "1"
- et.SubElement(autorizacion, "id_user").text = user_id
- et.SubElement(autorizacion, "language").text = "es"
- tree = et.ElementTree(servicio)
- tree.write(conf_path+"ServiceGetBasic.xml", xml_declaration=True, encoding='utf-8', method="xml")
- f = open(conf_path+'ServiceGetBasic.xml', 'r')
- xml_str = f.read()
- f.close()
- response = requester.post(self.end_points['get_data'], data=xml_str, headers=self.headers)
- # print (response.status_code)
- with open(conf_path+"responseGetBasic.xml", "wt") as s:
- s.write(response.content.decode())
- s.close()
- parser = et.XMLParser(encoding="ISO-8859-1")
- tree_resp = et.parse(conf_path+"responseGetBasic.xml", parser=parser)
- root_resp = tree_resp.getroot()
- if len(root_resp) != 4:
- return (False, "")
- else:
- transaccion = root_resp[0]
- estatus = root_resp[1]
- mensaje = root_resp[2]
- if estatus.text == 'true':
- return True, mensaje.text
- else:
- if mensaje.text == None:
- mensaje.text = "ERROR de autenticacion"
- return False, mensaje.text
- def verify_connection(self):
- """
- checks if the connection to the database server is alive
- :return: bool
- """
- url = self.end_points['base_server']
- self.active = False
- try:
- response = requester.get(url, headers=self.headers, timeout=10)
- if response.status_code == 200:
- self.active = True
- except Exception as e:
- pass
- finally:
- return self.active
- def get_all_data(self):
- """
- gets all the data from the database and loads it in the dictionary
- in the same order for all the existent tables, in case of an error with one
- of the databases it will return the error in the place of the data
- :return: dict
- """
- if self.verify_connection():
- for table in self.tables:
- data = self.get_data(table)
- self.general_stacker[table] = data
- else:
- raise ConnectionError('The server is not connected, please try again.')
- def get_data(self, table):
- """
- Gets the data from the given table
- :param table: table name to get the data from.
- :return: dict.
- """
- # getter = xml_get()
- status, data = self.validate_user_and_get_data(table)
- if status and 'no arrojo' not in data:
- # print(data,table)
- finished_data = self.read_xml(table)
- return finished_data
- else:
- return status, data
- def read_xml(self, table):
- """
- reads the xml file and give back all the data in a list of dictionaries
- :param table:
- :return:
- """
- parsed_data = []
- parser = et.XMLParser(encoding="ISO-8859-1")
- tree_resp = et.parse(conf_path+"responseGetBasic.xml", parser=parser)
- global root_resp
- root_resp = tree_resp.getroot()
- record_amount = int(root_resp[3][0].text)
- records = self.data_formatter([(r.tag,r.text) for x in root_resp[3][1:] for r in x],
- splitter_label='representing_column',num_records=record_amount)
- columns = [x[0] for x in records[0]]
- equality = self.equality_evaluator(table,gathered_columns=columns)
- if not equality[0]:
- raise AttributeError("La tabla %s tiene el error: %s" % (table, equality[1]))
- for row in records:
- register = {}
- columns = [x[0] for x in row]
- keys = [x for x in self.tables_columns[table]]
- for index in range(len(columns)):
- register[columns[index]] = row[index][1]
- else:
- parsed_data.append(register)
- else:
- return parsed_data
- def equality_evaluator(self, table, gathered_columns):
- """
- evaluates the equality of the columns
- :param table:
- :param gathered_columns:
- :return:
- """
- equality = False
- keys = [x for x in self.tables_columns[table]]
- for key in keys:
- if key not in gathered_columns:
- return equality, "the column %s doesn't exist in the database." % key
- else:
- if len(keys) != len(gathered_columns):
- return equality, "the provided table %s doesn't have the same number of columns in the " \
- "database."
- return True, "No errors"
- def data_formatter(self, data, splitter_label, num_records=0):
- """
- rearrange the given data so it could be processed better
- :param data: non-formatted data
- :param splitter_label: label to use as reference to split the values in
- :param num_records: number of identified rows
- :return: list of lists
- """
- s = 'Splitter'
- formatted_data = []
- for val_index in range(len(data)):
- if splitter_label in data[val_index]:
- data[val_index] = s, val_index
- # print(data)
- if s in data[0] and splitter_label != "label":
- del data[0]
- splitting_indexes = [val[1] for val in data if s in val and 0 not in val]
- last_index = 0
- for index in splitting_indexes:
- formatted_data.append(data[last_index:index])
- last_index = index
- else:
- if num_records > 0 and num_records != len(formatted_data):
- formatted_data.append(data[last_index:])
- for item in range(len(formatted_data)):
- formatted_data[item] = [x for x in formatted_data[item] if s not in x]
- else:
- if num_records == len(formatted_data):
- return formatted_data
- return None
- def post_record(self, table, record, operation, mode='insert'):
- """
- takes a record which belongs to the given table and inserts is in the remote
- database
- :param table: name of the table in the main dictionary
- :param record: data to be inserted into table
- :return: list of dictionaries
- """
- inserter = xml_post()
- validated = self.validate_table_data(table, data=record)
- if len(validated) > 1 and validated[0]:
- # local operations
- if mode == 'update':
- updated,errors = self.update_table(table,record)
- if not updated:
- return errors
- elif mode == 'delete':
- deleted, errors = self.remove_record_table(table,record)
- if not deleted:
- return errors
- else:
- record_local = record.copy()
- if 'id' not in record.keys():
- record_local['id'] = self.get_id(table)
- added, errors = self.add_record_table(table, record_local)
- if not added:
- return errors
- # remote operations
- record['modelo'] = table
- record = [record] if not isinstance(record,list) else record
- status, msg = inserter.valida_credencial(usuario=self.user, id_usuario=self.user_id,
- listaRecords=record, service=operation)
- if not status:
- return msg
- return True
- return validated[1]
- def get_id(self, table):
- """
- genera un nuevo ID para la ruta
- :return: str
- """
- new_id = "1"
- print(self.general_stacker[table])
- if self.general_stacker[table] == [] or isinstance(self.general_stacker[table], tuple):
- return new_id
- # print(self.db_object.db[self.model][-1],self.db_object.db[self.model][-1].keys(),
- # 'id' in self.db_object.db[self.model][-1].keys())
- ids = [int(obj['id']) for obj in self.general_stacker[table][:]]
- new_id = str(max(ids) + 1)
- return new_id
- def add_record_table(self, table, record):
- """
- adds the record locally to the table in order
- :param table: table to append the value
- :param record: record which it should be unique with the id
- :return:tuple: bool, error_msg
- """
- result = False
- error_msg = None
- id_exists = [True for exist in self.general_stacker[table][:] if isinstance(exist,dict) and
- exist['id'] == record['id']]
- if id_exists == [] :
- result = True
- if isinstance(self.general_stacker[table],tuple):
- self.general_stacker[table] = []
- self.general_stacker[table].append(record)
- else:
- error_msg = "the record you're trying to append already exists in the database"
- return result, error_msg
- def update_table(self, table, record):
- """
- updates the given record if found locally
- :param table: table
- :param record: dictionary, record to update
- :return: tuple bool and errors
- """
- result = False
- error = None
- rows_to_update = self.lookup_records(table, id=record['id'], mode='single')
- if rows_to_update != []:
- self.general_stacker[table][rows_to_update[1]] = record
- # print(self.general_stacker[table],'\n')
- result = True
- else:
- error = "Error. No record was found with the specified value."
- return result, error
- def remove_record_table(self, table, record):
- """
- deletes the given record if found locally
- :param table: table
- :param record: dictionary, record to update
- :return: tuple bool and errors
- """
- result = False
- error = None
- rows_to_delete = self.lookup_records(table, id=record['id'], mode='single')
- if rows_to_delete != []:
- self.general_stacker[table].pop(rows_to_delete[1])
- result = True
- else:
- error = "Error. No record was found with the specified value."
- return result, error
- def lookup_records(self, table, **kwargs):
- """
- gets the records in the general_stacker that matches with the given conditions
- for now it's going to work with just the ID
- :param table: string
- :param kwargs: dict
- :return: list of tuples
- """
- table_columns = self.tables_columns[table]
- look_up_filters = [key for key in kwargs.keys() if key in table_columns]
- results = [(dicty, self.general_stacker[table].index(dicty)) for dicty in self.general_stacker[table]
- for lookup in look_up_filters
- if dicty[lookup] == kwargs[lookup]]
- if 'mode' in kwargs.keys() and kwargs['mode'] == 'single':
- return results[0] if results != [] else []
- return results
- def validate_table_data(self, table, data=None):
- """
- validates the data in the given specified table is correct as required
- in case of any error, it will return the error
- the correctness will be validated based on the field the table takes. for example:
- if there's a data missing which is required, it will return the error or if the data structure is not right
- :return: tuple
- """
- if data is None:
- return "ERROR. No Data was passed. Try again"
- error_msgs = []
- validated = False
- # accepted_dtypes_column = [(dtype['name'], dtype['type']) for dtype in self.tables_columns_config[table]]
- passed_columns = [key for key in data.keys()]
- full_validation = []
- for key in passed_columns:
- if key not in self.tables_columns[table]:
- return "ERROR, The following doesn't exist in the predefined columns %s" % key
- dtype = self.get_field_dtype(key,table)
- data_validated = self.validate_data(data_type=dtype, data=data[key])
- full_validation.append([key, data[key], data_validated])
- else:
- # print(full_validation)
- val = [result[2][0] for result in full_validation]
- # print(val,full_validation,self.tables_columns_config[table])
- if False in val and val.count(False) >= 1:
- bad_items = [item for item in full_validation if not item[2][0]]
- for item in bad_items:
- error_msgs.append("Error. The data %s is not appropriated to the field %s because %s" %
- (item[1], item[0], item[2][1]))
- else:
- validated = True
- return validated, error_msgs
- def get_field_dtype(self, field, table):
- """
- gets the data type of a given field
- :param field:
- :return: str: data type of the field.
- """
- dtype = None
- for dicty in self.tables_columns_config[table]:
- if dicty['name'] == field:
- dtype = dicty['type']
- break
- return dtype
- def validate_data(self, data_type, data):
- """
- validates the data with the existing supported data types
- verify the module validators for more information.
- :param data_type: str: name of the data type
- :param data: obj to be evaluated
- :return: bool
- """
- if data_type is None:
- return False
- validation_result = self.validators[data_type](value=data)
- if not validation_result[0] and data_type =='string':
- validation_result = self.validators['email'](data)
- if not validation_result[0]:
- validation_result = self.validators['alpha'](data)
- return validation_result
- class xml_post():
- # I need modify valida_Credencial method for work with several services
- def valida_credencial(self, usuario, id_usuario, listaRecords, service='insert'):
- # variables de conexion tomadas del json de configuracion
- # id de usuario harcodeado hasta que funcione API
- cfg = lectura()
- g_dominio = cfg.url
- g_owner = cfg.owner
- g_api_pass = cfg.api_pass
- servicio = et.Element('serviceName_req', name=service)
- estructura = et.SubElement(servicio, 'ws_structure')
- parametros = et.SubElement(estructura, 'params')
- for records in listaRecords:
- for k, v in records.items():
- if k == 'modelo':
- mod = et.SubElement(parametros, 'model', name=v)
- atr = et.SubElement(mod, 'attributes')
- for records in listaRecords:
- for k, v in records.items():
- if k != 'modelo':
- if k == 'id' and v == 0:
- s_atr = et.SubElement(atr, k).text = None
- else:
- s_atr = et.SubElement(atr, k).text = v
- autorizacion = et.SubElement(servicio, 'authorization')
- et.SubElement(autorizacion, 'owner_code').text = g_owner
- et.SubElement(autorizacion, 'id_user').text = id_usuario
- et.SubElement(autorizacion, 'language').text = 'es'
- tree = et.ElementTree(servicio)
- tree.write(conf_path+'ServicePostBasic.xml', xml_declaration=True, encoding='utf-8',
- method='xml')
- f = open(conf_path + 'ServicePostBasic.xml', 'r')
- xml_str = f.read()
- f.close()
- api = g_owner + ':' + g_api_pass
- cabeceras = {'Content-Type': 'text/xml', 'Authorization': base64.b64encode(api.encode())}
- url = g_dominio + '/insert/insertData' if service == 'insert' else g_dominio + '/delete/deleteData'
- response = requester.post(url, data=xml_str, headers=cabeceras)
- # print (response.status_code)
- with open(conf_path + 'ResponsePostBasic.xml', 'wt') as s:
- s.write(response.content.decode())
- s.close()
- parser = et.XMLParser(encoding='ISO-8859-1') # a evaluar, por codificacion.
- tree_resp = et.parse(conf_path + 'ResponsePostBasic.xml', parser=parser)
- root_resp = tree_resp.getroot()
- if len(root_resp) != 4:
- return (False, '')
- else:
- transaccion = root_resp[0]
- estatus = root_resp[1]
- mensaje = root_resp[2]
- if estatus.text == 'true':
- return (True, mensaje.text)
- else:
- if mensaje.text == None:
- mensaje.text = 'ERROR de autenticacion'
- return (False, mensaje.text)
- def main():
- # api_tester = DataHandler(owner="materer", user='systemcore', user_password="Mate1275",
- # database="MATERER",database_password="MATERER2017",
- # tables=["articulos_productos",
- # "articulos_configuracion",
- # 'articulos',
- # 'articulos_tipo',
- # 'articulos_categorias',
- # 'alicuotas',
- # 'monedas',
- # 'unidades',
- # 'configuracion_articulos',
- # ])
- api_tester = DataHandler(tables=[
- 'empresa_contable','ejercicios_contables'
- ])
- print(api_tester.general_stacker)
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement