Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import tkinter as tk
- import tkinter.ttk as ttk
- import asyncio
- import operator
- import json
- import logging
- from datetime import datetime as dt, timedelta as td
- DELAULT_TABLE_IDENTITY = 2
- def human_readable(size:str):
- sizing = {
- range(0, 4): ('B', 1),
- range(4, 7): ('KB', 10**3),
- range(7, 10): ('MB', 10**6),
- range(10, 13): ('GB', 10**9)
- }
- lenght = len(str(size))
- for scope, notation in sizing.items():
- if lenght in scope:
- size = size/notation[1]
- size = round(size, 2)
- return '{}{}'.format(size, notation[0])
- class YggdrasilAPI():
- #__slots__ = ['addr', 'stats', '__callbacks']
- def __init__(self, address='localhost', port=9001, unixsock='/var/run/yggdrasil.sock'):
- self.addr = address, port
- self.stats = {}
- self.__init_trackers()
- asyncio.ensure_future(self.getSelf())
- async def _send_request(self, method:str, **kwargs)->dict:
- req = {'request': method, **kwargs}
- reader, writer = await asyncio.open_connection(*self.addr)
- writer.write(json.dumps(req).encode())
- response = json.loads(await reader.read())
- writer.close()
- await writer.wait_closed()
- logging.info('\n[{}] {}:{} {} request'.format(dt.now(), *self.addr, method))
- logging.debug('Response: {}'.format(response))
- if not response['status']=='success':
- raise Exception(response)
- self.stats.update(response['response'])
- return response['response']
- def _preprocess(self, **kwargs):
- if 'bytes_sent' in kwargs:
- kwargs['bytes_sent'] = human_readable(kwargs['bytes_sent'])
- if 'bytes_recvd' in kwargs:
- kwargs['bytes_recvd'] = human_readable(kwargs['bytes_recvd'])
- if 'proto' in kwargs and 'endpoint' in kwargs:
- kwargs['faddr'] = '{}://{}'.format(kwargs['proto'], kwargs['endpoint'])
- if 'uptime' in kwargs:
- sec = round(kwargs['uptime'])
- kwargs['uptime'] = str(td(seconds=sec))
- if 'last_seen' in kwargs:
- sec = round(kwargs['last_seen'])
- date = dt.now() - td(seconds=sec)
- kwargs['last_seen'] = date.strftime("%d/%m/%y %H:%M:%S")
- return kwargs
- #@self.callbacks_caller()
- async def getSelf(self)->dict:
- raw = await self._send_request('getSelf')
- response:dict = raw['self']
- addr, params = tuple(response.items())[0]
- return dict(addr=addr, **params)
- #@self.callbacks_caller()
- async def getPeers(self)->dict:
- raw = await self._send_request('getPeers')
- response = raw['peers']
- return [self._preprocess(addr=addr, **params) for addr, params in response.items()]
- #@self.callbacks_caller()
- async def getDHT(self)->dict:
- raw = await self._send_request('getDHT')
- response = raw['dht']
- return [self._preprocess(addr=addr, **params) for addr, params in response.items()]
- #@self.callbacks_caller()
- async def getSwitchPeers(self)->dict:
- raw = await self._send_request('getSwitchPeers')
- response = raw['switchpeers']
- return [self._preprocess(id=_id, **speer) for _id, speer in response.items()]
- #@self.callbacks_caller()
- async def getSessions(self)->dict:
- raw = await self._send_request('getSessions')
- response = raw['sessions']
- return [self._preprocess(addr=addr, **params) for addr, params in response.items()]
- #@self.callbacks_caller()
- async def getAllowedEncryptionPublicKeys(self)->list:
- raw = await self._send_request('getAllowedEncryptionPublicKeys')
- return raw['allowed_box_pubs']
- #@self.callbacks_caller()
- def getTransitTrafic(self):
- return {
- 'sent': sum(node['bytes_sent'] for node in self.stats['switchpeers'].values()),
- 'recvd': sum(node['bytes_recvd'] for node in self.stats['switchpeers'].values())
- }
- def getNumOfNodes(self):
- return len(self.stats['switchpeers'])
- def add_done_callback(self, method, cb):
- self.__callbacks[method].append[cb]
- return len(self.__callbacks[method]) - 1
- def del_done_callback(self, method, cb=None):
- try:
- self.__callbacks[method].remove(cb)
- except ValueError:
- pass
- def del_done_callback_by_id(self, method, cb_id):
- try:
- del self.__callbacks[method][cb_id]
- except IndexError:
- pass
- def __init_trackers(self):
- methods = ('getAllowedEncryptionPublicKeys', 'getDHT', 'getPeers', 'getSelf', 'getSessions', 'getSwitchPeers')
- self.__callbacks = {method: [] for method in methods}
- for method_name in methods:
- wrapper = self.done_tracker()
- wrapped = wrapper(getattr(self, method_name))
- setattr(self, method_name, wrapped)
- def done_tracker(self):
- def decorator(function):
- def wrapped(*args, **kwargs):
- result = function(*args, **kwargs)
- callbacks = self.__callbacks.get(function, [])
- for callback in callbacks:
- callback(result)
- return result
- return wrapped
- return decorator
- class Table(tk.Frame):
- __slots__ = []
- def __init__(
- self, parent=None, headings=(), unique_heading=None, params=(),
- updater=None, updater_exc_handler=None, update_delay=5,
- identity_lvl=DELAULT_TABLE_IDENTITY,
- ):
- super().__init__(parent)
- table = ttk.Treeview(self, show="headings", selectmode="extended")
- table["columns"]=headings
- table["displaycolumns"]=headings
- for head in headings:
- table.heading(head, text=head, anchor=tk.CENTER, command= self.sort_column(head))
- table.column(head, anchor=tk.CENTER, width=150)
- yscrolltable = tk.Scrollbar(self, command=table.yview)
- xscrolltable = tk.Scrollbar(self, command=table.xview, orient=tk.HORIZONTAL)
- table.configure(yscrollcommand=yscrolltable.set)
- table.configure(xscrollcommand=xscrolltable.set)
- yscrolltable.pack(side=tk.RIGHT, fill=tk.Y)
- xscrolltable.pack(side=tk.BOTTOM, fill=tk.X)
- table.place(relheight=1, relwidth=1, relx=0, rely=0)
- self.tv = table
- self.uniques = {}
- if unique_heading:
- self.uh_index = headings.index(unique_heading)
- self.names = params
- self._paused = False
- self._ident_lvl = identity_lvl
- if updater:
- self._updater = updater
- asyncio.ensure_future(self.updater(updater, update_delay))
- self.upd_exc_handler = updater_exc_handler if updater_exc_handler else lambda *args, **kwargs: None
- def add_row(self, row):
- new_id = self.tv.insert('', tk.END, values=tuple(row))
- self.uniques[row[self.uh_index]] = new_id
- def add_rows(self, rows):
- for row in rows:
- self.add_row(row)
- async def updater(self, updater, delay:int):
- updater = asyncio.coroutine(updater)
- while True:
- while not self._paused:
- try:
- raw = await updater()
- except Exception as e:
- #print('{}: {}'.format(e.__class__.__name__, str(e)))
- self.upd_exc_handler(e)
- else:
- rows = ((item[name] for name in self.names) for item in raw)
- self.update_table(rows, self._ident_lvl)
- finally:
- await asyncio.sleep(delay)
- await asyncio.sleep(delay)
- async def update_nowait(self):
- self.pause()
- raw = await asyncio.coroutine(self._updater())
- rows = ((item[name] for name in self.names) for item in raw)
- self.update_table(rows, self._ident_lvl)
- self.run()
- def pause(self):
- self._paused = True
- def run(self):
- self._paused = False
- def bind_menu(self, menu:tk.Menu):
- def call_menu(event: tk.Event):
- menu.post(event.x_root, event.y_root)
- self.tv.bind("<Button-3>", call_menu)
- def delete_selected(self, event:tk.Event):
- pass
- def clean(self):
- parent = ''
- items = self.tv.get_children(parent)
- self.tv.delete(*items)
- def _similarity(self, one, other):
- return sum(operator.eq(ones, others) for ones, others in zip(one, other))
- def _find_similar(self, row, identity_lvl):
- for iid in self.tv.get_children(''):
- item = self.tv.item(iid)
- similarity = self._similarity(item['values'], row)
- if similarity>=identity_lvl:
- return iid
- def update_table(self, rows, identity_lvl=None):
- if not identity_lvl:
- identity_lvl = len(self.names)
- for row in rows:
- row = tuple(row)
- if row[self.uh_index] in self.uniques:
- self.tv.item(self.uniques[row[self.uh_index]], values=row)
- else:
- self.add_row(row)
- """
- replaced = self._find_similar(row, identity_lvl)
- if replaced:
- self.tv.item(replaced, values=row)
- else:
- self.add_row(row)"""
- if hasattr(self, 'last_sort'):
- self.last_sort()
- def sort_column(self, column, reverse=False):
- def sorter():
- l = [(self.tv.set(k, column), k) for k in self.tv.get_children('')]
- l.sort(reverse=reverse)
- # rearrange items in sorted positions
- for index, (_, k) in enumerate(l):
- self.tv.move(k, '', index)
- self.last_sort = self.sort_column(column, reverse)
- self.tv.heading(column, command= self.sort_column(column, not reverse))
- return sorter
- class VarsUpdater():
- def __init__(self):
- self.groups = {}
- def add(self, var, group_name):
- self.groups[group_name]['vars'].append(var)
- class AutoUpdatingVar():
- __slots__ = ['var', 'type_coercion', 'delay']
- comparison = {
- tk.BooleanVar: bool,
- tk.DoubleVar: float,
- tk.IntVar: int,
- tk.StringVar: str
- }
- def __init__(self, var:tk.Variable, updater, delay=5):
- self.var = var
- self.type_coercion = self.comparison.get(type(var), str)
- self.delay = delay
- asyncio.ensure_future(self.updater(updater))
- async def updater(self, source):
- updater = asyncio.coroutine(source)
- while True:
- upd = self.type_coercion(await updater())
- self.var.set(upd)
- await asyncio.sleep(self.delay)
- class FStringVar(tk.StringVar):
- def __init__(self, *args, template:str, fargs:list, fkwargs:dict, **kwargs):
- self.templ = template
- kwargs['value'] = template.format(*fargs, **fkwargs)
- super().__init__(*args, **kwargs)
- def set(self, *args, **kwargs):
- upd = self.templ.format(*args, **kwargs)
- super().set(upd)
- def get_tables(ygg:YggdrasilAPI):
- return {
- 'Peers': {
- 'updater': ygg.getPeers,
- 'headings': ('IP', 'received', 'sent', 'endpoint', 'uptime'),
- 'params': ('addr', 'bytes_recvd', 'bytes_sent', 'endpoint', 'uptime'),
- 'unique_heading': 'IP',
- },
- 'DHT': {
- 'updater': ygg.getDHT,
- 'headings': ('IP', 'coordinates', 'last seen'),
- 'params': ('addr', 'coords', 'last_seen'),
- 'unique_heading': 'IP',
- 'identity_lvl': 1
- },
- 'Sessions': {
- 'updater': ygg.getSessions,
- 'headings': ('IP', 'received', 'sent', 'uptime', 'MTU', 'MTU fixed', 'coordinates'),
- 'params': ('addr', 'bytes_recvd', 'bytes_sent', 'uptime', 'mtu', 'was_mtu_fixed', 'coords'),
- 'unique_heading': 'IP'
- },
- 'Switch peers': {
- 'updater': ygg.getSwitchPeers,
- 'headings': ('port', 'IP', 'endpoint', 'coordinates', 'received', 'sent'),
- 'params': ('port', 'ip', 'faddr', 'coords', 'bytes_recvd', 'bytes_sent'),
- 'unique_heading': 'IP'
- },
- 'Allowed keys': {
- 'updater': ygg.getAllowedEncryptionPublicKeys,
- 'headings': ('Key',),
- 'params': ('key',),
- 'unique_heading': 'Key'
- }
- }
- class Tk(tk.Tk):
- __slots__ = []
- def __init__(self, *args, delay=1/50, **kwargs):
- super().__init__(*args, **kwargs)
- self._delay = delay
- self._loop = asyncio.get_event_loop()
- asyncio.ensure_future(self.updater(), loop=self._loop)
- async def updater(self):
- while not await asyncio.sleep(self._delay):
- try:
- self.update()
- except tk._tkinter.TclError:
- self._loop.stop()
- except Exception as E:
- info = E.__class__.__name__, str(E)
- print("{}: {}".format(*info))
- self._loop.stop()
- def mainloop(self, n=0):
- self._loop.run_forever()
- if __name__ == "__main__":
- names_priority = ('Overview', 'Peers', 'DHT', 'Sessions', 'Switch peers', 'Allowed keys')
- root = Tk('root')
- root.title('Verdandi')
- root.geometry('800x500')
- notebook = ttk.Notebook(root)
- ygg = YggdrasilAPI()
- tables = get_tables(ygg)
- for name in names_priority:
- frame = ttk.Frame(notebook, name=name.lower())
- notebook.add(frame, text=name)
- if name not in tables:
- continue
- table_params = tables[name]
- table = Table(frame, **table_params)
- table.place(relheight=1, relwidth=5/6, relx=1/6, rely=0)
- notebook.place(relx=0, rely=0, relheight=1, relwidth=1)
- overview = notebook.children['overview']
- addr = tk.StringVar()
- #addr_upd = AutoUpdatingVar(addr, ygg.getSelfAddr)
- addr_label = tk.Label(overview, textvariable=addr)
- subnet_label = tk.Label(overview)
- nodes = tk.Label(overview)
- nodes_traffic = tk.Label(overview)
- summary_traffic = tk.Label(overview)
- active_sessions = tk.Label(overview)
- services_availability = tk.Label(overview)
- root.mainloop()
Add Comment
Please, Sign In to add comment