Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import psycopg2
- import time
- import selenium
- from selenium import webdriver
- import openpyxl
- from openpyxl import load_workbook
- import subprocess
- from email.mime.text import MIMEText
- from email.mime.application import MIMEApplication
- from email.mime.multipart import MIMEMultipart
- from smtplib import SMTP
- import smtplib
- import datetime
- from PyPDF2 import PdfFileWriter, PdfFileReader
- from time import mktime
- import sys
- import os
- import re
- import redis
- import requests
- class Useful_Functions:
- def __init__(self):
- pass
- def check_cache(cache_key):
- """
- Checks the cache to see if a key exists
- param cache_key: A string representing a key
- type cache_key: string
- """
- db = redis.from_url(os.environ.get("REDIS_URL"))
- if db.get(cache_key):
- print("Found cache data, continuing and then comparing against this.")
- return True
- else:
- return False
- def write_cache(data, cache_key):
- """
- Write data with a key of cache_key to the redis cache
- param data: A python dictionary holding the data you want cached
- param cache_key: A string representing a key
- type data: dict
- type cache_key: string
- """
- db = redis.from_url(os.environ.get("REDIS_URL"))
- try:
- db.set(cache_key, json.dumps(data))
- return True
- except Exception as e:
- print(e)
- print("Failed to write to cache")
- def read_cache(cache_key):
- """
- Read data with a key of cache_key from the redis cache
- param cache_key: A string representing a key
- type cache_key: string
- """
- db = redis.from_url(os.environ.get("REDIS_URL"), decode_responses=True)
- data = json.loads(db.get(cache_key))
- return data
- def send_slack_message(**kwargs):
- """
- Sends a messages to Slack via a Webhook
- param: webhook_url
- param: msg_username
- param: msg_text
- param: msg_channel
- param: icon_url
- type webhook_url: string
- type msg_username: string
- type msg_text: string
- type msg_channel: string
- type icon_url: string
- """
- if not webhook_url:
- post_url = os.environ.get("SLACK_WEBHOOK_URL")
- else:
- post_url = webhook_url
- payload={"text": msg_text),
- "username": msg_username,
- "channel": msg_channel,
- "icon_url": icon_url}
- response = requests.post(post_url, json=payload)
- if response.status_code == 201:
- print("Message sent to Slack")
- return True
- else:
- return False
- def postgres_ref_db_connection():
- """
- Set up the postgres connection
- """
- return psycopg2.connect(host='HOST',
- user='USERNAME',
- password='PASSWORD',
- database='DB_NAME',
- port=5432)
- def insert_records(insert_statement, values):
- """
- Inserts a list of values into a database
- param insert_statement: The SQL query used to insert a row of data
- type insert_statement: string representing a SQL query
- param values: the list of tuples to insert into the database
- type values: list containing a tuple
- """
- if len(values) > 0:
- db = postgres_ref_db_connection()
- cur = db.cursor()
- try:
- if len(values) >= 1:
- cur.executemany(insert_statement, values)
- db.commit()
- print("INSERTED INTO DB")
- except Exception as err:
- print("EXCEPTION")
- print(err)
- db.rollback()
- pass
- finally:
- cur.close()
- db.close()
- def create_firefox_driver(path_to_profile):
- """
- Creates a firefox driver using a specified profile
- param path_to_profile: The filepath of the profile
- type path_to_profile: String representing a filepath
- """
- profile = webdriver.FirefoxProfile(path_to_profile)
- driver = webdriver.Firefox(profile)
- return driver
- def make_pdf_from_xlsx(filename, path_to_edit, path_to_completed, pdf_ready):
- """
- Converts XLSX to PDF
- param filename: The name of the xlsx file to edit
- type filename: String representing a file name
- param path_to_edit: The location of the folder containing a file to edit
- type path_to_edit: String representing a filepath
- param path_to_edit: The location of the folder to place a completed file
- type path_to_completed: String representing a filepath
- param pdf_ready: The location of the folder containing a file to edit
- type pdf_ready: String representing a filepath
- """
- try:
- for i in range(5):
- pdf_name = filename.replace('xlsx', 'pdf')
- try:
- with timeout(45, exception=RuntimeError):
- pdf_command = 'cd /Applications/LibreOffice.app/Contents/MacOS && ./soffice --headless --convert-to pdf --outdir ' + pdf_ready + ' ' + filename
- subprocess.check_output(['bash', '-c', pdf_command])
- break
- except RuntimeError:
- kill_command = 'killall soffice && killall libreoffice && killall libre'
- process = subprocess.Popen(kill_command.split(), stdout=subprocess.PIPE)
- time.sleep(5)
- output, error = process.communicate()
- continue
- except Exception as e:
- print(e)
- print('Fucked up converting to pdf')
- return None
- try:
- to_edit = path_to_edit + pdf_name
- edited = path_to_completed + pdf_name
- edit_pdf(to_edit)
- crop_pdf(to_edit, edited)
- return edited
- except Exception as e:
- print(e)
- print('Fucked up editing the PDF')
- return None
- def send_gmail(attachment, attachment2, recipient, message_text, subject, email_from, reply_to, username, password):
- """
- Sends an email from gmail with up to 2 attachments
- param attachment: The first attachment to the email
- type attachment: String representing a file path
- param attachment2: The second attachment to the email
- type attachment2: String representing a file path
- param recipient: The recipient's email address
- type recipient: String representing an email address
- param message_text: The body text of the email
- type message_text: String representing a message
- param subject: The subject of the email
- type subject: String representing a subject line
- param email_from: The sender email address (your email)
- type email_from: String representing an email address
- param reply_to: The email address that will receive a reply
- type reply_to: String representing an email address
- param username: Your gmail login
- type username: String representing an email address
- param password: Your gmail password
- type password: String representing a password
- """
- recipients = [recipient]
- emaillist = [elem.strip().split(',') for elem in recipients]
- msg = MIMEMultipart()
- msg['Subject'] = subject
- msg['From'] = email_from
- msg['Reply-to'] = reply_to
- msg.preamble = 'Multipart massage.\n'
- part = MIMEText(message_text)
- msg.attach(part)
- part = MIMEApplication(open(str(pdf_file),"rb").read())
- pdf_name = pdf_file
- part.add_header('Content-Disposition', 'attachment', filename=pdf_name)
- msg.attach(part)
- part = MIMEApplication(open(str(xlsx_file),"rb").read())
- part.add_header('Content-Disposition', 'attachment', filename=xlsx_file)
- msg.attach(part)
- server = smtplib.SMTP("smtp.gmail.com:587")
- server.ehlo()
- server.starttls()
- server.login(username, password)
- server.sendmail(msg['From'], emaillist , msg.as_string())
- def clean_text(text):
- """
- Clean up text and remove the nonsense
- param text: review text
- type text: string
- """
- return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", text).split())
- def get_sentiment(review):
- """
- Get sentiment analysis from a review
- param review: An Amazon product review
- type review: string
- """
- analysis = TextBlob(clean_text(review))
- if analysis.sentiment.polarity > 0:
- return 'positive', analysis.sentiment.polarity
- elif analysis.sentiment.polarity == 0:
- return 'neutral', analysis.sentiment.polarity
- else:
- return 'negative', analysis.sentiment.polarity
- def get_2fa_from_pushbullet(api_key, text_to_strip=''):
- """
- Receives a 2fa code from Pushbullet that was sent there from your phone using something like IFTTT or AutoMagic
- param api_key: Your pushbullet api key
- type api_key: String representing an api key
- param text_to_strip: The extraneous text you want to strip from the 2fa message
- type text_to_strip: String
- """
- pb = Pushbullet(api_key)
- for i in range(100):
- pushes = pb.get_pushes()
- latest = pushes[0]['body']
- check = open('2fa.txt', 'r').read()
- if latest == check:
- time.sleep(10)
- else:
- open('2fa.txt', 'w').close()
- with open("2fa.txt", "w") as text_file:
- text_file.write(latest)
- break
- pb.delete_push(pushes[0].get("iden"))
- code = latest.replace(text_to_strip, '')
- return code
Add Comment
Please, Sign In to add comment