Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from tkinter import *
- from tkinter import messagebox
- import asyncio
- import random
- def do_freezed():
- """ Button-Event-Handler to see if a button on GUI works. """
- messagebox.showinfo(message='Tkinter is reacting.')
- def do_tasks():
- """ Button-Event-Handler starting the asyncio part. """
- loop = asyncio.get_event_loop()
- try:
- loop.run_until_complete(do_urls())
- finally:
- loop.close()
- async def one_url(url):
- """ One task. """
- sec = random.randint(1, 15)
- await asyncio.sleep(sec)
- return 'url: {}tsec: {}'.format(url, sec)
- async def do_urls():
- """ Creating and starting 10 tasks. """
- tasks = [
- one_url(url)
- for url in range(10)
- ]
- completed, pending = await asyncio.wait(tasks)
- results = [task.result() for task in completed]
- print('n'.join(results))
- if __name__ == '__main__':
- root = Tk()
- buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
- buttonT.pack()
- buttonX = Button(master=root, text='Freezed???', command=do_freezed)
- buttonX.pack()
- root.mainloop()
- from tkinter import *
- from tkinter import messagebox
- import asyncio
- import threading
- import random
- def _asyncio_thread(async_loop):
- async_loop.run_until_complete(do_urls())
- def do_tasks(async_loop):
- """ Button-Event-Handler starting the asyncio part. """
- threading.Thread(target=_asyncio_thread, args=(async_loop,)).start()
- async def one_url(url):
- """ One task. """
- sec = random.randint(1, 8)
- await asyncio.sleep(sec)
- return 'url: {}tsec: {}'.format(url, sec)
- async def do_urls():
- """ Creating and starting 10 tasks. """
- tasks = [one_url(url) for url in range(10)]
- completed, pending = await asyncio.wait(tasks)
- results = [task.result() for task in completed]
- print('n'.join(results))
- def do_freezed():
- messagebox.showinfo(message='Tkinter is reacting.')
- def main(async_loop):
- root = Tk()
- Button(master=root, text='Asyncio Tasks', command= lambda:do_tasks(async_loop)).pack()
- buttonX = Button(master=root, text='Freezed???', command=do_freezed).pack()
- root.mainloop()
- if __name__ == '__main__':
- async_loop = asyncio.get_event_loop()
- main(async_loop)
- """Proof of concept: integrate tkinter, asyncio and async iterator.
- Terry Jan Reedy, 2016 July 25
- """
- import asyncio
- from random import randrange as rr
- import tkinter as tk
- class App(tk.Tk):
- def __init__(self, loop, interval=1/120):
- super().__init__()
- self.loop = loop
- self.protocol("WM_DELETE_WINDOW", self.close)
- self.tasks = []
- self.tasks.append(loop.create_task(self.rotator(1/60, 2)))
- self.tasks.append(loop.create_task(self.updater(interval)))
- async def rotator(self, interval, d_per_tick):
- canvas = tk.Canvas(self, height=600, width=600)
- canvas.pack()
- deg = 0
- color = 'black'
- arc = canvas.create_arc(100, 100, 500, 500, style=tk.CHORD,
- start=0, extent=deg, fill=color)
- while await asyncio.sleep(interval, True):
- deg, color = deg_color(deg, d_per_tick, color)
- canvas.itemconfigure(arc, extent=deg, fill=color)
- async def updater(self, interval):
- while True:
- self.update()
- await asyncio.sleep(interval)
- def close(self):
- for task in self.tasks:
- task.cancel()
- self.loop.stop()
- self.destroy()
- def deg_color(deg, d_per_tick, color):
- deg += d_per_tick
- if 360 <= deg:
- deg %= 360
- color = '#%02x%02x%02x' % (rr(0, 256), rr(0, 256), rr(0, 256))
- return deg, color
- loop = asyncio.get_event_loop()
- app = App(loop)
- loop.run_forever()
- loop.close()
- from tkinter import *
- from tkinter import messagebox
- import asyncio
- import random
- def do_freezed():
- """ Button-Event-Handler to see if a button on GUI works. """
- messagebox.showinfo(message='Tkinter is reacting.')
- def do_tasks():
- """ Button-Event-Handler starting the asyncio part. """
- loop = asyncio.get_event_loop()
- try:
- loop.run_until_complete(do_urls())
- finally:
- loop.close()
- async def one_url(url):
- """ One task. """
- sec = random.randint(1, 15)
- root.update_idletasks() # ADDED: Allow tkinter to update gui.
- await asyncio.sleep(sec)
- return 'url: {}tsec: {}'.format(url, sec)
- async def do_urls():
- """ Creating and starting 10 tasks. """
- tasks = [one_url(url) for url in range(10)]
- completed, pending = await asyncio.wait(tasks)
- results = [task.result() for task in completed]
- print('n'.join(results))
- if __name__ == '__main__':
- root = Tk()
- buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
- buttonT.pack()
- buttonX = Button(master=root, text='Freezed???', command=do_freezed)
- buttonX.pack()
- root.mainloop()
- import threading
- from functools import partial
- from tkinter import *
- from tkinter import messagebox
- import asyncio
- import random
- # Please wrap all this code in a nice App class, of course
- def _run_aio_loop(loop):
- asyncio.set_event_loop(loop)
- loop.run_forever()
- aioloop = asyncio.new_event_loop()
- t = threading.Thread(target=partial(_run_aio_loop, aioloop))
- t.daemon = True # Optional depending on how you plan to shutdown the app
- t.start()
- buttonT = None
- def do_freezed():
- """ Button-Event-Handler to see if a button on GUI works. """
- messagebox.showinfo(message='Tkinter is reacting.')
- def do_tasks():
- """ Button-Event-Handler starting the asyncio part. """
- buttonT.configure(state=DISABLED)
- asyncio.run_coroutine_threadsafe(do_urls(), aioloop)
- async def one_url(url):
- """ One task. """
- sec = random.randint(1, 3)
- # root.update_idletasks() # We can delete this now
- await asyncio.sleep(sec)
- return 'url: {}tsec: {}'.format(url, sec)
- async def do_urls():
- """ Creating and starting 10 tasks. """
- tasks = [one_url(url) for url in range(3)]
- completed, pending = await asyncio.wait(tasks)
- results = [task.result() for task in completed]
- print('n'.join(results))
- buttonT.configure(state=NORMAL) # Tk doesn't seem to care that this is called on another thread
- if __name__ == '__main__':
- root = Tk()
- buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
- buttonT.pack()
- buttonX = Button(master=root, text='Freezed???', command=do_freezed)
- buttonX.pack()
- root.mainloop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement