Guest User

Untitled

a guest
Dec 4th, 2018
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 11.17 KB | None | 0 0
  1. import mysql.connector
  2. import reddit_constants as rc
  3. import logging
  4.  
  5.  
  6. class MysqlManager:
  7. def __init__(self, database, username, password, logger):
  8. self.username = username
  9. self.password = password
  10. self.database = database
  11. self.logger = logger
  12.  
  13. def __enter__(self):
  14. try:
  15. self.create_db()
  16. self.conn = mysql.connector.connect(
  17. user=self.username,
  18. passwd=self.password,
  19. auth_plugin='mysql_native_password',
  20. database=self.database
  21. )
  22. if self.conn.is_connected():
  23. self.logger.info('Connected to reddit database')
  24. return self
  25. except:
  26. self.logger.error('Error: Could not connect to reddit database')
  27. self.conn.close()
  28.  
  29. def __exit__(self, exc_type, exc_val, exc_tb):
  30. self.conn.close()
  31.  
  32. def create_db(self):
  33. mydb = mysql.connector.connect(
  34. user=self.username,
  35. passwd=self.password,
  36. auth_plugin='mysql_native_password'
  37. )
  38.  
  39. cursor = mydb.cursor()
  40.  
  41. cursor.execute("CREATE DATABASE IF NOT EXISTS " + self.database)
  42.  
  43. mydb.close()
  44.  
  45. def create_tables(self):
  46. if not self.conn.is_connected():
  47. self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
  48. return
  49.  
  50. cursor = self.conn.cursor()
  51. cursor.execute("""
  52. CREATE TABLE IF NOT EXISTS subreddit
  53. (
  54. subreddit_id INT AUTO_INCREMENT,
  55. subreddit_name VARCHAR(30) NOT NULL UNIQUE,
  56. subscriptions INT,
  57. PRIMARY KEY (subreddit_id)
  58. )
  59. """)
  60.  
  61. cursor.execute("""
  62. CREATE TABLE IF NOT EXISTS post (
  63. post_id INT AUTO_INCREMENT,
  64. subreddit_id INT NOT NULL,
  65. post_title VARCHAR(500) NOT NULL,
  66. post_ref VARCHAR(2084),
  67. comments_ref VARCHAR(2084),
  68. username VARCHAR(30),
  69. created_time DATETIME NOT NULL,
  70. PRIMARY KEY (post_id),
  71. FOREIGN KEY (subreddit_id) REFERENCES subreddit(subreddit_id),
  72. CONSTRAINT UC_post UNIQUE(subreddit_id, post_title, username, created_time)
  73. )
  74. """)
  75.  
  76. cursor.execute("""
  77. CREATE TABLE IF NOT EXISTS post_history (
  78. post_history_id INT AUTO_INCREMENT,
  79. post_id INT,
  80. votes INT,
  81. ranks INT,
  82. updated_time DATETIME,
  83. PRIMARY KEY (post_history_id),
  84. FOREIGN KEY (post_id) references post(post_id)
  85. )
  86. """)
  87.  
  88. cursor.execute("""
  89. CREATE TABLE IF NOT EXISTS comment
  90. (
  91. comment_id INT AUTO_INCREMENT,
  92. post_id INT NOT NULL,
  93. subreddit_id INT NOT NULL,
  94. username VARCHAR(30) NOT NULL,
  95. created_time DATETIME NOT NULL,
  96. PRIMARY KEY (comment_id),
  97. FOREIGN KEY (subreddit_id) REFERENCES subreddit(subreddit_id),
  98. FOREIGN KEY (post_id) REFERENCES post(post_id),
  99. CONSTRAINT UC_comment UNIQUE(subreddit_id, post_id, username, created_time)
  100. )
  101. """)
  102.  
  103. cursor.execute("""
  104. CREATE TABLE IF NOT EXISTS comment_history
  105. (
  106. comment_history_id INT AUTO_INCREMENT,
  107. comment_id INT,
  108. message TEXT,
  109. votes INT,
  110. updated_time DATETIME,
  111. PRIMARY KEY (comment_history_id),
  112. FOREIGN KEY (comment_id) references comment(comment_id)
  113. )
  114. """)
  115.  
  116. cursor.execute("""
  117. CREATE TABLE IF NOT EXISTS youtube_info
  118. (
  119. youtube_info_id INT AUTO_INCREMENT,
  120. post_id INT,
  121. video_title TEXT,
  122. publish_date DATETIME,
  123. view_count INT,
  124. like_count INT,
  125. dislike_count INT,
  126. comment_count INT,
  127. PRIMARY KEY (youtube_info_id),
  128. FOREIGN KEY (post_id) REFERENCES post(post_id),
  129. CONSTRAINT UC_youtube_info UNIQUE(post_id)
  130. )
  131. """)
  132.  
  133. def insert_subreddits(self, posts):
  134. if not self.conn.is_connected():
  135. self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
  136. return
  137. cursor = self.conn.cursor()
  138.  
  139. for post in posts:
  140. values = (post[rc.SUBREDDIT_KEY], None)
  141. query = """
  142. INSERT IGNORE INTO subreddit (subreddit_name, subscriptions)
  143. VALUES(%s, %s)
  144. """
  145. cursor.execute(query, values)
  146. self.conn.commit()
  147. new_id = cursor.lastrowid
  148. if new_id == 0:
  149. id_query = "SELECT subreddit_id FROM subreddit WHERE subreddit_name = %s"
  150. id_values = (post[rc.SUBREDDIT_KEY],)
  151. cursor.execute(id_query, id_values)
  152. new_id = cursor.next()[0]
  153. post[rc.SUBREDDIT_ID] = new_id
  154. self.logger.info(' - Inserted subreddits from page successfully')
  155.  
  156. def insert_posts(self, posts):
  157. if not self.conn.is_connected():
  158. self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
  159. return
  160. cursor = self.conn.cursor()
  161.  
  162. for post in posts:
  163. post_values = (post[rc.SUBREDDIT_ID],
  164. post[rc.POST_TITLE_KEY],
  165. post[rc.POST_REF_KEY],
  166. post[rc.COMMENTS_REF_KEY],
  167. post[rc.USER_KEY],
  168. post[rc.CREATED_TIMESTAMP_KEY])
  169. post_query = """
  170. INSERT IGNORE INTO post (subreddit_id, post_title, post_ref,
  171. comments_ref, username, created_time)
  172. VALUES (%s, %s, %s, %s, %s, %s)
  173. """
  174.  
  175. cursor.execute(post_query, post_values)
  176. self.conn.commit()
  177. new_id = cursor.lastrowid
  178.  
  179. if new_id == 0:
  180. id_query = """
  181. SELECT post_id
  182. FROM post
  183. WHERE subreddit_id = %s
  184. AND post_title = %s
  185. AND username = %s
  186. AND created_time = %s
  187. """
  188. id_values = (post[rc.SUBREDDIT_ID],
  189. post[rc.POST_TITLE_KEY],
  190. post[rc.USER_KEY],
  191. post[rc.CREATED_TIMESTAMP_KEY])
  192. cursor.execute(id_query, id_values)
  193. new_id = cursor.next()[0]
  194. post[rc.POST_ID] = new_id
  195.  
  196. post_history_values = (post[rc.POST_ID],
  197. post[rc.VOTE_KEY],
  198. post[rc.RANK_KEY])
  199.  
  200. post_history_query = """
  201. INSERT INTO post_history (post_id, votes, ranks, updated_time)
  202. (SELECT %s, %s, %s, NOW())
  203. """
  204.  
  205. cursor.execute(post_history_query, post_history_values)
  206. self.conn.commit()
  207. self.logger.info(' - Inserted posts from page successfully')
  208.  
  209. def insert_video_info(self, video_info):
  210. if not self.conn.is_connected():
  211. self.logger.Error('MySqlManager: You must initialize the connection to MySql')
  212. return
  213. cursor = self.conn.cursor()
  214.  
  215. video_info_values = (video_info[rc.POST_ID],
  216. video_info[rc.YOUTUBE_TITLE_KEY],
  217. video_info[rc.YOUTUBE_PUBLISHED_KEY],
  218. video_info[rc.YOUTUBE_VIEW_COUNT_KEY],
  219. video_info[rc.YOUTUBE_LIKE_KEY],
  220. video_info[rc.YOUTUBE_DISLIKE_KEY],
  221. video_info[rc.YOUTUBE_COMMENT_KEY])
  222.  
  223. video_info_query = """
  224. INSERT IGNORE INTO youtube_info (post_id,
  225. video_title,
  226. publish_date,
  227. view_count,
  228. like_count,
  229. dislike_count,
  230. comment_count)
  231. VALUES (%s, %s, %s, %s, %s, %s, %s)
  232. """
  233.  
  234. cursor.execute(video_info_query, video_info_values)
  235. self.conn.commit()
  236.  
  237. def insert_comments(self, comments, post):
  238. if not self.conn.is_connected():
  239. self.logger.Error('Error in MySqlManager: You must initialize the connection to MySql')
  240. return
  241. cursor = self.conn.cursor()
  242.  
  243. for comment in comments:
  244.  
  245. comment_values = (post[rc.POST_ID],
  246. post[rc.SUBREDDIT_ID],
  247. comment[rc.USER_KEY],
  248. comment[rc.CREATED_TIMESTAMP_KEY])
  249. comment_query = """
  250. INSERT IGNORE INTO comment (post_id, subreddit_id, username, created_time)
  251. VALUES (%s, %s, %s, %s)
  252. """
  253.  
  254. cursor.execute(comment_query, comment_values)
  255. self.conn.commit()
  256. new_id = cursor.lastrowid
  257. if new_id == 0:
  258. id_query = """
  259. SELECT comment_id
  260. FROM comment
  261. WHERE post_id = %s
  262. AND username = %s
  263. AND created_time = %s
  264. """
  265. id_values = (post[rc.POST_ID],
  266. comment[rc.USER_KEY],
  267. comment[rc.CREATED_TIMESTAMP_KEY])
  268. cursor.execute(id_query, id_values)
  269. new_id = cursor.next()[0]
  270. comment[rc.COMMENT_ID] = new_id
  271.  
  272. comment_history_values = (comment[rc.COMMENT_ID],
  273. comment[rc.VOTE_KEY],
  274. comment[rc.MESSAGE_KEY])
  275. comment_history_query = """
  276. INSERT INTO comment_history (comment_id, votes, message, updated_time)
  277. (SELECT %s, %s, %s, NOW())
  278. """
  279. cursor.execute(comment_history_query, comment_history_values)
  280. self.conn.commit()
  281. self.logger.info('Inserted comments from {} post successfully'.format(post[rc.POST_REF_KEY]))
  282.  
  283.  
  284. def mysql_test_suite():
  285. import csv
  286. with MysqlManager(rc.DATABASE_NAME,
  287. rc.DB_USERNAME,
  288. rc.DB_PASSWORD,
  289. logging.getLogger('test')) as mysql_test:
  290.  
  291. with open("testfiles/posts.csv", "r") as f:
  292. reader = csv.DictReader(f)
  293. posts = list(reader)
  294.  
  295. mysql_test.insert_subreddits(posts)
  296. mysql_test.insert_posts(posts)
  297.  
  298. for post_test in posts:
  299. assert post_test[rc.SUBREDDIT_ID] > 0
  300. assert post_test[rc.POST_ID] > 0
  301.  
  302. with open("testfiles/comments.csv", "r") as f:
  303. reader = csv.DictReader(f)
  304. comments = list(reader)
  305.  
  306. mysql_test.insert_comments(comments, posts[0])
  307.  
  308. for comment in comments:
  309. assert comment[rc.COMMENT_ID] > 0
  310.  
  311.  
  312. if __name__ == "__main__":
  313. mysql_test_suite()
  314. print("All mysql tests ran successfully")
Add Comment
Please, Sign In to add comment