Advertisement
Guest User

Untitled

a guest
Feb 16th, 2017
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.62 KB | None | 0 0
  1. """
  2. Created By Velocity-plus
  3. Github: https://github.com/Velocity-plus/
  4. """
  5. from threading import Thread
  6. from queue import Queue
  7. from time import time as timestamp, sleep
  8. import pymysql.cursors
  9.  
  10.  
  11. class ThreadedMySQL:
  12.  
  13.     def __init__(self):
  14.         # Is the thread running?
  15.         self.thread_status = False
  16.         # Regular Queue
  17.         self._r_queue = Queue()
  18.         # Prioitized Queue
  19.         self._p_queue = Queue()
  20.  
  21.         self.connection_method = 0
  22.  
  23.         # Show print messages?
  24.         self._debug = True
  25.  
  26.         self.wait = 0
  27.  
  28.     def wait(self, delay):
  29.         """
  30.        If you for some reason want to delay the queue
  31.  
  32.        :param delay: The delay in seconds
  33.        :return:
  34.        """
  35.         self.wait = delay
  36.  
  37.     def execute(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
  38.         """
  39.            This function cannot pass fetched data to the callback!
  40.  
  41.        :param query: The SQL query that you want to execute
  42.        :param args: If the SQL query have any args
  43.        :param callback: The callback for the query
  44.        :param data_pack: If you want to pass special information to the callback
  45.        :param prioritize: If you have large queues, prioritizing the query can make it skip the queue
  46.         before the rest of the queue is finished
  47.        :param get_info: If you want information about the query passed to the callback
  48.         (such as timestamp, query and prioritized)
  49.        :return:
  50.        """
  51.         # We need this later
  52.         query_type = 0
  53.  
  54.         # If callback = None assuming no data returned needed
  55.         if get_info:
  56.             get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}
  57.  
  58.         if not prioritize:
  59.             self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
  60.         else:
  61.             self._p_queue.put([query, args, callback, data_pack, get_info, query_type])
  62.  
  63.     def fetchone(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
  64.         """
  65.            This function both execute and fetch data, no need to execute before using this!
  66.  
  67.        :param query: The SQL query that you want to execute
  68.        :param args: If the SQL query have any args
  69.        :param callback: The callback for the query
  70.        :param data_pack: If you want to pass special information to the callback
  71.        :param prioritize: If you have large queues, prioritizing the query can make it skip the queue
  72.         before the rest of the queue is finished
  73.        :param get_info: If you want information about the query passed to the callback
  74.         (such as timestamp, query and prioritized)
  75.        :return:
  76.        """
  77.         query_type = 1
  78.         if get_info:
  79.             get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}
  80.  
  81.          # If callback = None assuming no data returned needed
  82.         if not prioritize:
  83.             self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
  84.         else:
  85.             self._p_queue.put([query, args, callback, data_pack, get_info, query_type])
  86.  
  87.     def fetchall(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
  88.         """
  89.          This function both execute and fetch data, no need to execute before using this!
  90.  
  91.        :param query: The SQL query that you want to execute
  92.        :param args: If the SQL query have any args
  93.        :param callback: The callback for the query
  94.        :param data_pack: If you want to pass special information to the callback
  95.        :param prioritize: If you have large queues, prioritizing the query can make it skip the queue
  96.         before the rest of the queue is finished
  97.        :param get_info: If you want information about the query passed to the callback
  98.         (such as timestamp, query and prioritized)
  99.        :return:
  100.        """
  101.         query_type = 2
  102.  
  103.         if get_info:
  104.             get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}
  105.  
  106.         # If callback = None assuming no data returned needed
  107.         if not prioritize:
  108.             self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
  109.         else:
  110.             self._p_queue.put([query, args, callback, data_pack, get_info, query_type])
  111.  
  112.     def complete_task(self, worker, prio=None):
  113.         query = worker[0]
  114.         args = worker[1]
  115.         callback = worker[2]
  116.         data_pack = worker[3]
  117.         get_info = worker[4]
  118.         query_type = worker[5]
  119.  
  120.         if get_info:
  121.             get_info['time'] = timestamp() - get_info['time']
  122.  
  123.         if args:
  124.             self.cursor.execute(query, args)
  125.         else:
  126.             self.cursor.execute(query)
  127.  
  128.         if query_type == 0:
  129.             if get_info:
  130.                 if callback:
  131.                     if data_pack:
  132.                         callback(data_pack, get_info)
  133.                     else:
  134.                         callback(get_info)
  135.             else:
  136.                 if callback:
  137.                     if data_pack:
  138.                         callback(data_pack)
  139.                     else:
  140.                         callback()
  141.         if query_type == 1:
  142.             data = self.cursor.fetchone()
  143.             if get_info:
  144.                 if callback:
  145.                     if data_pack:
  146.                         callback(data, data_pack, get_info)
  147.                     else:
  148.                         callback(data, get_info)
  149.             else:
  150.                 if callback:
  151.                     if data_pack:
  152.                         callback(data, data_pack)
  153.                     else:
  154.                         callback(data)
  155.  
  156.         if query_type == 2:
  157.             data = self.cursor.fetchall()
  158.             if get_info:
  159.                 if callback:
  160.                     if data_pack:
  161.                         callback(data, data_pack, get_info)
  162.                     else:
  163.                         callback(data, get_info)
  164.             else:
  165.                 if callback:
  166.                     if data_pack:
  167.                         callback(data, data_pack)
  168.                     else:
  169.                         callback(data)
  170.         if prio:
  171.             self._p_queue.task_done()
  172.         else:
  173.             self._r_queue.task_done()
  174.  
  175.  
  176.  
  177.     def _threader(self):
  178.         while self.thread_status:
  179.             if self.wait:
  180.                 sleep(self.wait)
  181.  
  182.             if self._p_queue.empty():
  183.                 worker = self._r_queue.get()
  184.                 self.complete_task(worker, prio=False)
  185.  
  186.             else:
  187.                 worker = self._p_queue.get()
  188.                 self.complete_task(worker, prio=True)
  189.  
  190.     def _start_thread(self):
  191.         # Creates the thread
  192.         self.t = Thread(target=self._threader)
  193.         self.t.daemon = True
  194.         self.t.start()
  195.  
  196.     def handlequeue_start(self):
  197.         """
  198.        This handles the queue, should be stopped on unload
  199.        :return:
  200.        """
  201.         # Starts the queue
  202.         self.thread_status = True # This must be true before the thread can loop
  203.         self._start_thread()
  204.  
  205.     def handlequeue_stop(self):
  206.         """
  207.        This stops the queue for being processed, while a connection still might be open
  208.         no queries can be executed.
  209.        :return:
  210.        """
  211.         self.thread_status = False
  212.  
  213.     def queue_size(self):
  214.         """
  215.        :return: Returns the size of the queue
  216.        """
  217.         return self._r_queue.qsize() + self._p_queue.qsize()
  218.  
  219.     def connect(self, host, user, password, db, charset, cursorclass=pymysql.cursors.DictCursor):
  220.         """
  221.        Checkout PyMYSQL documentation for complete walkthrough
  222.        """
  223.         try:
  224.             self.connection = pymysql.connect(host=host,
  225.                                               user=user,
  226.                                               password=password,
  227.                                               db=db,
  228.                                               charset=charset,
  229.                                               cursorclass=cursorclass)
  230.             self.cursor = self.connection.cursor()
  231.             if self._debug:
  232.                 print('threaded_mysql: [SUCCES] connection was succesfully established.')
  233.  
  234.             self.connection_method = 1
  235.         except:
  236.             if self._debug:
  237.                 print('threaded_mysql: [ERROR] Not possible to make a connection.')
  238.  
  239.     def connect_use(self, connection):
  240.         """
  241.        If you created your connection elsewhere in your code, you can pass it to Threaded MySQL
  242.        :param connection: Your connection socket
  243.        :return:
  244.        """
  245.         try:
  246.             self.connection = connection
  247.             self.cursor = self.connection.cursor()
  248.             if self._debug:
  249.                 print('threaded_mysql: [SUCCES] Cursor created succesfully for your connection.')
  250.             self.connection_method = 2
  251.         except:
  252.             if self._debug:
  253.                 print('threaded_mysql: [ERROR] Not possible to create cursor.')
  254.  
  255.     def commit(self):
  256.         """
  257.        Regular pymysql commit
  258.        :return:
  259.        """
  260.         self.connection.commit()
  261.  
  262.     def close(self, finish_queue_before_close=False):
  263.         """
  264.        Closes the mysql connection
  265.        :param finish_queue_before_close: Finishes the queue before it terminates the connection
  266.        :return:
  267.        """
  268.         if finish_queue_before_close:
  269.             while self.queue_size() > 0:
  270.                 pass
  271.             else:
  272.                 self.connection.close()
  273.         else: self.connection.close()
  274.  
  275.  
  276. TSQL = ThreadedMySQL()
  277.  
  278. TSQL.connect()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement