Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import mysql.connector
- import reddit_constants as rc
- import logging
- class MysqlManager:
- def __init__(self, database, username, password, logger):
- self.username = username
- self.password = password
- self.database = database
- self.logger = logger
- def __enter__(self):
- try:
- self.create_db()
- self.conn = mysql.connector.connect(
- user=self.username,
- passwd=self.password,
- auth_plugin='mysql_native_password',
- database=self.database
- )
- if self.conn.is_connected():
- self.logger.info('Connected to reddit database')
- return self
- except:
- self.logger.error('Error: Could not connect to reddit database')
- self.conn.close()
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.conn.close()
- def create_db(self):
- mydb = mysql.connector.connect(
- user=self.username,
- passwd=self.password,
- auth_plugin='mysql_native_password'
- )
- cursor = mydb.cursor()
- cursor.execute("CREATE DATABASE IF NOT EXISTS " + self.database)
- mydb.close()
- def create_tables(self):
- if not self.conn.is_connected():
- self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
- return
- cursor = self.conn.cursor()
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS subreddit
- (
- subreddit_id INT AUTO_INCREMENT,
- subreddit_name VARCHAR(30) NOT NULL UNIQUE,
- subscriptions INT,
- PRIMARY KEY (subreddit_id)
- )
- """)
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS post (
- post_id INT AUTO_INCREMENT,
- subreddit_id INT NOT NULL,
- post_title VARCHAR(500) NOT NULL,
- post_ref VARCHAR(2084),
- comments_ref VARCHAR(2084),
- username VARCHAR(30),
- created_time DATETIME NOT NULL,
- PRIMARY KEY (post_id),
- FOREIGN KEY (subreddit_id) REFERENCES subreddit(subreddit_id),
- CONSTRAINT UC_post UNIQUE(subreddit_id, post_title, username, created_time)
- )
- """)
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS post_history (
- post_history_id INT AUTO_INCREMENT,
- post_id INT,
- votes INT,
- ranks INT,
- updated_time DATETIME,
- PRIMARY KEY (post_history_id),
- FOREIGN KEY (post_id) references post(post_id)
- )
- """)
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS comment
- (
- comment_id INT AUTO_INCREMENT,
- post_id INT NOT NULL,
- subreddit_id INT NOT NULL,
- username VARCHAR(30) NOT NULL,
- created_time DATETIME NOT NULL,
- PRIMARY KEY (comment_id),
- FOREIGN KEY (subreddit_id) REFERENCES subreddit(subreddit_id),
- FOREIGN KEY (post_id) REFERENCES post(post_id),
- CONSTRAINT UC_comment UNIQUE(subreddit_id, post_id, username, created_time)
- )
- """)
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS comment_history
- (
- comment_history_id INT AUTO_INCREMENT,
- comment_id INT,
- message TEXT,
- votes INT,
- updated_time DATETIME,
- PRIMARY KEY (comment_history_id),
- FOREIGN KEY (comment_id) references comment(comment_id)
- )
- """)
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS youtube_info
- (
- youtube_info_id INT AUTO_INCREMENT,
- post_id INT,
- video_title TEXT,
- publish_date DATETIME,
- view_count INT,
- like_count INT,
- dislike_count INT,
- comment_count INT,
- PRIMARY KEY (youtube_info_id),
- FOREIGN KEY (post_id) REFERENCES post(post_id),
- CONSTRAINT UC_youtube_info UNIQUE(post_id)
- )
- """)
- def insert_subreddits(self, posts):
- if not self.conn.is_connected():
- self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
- return
- cursor = self.conn.cursor()
- for post in posts:
- values = (post[rc.SUBREDDIT_KEY], None)
- query = """
- INSERT IGNORE INTO subreddit (subreddit_name, subscriptions)
- VALUES(%s, %s)
- """
- cursor.execute(query, values)
- self.conn.commit()
- new_id = cursor.lastrowid
- if new_id == 0:
- id_query = "SELECT subreddit_id FROM subreddit WHERE subreddit_name = %s"
- id_values = (post[rc.SUBREDDIT_KEY],)
- cursor.execute(id_query, id_values)
- new_id = cursor.next()[0]
- post[rc.SUBREDDIT_ID] = new_id
- self.logger.info(' - Inserted subreddits from page successfully')
- def insert_posts(self, posts):
- if not self.conn.is_connected():
- self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
- return
- cursor = self.conn.cursor()
- for post in posts:
- post_values = (post[rc.SUBREDDIT_ID],
- post[rc.POST_TITLE_KEY],
- post[rc.POST_REF_KEY],
- post[rc.COMMENTS_REF_KEY],
- post[rc.USER_KEY],
- post[rc.CREATED_TIMESTAMP_KEY])
- post_query = """
- INSERT IGNORE INTO post (subreddit_id, post_title, post_ref,
- comments_ref, username, created_time)
- VALUES (%s, %s, %s, %s, %s, %s)
- """
- cursor.execute(post_query, post_values)
- self.conn.commit()
- new_id = cursor.lastrowid
- if new_id == 0:
- id_query = """
- SELECT post_id
- FROM post
- WHERE subreddit_id = %s
- AND post_title = %s
- AND username = %s
- AND created_time = %s
- """
- id_values = (post[rc.SUBREDDIT_ID],
- post[rc.POST_TITLE_KEY],
- post[rc.USER_KEY],
- post[rc.CREATED_TIMESTAMP_KEY])
- cursor.execute(id_query, id_values)
- new_id = cursor.next()[0]
- post[rc.POST_ID] = new_id
- post_history_values = (post[rc.POST_ID],
- post[rc.VOTE_KEY],
- post[rc.RANK_KEY])
- post_history_query = """
- INSERT INTO post_history (post_id, votes, ranks, updated_time)
- (SELECT %s, %s, %s, NOW())
- """
- cursor.execute(post_history_query, post_history_values)
- self.conn.commit()
- self.logger.info(' - Inserted posts from page successfully')
- def insert_video_info(self, video_info):
- if not self.conn.is_connected():
- self.logger.Error('MySqlManager: You must initialize the connection to MySql')
- return
- cursor = self.conn.cursor()
- video_info_values = (video_info[rc.POST_ID],
- video_info[rc.YOUTUBE_TITLE_KEY],
- video_info[rc.YOUTUBE_PUBLISHED_KEY],
- video_info[rc.YOUTUBE_VIEW_COUNT_KEY],
- video_info[rc.YOUTUBE_LIKE_KEY],
- video_info[rc.YOUTUBE_DISLIKE_KEY],
- video_info[rc.YOUTUBE_COMMENT_KEY])
- video_info_query = """
- INSERT IGNORE INTO youtube_info (post_id,
- video_title,
- publish_date,
- view_count,
- like_count,
- dislike_count,
- comment_count)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- """
- cursor.execute(video_info_query, video_info_values)
- self.conn.commit()
- def insert_comments(self, comments, post):
- if not self.conn.is_connected():
- self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
- return
- cursor = self.conn.cursor()
- for comment in comments:
- comment_values = (post[rc.POST_ID],
- post[rc.SUBREDDIT_ID],
- comment[rc.USER_KEY],
- comment[rc.CREATED_TIMESTAMP_KEY])
- comment_query = """
- INSERT IGNORE INTO comment (post_id, subreddit_id, username, created_time)
- VALUES (%s, %s, %s, %s)
- """
- cursor.execute(comment_query, comment_values)
- self.conn.commit()
- new_id = cursor.lastrowid
- if new_id == 0:
- id_query = """
- SELECT comment_id
- FROM comment
- WHERE post_id = %s
- AND username = %s
- AND created_time = %s
- """
- id_values = (post[rc.POST_ID],
- comment[rc.USER_KEY],
- comment[rc.CREATED_TIMESTAMP_KEY])
- cursor.execute(id_query, id_values)
- new_id = cursor.next()[0]
- comment[rc.COMMENT_ID] = new_id
- comment_history_values = (comment[rc.COMMENT_ID],
- comment[rc.VOTE_KEY],
- comment[rc.MESSAGE_KEY])
- comment_history_query = """
- INSERT INTO comment_history (comment_id, votes, message, updated_time)
- (SELECT %s, %s, %s, NOW())
- """
- cursor.execute(comment_history_query, comment_history_values)
- self.conn.commit()
- self.logger.info('Inserted comments from {} post successfully'.format(post[rc.POST_REF_KEY]))
- def mysql_test_suite():
- import csv
- with MysqlManager(rc.DATABASE_NAME,
- rc.DB_USERNAME,
- rc.DB_PASSWORD,
- logging.getLogger('test')) as mysql_test:
- with open("testfiles/posts.csv", "r") as f:
- reader = csv.DictReader(f)
- posts = list(reader)
- mysql_test.insert_subreddits(posts)
- mysql_test.insert_posts(posts)
- for post_test in posts:
- assert post_test[rc.SUBREDDIT_ID] > 0
- assert post_test[rc.POST_ID] > 0
- with open("testfiles/comments.csv", "r") as f:
- reader = csv.DictReader(f)
- comments = list(reader)
- mysql_test.insert_comments(comments, posts[0])
- for comment in comments:
- assert comment[rc.COMMENT_ID] > 0
- if __name__ == "__main__":
- mysql_test_suite()
- print("All mysql tests ran successfully")
Add Comment
Please, Sign In to add comment