Advertisement
Guest User

Speaker recognition using DNN

a guest
Apr 15th, 2024
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.34 KB | Software | 0 0
  1. valid_split = 0.1
  2. shuffle_seed = 43
  3. sample_rate = 16000
  4. scale = 0.5
  5. batch_size = 4
  6. epochs = 15
  7.  
  8. import tensorflow as tf
  9. import os
  10. from os.path import isfile, join
  11. import numpy as np
  12. import shutil
  13. from tensorflow import keras
  14. from pathlib import Path
  15. import subprocess
  16. import sys
  17.  
  18. argCLI = sys.argv[1:]
  19. if len(argCLI) >= 1:
  20.     epochs = int(argCLI[0])
  21.  
  22. data_directory = "./Dataset"
  23. audio_folder = "audio"
  24. noise_folder = "noise"
  25.  
  26. audio_path = os.path.join(data_directory, audio_folder)
  27. noise_path = os.path.join(data_directory, noise_folder)
  28.  
  29. print(f"Folderul cu fisierele audio:{audio_path}\nFolderul cu fisierele noise:{noise_path}")
  30.  
  31. for folder in os.listdir(data_directory):
  32.     if os.path.isdir(os.path.join(data_directory, folder)):
  33.         if folder in [audio_folder, noise_folder]:
  34.             continue
  35.         elif folder in ["other", "_background_noise_"]:
  36.             shutil.move(
  37.                 os.path.join(data_directory, folder),
  38.                 os.path.join(noise_path, folder),
  39.             )
  40.         else:
  41.             shutil.move(
  42.                 os.path.join(data_directory, folder),
  43.                 os.path.join(audio_path, folder),
  44.             )
  45.  
  46. noise_paths = []
  47. for subdir in os.listdir(noise_path):
  48.     subdir_path = Path(noise_path) / subdir
  49.     if os.path.isdir(subdir_path):
  50.         noise_paths += [
  51.             os.path.join(subdir_path, filepath)
  52.             for filepath in os.listdir(subdir_path)
  53.             if filepath.endswith(".wav")
  54.         ]
  55.  
  56. print(f"{noise_paths}")
  57.  
  58. command = (
  59.     "for dir in `ls -1 " + noise_path + "`; do "
  60.     "for file in `ls -1 " + noise_path + "/$dir/*.wav`; do "
  61.     "sample_rate=`ffprobe -hide_banner -loglevel panic -show_streams "
  62.     "$file | grep sample_rate | cut -f2 -d=`; "
  63.     "if [ $sample_rate -ne 16000 ]; then "
  64.     "ffmpeg -hide_banner -loglevel panic -y "
  65.     "-i $file -ar 16000 temp.wav; "
  66.     "mv temp.wav $file; "
  67.     "fi; "
  68.     "echo $sample_rate; "
  69.     "done; done"
  70. )
  71.  
  72. #sample_rate = int(subprocess.getoutput(command).split('\n')[0].strip())
  73. sample_rate = 16000 #Just for on windows. Upper for linux
  74.  
  75. # Celula 5
  76. os.system(command)
  77. def load_noise_sample(path):
  78.     sample, sampling_rate = tf.audio.decode_wav(
  79.         tf.io.read_file(path), desired_channels=1
  80.     )
  81.     if sampling_rate == sample_rate:
  82.         slices = int(sample.shape[0] / sample_rate)
  83.         sample = tf.split(sample[: slices * sample_rate], slices)
  84.         return sample
  85.     else:
  86.         print("Sampling rate for", path, "is incorrect")
  87.         return None
  88.  
  89.  
  90. noises = []
  91. for path in noise_paths:
  92.     sample = load_noise_sample(path)
  93.     if sample:
  94.         noises.extend(sample)
  95. noises = tf.stack(noises)
  96.  
  97.  
  98.  
  99. def paths_and_labels_to_dataset(audio_paths, labels):
  100.     path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
  101.     audio_ds = path_ds.map(lambda x: path_to_audio(x))
  102.     label_ds = tf.data.Dataset.from_tensor_slices(labels)
  103.     return tf.data.Dataset.zip((audio_ds, label_ds))
  104.  
  105. def path_to_audio(path):
  106.     audio = tf.io.read_file(path)
  107.     audio, _ = tf.audio.decode_wav(audio, 1, sample_rate)
  108.     return audio
  109.  
  110. def add_noise(audio, noises=None, scale=0.5):   #Adding noise
  111.     if noises is not None:
  112.         tf_rnd = tf.random.uniform(
  113.             (tf.shape(audio)[0],), 0, noises.shape[0], dtype=tf.int32
  114.         )
  115.         noise = tf.gather(noises, tf_rnd, axis=0)
  116.  
  117.         prop = tf.math.reduce_max(audio, axis=1) / tf.math.reduce_max(noise, axis=1)
  118.         prop = tf.repeat(tf.expand_dims(prop, axis=1), tf.shape(audio)[1], axis=1)
  119.  
  120.         audio = audio + noise * prop * scale
  121.  
  122.     return audio
  123.  
  124. def audio_to_mfcc(audio_contents):
  125.     waveform = audio_contents
  126.    
  127.     stfts = tf.signal.stft(tf.transpose(waveform), frame_length=1024, frame_step=256, fft_length=1024)
  128.  
  129.     spectrograms = tf.abs(stfts)
  130.  
  131.     num_spectrogram_bins = stfts.shape[-1]
  132.     lower_edge_hertz, upper_edge_hertz, num_mel_bins = 80, waveform.shape[1] / 2, 128
  133.     linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(num_mel_bins, num_spectrogram_bins, waveform.shape[1], lower_edge_hertz, upper_edge_hertz)
  134.     mel_spectrograms = tf.tensordot(spectrograms, linear_to_mel_weight_matrix, 1)
  135.    
  136.     mel_spectrograms.set_shape(spectrograms.shape[:-1].concatenate(linear_to_mel_weight_matrix.shape[-1:]))
  137.  
  138.     log_mel_spectrograms = tf.math.log(mel_spectrograms + 1e-6)
  139.  
  140.     mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrograms)[..., :16]
  141.     mfccs = tf.squeeze(mfccs, axis=0)
  142.     mfccs = tf.transpose(mfccs, perm=[1, 0, 2])
  143.     return mfccs
  144.  
  145. class_names = os.listdir(audio_path)
  146.  
  147. audio_paths = []
  148. labels = []
  149. for label, name in enumerate(class_names):
  150.     print("Speaker:", (name))
  151.     dir_path = Path(audio_path) / name
  152.     speaker_sample_paths = [
  153.         os.path.join(dir_path, filepath)
  154.         for filepath in os.listdir(dir_path)
  155.         if filepath.endswith(".wav")
  156.     ]
  157.     audio_paths += speaker_sample_paths
  158.     labels += [label] * len(speaker_sample_paths)
  159.  
  160. # Shuffle to generate random data
  161. rng = np.random.RandomState(shuffle_seed)
  162. rng.shuffle(audio_paths)
  163. rng = np.random.RandomState(shuffle_seed)
  164. rng.shuffle(labels)
  165.  
  166. # Split into training and validation
  167. num_val_samples = int(valid_split * len(audio_paths))
  168. train_audio_paths = audio_paths[:-num_val_samples]
  169. train_labels = labels[:-num_val_samples]
  170.  
  171. valid_audio_paths = audio_paths[-num_val_samples:]
  172. valid_labels = labels[-num_val_samples:]
  173.  
  174. # Create datasets, one for training and the other for validation
  175. train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels)
  176. train_ds = train_ds.shuffle(buffer_size=batch_size * 8, seed=shuffle_seed).batch(batch_size)
  177.  
  178. valid_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
  179. valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=shuffle_seed).batch(32)
  180.  
  181. # Add noise to the training set
  182. train_ds = train_ds.map(lambda x, y: (add_noise(x, noises, scale=scale), y),num_parallel_calls=tf.data.experimental.AUTOTUNE,)
  183.  
  184. # Transform audio wave to the frequency domain using `audio_to_fft`
  185. train_ds = train_ds.map(lambda x, y: (audio_to_mfcc(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE)
  186.  
  187. train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)
  188.  
  189. valid_ds = valid_ds.map(lambda x, y: (audio_to_mfcc(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE)
  190. valid_ds = valid_ds.prefetch(tf.data.experimental.AUTOTUNE)
  191.  
  192. def residual_block(x, filters, conv_num = 3, activation = "relu"):
  193.     s = keras.layers.Conv1D(filters, 1, padding = "same")(x)
  194.    
  195.     for i in range(conv_num - 1):
  196.         x = keras.layers.Conv1D(filters, 3, padding = "same")(x)
  197.         x = keras.layers.Activation(activation)(x)
  198.    
  199.     x = keras.layers.Conv1D(filters, 3, padding = "same")(x)
  200.     x = keras.layers.Add()([x, s])
  201.     x = keras.layers.Activation(activation)(x)
  202.    
  203.     return keras.layers.MaxPool1D(pool_size = 2, strides = 2)(x)
  204.  
  205. #from tensorflow.keras.layers import Dropout #pt dropout layer
  206.  
  207. def build_mfcc_model(input_shape, num_classes, dropout_rate=0.2):
  208.     inputs = keras.layers.Input(shape = input_shape, name = "input")
  209.    
  210.     x = residual_block(inputs, 16, 2)
  211.     x = residual_block(inputs, 32, 2)
  212.     x = residual_block(inputs, 64, 3)
  213.     x = residual_block(inputs, 128, 3)
  214.     x = residual_block(inputs, 128, 3)
  215.     x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
  216.     x = keras.layers.Flatten()(x)
  217.     x = keras.layers.Dense(256, activation="relu")(x)
  218.     x = keras.layers.Dropout(rate=dropout_rate)(x)  #Adaugare dropout layer de 0.2
  219.     x = keras.layers.Dense(128, activation="relu")(x)
  220.     x = keras.layers.Dropout(rate=dropout_rate)(x)
  221.    
  222.     if len(argCLI) >= 2:
  223.         DenseLayers = int(argCLI[1])  
  224.         for i in range(DenseLayers):
  225.             print(f"i:{i}\n")
  226.             x = keras.layers.Dense(128, activation="relu")(x)
  227.             x = keras.layers.Dropout(rate=dropout_rate)(x)
  228.  
  229.     outputs = keras.layers.Dense(num_classes, activation = "softmax", name = "output")(x)
  230.    
  231.     return keras.models.Model(inputs = inputs, outputs = outputs)
  232.  
  233. model = build_mfcc_model((sample_rate, 16), len(class_names))
  234. model.summary()
  235.  
  236. model.compile(optimizer="Adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
  237.  
  238. model_save_filename = "model.keras"
  239.  
  240. earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
  241.  
  242. mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(
  243.     model_save_filename, monitor="val_accuracy", save_best_only=True
  244. )
  245.  
  246. history = model.fit(
  247.     train_ds,
  248.     epochs=epochs,
  249.     validation_data=valid_ds,
  250.     callbacks=[earlystopping_cb, mdlcheckpoint_cb],
  251. )
  252. print("Accuracy of model:", model.evaluate(valid_ds))
  253.  
  254. from sklearn.metrics import confusion_matrix
  255. import seaborn as sns
  256. import matplotlib.pyplot as plt
  257.  
  258. def plot_confusion_matrix(y_true, y_pred, class_names):
  259.     cm = confusion_matrix(y_true, y_pred)
  260.     plt.figure(figsize=(10, 8))
  261.     sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', xticklabels=class_names, yticklabels=class_names)
  262.     plt.xlabel('Predicted labels')
  263.     plt.ylabel('True labels')
  264.     plt.title('Confusion Matrix')
  265.     plt.xticks(rotation=45)
  266.     plt.yticks(rotation=45)
  267.     plt.tight_layout()
  268.     plt.savefig('confusion_matrix.png')
  269.     plt.show()
  270.  
  271. y_pred = np.argmax(model.predict(valid_ds), axis=1)
  272. plot_confusion_matrix(valid_labels, y_pred, class_names)
  273.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement