Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import boto3
- import json
- import os
- import hvac
- import sys
- import mysql.connector
- import base64
- import time
- from threading import Thread
- from mysql.connector import errorcode
- AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
- AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
- s3 = boto3.client('s3',aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
- S3_BUCKET = os.environ['S3_BUCKET']
- MYSQL_ADDR = os.environ['MYSQL_ADDR']
- vault = hvac.Client(url=os.environ['VAULT_ADDR'], token=os.environ['VAULT_TOKEN'])
- action = sys.argv[1]
- def getMySQLcreds(vault):
- mysqlcreds = vault.read('mysql/creds/readonly')
- return mysqlcreds
- def mysqlclient(mysqlcreds, mysql_addr):
- print("Connecting to Mysql as %s" % mysqlcreds['data']['username'])
- try:
- mysqlobj = mysql.connector.connect(user=mysqlcreds['data']['username'], password=mysqlcreds['data']['password'],
- host=mysql_addr,
- database='world')
- except mysqlobj.connector.Error as err:
- if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
- print("Something is wrong with your username or password")
- elif err.errno == errorcode.ER_BAD_DB_ERROR:
- print("Database does not exist")
- else:
- print(err)
- else:
- return mysqlobj
- def iterate_bucket_items(s3, s3_bucket):
- paginator = s3.get_paginator('list_objects_v2')
- page_iterator = paginator.paginate(Bucket=s3_bucket)
- for page in page_iterator:
- for item in page['Contents']:
- yield item
- def importtoS3(vault, s3, s3_bucket):
- mysqlcreds = getMySQLcreds(vault)
- mysqlconn = mysqlclient(mysqlcreds, MYSQL_ADDR)
- cursor = mysqlconn.cursor()
- start_time = time.time()
- cursor.execute("select * from City")
- print("---Time selecting dataset: %s seconds ---" % (time.time() - start_time))
- for (ID, Name, CountryCode, District, Population) in cursor:
- t = Thread(target=makeJson, args=(vault, s3, s3_bucket, ID, Name, CountryCode, District, Population))
- t.start()
- mysqlconn.close()
- def makeJson(vault, s3, s3_bucket, ID, Name, CountryCode, District, Population):
- # Take the starting time for the whole function
- tot_time = time.time()
- # Take the starting time for encryption
- enc_time = time.time()
- # Base64 a single Value
- Nameb64 = base64.b64encode(Name.encode('utf-8'))
- # Encrypt it through Vault
- NameEnc = vault.write('transit/encrypt/world-transit', plaintext=bytes(Nameb64), context=base64.b64encode('world-transit'))
- # Calculate how long it took to encrypt
- eetime = time.time() - enc_time
- # Create the object to persist and convert it to JSON
- Cityobj = { "ID": ID, "Name": NameEnc['data']['ciphertext'], "CountryCode": CountryCode, "District": District, "Population": Population }
- City = json.dumps(Cityobj)
- filename = "%s.json" % ID
- #print("Writing %s" % filename)
- # Take the starting time for persisting it into S3, for comparison
- store_time = time.time()
- # Persist the object
- s3.put_object(Body=City, Bucket=s3_bucket, Key=filename)
- # Calculate how long it took to store it
- sstime = time.time() - store_time
- # Calculate how long it took to run the whole function
- tttime = time.time() - tot_time
- print("%i,%s,%s,%s\n" % (int(ID), str(sstime), str(eetime), str(tttime)))
- def exporttoMySQL(vault, s3, s3_bucket):
- mysqlcreds = getMySQLcreds(vault)
- mysqlconn = mysqlclient(mysqlcreds, MYSQL_ADDR)
- cursor = mysqlconn.cursor()
- cursor.execute('DROP TABLE IF EXISTS `CityDecoded`;')
- cursor.execute('''CREATE TABLE `CityDecoded` (
- `ID` int(11) NOT NULL auto_increment,
- `Name` char(35) NOT NULL default '',
- `CountryCode` char(3) NOT NULL default '',
- `District` char(20) NOT NULL default '',
- `Population` int(11) NOT NULL default '0',
- PRIMARY KEY (`ID`)
- ) ENGINE=MyISAM DEFAULT CHARSET=latin1;''')
- for file in iterate_bucket_items(s3, s3_bucket):
- print("Decoding and storing %s" % file['Key'])
- file = s3.get_object(Key=file['Key'], Bucket=s3_bucket)
- content = json.loads(file['Body'].read())
- NameEnc = content['Name']
- NameDec = vault.write('transit/decrypt/world-transit', ciphertext=NameEnc, context=base64.b64encode('world-transit'))
- Name = base64.b64decode(NameDec['data']['plaintext'])
- print("Inserting row in the table for City %s" % str(Name))
- cursor.execute('INSERT INTO `CityDecoded` VALUES (%i,\'%s\',\'%s\',\'%s\',%i);' % (int(content['ID']), str(Name), content['CountryCode'].encode('utf-8'), content['District'].encode('utf-8'), int(content['Population'])))
- mysqlconn.close()
- def main():
- if action == "import":
- importtoS3(vault,s3,S3_BUCKET)
- elif action == "export":
- exporttoMySQL(vault,s3,S3_BUCKET)
- else:
- print("What should I do? import/export")
- if __name__ == "__main__": main()
Add Comment
Please, Sign In to add comment