Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import csv
- import numpy as np
- from sklearn.model_selection import StratifiedKFold, train_test_split
- from sklearn.linear_model import LogisticRegression
- from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score, roc_curve
- from sklearn.decomposition import PCA
- import math
- class InterpretableModel:
- def __init__(self):
- pass
- def fit(self, x, y):
- pca = PCA(n_components=9)
- pca.fit(x)
- print(pca.components_)
- print(pca.explained_variance_ratio_)
- components = pca.transform(x)
- #print(components[0])
- #print(np.dot(pca.components_, x[0, :]))
- num_components = 9
- data_size, num_features = x.shape
- cutoffs1 = np.zeros(num_features)
- cutoffs2 = np.zeros(num_features)
- # находим порог для каждой главной компоненты
- for k in range(num_components):
- # разбиваем диапазон значений k-й компоненты на 100 частей
- grid = np.linspace(np.min(components[:, k]), np.max(components[:, k]), 100, endpoint=False)
- min_entropy = None
- optimal_cutoff = None
- for cutoff in grid:
- y_pred = np.where(components[:, k] >= cutoff, 1, 0).reshape(-1, 1)
- cm = confusion_matrix(y, y_pred)
- #print(cm)
- N0 = cm[0, 0] + cm[1, 0] # число предсказанных "0"
- N1 = cm[0, 1] + cm[1, 1] # число предсказанных "1"
- if N1 == 0 or N0 == 0:
- continue
- p0 = cm[0, 0] / N0
- p1 = cm[1, 1] / N1
- if p0 == 0 or p1 == 0:
- continue
- entropy = -(N1/data_size) * math.log(p1) - (N0/data_size) * math.log(p0) + \
- (N1/data_size) * math.log(N1/data_size) + (N0/data_size) * math.log(N0/data_size)
- if min_entropy is None or entropy < min_entropy:
- min_entropy = entropy
- optimal_cutoff = cutoff
- # print(cm)
- print("k =", k, " cutoff =", optimal_cutoff)
- cutoffs1[k] = optimal_cutoff
- min_entropy = None
- optimal_cutoff = None
- for cutoff in grid:
- y_pred = np.where(components[:, k] <= cutoff, 1, 0).reshape(-1, 1)
- cm = confusion_matrix(y, y_pred)
- #print(cm)
- N0 = cm[0, 0] + cm[1, 0] # число предсказанных "0"
- N1 = cm[0, 1] + cm[1, 1] # число предсказанных "1"
- if N1 == 0 or N0 == 0:
- continue
- p0 = cm[0, 0] / N0
- p1 = cm[1, 1] / N1
- if p0 == 0 or p1 == 0:
- continue
- entropy = -(N1/data_size) * math.log(p1) - (N0/data_size) * math.log(p0) + \
- (N1/data_size) * math.log(N1/data_size) + (N0/data_size) * math.log(N0/data_size)
- if min_entropy is None or entropy < min_entropy:
- min_entropy = entropy
- optimal_cutoff = cutoff
- # print(cm)
- print("k =", k, " cutoff =", optimal_cutoff)
- cutoffs2[k] = optimal_cutoff
- # обучаем модель логистической регрессии на дихотомизированных главных компонентах
- bin_components = np.zeros((data_size, 2 * num_components))
- for i in range(data_size):
- for k in range(num_components):
- bin_components[i, k] = 1 if components[i, k] >= cutoffs1[k] else 0
- bin_components[i, num_components + k] = 1 if components[i, k] <= cutoffs2[k] else 0
- log_reg = LogisticRegression()
- log_reg.fit(bin_components, y)
- self.pca = pca
- self.cutoffs_upper = cutoffs1
- self.cutoffs_lower = cutoffs2
- self.log_reg = log_reg
- def predict_proba(self, x):
- components = self.pca.transform(x)
- data_size, num_features = x.shape
- num_components = 9
- bin_components = np.zeros((data_size, 2 * num_components))
- for i in range(data_size):
- for k in range(num_components):
- bin_components[i, k] = 1 if components[i, k] >= self.cutoffs_upper[k] else 0
- bin_components[i, num_components + k] = 1 if components[i, k] <= self.cutoffs_lower[k] else 0
- return self.log_reg.predict_proba(bin_components)
- def read_data(file_name, x_num, y_num):
- with open(file_name) as fp:
- reader = csv.reader(fp, delimiter=";")
- next(reader, None) # пропустить заголовки
- data_str = [row for row in reader]
- data_size = len(data_str)
- data = [[] for _ in range(data_size)]
- for i in range(data_size):
- try:
- data[i] = list(map(float, data_str[i]))
- assert(len(data[i]) == x_num + y_num)
- except (ValueError, AssertionError) as e:
- print(f"Error processing row {i+1}: {e}")
- continue
- data_x = np.array([[data[i][j] for j in range(x_num)] for i in range(data_size)])
- data_y = np.array([[data[i][x_num + j] for j in range(y_num)] for i in range(data_size)])
- return data_x, data_y
- file_name = 'data0.csv' # Замените 'your_data_file.csv' на путь к вашему файлу данных
- x_num = 29
- y_num = 3
- data_x, data_y = read_data(file_name, x_num, y_num)
- def k_fold_cross_validation(data_x, data_y, k=5):
- skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)
- accuracy_scores = []
- sensitivity_scores = []
- specificity_scores = auc_scores = []
- data_z = np.where(np.mean(data_y, axis=1) >= 75, 1, 0)
- for i, (train_indices, test_indices) in enumerate(skf.split(data_x, data_z)):
- train_x, test_x = data_x[train_indices], data_x[test_indices]
- train_z, test_z = data_z[train_indices], data_z[test_indices]
- # model = LogisticRegression()
- model = InterpretableModel()
- model.fit(train_x, train_z) # Обучение модели
- # predictions = model.predict(test_x)
- predictions = model.predict_proba(test_x)[:, 1]
- accuracy = accuracy_score(test_z, predictions > 0.5)
- accuracy_scores.append(accuracy)
- cm = confusion_matrix(test_z, predictions > 0.5)
- sensitivity = cm[1, 1] / (cm[1, 0] + cm[1, 1])
- specificity = cm[0, 0] / (cm[0, 0] + cm[0, 1])
- sensitivity_scores.append(sensitivity)
- specificity_scores.append(specificity)
- # Рассчитаем AUC
- fpr, tpr, _ = roc_curve(test_z, predictions)
- auc = roc_auc_score(test_z, predictions)
- auc_scores.append(auc)
- return accuracy_scores, sensitivity_scores, specificity_scores, auc_scores
- def final_test(data_x, data_y):
- data_z = np.where(np.mean(data_y, axis=1) >= 75, 1, 0)
- train_x, test_x, train_z, test_z = train_test_split(data_x, data_z, test_size=0.2, random_state=42)
- # model = LogisticRegression()
- model = InterpretableModel()
- model.fit(train_x, train_z) # Обучение модели
- #predictions = model.predict(test_x)
- predictions = model.predict_proba(test_x)[:, 1]
- accuracy = accuracy_score(test_z, predictions > 0.5)
- print(test_z)
- print(predictions)
- # Рассчитаем AUC для финального теста
- fpr, tpr, _ = roc_curve(test_z, predictions)
- final_auc = roc_auc_score(test_z, predictions)
- return accuracy, final_auc
- # Вывод статистики кросс-валидации
- accuracy_scores, sensitivity_scores, specificity_scores, auc_scores = k_fold_cross_validation(data_x, data_y)
- print(f'Cross-Validation Accuracy: {np.mean(accuracy_scores):.2%} (std: {np.std(accuracy_scores):.2%})')
- print(f'Cross-Validation Sensitivity: {np.mean(sensitivity_scores):.2%}')
- print(f'Cross-Validation Specificity: {np.mean(specificity_scores):.2%}')
- print(f'Cross-Validation AUC: {np.mean(auc_scores):.2%} (std: {np.std(auc_scores):.2%})')
- # Итоговое тестирование
- final_accuracy, final_auc = final_test(data_x, data_y)
- print(f'Final Test Accuracy: {final_accuracy:.2%}')
- print(f'Final Test AUC: {final_auc:.2%}')
- model = InterpretableModel()
- #model = LogisticRegression()
- data_z = np.where(np.mean(data_y, axis=1) >= 75, 1, 0)
- print(data_z)
- train_x, test_x, train_z, test_z = train_test_split(data_x, data_z, test_size=0.2, random_state=42)
- model.fit(train_x, train_z)
- pred = model.predict_proba(test_x)[:, 1]
- fpr, tpr, _ = roc_curve(test_z, pred)
- auc = roc_auc_score(test_z, pred)
- print(auc)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement