Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- '''Simple implementations of connectivity measures.'''
- # Authors : pravsripad@gmail.com
- # daniel.vandevelden@yahoo.de
- import sys
- import numpy as np
- import matplotlib.pyplot as pl
- import matplotlib.mlab as mlab
- con_name = 'coh'
- if con_name == '':
- print 'Please provided the connectivity method to use.'
- sys.exit()
- amp = 1.
- amp2 = 1.
- sfreq = 1000.
- #_________
- nse_amp = 1
- noise_mu = 0.1 # mean phase, "center" of noise distribution
- nse_kappa = 2. # dispersion of noise dispritbution
- phase_mu = np.abs(np.deg2rad(45)) # mean circular phase (ENTER IN DEGREE), "center" of phase distribution
- phase_kappa = 10. # dispersion of phase dispritbution
- #____
- n_epochs = 120
- epoch_length = 1 # make epoch of 1 second each
- times = np.arange(0,epoch_length,1/(sfreq))
- nfft = 512
- #____
- freq1_start = 150
- freq1_end = 220
- freq_steps = 0.#___
- freq2_start = freq1_start
- freq2_end = freq1_end
- #___
- freq_band1 = np.linspace(freq1_start, freq1_end, times.size)
- freq_band2 = np.linspace(freq2_start, freq2_end, times.size)
- """#############################################################################
- #_____________________________Signal generated in epochs_______________________#
- #############################################################################"""
- def simulate_x_y(times, n_epochs, amp, amp2, freq_band1, freq_band2, phase_mu, phase_kappa, nse_amp, noise_mu, nse_kappa):
- x = np.zeros((n_epochs, times.size))
- y = np.zeros((n_epochs, times.size))
- phase_shift_y = np.random.vonmises(phase_mu, phase_kappa, n_epochs)
- for i in range(0, n_epochs):
- print i
- for j in xrange(freq1_start,freq1_end):
- print j
- x[i] = (amp * np.sin(2 * np.pi * freq_band1 * times )+ np.sin(2 * np.pi * j * times)) + nse_amp * np.random.normal(loc=noise_mu, scale= nse_kappa, size=times.size)
- y[i] = (amp2 * np.sin(2 * np.pi * freq_band2 * times + phase_shift_y[i])+ np.sin(2 * np.pi * j * times)) + nse_amp * np.random.normal(loc=noise_mu, scale= nse_kappa, size=times.size)
- return x, y, phase_shift_y
- # return the values from the function is just for the coding process to check every variable
- x, y, phase_shift_y = simulate_x_y(times, n_epochs, amp, amp2, freq_band1, freq_band2, phase_mu, phase_kappa, nse_amp, noise_mu, nse_kappa)
- x0 = x.flatten()
- y0 = y.flatten()
- def compute_mean_psd_csd(x, y, n_epochs, nfft, sfreq):
- '''Computes mean of PSD and CSD for signals.'''
- n_freqs = nfft/2 + 1
- Rxy = np.zeros((n_epochs, n_freqs), dtype=complex)
- Rxx = np.zeros((n_epochs, n_freqs), dtype=complex)
- Ryy = np.zeros((n_epochs, n_freqs), dtype=complex)
- for i in range(n_epochs):
- Rxy[i], freqs = mlab.csd(x[i], y[i], NFFT=nfft, Fs=sfreq)
- Rxx[i], _____ = mlab.psd(x[i], NFFT=nfft, Fs=sfreq)
- Ryy[i], _____ = mlab.psd(y[i], NFFT=nfft, Fs=sfreq)
- Rxy_mean = np.mean(Rxy, axis=0)
- Rxx_mean = np.mean(Rxx, axis=0)
- Ryy_mean = np.mean(Ryy, axis=0)
- return freqs, Rxy, Rxy_mean, np.real(Rxx_mean), np.real(Ryy_mean)
- def my_coherence(n_freqs, Rxy_mean, Rxx_mean, Ryy_mean):
- ''' Computes coherence. '''
- coh = np.zeros((n_freqs))
- for i in range(0, n_freqs):
- coh[i] = np.abs(Rxy_mean[i]) / np.sqrt(Rxx_mean[i] * Ryy_mean[i])
- return coh
- def my_imcoh(n_freqs, Rxy_mean, Rxx_mean, Ryy_mean):
- ''' Computes imaginary coherence. '''
- imcoh = np.zeros((n_freqs))
- for i in range(0, n_freqs):
- imcoh[i] = np.imag(Rxy_mean[i]) / np.sqrt(Rxx_mean[i] * Ryy_mean[i])
- return imcoh
- def my_cohy(n_freqs, Rxy_mean, Rxx_mean, Ryy_mean):
- ''' Computes coherency. '''
- cohy = np.zeros((n_freqs))
- for i in range(0, n_freqs):
- cohy[i] = np.real(Rxy_mean[i]) / np.sqrt(Rxx_mean[i] * Ryy_mean[i])
- return cohy
- def my_plv(n_freqs, Rxy, Rxy_mean):
- ''' Computes PLV. '''
- Rxy_plv = np.zeros((n_epochs, n_freqs), dtype=np.complex)
- for i in range(0, n_epochs):
- Rxy_plv[i] = Rxy[i] / np.abs(Rxy[i])
- plv = np.abs(np.mean(Rxy_plv, axis=0))
- return plv
- def my_pli(n_freqs, Rxy, Rxy_mean):
- ''' Computes PLI. '''
- Rxy_pli = np.zeros((n_epochs, n_freqs), dtype=np.complex)
- for i in range(0, n_epochs):
- Rxy_pli[i] = np.sign(np.imag(Rxy[i]))
- pli = np.abs(np.mean(Rxy_pli, axis=0))
- return pli
- def my_wpli(n_freqs, Rxy, Rxy_mean):
- ''' Computes WPLI. '''
- Rxy_wpli_1 = np.zeros((n_epochs, n_freqs), dtype=complex)
- Rxy_wpli_2 = np.zeros((n_epochs, n_freqs), dtype=complex)
- for i in range(0, n_epochs):
- Rxy_wpli_1[i] = np.imag(Rxy[i])
- Rxy_wpli_2[i] = np.abs(np.imag(Rxy[i]))
- # handle divide by zero
- denom = np.mean(Rxy_wpli_2, axis=0)
- idx_denom = np.where(denom == 0.)
- denom[idx_denom] = 1.
- wpli = np.abs(np.mean(Rxy_wpli_1, axis=0)) / denom
- wpli[idx_denom] = 0.
- return wpli
- def my_con(x, y, n_epochs, nfft, sfreq, con_name='coh'):
- '''Computes connectivity measure mentioned on provided signal pair and its surrogates.'''
- n_freqs = nfft/2 + 1
- freqs, Rxy, Rxy_mean, Rxx_mean, Ryy_mean = compute_mean_psd_csd(x, y, n_epochs, nfft, sfreq)
- # compute surrogates
- x_surr = x.copy()
- y_surr = y.copy()
- np.random.shuffle(x_surr)
- np.random.shuffle(y_surr)
- freqs_surro, Rxy_s, Rxy_s_mean, Rxx_s_mean, Ryy_s_mean = compute_mean_psd_csd(x_surr, y_surr, n_epochs, nfft, sfreq)
- m = {
- 'coh': lambda: (my_coherence(n_freqs, Rxy_mean, Rxx_mean, Ryy_mean), my_coherence(n_freqs, Rxy_s_mean, Rxx_s_mean, Ryy_s_mean)),
- 'imcoh': lambda: (my_imcoh(n_freqs, Rxy_mean, Rxx_mean, Ryy_mean), my_imcoh(n_freqs, Rxy_s_mean, Rxx_s_mean, Ryy_s_mean)),
- 'cohy': lambda: (my_cohy(n_freqs, Rxy_mean, Rxx_mean, Ryy_mean), my_cohy(n_freqs, Rxy_s_mean, Rxx_s_mean, Ryy_s_mean)),
- 'plv': lambda: (my_plv(n_freqs, Rxy, Rxy_mean), my_plv(n_freqs, Rxy_s, Rxy_s_mean)),
- 'pli': lambda: (my_pli(n_freqs, Rxy, Rxy_mean), my_pli(n_freqs, Rxy_s, Rxy_s_mean)),
- 'wpli': lambda: (my_wpli(n_freqs, Rxy, Rxy_mean), my_wpli(n_freqs, Rxy_s, Rxy_s_mean)),
- }
- return m[con_name]() + (freqs, freqs_surro)
- con, con_surro, freqs, freqs_surro = my_con(x, y, n_epochs, nfft, sfreq, con_name)
- #----------------
- # plot results
- fig = pl.figure('Connectivity')
- ax = fig.add_subplot(2, 1, 1)
- pl.plot(freqs, con)
- #pl.plot(freqs_surro, con_surro)
- pl.title("Connectivity Method is %s" %(con_name), color="black", fontsize=20)
- pl.xlabel("Frequency [ Hz ]", color="black", fontsize=14)
- pl.ylabel("Connectivity Value [ ]", color="black", fontsize=14)
- pl.ylim(0,1.1)
- pl.grid()
- pl.legend(['%s'%con_name,'ImCoh','PLV','PLI','WPLI'])
- #-----------------
- ax = fig.add_subplot(2, 1, 2)
- ax = pl.subplot(2, 1, 2, polar=True)
- pl.title("Phase Shift for mu=%srad and kappa= %s" %(phase_mu, phase_kappa), color="black", fontsize=16)
- radii = np.ones((n_epochs))
- pl.grid()
- bars = ax.bar(phase_shift_y, radii, bottom=0., width=0.001)
- ax.set_yticklabels([])
- pl.grid()
- #----------------
- pl.ion()
- pl.show()
- pl.tight_layout()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement