Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from IPython.display import display
- from biosppy.signals import ecg
- from wfdb import processing
- import matplotlib.pyplot as plt
- import numpy as np
- import shutil
- import wfdb
- import tools as st
- from keras.utils import plot_model
- from keras.models import Sequential
- from keras.layers import Dense
- from keras.layers import Dropout, Flatten
- from keras.layers import Conv1D, MaxPooling1D
- from keras.models import model_from_json
- diagnosis = []
- records = []
- input = open('records.txt', 'r')
- for line in input:
- line = line.replace("\n","")
- arr = line.split('/')
- _, fields = wfdb.rdsamp(arr[1], pb_dir='ptbdb/' + arr[0] + '/')
- if 'Healthy control' in fields['comments'][4]:
- diagnosis.append(1)
- records.append(line)
- if 'Myocardial infarction' in fields['comments'][4]:
- if 'inferior' in fields['comments'][5]:
- diagnosis.append(0)
- records.append(line)
- f = open('upd_records.txt','w')
- ff = open('diagnosis.txt','w')
- f.write("\n".join([str(x) for x in records]))
- ff.write("\n".join([str(x) for x in diagnosis]))
- f.close()
- ff.close()
- patients = []
- input = open('upd_records.txt', 'r')
- for line in input:
- line = line.replace("\n","")
- arr = line.split('/')
- record2 = wfdb.rdrecord(arr[1], pb_dir='ptbdb/' + arr[0] + '/', channels=[1,2,5])
- f = open('data.txt', 'w')
- f.write("# Simple Text Format\n")
- f.write("# Sampling Rate (Hz):= 1000.00\n")
- f.write("# Resolution:= 12\n")
- f.write("# Labels:= ECG\n")
- print(np.array(record2.p_signal).shape)
- for x in record2.p_signal:
- f.write(str(x[0]) + " " + str(x[1]) + " " + str(x[2]) + "\n")
- f.close()
- xxxs = ""
- xxx = open("data.txt")
- s=xxx.readlines()[4:]
- signal0 = np.loadtxt("data.txt", usecols = (0))
- out0 = ecg.ecg(signal=signal0, sampling_rate=1000., show = False)["templates"]
- signal1 = np.loadtxt("data.txt", usecols = (1))
- out1 = ecg.ecg(signal=signal1, sampling_rate=1000., show = False)["templates"]
- signal2 = np.loadtxt("data.txt", usecols = (2))
- out2 = ecg.ecg(signal=signal2, sampling_rate=1000., show = False)["templates"]
- ff = open('temp.txt','w')
- average_signal = []
- for x in range(len(out0[0])):
- average_signal.append(sum(out0[:,x])/len(out0[:,x]))
- for x in range(len(out1[0])):
- average_signal.append(sum(out1[:,x])/len(out1[:,x]))
- for x in range(len(out2[0])):
- average_signal.append(sum(out2[:,x])/len(out2[:,x]))
- for i in range(len(average_signal)):
- ff.write(str(average_signal[i])+ "\n")
- ff.close()
- time = [i for i in range(1, 1801)]
- load_data = np.array(average_signal)
- patients.append(load_data)
- output = open('result_data.txt','w')
- output.write("\n".join([",".join([str(x) for x in np_arr.tolist()]) for np_arr in patients]))
- output.close()
- def model(train_data, train_result, test_data):
- model = Sequential()
- model.add(Conv1D(filters=15, kernel_size=5,activation='relu',input_shape=(600 , 3)))
- model.add(MaxPooling1D(pool_size = 2))
- model.add(Dropout(0.2))
- model.add(Conv1D(filters=10, kernel_size=3,activation='relu'))
- model.add(MaxPooling1D(pool_size = 2))
- model.add(Dropout(0.2))
- model.add(Flatten())
- model.add(Dense(25, activation='relu'))
- model.add(Dropout(0.5))
- model.add(Dense(1, activation='sigmoid'))
- model.compile(loss="binary_crossentropy", optimizer="adam", metrics=['accuracy'])
- model.fit(np.array(train_data), np.array(train_result), epochs = 45, batch_size = 8, verbose=1)
- pred = model.predict(np.array(np.array(test_data[:]).reshape(1,600,3)),verbose = 0)
- print(pred)
- return pred
- fi = open('result_data.txt')
- data = [np.array([float(xs) for xs in x.split(",")]) for x in fi.readlines()]
- data = np.array(data).reshape((169,600,3))
- fu = open('diagnosis.txt','r')
- result = [[int(x)] for x in fu.readlines()]
- result_predict = []
- k = 0
- for i in range(169):
- test_data = np.array(data[i]).reshape(1, 600, 3)
- train_data = data[:i]
- train_data = np.array(np.append(train_data, data[i+1:])).reshape(168, 600, 3)
- test_result = list(result[i])
- train_result = result[:i]
- train_result = np.append(train_result, result[i+1:])
- k = k + 1
- print(k)
- pred = model(train_data, train_result, test_data)
- if pred > 0.5:
- pred = 1
- else:
- pred = 0
- print(pred)
- result_predict.append(pred)
- print(result_predict)
- with open('pred.txt', 'w') as fw:
- json.dump(result_predict, fw)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement