Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- import time
- import os
- import theano
- import theano.tensor as T
- import lasagne
- from sklearn.preprocessing import LabelEncoder
- from sklearn.metrics import accuracy_score
- import pickle
- from scikits.audiolab import Sndfile
- from scikits.talkbox import segment_axis
- def feature_data_and_phoneme_matching(save=True):
- """""
- This function performs reading the audio files with
- their corresponding phonetic transcriptions, then, given
- the size of the hamming window and overlap, it segments the
- audio data and stacks them on a matrix, whilst figuring out
- which phoneme corresponds to which row (window) of the matrix.
- After all the files have been passed, the matrix and the list of
- phonemes are saved as .npy or returned if stated.
- """""
- # PATH TO TRAINING CORPORA.
- msak_path = '/home/gunslinger/Desktop/Msak/'
- fsew_path = '/home/gunslinger/Desktop/Fsew/'
- # Defining a sliding window of size 256.
- # With 96 overlap.
- window_size = 256
- non_overlap = 160;
- overlap = window_size - non_overlap
- # Rate at which audio file is sampled at.
- sample_rate = 16e3
- # Phonemes will be stored here, this list
- # will be of the same row size as the size of
- # the matrix of frames. These will later be used
- # for the supervised learning for the uppermost
- # layer of the autoencoder.
- phoneme_list = []
- # If it's the first time that we're adding values to the matrix
- # it needs to be initialized.
- flag = True
- # A simple counter for how many files we've passed.
- at_file = 0
- # Ordering is important.
- for path in msak_path, fsew_path:
- files = sorted(os.listdir(path))
- # Audio files
- wav_files = [f for f in files if f.endswith('wav')]
- # Phonetic transcripts
- lab_files = [f for f in files if f.endswith('lab')]
- # Nevermind which, wav_files' shape equals lab_files'
- num_files = len(wav_files)
- for i in range(0, num_files):
- at_file += 1
- print (at_file)
- # After reading the audio file, the data
- # is segmented w/ 96 overlap in a matrix.
- if path == msak_path:
- wav_file = Sndfile(msak_path + wav_files[i], 'r')
- # Read the phonetic transcript
- # in line. File is of the following type:
- # sec.ms sec.ms ph where ph is a phoneme.
- lab_file = open(msak_path + lab_files[i], 'r').read().split()
- else:
- wav_file = Sndfile(fsew_path + wav_files[i], 'r')
- lab_file = open(fsew_path + lab_files[i], 'r').read().split()
- # Audio data of file is stored as a numpy array.
- audio_data = wav_file.read_frames(wav_file.nframes)
- # Segmentation of the array with the given parameters.
- segmented_audio_data = segment_axis(audio_data, length=window_size, overlap=overlap)
- if flag:
- features = segmented_audio_data
- flag = False
- else:
- # Add the segmented audio data array to the features matrix:
- features = np.vstack((features, segmented_audio_data))
- # nul_el holds the number of phonemes (along with timestamps)
- # that exists in the file.
- num_el = len(lab_file)
- # Reiterating will be slow. This variable will
- # tell us where we left off.
- start_at = 0
- for j in range(0, num_el, 3):
- # Going over the timesamps and phonemes acquiring
- # a whole line in one iteration.
- start_time = float(lab_file[j])
- end_time = float(lab_file[j + 1])
- phoneme = lab_file[j + 2]
- # The frame interval between which a
- # phoneme is found.
- start_frame_phoneme = sample_rate * start_time
- end_frame_phoneme = sample_rate * end_time
- # Going through frames w/ 96 overlap and mapping them to phonemes
- for LHS_interval_of_frames in range(start_at, wav_file.nframes, non_overlap):
- RHS_interval_of_frames = LHS_interval_of_frames + window_size
- if LHS_interval_of_frames >= start_frame_phoneme:
- phoneme_list.append(phoneme)
- if RHS_interval_of_frames >= end_frame_phoneme:
- # Start where we left off.
- start_at = LHS_interval_of_frames
- break
- phoneme_list.append('sil')
- phonemes = np.asarray(phoneme_list)
- if save:
- np.save('features.npy', features)
- np.save('phonemes.npy', phonemes)
- else:
- return features, phonemes
- def to_categorical(y, nb_classes=None):
- '''Convert class vector (integers from 0 to nb_classes)
- to binary class matrix, for use with categorical_crossentropy.
- '''
- y = np.asarray(y, dtype='int32')
- if not nb_classes:
- nb_classes = np.max(y)+1
- Y = np.zeros((len(y), nb_classes))
- for i in range(len(y)):
- Y[i, y[i]] = 1.
- return Y
- def float32(k):
- return np.cast['float32'](k)
- def batch_gen_ae(X, y, N):
- while True:
- idx = np.random.choice(len(y), N)
- yield X[idx].astype('float32'), y[idx].astype('float32')
- def batch_gen_clf(X, y, N):
- while True:
- idx = np.random.choice(len(y), N)
- yield X[idx].astype('float32'), y[idx].astype('int32')
- def layerwise_model(input_var=None, sizes=None, gaussian_noise=False):
- # Depth of the network.
- no_layers = sizes.shape[0]
- # Nonlinearities selection
- linear = lasagne.nonlinearities.linear
- sigmoid = lasagne.nonlinearities.sigmoid
- # Input layer:
- network = lasagne.layers.InputLayer(shape=(None, sizes[0]),
- input_var=input_var)
- if gaussian_noise:
- network = lasagne.layers.GaussianNoiseLayer(network, sigma=0.1)
- # Hidden layers and dropout:
- network = lasagne.layers.DenseLayer(
- network, sizes[1], nonlinearity=sigmoid)
- # Output layer:
- network = lasagne.layers.DenseLayer(network, sizes[no_layers-1],
- nonlinearity=linear)
- return network
- def fine_tuning_model(input_var=None, sizes=None) :
- # Number of layers
- no_layers = sizes.shape[0]
- no_hidden = no_layers - 2
- middle_index = no_layers / 2
- # Nonlinearities
- linear = lasagne.nonlinearities.linear
- sigmoid = lasagne.nonlinearities.sigmoid
- # Input layer:
- model = lasagne.layers.InputLayer(shape=(None, sizes[0]),
- input_var=input_var, name='input')
- # Hidden layers:
- for i in range(no_hidden):
- if i < middle_index:
- model = lasagne.layers.DenseLayer(model, num_units=sizes[i+1], nonlinearity=sigmoid)
- else:
- model = lasagne.layers.DenseLayer(model, num_units=sizes[i+1], nonlinearity=linear)
- # Output layer:
- model = lasagne.layers.DenseLayer(model, num_units=sizes[no_layers-1], nonlinearity=linear)
- return model
- def softmax_classifier(input_var=None, input_size=0, output_size=0):
- model = lasagne.layers.InputLayer(input_var=input_var, shape=(None, input_size))
- model = lasagne.layers.DenseLayer(model, num_units=246, nonlinearity=lasagne.nonlinearities.sigmoid)
- model = lasagne.layers.DenseLayer(model, num_units=output_size,
- nonlinearity=lasagne.nonlinearities.softmax)
- return model
- def train_classifier(X, y, network=None, lr=0.1, batch_size=128, num_epochs=300):
- split_at = X.shape[0]*.2
- X_train = X[:-split_at]
- X_test = X[:split_at]
- y_train = y[:-split_at]
- y_test = y[:split_at]
- input_var = T.dmatrix()
- target_var = T.ivector()
- # Loss function
- prediction = lasagne.layers.get_output(network, inputs=input_var)
- loss = lasagne.objectives.categorical_crossentropy(prediction, target_var)
- loss = loss.mean()
- # Regularizer
- # Update expression
- params = lasagne.layers.get_all_params(network, trainable=True)
- updates = lasagne.updates.nesterov_momentum(loss, params=params, learning_rate=lr,
- momentum=0.9)
- # Expression for loss
- test_prediction = lasagne.layers.get_output(network, inputs=input_var, deterministic=True)
- test_loss = lasagne.objectives.categorical_crossentropy(test_prediction, target_var)
- test_loss = test_loss.mean()
- f_train = theano.function([input_var, target_var], loss, updates=updates)
- f_val = theano.function([input_var, target_var], test_loss)
- # Batch size choice and the number of batches per epoch
- BATCH_SIZE = batch_size
- N_BATCHES = len(X_train) // BATCH_SIZE
- N_VAL_BATCHES = len(X_test) // BATCH_SIZE
- # Minibatch generators
- train_batches_gen = batch_gen_clf(X_train, y_train, BATCH_SIZE)
- val_batches_gen = batch_gen_clf(X_test, y_test, BATCH_SIZE)
- for epoch in range(num_epochs):
- # In each epoch, we do a full pass over the training data:
- train_err = 0
- start_time = time.time()
- for _ in range(N_BATCHES):
- inputs, targets = next(train_batches_gen)
- train_err += f_train(inputs, targets)
- # And a full pass over the validation data:
- val_err = 0
- for _ in range(N_VAL_BATCHES):
- inputs, targets = next(val_batches_gen)
- err = f_val(inputs, targets)
- val_err += err
- # Then we print the results for this epoch:
- print("Epoch {} of {} took {:.3f}s".format(
- epoch + 1, num_epochs, time.time() - start_time))
- print(" training loss:\t\t{:.6f}".format(train_err / N_BATCHES))
- print(" validation loss:\t\t{:.6f}".format(val_err / N_VAL_BATCHES))
- def train_autoenc(X, network=None, lr=0.03, batch_size=128, num_epochs=300):
- split_at = X.shape[0] * 0.2
- X_train = X[:-split_at]
- X_test = X[:split_at]
- input_var = T.dmatrix()
- target_var = T.dmatrix()
- # Loss expression
- prediction = lasagne.layers.get_output(network, input_var)
- loss = lasagne.objectives.squared_error(prediction, target_var)
- loss = loss.mean()
- # Weight decay
- # Retrieve the parameters
- params = lasagne.layers.get_all_params(network, trainable=True)
- # Compute the gradient of the loss function with respect to the parameters.
- grad = T.grad(loss, params)
- # Learning rate and momentum will be variable.
- updates = lasagne.updates.nesterov_momentum(grad, params, learning_rate=lr, momentum=0.9)
- # Expression for testing, set deterministic to true in case dropouts are activated.
- test_prediction = lasagne.layers.get_output(network, input_var, deterministic=True)
- test_loss = lasagne.objectives.squared_error(test_prediction,
- target_var)
- test_loss = test_loss.mean()
- # Define a training function
- f_train = theano.function([input_var, target_var], loss, updates=updates)
- # A validation function, similar but it doesn't alter the parameters
- f_val = theano.function([input_var, target_var], test_loss)
- # Batch size choice and the number of batches per epoch
- BATCH_SIZE = batch_size
- N_BATCHES = len(X_train) // BATCH_SIZE
- N_VAL_BATCHES = len(X_test) // BATCH_SIZE
- # Minibatch generators
- train_batches_gen = batch_gen_ae(X_train, X_train, BATCH_SIZE)
- val_batches_gen = batch_gen_ae(X_test, X_test, BATCH_SIZE)
- for epoch in range(num_epochs):
- # In each epoch, we do a full pass over the training data:
- train_err = 0
- start_time = time.time()
- for _ in range(N_BATCHES):
- inputs, targets = next(train_batches_gen)
- train_err += f_train(inputs, targets)
- # And a full pass over the validation data:
- val_err = 0
- for _ in range(N_VAL_BATCHES):
- inputs, targets = next(val_batches_gen)
- err = f_val(inputs, targets)
- val_err += err
- # Then we print the results for this epoch:
- print("Epoch {} of {} took {:.3f}s".format(
- epoch + 1, num_epochs, time.time() - start_time))
- print(" training loss:\t\t{:.6f}".format(train_err / N_BATCHES))
- print(" validation loss:\t\t{:.6f}".format(val_err / N_VAL_BATCHES))
- def layerwise_training_f(X=None, no_epochs=0, input_shapes=[], hidden_shapes=[],
- output_shapes=[]):
- """"
- Given input, hidden and output shapes,
- it performs greedy layer-wise training,
- :return: No return, only the parameters are saved.
- """""
- i = 1
- for input_num, hidden_num, output_num in zip(input_shapes, hidden_shapes, output_shapes):
- input_var = T.dmatrix()
- print ("Building model for layer " + str(i))
- sizes = np.array([input_num, hidden_num, output_num])
- model = layerwise_model(input_var=input_var, sizes=sizes, gaussian_noise=True)
- print ("Training")
- train_autoenc(X=X, network=model, num_epochs=no_epochs)
- print ("Saving parameters")
- parameters = lasagne.layers.get_all_param_values(model)
- np.save('LasagneLayerParams' + str(i), parameters)
- encode_layer = lasagne.layers.get_all_layers(model)[2]
- X = lasagne.layers.get_output(layer_or_layers=encode_layer, inputs=X).eval()
- i += 1
- def disperse(phonemes, no_frames=5):
- """""
- :param phonemes: Target(phoneme) array.
- :param overlap: The number of features that will overlap.
- :param no_frames: The number of frames that will be trained at once.
- :return: List of the dominant phonemes corresponding to multiple frames
- at once.
- """""
- # The phonemes are categorized as if used for categorical cross-entropy.
- lab_enc = LabelEncoder()
- phonemes_categorized = to_categorical(lab_enc.fit_transform(phonemes))
- phoneme_classes = lab_enc.classes_
- no_features = phonemes.shape[0]
- dominant_phonemes = []
- for frame_ind in range(0, no_features, no_frames):
- array = sum(phonemes_categorized[frame_ind:frame_ind + no_frames])
- dominant_phonemes.append(phoneme_classes[np.argmax(array)])
- return np.asarray(dominant_phonemes)
- def proliferate_params(file_names, model=None):
- """""
- Given file names, sets the parameters to the
- unwinded model.
- """""
- no_parameters = file_names.__len__() * 4
- list = [None for _ in range(no_parameters)]
- i = 0
- for filename in file_names:
- parameters = np.load(filename)
- list.__setitem__(i, parameters[0])
- list.__setitem__(i+1, parameters[1])
- list.__setitem__(no_parameters - i - 1, parameters[3])
- list.__setitem__(no_parameters - i - 2, parameters[2])
- i += 2
- lasagne.layers.set_all_param_values(model, list)
- def main(generate_features=False, layerwise_training=False,
- save_encoded_features=False):
- if generate_features:
- X, phonemes = feature_data_and_phoneme_matching(save=False)
- else:
- X, phonemes = np.load('features_std.npy'), np.load('phonemes.npy')
- if layerwise_training:
- input_shapes = [256, 100, 70]
- hidden_shapes = [100, 70, 50]
- output_shapes = [256, 100, 70]
- layerwise_training_f(X=X, no_epochs=300, input_shapes=input_shapes,
- hidden_shapes=hidden_shapes, output_shapes=output_shapes)
- # Process of fine-tuning.
- file_names = np.array(['LasagneLayerParams1.npy', 'LasagneLayerParams2.npy', 'LasagneLayerParams3.npy'])
- sizes = np.array([256, 100, 70, 50, 70, 100, 256])
- ft_model = fine_tuning_model(sizes=sizes)
- proliferate_params(file_names=file_names, model=ft_model)
- train_autoenc(X=X, network=ft_model)
- # Careful which layer is picked
- enc_layer = lasagne.layers.get_all_layers(ft_model)[3]
- encoded_features = lasagne.layers.get_output(layer_or_layers=enc_layer, inputs=X)
- if save_encoded_features
- np.save('encoded_features.npy', encoded_features)
- # Finding the dominant phonemes in a selected number
- # of frames. The features are also reshaped so one row
- # corresponds to the dominant phoneme.
- no_frames = 5
- dispersed_phonemes = disperse(phonemes=phonemes, no_frames=no_frames)
- encoded_features = encoded_features.reshape((encoded_features.shape[0]/no_frames,
- encoded_features.shape[1]*no_frames))
- # Instantiate a model for the classifier
- input_var = T.dmatrix()
- softmax_clf = softmax_classifier(input_var=input_var, input_size=no_frames*50,
- output_size=46)
- # Feed data
- train_classifier(X=encoded_features, y=dispersed_phonemes, network=softmax_clf)
- predicted = softmax_clf.get_output_for(input=encoded_features)
- print "Accuracy score:"
- print accuracy_score(dispersed_phonemes, predicted)
- main(generate_features=False, layerwise_training=False,
- save_encoded_features=True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement