Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from sqlalchemy import create_engine
- import json
- import requests
- import logging
- from logging.handlers import RotatingFileHandler
- import sys
- connector_id = "mysql_v1_db_sif_canvas"
- logger = logging.getLogger(connector_id)
- hdlr = RotatingFileHandler('./logs/' + connector_id + '.log', maxBytes=10485760, backupCount=10)
- formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
- hdlr.setFormatter(formatter)
- logger.addHandler(hdlr)
- def error_handling():
- return 'Error: {}. {}, line: {}'.format(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2].tb_lineno)
- def execute_mysql(db_conn, query):
- # db_conn.connect()
- query = db_conn.execute(query)
- # db_conn.close()
- return [dict(zip(tuple(query.keys()), i)) for i in query.cursor]
- def execute(manifest):
- try:
- log_level = manifest["log_level"]
- if log_level == "info" or log_level == "Info" or log_level == "INFO":
- logger.setLevel(logging.INFO)
- elif log_level == "warning" or log_level == "Warning" or log_level == "WARNING" or log_level == "WARN" \
- or log_level == "warn" or log_level == "Warn":
- logger.setLevel(logging.WARNING)
- else:
- logger.setLevel(logging.ERROR)
- db_engine = create_engine('mysql+pymysql://' + manifest["connector_config"]["db_username"]["value"] + ':'
- + manifest["connector_config"]["db_password"]["value"] + '@'
- + manifest["connector_config"]["db_address"]["value"] + '/'
- + manifest["connector_config"]["db_name"]["value"], pool_recycle=3600)
- db_conn = db_engine.connect()
- response = None
- if manifest["job_type"] == 'read':
- fieldmapping = manifest["job_parameters"]["field_mapping"]
- if "unique_id" in manifest["job_parameters"]:
- unique_id = manifest["job_parameters"]["unique_id"]
- else:
- unique_id = ""
- sqlfields = []
- for k in fieldmapping.keys():
- sqlfields.append(" " + k + " AS " + fieldmapping[k] + " ")
- fields = ",".join(sqlfields)
- print(fields)
- filter_list = []
- filter_string = ""
- if "filters" in manifest["job_parameters"].keys():
- filters = manifest["job_parameters"]["filters"]
- andfilter: object
- for andfilter in filters:
- filter_list_or = []
- for orfilter in andfilter:
- filter_list_or.append(
- orfilter["field"] + " " + orfilter["operator"] + " " +
- orfilter["value"].replace("---quote---", "'") + " ")
- if len(filter_list_or) > 0:
- filter_list.append("(" + " or ".join(filter_list_or) + ")")
- if len(filter_list) > 0:
- filter_string = " where " + " and ".join(filter_list)
- if manifest["recordtype"] == 'Users':
- query = "select " + fields + " from " + manifest["job_parameters"]["table_name"] + filter_string
- elif manifest["recordtype"] == 'Courses':
- query = "select " + fields + " from " + manifest["job_parameters"]["table_name"] + filter_string
- elif manifest["recordtype"] == 'Sections':
- query = "select " + fields + " from " + manifest["job_parameters"]["table_name"] + filter_string
- elif manifest["recordtype"] == 'Enrolments':
- query = "select " + fields + " from " + manifest["job_parameters"]["table_name"] + filter_string
- else:
- return {"error": {"connector": "invalid recordtype"}}
- query_result = execute_mysql(db_conn, query)
- records = []
- for item in query_result:
- if unique_id == "":
- record = {"data": item}
- # print(record)
- records.append(record)
- else:
- record = {"data": item, "id": item[unique_id]}
- # print(record)
- records.append(record)
- manifest["records"] = records
- logger.info("connector post manifest: " + str(json.dumps(manifest)))
- response = requests.request("POST", manifest["core_config"]["core_api_url"],
- headers={
- "Content-Type": "application/json",
- "X-Secret": manifest["token"],
- "job_id": str(manifest["job_id"]),
- "tenant_id": manifest["tenant_id"]
- },
- data=json.dumps(manifest)
- )
- else:
- return {"error": {"connector": "invalid job_type"}}
- if response.status_code < 300:
- return {"success": {"core_api_status_code": str(response.status_code)}}
- else:
- return {"error": {"core_api_status_code": str(response.status_code)}}
- except:
- logger.error("exception: " + str(error_handling()))
- return {"error": {"exception: ": str(error_handling())}}
- manifest = {
- "tenant_id": "reciprocity_ppd",
- "log_level": "info",
- "recordtype": "Users",
- "job_type": "read",
- "core_config": {
- "core_api_url": "https://reciprocity.saasyan.com.au/api/v1.0/connector/records"
- },
- "token": "3c6faf72-7f96-4870-a035-35e1435df7f3",
- "job_id": 2729,
- "records": [],
- "end_of_list": 1,
- "job_status": "success",
- "connector_config": {
- "db_name": {
- "type": "string",
- "value": "assure",
- "required": "true",
- "description": "Database Name"
- },
- "db_port": {
- "type": "integer",
- "value": "3306",
- "required": "true",
- "description": "Database Port"
- },
- "db_address": {
- "type": "string",
- "value": "10.12.128.100",
- "required": "true",
- "description": "Database Address"
- },
- "db_password": {
- "type": "password",
- "value": "foQBGUPzTbFiFugJa2sC6abdGuiszbvh",
- "required": "true",
- "description": "Database Password"
- },
- "db_username": {
- "type": "string",
- "value": "root",
- "required": "true",
- "description": "Database Username"
- }
- },
- "job_parameters": {
- "unique_id": "user_id",
- "table_name": "sis_users",
- "field_mapping": {
- "lower(email)": "email",
- "user_id": "user_id",
- "lower(login_id)": "login_id",
- "last_name": "last_name",
- "first_name": "first_name"
- }
- },
- "connector_id": "mysql_v1_db_sif_canvas"
- }
- print(execute(manifest))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement