Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- valid_split = 0.1
- shuffle_seed = 43
- sample_rate = 16000
- scale = 0.5
- batch_size = 4
- epochs = 15
- import tensorflow as tf
- import os
- from os.path import isfile, join
- import numpy as np
- import shutil
- from tensorflow import keras
- from pathlib import Path
- import subprocess
- import sys
- argCLI = sys.argv[1:]
- if len(argCLI) >= 1:
- epochs = int(argCLI[0])
- data_directory = "./Dataset"
- audio_folder = "audio"
- noise_folder = "noise"
- audio_path = os.path.join(data_directory, audio_folder)
- noise_path = os.path.join(data_directory, noise_folder)
- print(f"Folderul cu fisierele audio:{audio_path}\nFolderul cu fisierele noise:{noise_path}")
- for folder in os.listdir(data_directory):
- if os.path.isdir(os.path.join(data_directory, folder)):
- if folder in [audio_folder, noise_folder]:
- continue
- elif folder in ["other", "_background_noise_"]:
- shutil.move(
- os.path.join(data_directory, folder),
- os.path.join(noise_path, folder),
- )
- else:
- shutil.move(
- os.path.join(data_directory, folder),
- os.path.join(audio_path, folder),
- )
- noise_paths = []
- for subdir in os.listdir(noise_path):
- subdir_path = Path(noise_path) / subdir
- if os.path.isdir(subdir_path):
- noise_paths += [
- os.path.join(subdir_path, filepath)
- for filepath in os.listdir(subdir_path)
- if filepath.endswith(".wav")
- ]
- print(f"{noise_paths}")
- command = (
- "for dir in `ls -1 " + noise_path + "`; do "
- "for file in `ls -1 " + noise_path + "/$dir/*.wav`; do "
- "sample_rate=`ffprobe -hide_banner -loglevel panic -show_streams "
- "$file | grep sample_rate | cut -f2 -d=`; "
- "if [ $sample_rate -ne 16000 ]; then "
- "ffmpeg -hide_banner -loglevel panic -y "
- "-i $file -ar 16000 temp.wav; "
- "mv temp.wav $file; "
- "fi; "
- "echo $sample_rate; "
- "done; done"
- )
- #sample_rate = int(subprocess.getoutput(command).split('\n')[0].strip())
- sample_rate = 16000 #Just for on windows. Upper for linux
- # Celula 5
- os.system(command)
- def load_noise_sample(path):
- sample, sampling_rate = tf.audio.decode_wav(
- tf.io.read_file(path), desired_channels=1
- )
- if sampling_rate == sample_rate:
- slices = int(sample.shape[0] / sample_rate)
- sample = tf.split(sample[: slices * sample_rate], slices)
- return sample
- else:
- print("Sampling rate for", path, "is incorrect")
- return None
- noises = []
- for path in noise_paths:
- sample = load_noise_sample(path)
- if sample:
- noises.extend(sample)
- noises = tf.stack(noises)
- def paths_and_labels_to_dataset(audio_paths, labels):
- path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
- audio_ds = path_ds.map(lambda x: path_to_audio(x))
- label_ds = tf.data.Dataset.from_tensor_slices(labels)
- return tf.data.Dataset.zip((audio_ds, label_ds))
- def path_to_audio(path):
- audio = tf.io.read_file(path)
- audio, _ = tf.audio.decode_wav(audio, 1, sample_rate)
- return audio
- def add_noise(audio, noises=None, scale=0.5): #Adding noise
- if noises is not None:
- tf_rnd = tf.random.uniform(
- (tf.shape(audio)[0],), 0, noises.shape[0], dtype=tf.int32
- )
- noise = tf.gather(noises, tf_rnd, axis=0)
- prop = tf.math.reduce_max(audio, axis=1) / tf.math.reduce_max(noise, axis=1)
- prop = tf.repeat(tf.expand_dims(prop, axis=1), tf.shape(audio)[1], axis=1)
- audio = audio + noise * prop * scale
- return audio
- def audio_to_mfcc(audio_contents):
- waveform = audio_contents
- stfts = tf.signal.stft(tf.transpose(waveform), frame_length=1024, frame_step=256, fft_length=1024)
- spectrograms = tf.abs(stfts)
- num_spectrogram_bins = stfts.shape[-1]
- lower_edge_hertz, upper_edge_hertz, num_mel_bins = 80, waveform.shape[1] / 2, 128
- linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(num_mel_bins, num_spectrogram_bins, waveform.shape[1], lower_edge_hertz, upper_edge_hertz)
- mel_spectrograms = tf.tensordot(spectrograms, linear_to_mel_weight_matrix, 1)
- mel_spectrograms.set_shape(spectrograms.shape[:-1].concatenate(linear_to_mel_weight_matrix.shape[-1:]))
- log_mel_spectrograms = tf.math.log(mel_spectrograms + 1e-6)
- mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrograms)[..., :16]
- mfccs = tf.squeeze(mfccs, axis=0)
- mfccs = tf.transpose(mfccs, perm=[1, 0, 2])
- return mfccs
- class_names = os.listdir(audio_path)
- audio_paths = []
- labels = []
- for label, name in enumerate(class_names):
- print("Speaker:", (name))
- dir_path = Path(audio_path) / name
- speaker_sample_paths = [
- os.path.join(dir_path, filepath)
- for filepath in os.listdir(dir_path)
- if filepath.endswith(".wav")
- ]
- audio_paths += speaker_sample_paths
- labels += [label] * len(speaker_sample_paths)
- # Shuffle to generate random data
- rng = np.random.RandomState(shuffle_seed)
- rng.shuffle(audio_paths)
- rng = np.random.RandomState(shuffle_seed)
- rng.shuffle(labels)
- # Split into training and validation
- num_val_samples = int(valid_split * len(audio_paths))
- train_audio_paths = audio_paths[:-num_val_samples]
- train_labels = labels[:-num_val_samples]
- valid_audio_paths = audio_paths[-num_val_samples:]
- valid_labels = labels[-num_val_samples:]
- # Create datasets, one for training and the other for validation
- train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels)
- train_ds = train_ds.shuffle(buffer_size=batch_size * 8, seed=shuffle_seed).batch(batch_size)
- valid_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
- valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=shuffle_seed).batch(32)
- # Add noise to the training set
- train_ds = train_ds.map(lambda x, y: (add_noise(x, noises, scale=scale), y),num_parallel_calls=tf.data.experimental.AUTOTUNE,)
- # Transform audio wave to the frequency domain using `audio_to_fft`
- train_ds = train_ds.map(lambda x, y: (audio_to_mfcc(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE)
- train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)
- valid_ds = valid_ds.map(lambda x, y: (audio_to_mfcc(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE)
- valid_ds = valid_ds.prefetch(tf.data.experimental.AUTOTUNE)
- def residual_block(x, filters, conv_num = 3, activation = "relu"):
- s = keras.layers.Conv1D(filters, 1, padding = "same")(x)
- for i in range(conv_num - 1):
- x = keras.layers.Conv1D(filters, 3, padding = "same")(x)
- x = keras.layers.Activation(activation)(x)
- x = keras.layers.Conv1D(filters, 3, padding = "same")(x)
- x = keras.layers.Add()([x, s])
- x = keras.layers.Activation(activation)(x)
- return keras.layers.MaxPool1D(pool_size = 2, strides = 2)(x)
- #from tensorflow.keras.layers import Dropout #pt dropout layer
- def build_mfcc_model(input_shape, num_classes, dropout_rate=0.2):
- inputs = keras.layers.Input(shape = input_shape, name = "input")
- x = residual_block(inputs, 16, 2)
- x = residual_block(inputs, 32, 2)
- x = residual_block(inputs, 64, 3)
- x = residual_block(inputs, 128, 3)
- x = residual_block(inputs, 128, 3)
- x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
- x = keras.layers.Flatten()(x)
- x = keras.layers.Dense(256, activation="relu")(x)
- x = keras.layers.Dropout(rate=dropout_rate)(x) #Adaugare dropout layer de 0.2
- x = keras.layers.Dense(128, activation="relu")(x)
- x = keras.layers.Dropout(rate=dropout_rate)(x)
- if len(argCLI) >= 2:
- DenseLayers = int(argCLI[1])
- for i in range(DenseLayers):
- print(f"i:{i}\n")
- x = keras.layers.Dense(128, activation="relu")(x)
- x = keras.layers.Dropout(rate=dropout_rate)(x)
- outputs = keras.layers.Dense(num_classes, activation = "softmax", name = "output")(x)
- return keras.models.Model(inputs = inputs, outputs = outputs)
- model = build_mfcc_model((sample_rate, 16), len(class_names))
- model.summary()
- model.compile(optimizer="Adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
- model_save_filename = "model.keras"
- earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
- mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(
- model_save_filename, monitor="val_accuracy", save_best_only=True
- )
- history = model.fit(
- train_ds,
- epochs=epochs,
- validation_data=valid_ds,
- callbacks=[earlystopping_cb, mdlcheckpoint_cb],
- )
- print("Accuracy of model:", model.evaluate(valid_ds))
- from sklearn.metrics import confusion_matrix
- import seaborn as sns
- import matplotlib.pyplot as plt
- def plot_confusion_matrix(y_true, y_pred, class_names):
- cm = confusion_matrix(y_true, y_pred)
- plt.figure(figsize=(10, 8))
- sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', xticklabels=class_names, yticklabels=class_names)
- plt.xlabel('Predicted labels')
- plt.ylabel('True labels')
- plt.title('Confusion Matrix')
- plt.xticks(rotation=45)
- plt.yticks(rotation=45)
- plt.tight_layout()
- plt.savefig('confusion_matrix.png')
- plt.show()
- y_pred = np.argmax(model.predict(valid_ds), axis=1)
- plot_confusion_matrix(valid_labels, y_pred, class_names)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement