Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python
- # -*- coding: Latin-1 -*-
- """Module containing classes used for audio visualization.
- """
- from __future__ import division
- import os
- import threading
- import sys
- from struct import unpack
- import cairo
- import gobject
- import gst
- import gtk
- from numpy import abs
- from numpy import fft
- class Visualizer(gtk.Window):
- """Base class used by inheritance from the various specific visualizers.
- """
- def __init__(self):
- """Constructor.
- Create a drawing area used to display audio visualizations.
- """
- super(Visualizer, self).__init__()
- self.connect('delete-event', self.delete_cb)
- darea = gtk.DrawingArea()
- darea.connect('configure-event', self.configure_cb)
- darea.connect('expose-event', self.expose_cb)
- self.add(darea)
- self.show_all()
- def delete_cb(self, window, event):
- """Close the window and quit the mainloop.
- """
- gtk.main_quit()
- def configure_cb(self, darea, event):
- """Update the cairo context used for the drawing actions.
- """
- _, _, self.width, self.height = darea.get_allocation()
- self.surface = cairo.ImageSurface(cairo.FORMAT_ARGB32,
- self.width,
- self.height)
- return True
- def expose_cb(self, darea, event):
- """Redraw either the whole window or a part of it.
- """
- cr = darea.window.cairo_create()
- cr.rectangle(event.area.x, event.area.y,
- event.area.width, event.area.height)
- cr.clip()
- cr.set_source_surface(self.surface, 0, 0)
- cr.paint()
- self.queued_draw = False
- return False
- def threaded_handoff_cb(self, fakesink, buff, pad):
- """Call handoff_cb inside a new thread.
- """
- self.worker = threading.Thread(target=self.handoff_cb,
- args=(fakesink, buff, pad)).start()
- def handoff_cb(self, fakesink, buff, pad):
- """Invoked when the fakesink collected a new buffer of data.
- Process the input stream and draw the result on a new surface, then
- schedule an expose event.
- """
- samples = buff.size // 2
- fmt = "<" + str(samples) + "h"
- data = unpack(fmt, buff.data)
- if hasattr(self, 'draw'):
- gobject.idle_add(self.refresh, self.draw(data))
- def refresh(self, surface):
- """Refresh the content of the canvas with the given surface.
- Keywords:
- surface Surface to copy on the internal one.
- """
- self.surface = surface
- if not self.queued_draw:
- self.queued_draw = True
- self.queue_draw()
- class Analyzer(Visualizer):
- """Display the spectrum analyzer of the input audio signal.
- """
- def draw(self, data):
- """
- Compute the fft, normalize it by a coefficient of 2/N, then draw a
- vertical line for each component of the signal.
- Keywords:
- data List containing the values of the input signal.
- Return:
- Surface containing the fft representation.
- """
- surface = cairo.ImageSurface(cairo.FORMAT_ARGB32,
- self.width,
- self.height)
- cr = cairo.Context(surface)
- samples = len(data)
- step = self.width / (samples // 2)
- normalization = 2 / samples
- fft_data = abs(fft.fft(data)[:samples // 2]) * normalization
- for (i, value) in enumerate(fft_data):
- x = i * step
- y = - value / (1 << 15) * self.height
- cr.move_to(x, self.height - 1)
- cr.rel_line_to(0, y)
- cr.stroke()
- return surface
- class Oscilloscope(Visualizer):
- """Display the shape of the input audio signal.
- """
- def draw(self, data):
- """
- Connect the points of the input signal by simple lines.
- Keywords:
- data List containing the values of the input signal.
- Return:
- Surface containing the signal representation.
- """
- surface = cairo.ImageSurface(cairo.FORMAT_ARGB32,
- self.width,
- self.height)
- cr = cairo.Context(surface)
- samples = len(data)
- step = (self.width - 1) / (samples // 2)
- for (x, y) in enumerate(data):
- y = -y / (1 << 15) * (self.height // 2)
- if x == 0:
- cr.move_to(x * step, self.height // 2 + y)
- else:
- cr.line_to(x * step, self.height // 2 + y)
- cr.stroke()
- return surface
- if __name__ == '__main__':
- if len(sys.argv) != 2:
- print "Usage: {0} <file>".format(sys.argv[0])
- sys.exit(1)
- location = sys.argv[1]
- if not location.startswith('/'):
- location = os.path.join(os.getcwd(), location)
- str_pipe = '''filesrc location={0} !
- decodebin !
- audioconvert !
- audio/x-raw-int,
- channels=1,
- rate=44100,
- width=16,
- depth=16,
- endianness=1234 !
- tee name=t !
- queue !
- fakesink name=fakesink signal-handoffs=true t. !
- queue !
- autoaudiosink
- '''
- pipeline = gst.parse_launch(str_pipe.format(location))
- vis = Analyzer()
- fakesink = pipeline.get_by_name('fakesink')
- fakesink.connect('handoff', vis.threaded_handoff_cb
- pipeline.set_state(gst.STATE_PLAYING)
- gobject.threads_init()
- gtk.main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement