Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ## Importations utiles
- import numpy as np
- import matplotlib.pyplot as plt
- import random
- from scipy.signal import stft
- from scipy.signal import square
- from scipy.io.wavfile import read
- from scipy.io.wavfile import write
- from contextlib import contextmanager
- from matplotlib import cm
- from numpy import NaN
- import math
- from matplotlib import ticker
- from matplotlib.colors import LightSource
- from scipy.signal import istft
- from scipy.signal import correlate
- from mpl_toolkits.mplot3d import Axes3D
- import cmath as cm
- from scipy import stats
- import matplotlib
- from matplotlib.ticker import LinearLocator, FormatStrFormatter
- import time
- import pyroomacoustics as pra
- from scipy.io import wavfile
- ################################################
- ## LI-TEMPROM
- def calc_moyenneTEMP(rapp,M,i):
- moy = 0
- for j in range(0,M):
- moy = moy + rapp[int(i*M/2 + j)]
- moy = 1/M * moy
- return moy
- def calc_varianceTEMP(rapp,M,i): # calcul la variance de rapp pour la ieme fenêtre
- # de longueur M, à la fréquence num f
- moy = 0
- for j in range(0,M):
- moy = moy + rapp[int(i*M/2 + j)]
- moy = 1/M * moy
- var = 0
- for j in range(0,M):
- var = var + abs(rapp[int(i*M/2 + j)] - moy)**2
- var = 1/M * var
- return var
- def varianceTEMP(rapp,M):
- k = 0
- nb_tps = len(rapp)
- nb_fenetre = 0
- while k < (nb_tps - M): # donne le nb de fenêtre temporelles à analyser
- nb_fenetre = nb_fenetre + 1
- k = k + M/2
- res = np.zeros((nb_fenetre,2))
- # pos 0 : num fenêtre
- # pos 1 : variance
- for i in range(0,nb_fenetre):
- res[i][1] = calc_varianceTEMP(rapp,M,i)
- res[i][0] = i
- return res,nb_fenetre
- def rapportTEMP(melange1,melange2):
- long = len(melange1)
- res = np.zeros(long)
- for i in range(long):
- res[i] = NaN
- if melange2[i] != 0:
- res[i] = melange1[i]/melange2[i]
- return res
- def CL(coeff1,signal1,coeff2,signal2): #Effectue le mélange
- res = np.add(coeff1 * signal1,coeff2 * signal2)
- return res
- def rapportbis(melange1,melange2):
- long = len(melange1)
- res = np.zeros(long)
- for i in range(long):
- if melange2[i] != 0:
- res[i] = melange1[i]/melange2[i]
- return res
- def creernuage(rapp_trie,var_max,x1,x2,M):
- x1_res = []
- x2_res = []
- i = 0
- n = len(rapp_trie)
- while i < n and rapp_trie[i][1] < var_max:
- x1_res.append(x1[int(rapp_trie[i][0] * M/2)])
- x2_res.append(x2[int(rapp_trie[i][0] * M/2)])
- i = i + 1
- return x1_res,x2_res
- x1_res,x2_res = creernuage(rapp_trie,1e-14,x1,x2,8)
- ## LI-TIFROM
- # Calculer rapport de deux stft de mélanges
- def rapport(stft_x1,stft_x2,freq_echant):
- nb_freq,nb_tps = stft_x1.shape
- res = np.zeros((nb_freq,nb_tps))
- for f in range(0,nb_freq):
- for n in range(0,nb_tps):
- if stft_x2[f][n] != 0:
- res[f][n] = stft_x1[f][n]/stft_x2[f][n]
- else:
- res[f][n] = NaN
- return res
- #Calculer variance du rapport DE DEUX MELANGES SEULMENT pour chaque fréquence
- # fenêtre temporelle de longueur M, supposé pair
- def calc_moyenne(rapp,M,f,i):
- moy = 0
- for j in range(0,M):
- moy = moy + rapp[f][int(i*M/2 + j)]
- moy = 1/M * moy
- return moy
- def calc_variance(rapp,M,f,i): # calcul la variance de rapp pour la ieme fenêtre
- # de longueur M, à la fréquence num f
- moy = 0
- for j in range(0,M):
- moy = moy + rapp[f][int(i*M/2 + j)]
- moy = 1/M * moy
- var = 0
- for j in range(0,M):
- var = var + np.abs(rapp[f][int(i*M/2 + j)] - moy)**2
- var = 1/M * var
- return var
- # i : INDIQUE LE NUMERO DE LA FENETRE
- def variance(rapp,M): #renvoie une matrice de taille nb_freq * nb_fenetre
- #donnant la variance pour chaque fenêtre,fréquence.
- nb_freq,nb_tps = rapp.shape
- nb_fenetre = 0
- k = 0
- while k < (nb_tps - M): # donne le nb de fenêtre temporelles à analyser
- nb_fenetre = nb_fenetre + 1
- k = k + M/2
- res = np.zeros((nb_freq,nb_fenetre,3),dtype=object)
- # pos 0 : num fréquence (=/= freq réelle)
- # pos 1 : num fenêtre
- # pos 2 : variance
- for i in range(0,nb_fenetre):
- for f in range(0,nb_freq):
- res[f][i][2] = calc_variance(rapp,M,f,i)
- res[f][i][0] = f
- res[f][i][1] = i
- return res,nb_fenetre
- # Tri
- def trier(A): # A = matrice de taille nb_freq * nb_fen * 3
- # Renvoie une liste de longueur nb_freq * nb_fen triée selon la
- # variance croissante
- atrier = np.reshape(A,(len(A)*len(A[0]),3))
- atrier = atrier[np.argsort(atrier[:,2])]
- return atrier # atrier = tableau des variances triées
- # Détection des sources visibles, détection des zones mono-sources
- # rapp : rapport de deux mélanges
- # tabvar : tableau des variances croissantes associé
- def detecter_sources_visibles(rapp,tabvar,variance_max,seuil,M,N):
- res = []
- indicemax_tabvar = 0 # Indice de la zone limite considérée (var assez faible).
- n = len(tabvar)
- while indicemax_tabvar < (n-1):
- if tabvar[indicemax_tabvar + 1][2] < variance_max:
- indicemax_tabvar = indicemax_tabvar + 1
- else:
- break
- rapports_seuls = np.copy(tabvar[0:indicemax_tabvar + 1])
- num_source = np.zeros(indicemax_tabvar + 1,dtype = int)
- # Contient pour chaque rapport le numéro de la source associé
- # (complété au fur et à mesure après)
- # On remplace la variance par les rapports dans rapports_seuls
- for i in range(0,indicemax_tabvar + 1):
- rapports_seuls[i][2] = \
- calc_moyenne(rapp,M,rapports_seuls[i][0],rapports_seuls[i][1])
- res.append([[rapports_seuls[0][0],rapports_seuls[0][1]]]) #Initialisation de res
- for i in range(1,indicemax_tabvar + 1):
- indice_memesource = -1 # Indice où la source seule est identique.
- for k in range(0,i):
- if np.abs(rapports_seuls[i][2] - rapports_seuls[k][2]) < seuil:
- indice_memesource = k
- break
- if indice_memesource != -1:
- if len(res[num_source[indice_memesource]]) < 15:
- res[num_source[indice_memesource]].append([rapports_seuls[i][0],rapports_seuls[i][1]])
- num_source[i] = num_source[indice_memesource]
- else:
- rapports_seuls[i][2] = NaN
- else:
- if len(res) < N:
- res.append([[rapports_seuls[i][0],rapports_seuls[i][1]]])
- num_source[i] = len(res) - 1
- else:
- rapports_seuls[i][2] = NaN
- nb_rapports = 0
- for k in res:
- nb_rapports = nb_rapports + len(k)
- rapports_seuls_new = np.empty((nb_rapports,3), dtype=np.object)
- count = 0
- for k in rapports_seuls:
- if not math.isnan(k[2]):
- rapports_seuls_new[count] = k
- count = count + 1
- return res,indicemax_tabvar,rapports_seuls_new
- # res : matrice de taille nb_sources_vis * ? * 2
- # donnant toutes les zones mono-sources par source.
- # Le "?" change selon la source, mais minimum 1.
- # - Colonne 0 : num fréquence.
- # - Colonne 1 : num fenêtre temporelle.
- # Calcul moyenne du rapport entre deux mélanges en une zone monosource
- #i : num de la fenêtre temporelle
- def calc_moy_monosource(stft_x1,stft_x2,M,f,i):
- moy = 0
- for k in range(0,M):
- moy = moy + stft_x1[f][int(M/2 * i + k)]/stft_x2[f][int(M/2 * i + k)]
- moy = 1/M * moy
- return moy
- # Cas N = Q = P
- # où N = nb de sources ; Q = nb de sources visibles ; P = nb de mélanges.
- #matrice_rapports = le res du résultat précédent
- #melanges = matrice contenant tous les STFT des mélanges 0,..,(N-1).
- def creer_mat_mixage(N,matrice_rapports,rapp,M):
- if N != len(matrice_rapports):
- return False
- else:
- mat_mixage = np.zeros((N,N))
- for j in range(0,N): # 1ere ligne
- mat_mixage[0][j] = 1
- for i in range(1,N):
- for j in range(0,N): # Case i,j
- nb_cij = 0
- for L in matrice_rapports[j]:
- nb_cij = nb_cij + np.mean(rapp[L[0]][int(M/2)*L[1]:int(M/2)*L[1]+M])
- mat_mixage[i][j] = nb_cij/len(matrice_rapports[j])
- return mat_mixage
- def retrouver_sources(mat_mixage,melanges):
- return np.dot(np.linalg.inv(mat_mixage),melanges)
- def wav_sourcesbis(mat_sources_separees,freq_echant):
- N = len(mat_sources_separees)
- for i in range(0,N):
- write("/home/felix/sourceseparee%d.wav" %i,freq_echant,
- mat_sources_separees[i])
- ### LI-TIFROM retardé
- def detecter_sources_visibles_freq(rapp,correlation_max,M,temps_stft,freq_stft,stft_melanges):
- var_freq = variance_freq(rapp,M)
- tabvar = np.reshape(var_freq,(len(var_freq)*len(var_freq[0]),3))
- tabvar = tabvar[np.argsort(tabvar[:,2])]#[::-1]
- # tabvar de longueur nb_fenetre * nb_tps et contenant les [t,fi,var] trié
- # par variance croissante.
- return tabvar
- def variance_freq(rapp,M): #renvoie une matrice de taille nb_fenetre * nb_tps
- #donnant la variance pour chaque fenêtre (à temps CSTE).
- nb_freq,nb_tps = rapp.shape
- nb_fenetre = 0
- k = 0
- while k < (nb_freq - M): # donne le nb de fenêtre fréquentielles à analyser
- nb_fenetre = nb_fenetre + 1
- k = k + M/2
- res = np.zeros((nb_tps,nb_fenetre,3),dtype=object)
- # pos 0 : num temps (=/= temps réel)
- # pos 1 : num fenêtre en fréquence
- # pos 2 : variance
- for t in range(0,nb_tps):
- for fi in range(0,nb_fenetre):
- res[t][fi][2] = calc_variance_freq(rapp,M,fi,t)
- res[t][fi][1] = fi
- res[t][fi][0] = t
- return res
- def calc_variance_freq(rapp,M,fi,t): # calcul la variance de rapp pour la
- # fi-eme fenêtre fréquentielle
- # de longueur M, au temps t
- moy = 0
- for j in range(0,M):
- moy = moy + np.abs(rapp[int(fi*M/2 + j)][t])
- moy = 1/M * moy
- var = 0
- for j in range(0,M):
- var = var + np.abs(np.abs(rapp[int(fi*M/2 + j)][t]) - moy)**2
- var = 1/M * var
- return var
- def calc_moyenne_freq(rapp,M,fi,t):
- moy = 0
- for j in range(0,M):
- moy = moy + np.abs(rapp[int(fi*M/2 + j)][t])
- moy = 1/M * moy
- return moy
- def rapport_freq(stft_x1,stft_x2,freq_echant):
- nb_freq,nb_tps = stft_x1.shape
- res = np.zeros((nb_freq,nb_tps),dtype=complex)
- for f in range(0,nb_freq):
- for n in range(0,nb_tps):
- if stft_x2[f][n] != 0:
- res[f][n] = stft_x1[f][n]/stft_x2[f][n]
- else:
- res[f][n] = NaN
- return res
- def modifier_stft(Zxx,L,M):
- nb_freq,nb_tps = Zxx.shape
- res = np.zeros(np.shape(Zxx),dtype=complex)
- for k in L:
- f = k[0]
- t = int(M/2) * k[1]
- for j in range(M):
- res[f][t+j] = Zxx[f][t+j]
- return res
- ## SIMULATION DE SALLE
- pra.constants.set('c', 345)
- fs, s1 = wavfile.read('/home/felix/Simulation salle/Test_salle_1m/ref_500Hz_pure.wav')
- fs, s2 = wavfile.read('/home/felix/Simulation salle/Test_salle_1m/ref_880Hz_pure.wav')
- room_dim = [5, 5]
- shoebox = pra.ShoeBox(
- room_dim,
- absorption=0.9999,
- fs=fs,
- max_order=0,
- )
- shoebox.add_source([2, 3.01], signal=s1)
- shoebox.add_source([2.02, 2.99], signal=s2)
- R = np.array([[2,2.02],[3,3]])
- bf = pra.MicrophoneArray(R,shoebox.fs)
- shoebox.add_microphone_array(bf)
- shoebox.simulate()
- shoebox.plot()
- plt.grid()
- plt.show()
- audio_reverb = shoebox.mic_array.to_wav('/home/felix/test_reverb.wav', norm=True, bitdepth=np.int16)
- ## Tests en salle
- fs,m1 = wavfile.read('/home/felix/Simulation salle/Test LI-TIFROM retardé/Distance 0.5 bis/m-01.wav')
- fs, m2 = wavfile.read('/home/felix/Simulation salle/Test LI-TIFROM retardé/Distance 0.5 bis/m-02.wav')
- def retarder(m1,n):
- p = len(m1)
- res = np.zeros(p+n)
- for i in range(p):
- res[n+i] = m1[i]
- return res
- def trouver_retard(m1,m2):
- i = 0
- cond = True
- while cond:
- if m1[i] == 0:
- i = i +1
- else:
- cond = False
- j = 0
- cond = True
- while cond:
- if m2[j] == 0:
- j = j +1
- else:
- cond = False
- return j - i,i,j
- ## UN EXEMPLE
- freq_echant, x1 = read("/home/felix/Simulation salle/Test_salle_0.1m/m-01.wav")
- freq_echant, x2 = read("/home/felix/Simulation salle/Test_salle_0.1m/m-02.wav")
- nb_echantillons = len(x1)
- temps = np.linspace(0,nb_echantillons/freq_echant,nb_echantillons)
- freq1,temps_seg1,Zxx_x1 = stft(x1,freq_echant,nperseg = 4000)
- freq2,temps_seg2,Zxx_x2 = stft(x2,freq_echant,nperseg = 4000)
- stft_x1 = np.abs(Zxx_x1)
- stft_x2 = np.abs(Zxx_x2)
- melanges_stft = np.array([np.abs(Zxx_x1),np.abs(Zxx_x2)])
- stft_melanges_comp = np.array([Zxx_x1,Zxx_x2])
- melanges = np.array([x1,x2])
- rapp = rapport(melanges_stft[1],melanges_stft[0],freq_echant)
- var,nb_fenetre = variance(rapp,20)
- var_trie = trier(var)
- visi,indmax,rappseul = detecter_sources_visibles(rapp,var_trie,5e-3,0.2,20,2)
- #rappseul[rappseul[:,2].argsort()]
- L = visi[0]
- #L = visi[1]
- Zxx_m1 = modifier_stft(Zxx_x1,L,20)
- tps1,res1 = istft(Zxx_m1,freq_echant,nperseg = 4000)
- write("/home/felix/1freq_m1.wav",freq_echant,res1)
- Zxx_m2 = modifier_stft(Zxx_x2,L,20)
- tps2,res2 = istft(Zxx_m2,freq_echant,nperseg = 4000)
- write("/home/felix/1freq_m2.wav",freq_echant,res2)
- def correlation(x,y,normalize):
- xfft = np.fft.fft(x, 4*len(x))
- yfft = np.fft.fft(y, 4*len(x))
- R = xfft * np.conjugate(yfft)
- if normalize:
- R = R/np.abs(R)
- c = np.fft.ifft(R)
- return c
- bruit = np.random.normal(0, 0.005, size=len(res1)+40)
- res1_red = res1
- res2_red = res2
- xc = correlation(res1_red,res2_red,False)
- xc_phat = correlation(res1_red,res2_red,True)
- abs = np.arange(-len(res1_red),len(res1_red),1)
- bon1 = np.fft.fftshift(np.real(xc))[len(xc)//2 - len(res1_red):len(xc)//2 + len(res1_red)]
- bon2 = np.fft.fftshift(np.real(xc_phat))[len(xc)//2 - len(res1_red):len(xc)//2 + len(res1_red)]
- bon2[len(res1_red)] = 0
- plt.subplot(2, 1, 1)
- plt.plot(abs, bon1)
- plt.title('Corrélation croisée VS filtre PHAT')
- plt.xlim(xmax=100)
- plt.xlim(xmin=-100)
- plt.grid()
- plt.subplot(2, 1, 2)
- #plt.plot(abs, np.fft.fftshift(np.real(xc_phat)))
- plt.plot(abs, bon2)
- plt.xlim(xmax=100)
- plt.xlim(xmin=-100)
- plt.grid()
- plt.show()
- mat_mixage = creer_mat_mixage(2,visi,rapp,20)
- # mat_sourcesextraites = retrouver_sources(mat_mixage,melanges)
- # (dans le cas sans retards)
- stft_res = np.zeros((2,len(freq1),len(temps_seg1)),dtype=complex)
- for fk in range(len(freq1)):
- mat_mixage_inv = np.linalg.inv(np.array([[1,1],[mat_mixage[1][0]*np.exp(-1j*2*np.pi*(0)*freq1[fk]/freq_echant),mat_mixage[1][1]*np.exp(-1j*2*np.pi*(37)*freq1[fk]/freq_echant)]]))
- L = np.dot(mat_mixage_inv,stft_melanges_comp[:,fk])
- stft_res[0][fk] = L[0]
- stft_res[1][fk] = L[1]
- tps1,res1 = istft(stft_res[0],freq_echant,nperseg = 4000)
- tps2,res2 = istft(stft_res[1],freq_echant,nperseg = 4000)
- write("/home/felix/sourceseparee1_1m.wav",freq_echant,res1)
- write("/home/felix/sourceseparee2_1m.wav",freq_echant,res2)
- ## La surface du démon, des heures et des heures de perdues...
- freqmin = 0
- freqmax = grid_freq[0][-1]
- temps_bis = temps_seg1[:170]
- freq_bis = freq1[:100]
- grid_z = np.zeros((len(temps_bis),len(freq_bis)))
- grid_freq, grid_tps = np.meshgrid(freq_bis,temps_bis)
- for j in range(len(grid_tps[0])): #freq
- for i in range(nb_fenetre): #tps
- a = np.log10(1/var[j][i][2])
- for k in range(10):
- grid_z[10 * i + k][j] = a
- fig = plt.figure()
- ax = fig.gca(projection='3d')
- import matplotlib.ticker as mticker
- def log_tick_formatter(val, pos=None):
- return "{:.0e}".format(10**val)
- ax.zaxis.set_major_formatter(mticker.FuncFormatter(log_tick_formatter))
- ax.yaxis.set_ticks([0,2,4,6])
- light = LightSource(azdeg=-20, altdeg=60)
- #green_surface = light.shade_rgb(rgb * green, grid_z)
- illuminated_surface = light.shade(grid_z, cmap=cm.jet)
- ax.plot_surface(grid_freq,grid_tps, grid_z,rstride=1,cstride=1,antialiased=False, facecolors=illuminated_surface)
- #, cmap=cm.jet
- #
- plt.xlabel('\nfréquence (Hz)', linespacing=0.8)
- plt.ylabel('\ntemps (s)', linespacing=0.8)
- ax.tick_params(axis='z', which='major', pad=11)
- ax.set_zlabel('\n1/variance (échelle log)',linespacing=5)
- #,vmin=np.nanmin(grid_z)-100,vmax=np.nanmax(grid_z)
- #ax.set_xlim(np.log10(freqmin),np.log10(freqmax))
- ax.set_xlim(freqmin,freqmax)
- ax.set_zlim(-2,np.max(grid_z))
- plt.show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement