Advertisement
Guest User

Untitled

a guest
Sep 6th, 2017
311
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.57 KB | None | 0 0
  1. class Decoders(threading.Thread):
  2. def __init__(self, queue):
  3. threading.Thread.__init__(self)
  4. self.queue = queue
  5.  
  6. def run(self):
  7. self.decode()
  8.  
  9. def decode(self):
  10. queue = self.queue
  11. db = Database()
  12. while queue.qsize() > 0:
  13. # calling db methods, just an example
  14. temp = queue.get()
  15. db.select_records()
  16. db.insert_record(temp)
  17.  
  18. Decoders(queue).start()
  19. Decoders(queue).start()
  20.  
  21. class Database:
  22. db = object
  23. cursor = object
  24.  
  25. def __init__(self):
  26. self.db = connect(host=conf_hostname,
  27. database=conf_dbname,
  28. user=conf_dbuser,
  29. password=conf_dbpass,
  30. port=conf_dbport)
  31. self.db.autocommit = True
  32. self.cursor = self.db.cursor()
  33.  
  34. def select_records(self):
  35. self.cursor.execute(simple select)
  36. return self.cursor.fetchall()
  37.  
  38.  
  39. def insert_record(self, temp):
  40. # insert query
  41.  
  42. #!/usr/bin/env python
  43.  
  44. import psycopg2
  45. from config import config
  46. from psycopg2.pool import ThreadedConnectionPool
  47. from multiprocessing import Process
  48. import time
  49. import threading
  50. from multiprocessing import Queue
  51.  
  52. data_queque = Queue() # reader reads data from queue
  53.  
  54. SELECT_QUERY = 'Select something from some_table limit %s offset %s ';
  55.  
  56. INSERT_QUERY = "Insert INTO sometable (col1, col2, col3) values "
  57.  
  58. # writer write data to queue
  59. class PsqlMultiThreadExample(object):
  60. _select_conn_count = 10;
  61. _insert_conn_count = 10;
  62. _insert_conn_pool = None;
  63. _select_conn_pool = None;
  64.  
  65. def __init__(self):
  66. self = self;
  67.  
  68. def postgres_connection(self):
  69. """ Connect to the PostgreSQL database server """
  70. conn = None
  71. try:
  72. # read connection parameters
  73. params = config()
  74.  
  75. # connect to the PostgreSQL server
  76. print('Connecting to the PostgreSQL database...')
  77. conn = psycopg2.connect(**params)
  78.  
  79. # create a cursor
  80. cur = conn.cursor()
  81.  
  82. # execute a statement
  83. print('PostgreSQL database version:')
  84. cur.execute('SELECT version()')
  85.  
  86. # display the PostgreSQL database server version
  87. db_version = cur.fetchone()
  88. print(db_version)
  89.  
  90. # close the communication with the PostgreSQL
  91. cur.close()
  92. except (Exception, psycopg2.DatabaseError) as error:
  93. print(error)
  94. finally:
  95. if conn is not None:
  96. conn.close()
  97. print('Database connection closed.')
  98.  
  99. def check_connection(self):
  100. """ Checking the postgres database connection"""
  101. conn = None;
  102. try:
  103. conn = PsqlMultiThreadExample._select_conn_pool.getconn()
  104.  
  105. # create a cursor
  106. cur = conn.cursor()
  107.  
  108. # execute a statement
  109. print('PostgreSQL database version:')
  110. cur.execute('SELECT version()')
  111.  
  112. # display the PostgreSQL database server version
  113. db_version = cur.fetchone()
  114. print(db_version)
  115. # close the communication with the PostgreSQL
  116. cur.close()
  117. except (Exception, psycopg2.DatabaseError) as error:
  118. print(error)
  119. finally:
  120. if conn is not None:
  121. conn.close()
  122. print('Database connection closed.')
  123.  
  124. def create_connection_pool(self):
  125. """ Create the thread safe threaded postgres connection pool"""
  126.  
  127. # calculate the max and min connection required
  128. max_conn = PsqlMultiThreadExample._insert_conn_count + PsqlMultiThreadExample._select_conn_count;
  129. min_conn = max_conn / 2;
  130. params = config()
  131.  
  132. # creating separate connection for read and write purpose
  133. PsqlMultiThreadExample._insert_conn_pool = PsqlMultiThreadExample._select_conn_pool
  134. = ThreadedConnectionPool(min_conn, max_conn, **params);
  135.  
  136. def read_data(self):
  137. """
  138. This read thedata from the postgres and shared those records with each
  139. processor to perform their operation using threads
  140. Here we calculate the pardition value to help threading to read data from database
  141.  
  142. :return:
  143. """
  144. pardition_value = 805000 / 10; # Its total record
  145. # this helps to identify the starting number to get data from db
  146. start_index = 1
  147. for pid in range(1, 11):
  148. # Getting connection from the connection pool
  149. select_conn = PsqlMultiThreadExample._select_conn_pool.getconn();
  150. insert_conn = PsqlMultiThreadExample._insert_conn_pool.getconn();
  151. #setting auto commit true
  152. insert_conn.autocommit = 1;
  153. # insert_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
  154.  
  155. #Creating 10 process to perform the operation
  156. ps = Process(target=self.process_data, args=(data_queque, pid, (start_index - 1) * pardition_value,
  157. start_index * pardition_value, select_conn, insert_conn))
  158. start_index = start_index + 1;
  159. ps.daemon = True;
  160. ps.start();
  161. _start = time.time()
  162. ps.join()
  163. print "Process %s took %s seconds" % (pid, (time.time() - _start))
  164.  
  165. def process_data(self, queue, pid, start_index, end_index, select_conn, insert_conn):
  166. """
  167. Here we process the each process into 10 multiple threads to do data process
  168.  
  169. :param queue:
  170. :param pid:
  171. :param start_index:
  172. :param end_index:
  173. :param select_conn:
  174. :param insert_conn:
  175. :return:
  176. """
  177. print "n";
  178. print(" n Started processing record from %s to %s" % (start_index, end_index))
  179. pardition_value = (end_index - start_index) / 10;
  180. for tid in range(1, 11):
  181. ins_cur = insert_conn.cursor();
  182. worker = threading.Thread(target=self.process_thread, args=(
  183. queue, pid, tid, start_index, (start_index + pardition_value), select_conn.cursor(), ins_cur,
  184. threading.Lock()))
  185. start_index = start_index + pardition_value;
  186. worker.daemon = True;
  187. worker.start();
  188. worker.join()
  189.  
  190. def process_thread(self, queue, pid, tid, start_index, end_index, sel_cur, ins_cur, lock):
  191. """
  192. Thread read data from database and doing the elatic search to get
  193. experience have the same data
  194.  
  195. :param queue:
  196. :param pid:
  197. :param tid:
  198. :param start_index:
  199. :param end_index:
  200. :param sel_cur:
  201. :param ins_cur:
  202. :param lock:
  203. :return:
  204. """
  205. limit = end_index - start_index;
  206. sel_cur.execute(SELECT_QUERY, (limit, start_index,))
  207. rows = sel_cur.fetchall();
  208.  
  209.  
  210. records.append(ins_cur.mogrify("(%s,%s,%s)", (row[0], row[1], row[2],)));
  211.  
  212.  
  213.  
  214. self.write_data(records, ins_cur, lock)
  215.  
  216. def write_data(self, records, ins_cur, lock):
  217. """
  218. Insert the data with experience id
  219.  
  220. :param records:
  221. :param ins_cur:
  222. :param lock:
  223. :return:
  224. """
  225.  
  226. lock.acquire()
  227. if records and records != '':
  228. ins_cur.execute(INSERT_QUERY + records)
  229. lock.release()
  230.  
  231.  
  232. if __name__ == '__main__':
  233. _start = time.time()
  234. cmp_clener = PsqlMultiThreadExample();
  235. #Craeting database connection pool to help connection shared along process
  236. cmp_clener.create_connection_pool()
  237. cmp_clener.read_data();
  238. print('Total Processing time %s seconds' % (time.time() - _start))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement