import sys
import wave
import struct
import pyaudio
class AudioAnalyzer:
def __init__(self):
# Sample width (16bites)
self.sw = -1
# Samples per second (8000Hz)
self.sps = -1
self.values = []
self.noise = {'max':0, 'min':0}
self.pos_num = {0 :0x8, 1 :0x9, 2 :0x5, 3 :0x0,
4 :0x4, 5 :0xe, 7 :0x7, 9 :0xd,
11:0xa, 12:0x1, 31:0x2, 39:0xc,
58:0xf, 60:0x6, 61:0xb, 95:0x3}
self.signatures = {}
def to_samp(self, secs):
return int(self.sps*secs)
def load_file(self, filename):
wav = wave.open(filename, 'rb')
self.sw = wav.getsampwidth()
self.sps = wav.getframerate()
wav.rewind()
while True:
frames = wav.readframes(self.sps)
if not frames:
break
for i in range(0, len(frames), self.sw):
self.values.append(struct.unpack('h', frames[i:i+2])[0])
wav.close()
def play(self, sample):
p = pyaudio.PyAudio()
stream = p.open(format=p.get_format_from_width(self.sw),
channels=1,
rate=self.sps,
output=True)
stream.write(sample)
stream.stop_stream()
stream.close()
''' Takes the information needed to recognize a silence
segment. This will be used to split
the audio in the different components.
.- offset in seconds (float)
.- size in seconds (float)
'''
def set_silence_sample(self, offset, size):
end = offset + size + 1
silence_segment = self.values[offset:end]
# "100" is an estimated error
self.noise['max'] = max(silence_segment) + 200
self.noise['min'] = min(silence_segment) - 200
print(self.noise)
''' This method return the offset and size of the
next silence segment starting from offset.
.- offset in seconds (float)
.- minimal silence segmente size in seconds (float)
'''
def next_silence(self, offset, min_size):
index = offset
silence = {'offset':-1, 'size':0}
while True:
try:
value = self.values[index]
except IndexError:
break
if (value > (self.noise['min'])) and (value < (self.noise['max'])):
if silence['offset'] == -1:
silence['offset'] = index
silence['size'] += 1
elif silence['size'] > min_size:
break
else:
silence['offset'] = -1
silence['size'] = 0
index += 1
if (silence['offset'] != -1) and (silence['size'] > min_size):
return silence
return None
def split(self, start_offset, silence_min_size):
offset = start_offset
size = 0
results = []
silence = self.next_silence(start_offset, silence_min_size)
while silence:
if offset != silence['offset']:
audio = {'offset':(offset + size),
'size' :(silence['offset'] - (offset + size))}
results.append(audio)
offset = silence['offset']
size = silence['size']
silence = self.next_silence((offset + size), silence_min_size)
return results
def get_signature(self, offset, size):
signature = []
signature_len = 10
segment = self.values[offset:(offset+size)]
interval_size = size/signature_len
for index in range(0, signature_len):
interval = segment[index*interval_size:(index+1)*interval_size]
tmp = [v for v in interval if v >= 0]
top = sum(tmp)/len(tmp)
tmp = [v for v in interval if v < 0]
bottom = sum(tmp)/len(tmp)
signature.append(top - bottom)
return tuple(signature)
def signature_to_number(self, signature):
result = None
best = None
for _signature in self.signatures:
tmp = []
for i in range(0, len(signature)):
tmp.append(abs(signature[i] - _signature[i]))
if best == None:
best = tmp
result = _signature
else:
points = 0
for i in range(0, len(signature)):
if tmp[i] < best[i]:
points += 1
else:
points -= 1
if points > 0:
best = tmp
result = _signature
return self.signatures[result]
if __name__ == '__main__':
if len(sys.argv) < 2:
print('$ {0} <audio_file.wav>'.format(sys.argv[0]))
sys.exit(-1)
solution = AudioAnalyzer()
print('loading file ...')
solution.load_file(sys.argv[1])
print('Setting silence sample ...')
solution.set_silence_sample(int(solution.sps*11.5), int(solution.sps*6.5))
ns = solution.next_silence(int(solution.sps*18.4), int(solution.sps*0.10))
print('next silence - offset {0} - size {1} - end {2}'.format(ns['offset'],
ns['size'],
(ns['offset'] + ns['size'])))
print('Spliting audio ...')
components = solution.split(0, int(solution.sps*0.13))
components = components[2:]
print('Calculating reference signatures ...')
index = 0
for component in components:
if component['size'] < 1000:
print('Bad segment :S')
continue
signature = solution.get_signature(component['offset'], component['size'])
if index in solution.pos_num:
number = solution.pos_num[index]
solution.signatures[signature] = number
if index > 95:
break
index += 1
numbers = []
print('------------------------------------------------------')
index = 0
for component in components:
print('({0})'.format(index))
print('size {0}'.format(component['size']))
print('beginning {0} - end {1}'.format(component['offset'],
(component['offset'] + component['size'])))
if component['size'] < 1000:
print('Bad segment :S')
continue
signature = solution.get_signature(component['offset'], component['size'])
print(signature)
number = solution.signature_to_number(signature)
numbers.append(number)
print(number)
index += 1
print('------------------------------------------------------')
fd = open('result.hex', 'a+')
for i in range(0, len(numbers), 2):
byte = struct.pack('B', (numbers[i] << 4) + numbers[i+1])
fd.write(byte)
fd.close()