Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #
- # Copyright (C) 2016 Jason Gray <jasonlevigray3@gmail.com>
- #
- #This program is free software: you can redistribute it and/or modify it
- #under the terms of the GNU General Public License version 3, as published
- #by the Free Software Foundation.
- #
- #This program is distributed in the hope that it will be useful, but
- #WITHOUT ANY WARRANTY; without even the implied warranties of
- #MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
- #PURPOSE. See the GNU General Public License for more details.
- #
- #You should have received a copy of the GNU General Public License along
- #with this program. If not, see <http://www.gnu.org/licenses/>.
- ### END LICENSE
- import threading
- import queue
- import time
- import random
- import gi
- gi.require_version('Gtk', '3.0')
- from gi.repository import Gtk, GLib, Gio
- class Result:
- def __init__(self, condition, result):
- self._result = result
- self._condition = condition
- def release(self):
- with self._condition:
- self._condition.notify()
- return self._result
- class Worker(threading.Thread):
- def __init__(self):
- super().__init__()
- self.queue = queue.PriorityQueue()
- self.fifo_priority = 0
- self.daemon = True
- self.start()
- def run(self):
- while True:
- priority, _, f, args, kwargs, on_done, cancellable = self.queue.get()
- if not cancellable.is_cancelled():
- condition = threading.Condition()
- result = Result(condition, f(*args, **kwargs))
- if on_done is not None:
- GLib.idle_add(on_done, result, priority=priority)
- with condition:
- condition.wait()
- def queue_task(self, priority, f, args, kwargs, on_done, cancellable):
- self.fifo_priority += 1
- self.queue.put_nowait((priority, self.fifo_priority, f, args, kwargs, on_done, cancellable))
- worker = Worker()
- def GLib_async_queue(on_done=None, priority=GLib.PRIORITY_DEFAULT_IDLE):
- def wrapper(f):
- def run(*args, **kwargs):
- cancellable = Gio.Cancellable()
- worker.queue_task(priority, f, args, kwargs, on_done, cancellable)
- return cancellable
- return run
- return wrapper
- class QueueTest(Gtk.Window):
- def __init__(self):
- Gtk.Window.__init__(self, title='Queue Test')
- self.time_sec = 0
- self.start_time = None
- self.canceled_call_counter = 0
- self.call_order_counter = 0
- self.return_order_counter = 0
- self.complete_message = []
- self.set_default_size(350, -1)
- self.set_resizable(False)
- self.set_border_width(10)
- self.headerbar = Gtk.HeaderBar()
- self.headerbar.set_show_close_button(True)
- self.headerbar.set_title('Queue Test')
- self.headerbar.set_subtitle('00:00:00')
- self.set_titlebar(self.headerbar)
- vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10)
- self.add(vbox)
- self.button = Gtk.Button.new_with_label('Get Queue Test Results')
- self.button.connect('clicked', self.async_queue_test)
- vbox.pack_start(self.button, False, False, 0)
- self.label = Gtk.Label()
- vbox.pack_start(self.label, True, True, 0)
- GLib.timeout_add(1000, self.update_time)
- def async_queue_test(self, *ignore):
- self.button.set_sensitive(False)
- self.start_time = None
- self.canceled_call_counter = 0
- self.call_order_counter = 0
- self.return_order_counter = 0
- self.complete_message = []
- self.label.set_label('')
- self.complete_message = []
- def get_message(result):
- priority, order_called = result.release()
- self.return_order_counter += 1
- message = '{} priority: Call order {}, return order {}.'.format(priority, order_called, self.return_order_counter)
- self.complete_message.append(message)
- if self.return_order_counter == (24 - self.canceled_call_counter):
- should_have_took = (24 - self.canceled_call_counter) * 0.25
- time_took = time.time() - self.start_time
- done_message = '\nQueued 24 random priority 0.25 sec tasks.\nRandomly cancelled {} of them.'.format(self.canceled_call_counter)
- time_message = '{}\nShould have taken about {} secs.\nActually took {} secs.'.format(done_message, should_have_took, time_took)
- self.complete_message.append(time_message)
- self.button.set_sensitive(True)
- label_text = '\n'.join(self.complete_message)
- self.label.set_label(label_text)
- @GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_HIGH)
- def say_high(order_called):
- time.sleep(0.25)
- return 'High', order_called
- @GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_DEFAULT)
- def say_default(order_called):
- time.sleep(0.25)
- return 'Default', order_called
- @GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_DEFAULT_IDLE)
- def say_idle(order_called):
- time.sleep(0.25)
- return 'Idle', order_called
- @GLib_async_queue(on_done=get_message, priority=GLib.PRIORITY_LOW)
- def say_low(order_called):
- time.sleep(0.25)
- return 'Low', order_called
- def low():
- return say_low(self.call_order_counter)
- def idle():
- return say_idle(self.call_order_counter)
- def default():
- return say_default(self.call_order_counter)
- def high():
- return say_high(self.call_order_counter)
- called = []
- self.start_time = time.time()
- for call in (random.choice((high, default, idle, low)) for i in range(24)):
- self.call_order_counter += 1
- test_call = call()
- called.append(test_call)
- for i in range(12):
- random_call = random.choice(called)
- if not random_call.is_cancelled():
- self.canceled_call_counter += 1
- random_call.cancel()
- def update_time(self):
- self.time_sec += 1
- time_int = self.time_sec
- s = time_int % 60
- time_int //= 60
- m = time_int % 60
- time_int //= 60
- h = time_int
- self.headerbar.set_subtitle('{:02d}:{:02d}:{:02d}'.format(h, m, s))
- return True
- if __name__ == '__main__':
- win = QueueTest()
- win.connect('delete-event', Gtk.main_quit)
- win.show_all()
- Gtk.main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement