Advertisement
Guest User

Untitled

a guest
Oct 6th, 2017
107
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.67 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Fri Jun 30 14:24:35 2017
  4. Wrappers for connecting to database
  5. @author: james.toh
  6. """
  7.  
  8.  
  9. # import sys
  10. class mysql_wrapper:
  11.  
  12. def __init__(self, database, host, user, password):
  13.  
  14. self.db = database
  15. self.host = host
  16. self.user = user
  17. self.password = password
  18. try:
  19. self.pymysql = __import__('pymysql.cursors')
  20. self.pd = __import__('pandas')
  21. except BaseException as e:
  22. raise(e)
  23.  
  24. try:
  25. print("Testing mySQL connection...", end=' ')
  26. self.connect()
  27. except BaseException as e:
  28. print("Error")
  29. raise(e)
  30. else:
  31. print("OK")
  32. self.close()
  33.  
  34. def connect(self, local_infile = False):
  35. try:
  36. self.conn = self.pymysql.connect(host=self.host,
  37. user=self.user,
  38. password=self.password,
  39. db=self.db,
  40. charset='utf8mb4',
  41. cursorclass=self.pymysql.cursors.DictCursor,
  42. local_infile = local_infile)
  43. self.cur = self.conn.cursor()
  44. except BaseException as e:
  45. raise(e)
  46.  
  47. def get_data(self, sql):
  48. return self.getData(sql)
  49.  
  50. def getData(self, sql):
  51. result = None
  52. try:
  53. self.cur.execute(sql)
  54. result = self.cur.fetchall()
  55. except BaseException as e:
  56. raise (e)
  57.  
  58. if len(result) == 0:
  59. return None
  60. return self.pd.DataFrame(result)
  61.  
  62. def execute(self, sql):
  63. try:
  64. self.cur.execute(sql)
  65. except BaseException as e:
  66. raise(e)
  67. return True
  68.  
  69. def close(self):
  70. try:
  71. self.conn.close()
  72. self.cur.close()
  73. except:
  74. pass
  75.  
  76. class gp_wrapper:
  77.  
  78. def __init__(self, database, host, port, user, password):
  79.  
  80. try:
  81. self.psycopg2 = __import__('psycopg2')
  82. self.pd = __import__('pandas')
  83. #self.psql = __import__('pandas.io.sql')
  84. except BaseException as e:
  85. print(e)
  86. raise BaseException("Error importing packages")
  87.  
  88. self.database = database
  89. self.host = host
  90. self.port = port
  91. self.user = user
  92. self.password = password
  93. self.conn_str = """dbname='{database}' user='{user}' host='{host}' port='{port}' password='{password}'""".format(
  94. database=self.database,
  95. host=self.host,
  96. port=self.port,
  97. user=self.user,
  98. password=self.password
  99. )
  100. try:
  101. print("Testing GP connection...", end=' ')
  102. self.connect()
  103. except BaseException as e:
  104. print("Error!")
  105. raise(e)
  106. else:
  107. print("OK")
  108. self.close()
  109.  
  110. def connect(self):
  111. self.conn = self.psycopg2.connect(self.conn_str)
  112. self.cur = self.conn.cursor()
  113.  
  114. def close(self):
  115. self.cur.close()
  116. self.conn.close()
  117.  
  118. def commit(self):
  119. self.conn.commit()
  120.  
  121. def rollback(self):
  122. self.conn.rollback()
  123.  
  124. def get_data_chunk(self, sql_str, chunk_size = 5000):
  125. df_main = None
  126. is_conn_open = True; ## Close the connection later if it is not already opened
  127. if self.conn.closed == 1:
  128. self.connect()
  129. is_conn_open= False;
  130.  
  131. try:
  132. # x = self.psql.read_sql(sql_str, self.conn)
  133. self.cur.execute(sql_str)
  134. df_main = self.pd.DataFrame(self.cur.fetchmany(chunk_size))
  135. df_main.columns = [i[0] for i in self.cur.description]
  136. counter = 1
  137. while True:
  138. df_temp = self.pd.DataFrame(self.cur.fetchmany(chunk_size))
  139. if len(df_temp) == 0:
  140. break;
  141. else:
  142. counter += 1
  143. print("Getting chunk: %s" % str(counter))
  144. df_main = df_main.append(df_temp)
  145. del df_temp
  146.  
  147. except BaseException as e:
  148. raise(e)
  149. finally:
  150. if not is_conn_open:
  151. self.close()
  152. return df_main
  153.  
  154. def get_data(self, sql_str):
  155. return self.getData(sql_str)
  156.  
  157. def getData(self, sql_str):
  158. x = None
  159. is_conn_open = True; ## Close the connection later if it is not already opened
  160. if self.conn.closed == 1:
  161. self.connect()
  162. is_conn_open= False;
  163. try:
  164. # x = self.psql.read_sql(sql_str, self.conn)
  165. self.cur.execute(sql_str)
  166. x = self.cur.fetchall()
  167. x = self.pd.DataFrame(x)
  168. if len(x) == 0:
  169. return None
  170. else:
  171. x.columns = [i[0] for i in self.cur.description]
  172. except BaseException as e:
  173. raise(e)
  174. finally:
  175. if not is_conn_open:
  176. self.close()
  177. return x
  178.  
  179. def execute(self,sql_str):
  180. is_conn_open = True; ## Close the connection later if it is not already opened
  181. if self.conn.closed == 1:
  182. self.connect()
  183. is_conn_open= False;
  184. try:
  185. self.cur.execute(sql_str)
  186. except BaseException as e:
  187. raise BaseException(e)
  188. finally:
  189. if not is_conn_open:
  190. self.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement