Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import curses
- import time
- import json
- from redis import StrictRedis
- redis_client = StrictRedis()
- redis_pubsub = redis_client.pubsub(ignore_subscribe_messages=True)
- statuses = {}
- class TerminalColor:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
- def on_warrior_message(message):
- data = json.loads(message["data"])
- status_key = "%s:%s" % (data["host"], data["port"])
- # Split the message
- output_split = data["data"].split(" ")
- # Bad code to priortize uploads on the display.
- if "%" not in data["data"] or "B/s" not in data["data"]:
- if status_key in statuses and ("%" in statuses[status_key] and "B/s" in statuses[status_key]):
- if "100%" not in statuses[status_key]:
- return
- # Change the color depending on status.
- # XXX FUCKING NASTY
- if "=404" in output_split[0]:
- status_color = TerminalColor.FAIL
- elif "=301" in output_split[0]:
- status_color = TerminalColor.WARNING
- elif "%" in data["data"] and "B/s" in data["data"]:
- status_color = TerminalColor.OKBLUE
- else:
- status_color = ""
- statuses[status_key] = "{status_color}{host}:{port}\t {data}".format(
- host=data["host"],
- port=data["port"],
- data=data["data"].strip(),
- status_color=status_color,
- )
- def draw_statuses(console):
- console.clear()
- console.refresh()
- curses.start_color()
- curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLACK)
- curses.init_pair(2, curses.COLOR_CYAN, curses.COLOR_BLACK)
- curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK)
- curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK)
- while True:
- i = 1
- console.addstr(0, 0, "Host\t\t\t URL")
- for host in sorted(statuses):
- values = statuses[host]
- color_pair = curses.color_pair(1)
- if values.startswith(TerminalColor.OKBLUE):
- values = values.lstrip(TerminalColor.OKBLUE)
- color_pair = curses.color_pair(2)
- elif values.startswith(TerminalColor.FAIL):
- values = values.lstrip(TerminalColor.FAIL)
- color_pair = curses.color_pair(3)
- elif values.startswith(TerminalColor.WARNING):
- values = values.lstrip(TerminalColor.WARNING)
- color_pair = curses.color_pair(4)
- console.addstr(i, 0, values, color_pair)
- console.clrtoeol()
- i += 1
- console.refresh()
- time.sleep(0.1)
- redis_pubsub.subscribe(**{"tumblr:warrior": on_warrior_message})
- pubsub_thread = redis_pubsub.run_in_thread(sleep_time=0.001)
- try:
- curses.wrapper(draw_statuses)
- except KeyboardInterrupt:
- pubsub_thread.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement