#!/usr/bin/env python
#
# statnot - Status and Notifications
#
# Lightweight notification-(to-become)-deamon intended to be used
# with lightweight WMs, like dwm.
# Receives Desktop Notifications (including libnotify / notify-send)
# See: http://www.galago-project.org/specs/notification/0.9/index.html
#
# Note: VERY early prototype, to get feedback.
#
# Copyright (c) 2009 by Henrik Hallberg (halhen@k2h.se)
# http://code.k2h.se
# Please report bugs or feature requests by e-mail.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import dbus
import dbus.service
import dbus.mainloop.glib
import gobject
import os
import re
import signal
import subprocess
import sys
import thread
import time
# ===== CONFIGURATION =====
# Default time a notification is show, unless specified in notification
DEFAULT_NOTIFY_TIMEOUT = 3000 # milliseconds
# Maximum time a notification is allowed to show
MAX_NOTIFY_TIMEOUT = 5000 # milliseconds
# Maximum number of characters in a notification.
NOTIFICATION_MAX_LENGTH = 100 # number of characters
# Time between regular status updates
STATUS_UPDATE_INTERVAL = 2.0 # seconds
# Command to fetch status text from. We read from stdout.
# Each argument must be an element in the array
#STATUS_COMMAND = ["/bin/sh", "%s/.stats/formatter.sh" % os.getenv("HOME")]
# Function run to format text
def formatter(notification):
# Important stuff
color1 = "#888888"
color2 = "#1793D1"
highlight = lambda x: "^fg({0}){1}^fg({2})".format(color2, x, color1)
if not notification: notification = ""
else: notification += " "
artist, song = open("/home/kirby/.stats/cmus.status").read().splitlines()
song = highlight(song)
mail, updates = open("/home/kirby/.stats/pacmail.status").read().splitlines()
mail = " E: {0}".format(highlight(mail)) if int(mail) else ""
updates = " U: {0}".format(highlight(updates)) if int(updates) else ""
battery = subprocess.Popen(["acpi"], stdout=subprocess.PIPE).communicate()[0]
match = re.search(r"\d+%", battery)
battery = match.group() if match else ""
date = time.strftime(r"%a %d")
clock = highlight(time.strftime(r"%l.%M"))
return "{0} {1}: {2} {3} {4}{5}{6} {7}".format(
battery, artist, song, date, clock, mail, updates, highlight(notification))
# update_text(text) is called when the status text should be updated
# If there is a pending notification to be formatted, it is appended as
# the final argument to the STATUS_COMMAND, e.g. as $1 in default shellscript
# Uncomment the appropriate one below, or write your own.
# dummy (echo text)
#def update_text(text):
# print text
# dwm
#def update_text(text):
# subprocess.call(["xsetroot", "-name", text])
# i3
dzen2 = subprocess.Popen(["dzen2", "-p", "-x", "300", "-y", "890", "-ta", "r", "-h", "14"], stdin=subprocess.PIPE)
def update_text(text):
dzen2.stdin.write(text+"\n")
# wmii
#def update_text(text):
# subprocess.Popen(["wmiir", "write", "/rbar/status"], stdin=subprocess.PIPE).communicate(text)
def sighandler(signum, frame):
dzen2.terminate()
sys.exit()
signal.signal(signal.SIGTERM, sighandler)
# ===== CONFIGURATION END =====
# List of not shown notifications.
# Array of arrays: [id, text, timeout in s]
# 0th element is being displayed right now, and may change
# Replacements of notification happens att add
# message_thread only checks first element for changes
notification_queue = []
notification_queue_lock = thread.allocate_lock()
def add_notification(notif):
with notification_queue_lock:
for index, n in enumerate(notification_queue):
if n[0] == notif[0]: # same id, replace instead of queue
n[1:] = notif[1:]
return
notification_queue.append(notif)
def next_notification(pop = False):
# No need to be thread safe here. Also most common scenario
if not notification_queue:
return None
with notification_queue_lock:
# If there are several pending messages, discard the first 0-timeouts
while len(notification_queue) > 1 and notification_queue[0][2] == 0:
notification_queue.pop(0)
if pop:
return notification_queue.pop(0)
else:
return notification_queue[0]
def get_statustext(notification = ''):
output = ''
try:
output = formatter(notification)
except:
sys.stderr.write("%s: could not read status message\n"
% (sys.argv[0]))
# Error - STATUS_COMMAND didn't exist or delivered empty result
# Fallback to notification only
if not output:
output = notification
return output
def message_thread(dummy):
last_status_update = 0
last_notification_update = 0
current_notification_text = ''
while 1:
notif = next_notification()
current_time = time.time()
update_status = False
if notif:
if notif[1] != current_notification_text:
update_status = True
elif current_time > last_notification_update + notif[2]:
# If requested timeout is zero, notification shows until
# a new notification arrives or a regular status mesasge
# cleans it
# This way is a bit risky, but works. Keep an eye on this
# when changing code
if notif[2] != 0:
update_status = True
# Pop expired notification
next_notification(True)
notif = next_notification()
if update_status == True:
last_notification_update = current_time
if current_time > last_status_update + STATUS_UPDATE_INTERVAL:
update_status = True
if update_status:
if notif:
current_notification_text = notif[1]
else:
current_notification_text = ''
update_text(get_statustext(current_notification_text))
last_status_update = current_time
time.sleep(0.1)
class NotificationFetcher(dbus.service.Object):
_id = 0
@dbus.service.method("org.freedesktop.Notifications",
in_signature='susssasa{ss}i',
out_signature='u')
def Notify(self, app_name, notification_id, app_icon,
summary, body, actions, hints, expire_timeout):
if (expire_timeout < 0) or (expire_timeout > MAX_NOTIFY_TIMEOUT):
expire_timeout = DEFAULT_NOTIFY_TIMEOUT
if not notification_id:
self._id += 1
notification_id = self._id
text = ("%s %s" % (summary, body)).strip()
add_notification( [notification_id,
text[:NOTIFICATION_MAX_LENGTH],
int(expire_timeout) / 1000.0] )
return notification_id
@dbus.service.method("org.freedesktop.Notifications", in_signature='', out_signature='as')
def GetCapabilities(self):
return ("body")
@dbus.service.signal('org.freedesktop.Notifications', signature='uu')
def NotificationClosed(self, id_in, reason_in):
pass
@dbus.service.method("org.freedesktop.Notifications", in_signature='u', out_signature='')
def CloseNotification(self, id):
pass
if __name__ == '__main__':
if len(sys.argv) > 1:
print "%s 0.0.2" % sys.argv[0]
sys.exit(1)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
session_bus = dbus.SessionBus()
name = dbus.service.BusName("org.freedesktop.Notifications", session_bus)
nf = NotificationFetcher(session_bus, '/org/freedesktop/Notifications')
# We must use contexts and iterations to run threads
# http://www.jejik.com/articles/2007/01/python-gstreamer_threading_and_the_main_loop/
gobject.threads_init()
context = gobject.MainLoop().get_context()
thread.start_new_thread(message_thread, (None,))
while 1:
context.iteration(True)
dzen2.terminate()