Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pylsl import StreamInlet, resolve_stream, resolve_byprop
- import numpy as np
- import keras
- import utils
- class Classifier_LSTM:
- def __init__(self, output_directory, input_shape, nb_classes, verbose=False):
- self.output_directory = output_directory
- self.model = self.build_model(input_shape, nb_classes)
- if (verbose == True):
- self.model.summary()
- self.verbose = verbose
- self.model.save_weights(self.output_directory + 'model_init.hdf5')
- def build_model(self, input_shape, nb_classes):
- padding = 'valid'
- input_layer = keras.layers.Input(input_shape)
- lstm = keras.layers.LSTM(units=100, return_sequences = True)(input_layer)
- time1 = keras.layers.TimeDistributed(keras.layers.Dense(50))(lstm)
- time1 = keras.layers.GlobalAveragePooling1D()(time1)
- output_layer = keras.layers.Dense(units=1, activation='sigmoid')(time1)
- #conv1 = keras.layers.Conv1D(filters=6,kernel_size=7,padding=padding,activation='sigmoid')(input_layer)
- # conv1 = keras.layers.Dropout(rate=0.5)(conv1)
- #conv1 = keras.layers.AveragePooling1D(pool_size=3)(conv1)
- #conv2 = keras.layers.Conv1D(filters=12,kernel_size=7,padding=padding,activation='sigmoid')(conv1)
- #conv2 = keras.layers.Dropout(rate=0.5)(conv2)
- #conv2 = keras.layers.AveragePooling1D(pool_size=3)(conv2)
- # flatten_layer = keras.layers.Flatten()(conv2)
- # output_layer = keras.layers.Dense(units=nb_classes,activation='softmax')(flatten_layer)
- model = keras.models.Model(inputs=input_layer, outputs=output_layer)
- model.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(lr=0.01),
- metrics=['accuracy'])
- # file_path = self.output_directory + 'best_model.hdf5'
- # model_checkpoint = keras.callbacks.ModelCheckpoint(filepath=file_path, monitor='val_geometricmean',
- # save_best_only=True,mode='max')
- # self.callbacks = [model_checkpoint, early_stop]
- return model
- import time
- # first resolve an EEG stream on the lab network
- print("looking for a stream...")
- stream_eeg = resolve_byprop('type', 'EEG')
- stream_ecg = resolve_byprop('type', 'ECG')
- # create a new inlet to read from the stream
- inlet_eeg = StreamInlet(stream_eeg[0], max_chunklen=12)
- inlet_ecg = StreamInlet(stream_ecg[0], max_chunklen=12)
- epoch_eeg=[]
- import gc
- #gc.collect()
- import tensorflow as tf
- import time
- tf.keras.backend.clear_session()
- tf.keras.backend.set_learning_phase(0)
- time_modelload= time.clock()
- classifier_eeg = keras.models.load_model('best_model_eeg.hdf5')
- classifier_eeg.compile(loss='binary_crossentropy',optimizer=keras.optimizers.Adam(lr=0.01), metrics=['accuracy'])
- classifier_ecg = keras.models.load_model('best_model_ecg.hdf5')
- classifier_ecg.compile(loss='binary_crossentropy',optimizer=keras.optimizers.Adam(lr=0.01), metrics=['accuracy'])
- #gc.collect()
- #time.sleep(10)
- eeg_inarow_counts=0
- classifs_time=[]
- epoch_length=3
- info = inlet_eeg.info()
- description = info.desc()
- buffer_len=5
- print("desc:", description)
- fs= int(750/3)
- eeg_buffer = np.zeros((int(fs * 3),4))
- ecg_buffer = np.zeros((int(fs * 3),1))
- n_win_test = int(np.floor((buffer_len-3 / epoch_length +1)))
- while True:
- # get a new sample (you can also omit the timestamp part if you're not
- # interested in it)
- sample_eeg, timestamp_eeg = inlet_eeg.pull_chunk(timeout=1)
- sample_ecg, timestamp_ecg = inlet_ecg.pull_chunk(timeout=1)
- # print("sample eeg:", timestamp_eeg)
- # print("sample ecg:", timestamp_ecg)
- sample_eeg=np.array(sample_eeg)[:,0:4]
- sample_ecg = np.array(sample_ecg)[:,0]
- # print("ECG timestamp:", timestamp_ecg, "EEG timestamp:", timestamp_eeg)
- # print()
- # print("ECG epoch made!")
- #print(sample_eeg.shape)
- eeg_buffer, filter_state = utils.update_buffer(eeg_buffer, sample_eeg, filter_state=None)
- epoch_eeg=utils.get_last_data(eeg_buffer, 3*fs)
- print("eeg shape", epoch_eeg.shape)
- epoch_eeg=np.array(epoch_eeg).reshape(1,750,4)
- # print("EEG epoch made!")
- predicted_eeg = classifier_eeg.predict(epoch_eeg)>0.5
- print("Ictal according to EEG ?: ", predicted_eeg)
- if(predicted_eeg):
- eeg_inarow_counts=eeg_inarow_counts+1
- else:
- eeg_inarow_counts=0
- ecg_buffer, filter_state = utils.update_buffer(ecg_buffer, sample_ecg, filter_state=None)
- epoch_ecg=utils.get_last_data(ecg_buffer, 3*fs)
- print("ecg shape", epoch_ecg.shape)
- epoch_ecg=np.array(epoch_ecg).reshape(1,750,1)
- # print("EEG epoch made!")
- predicted_ecg = classifier_ecg.predict(epoch_ecg)>0.5
- print("Ictal according to ECG ?: ", predicted_ecg)
- if(predicted_ecg):
- ecg_inarow_counts=ecg_inarow_counts+1
- else:
- ecg_inarow_counts=0
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement