Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- #
- # statnot - Status and Notifications
- #
- # Lightweight notification-(to-become)-deamon intended to be used
- # with lightweight WMs, like dwm.
- # Receives Desktop Notifications (including libnotify / notify-send)
- # See: http://www.galago-project.org/specs/notification/0.9/index.html
- #
- # Note: VERY early prototype, to get feedback.
- #
- # Copyright (c) 2009 by Henrik Hallberg (halhen@k2h.se)
- # http://code.k2h.se
- # Please report bugs or feature requests by e-mail.
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- #
- import dbus
- import dbus.service
- import dbus.mainloop.glib
- import gobject
- import os
- import re
- import signal
- import subprocess
- import sys
- import thread
- import time
- # ===== CONFIGURATION =====
- # Default time a notification is show, unless specified in notification
- DEFAULT_NOTIFY_TIMEOUT = 3000 # milliseconds
- # Maximum time a notification is allowed to show
- MAX_NOTIFY_TIMEOUT = 5000 # milliseconds
- # Maximum number of characters in a notification.
- NOTIFICATION_MAX_LENGTH = 100 # number of characters
- # Time between regular status updates
- STATUS_UPDATE_INTERVAL = 2.0 # seconds
- # Command to fetch status text from. We read from stdout.
- # Each argument must be an element in the array
- #STATUS_COMMAND = ["/bin/sh", "%s/.stats/formatter.sh" % os.getenv("HOME")]
- # Function run to format text
- def formatter(notification):
- # Important stuff
- color1 = "#888888"
- color2 = "#1793D1"
- highlight = lambda x: "^fg({0}){1}^fg({2})".format(color2, x, color1)
- if not notification: notification = ""
- else: notification += " "
- artist, song = open("/home/kirby/.stats/cmus.status").read().splitlines()
- song = highlight(song)
- mail, updates = open("/home/kirby/.stats/pacmail.status").read().splitlines()
- mail = " E: {0}".format(highlight(mail)) if int(mail) else ""
- updates = " U: {0}".format(highlight(updates)) if int(updates) else ""
- battery = subprocess.Popen(["acpi"], stdout=subprocess.PIPE).communicate()[0]
- match = re.search(r"\d+%", battery)
- battery = match.group() if match else ""
- date = time.strftime(r"%a %d")
- clock = highlight(time.strftime(r"%l.%M"))
- return "{0} {1}: {2} {3} {4}{5}{6} {7}".format(
- battery, artist, song, date, clock, mail, updates, highlight(notification))
- # update_text(text) is called when the status text should be updated
- # If there is a pending notification to be formatted, it is appended as
- # the final argument to the STATUS_COMMAND, e.g. as $1 in default shellscript
- # Uncomment the appropriate one below, or write your own.
- # dummy (echo text)
- #def update_text(text):
- # print text
- # dwm
- #def update_text(text):
- # subprocess.call(["xsetroot", "-name", text])
- # i3
- dzen2 = subprocess.Popen(["dzen2", "-p", "-x", "300", "-y", "890", "-ta", "r", "-h", "14"], stdin=subprocess.PIPE)
- def update_text(text):
- dzen2.stdin.write(text+"\n")
- # wmii
- #def update_text(text):
- # subprocess.Popen(["wmiir", "write", "/rbar/status"], stdin=subprocess.PIPE).communicate(text)
- def sighandler(signum, frame):
- dzen2.terminate()
- sys.exit()
- signal.signal(signal.SIGTERM, sighandler)
- # ===== CONFIGURATION END =====
- # List of not shown notifications.
- # Array of arrays: [id, text, timeout in s]
- # 0th element is being displayed right now, and may change
- # Replacements of notification happens att add
- # message_thread only checks first element for changes
- notification_queue = []
- notification_queue_lock = thread.allocate_lock()
- def add_notification(notif):
- with notification_queue_lock:
- for index, n in enumerate(notification_queue):
- if n[0] == notif[0]: # same id, replace instead of queue
- n[1:] = notif[1:]
- return
- notification_queue.append(notif)
- def next_notification(pop = False):
- # No need to be thread safe here. Also most common scenario
- if not notification_queue:
- return None
- with notification_queue_lock:
- # If there are several pending messages, discard the first 0-timeouts
- while len(notification_queue) > 1 and notification_queue[0][2] == 0:
- notification_queue.pop(0)
- if pop:
- return notification_queue.pop(0)
- else:
- return notification_queue[0]
- def get_statustext(notification = ''):
- output = ''
- try:
- output = formatter(notification)
- except:
- sys.stderr.write("%s: could not read status message\n"
- % (sys.argv[0]))
- # Error - STATUS_COMMAND didn't exist or delivered empty result
- # Fallback to notification only
- if not output:
- output = notification
- return output
- def message_thread(dummy):
- last_status_update = 0
- last_notification_update = 0
- current_notification_text = ''
- while 1:
- notif = next_notification()
- current_time = time.time()
- update_status = False
- if notif:
- if notif[1] != current_notification_text:
- update_status = True
- elif current_time > last_notification_update + notif[2]:
- # If requested timeout is zero, notification shows until
- # a new notification arrives or a regular status mesasge
- # cleans it
- # This way is a bit risky, but works. Keep an eye on this
- # when changing code
- if notif[2] != 0:
- update_status = True
- # Pop expired notification
- next_notification(True)
- notif = next_notification()
- if update_status == True:
- last_notification_update = current_time
- if current_time > last_status_update + STATUS_UPDATE_INTERVAL:
- update_status = True
- if update_status:
- if notif:
- current_notification_text = notif[1]
- else:
- current_notification_text = ''
- update_text(get_statustext(current_notification_text))
- last_status_update = current_time
- time.sleep(0.1)
- class NotificationFetcher(dbus.service.Object):
- _id = 0
- @dbus.service.method("org.freedesktop.Notifications",
- in_signature='susssasa{ss}i',
- out_signature='u')
- def Notify(self, app_name, notification_id, app_icon,
- summary, body, actions, hints, expire_timeout):
- if (expire_timeout < 0) or (expire_timeout > MAX_NOTIFY_TIMEOUT):
- expire_timeout = DEFAULT_NOTIFY_TIMEOUT
- if not notification_id:
- self._id += 1
- notification_id = self._id
- text = ("%s %s" % (summary, body)).strip()
- add_notification( [notification_id,
- text[:NOTIFICATION_MAX_LENGTH],
- int(expire_timeout) / 1000.0] )
- return notification_id
- @dbus.service.method("org.freedesktop.Notifications", in_signature='', out_signature='as')
- def GetCapabilities(self):
- return ("body")
- @dbus.service.signal('org.freedesktop.Notifications', signature='uu')
- def NotificationClosed(self, id_in, reason_in):
- pass
- @dbus.service.method("org.freedesktop.Notifications", in_signature='u', out_signature='')
- def CloseNotification(self, id):
- pass
- if __name__ == '__main__':
- if len(sys.argv) > 1:
- print "%s 0.0.2" % sys.argv[0]
- sys.exit(1)
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- session_bus = dbus.SessionBus()
- name = dbus.service.BusName("org.freedesktop.Notifications", session_bus)
- nf = NotificationFetcher(session_bus, '/org/freedesktop/Notifications')
- # We must use contexts and iterations to run threads
- # http://www.jejik.com/articles/2007/01/python-gstreamer_threading_and_the_main_loop/
- gobject.threads_init()
- context = gobject.MainLoop().get_context()
- thread.start_new_thread(message_thread, (None,))
- while 1:
- context.iteration(True)
- dzen2.terminate()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement