Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- from sklearn import datasets
- import scipy.stats
- import random
- import pandas as pd
- import matplotlib.pyplot as plt
- # https://datascience.stackexchange.com/questions/39142/normalize-matrix-in-python-numpy
- def scale(X, x_min, x_max):
- nom = (X-X.min(axis=0))*(x_max-x_min)
- denom = X.max(axis=0) - X.min(axis=0)
- denom[denom==0] = 1
- return x_min + nom/denom
- class Preprocessor(object):
- def __init__(self, **args):
- self.__dict__.update(args)
- # OPTIONAL TODO
- self.max = None
- self.min = None
- self.standard_deviation = None
- self.median = []
- self.iqr = []
- self.dataframe = None
- self.transformed_y = None
- self.noise_quantile= []
- def fit(self, X, y=None):
- '''
- Recibe los datos de entrada X y opcionalmente los datos de salida esperados y para preparar los modelos necesarios para
- el preprocesado de datos.
- X Matriz bidimiensional con forma (u,v) donde u es el número de tuplas y v el número de variables
- y (opcional) Vector unidimensional con longitud u que contiene las salidas esperadas para cada una de las tuplas de X
- No devuelve nada
- Params:
- Median: insertado en tuplas con valores nulos
- dataframe: copia de X en Pandas Dataframe. Manipulado por datos
- Returns:
- None
- Observaciones: Este método es llamado por transform dado que es necesario para realizar las operaciones con éxito.
- Podría declararse como privado. No se ha hecho por no modificar las bases dadas.
- '''
- self.max = np.amax(X, axis=0)
- self.min = np.amin(X, axis=0)
- self.median = np.median(X, axis=0)
- self.standard_deviation = np.std(X, axis=0)
- self.noise_quantile = np.quantile(X, 0.9, axis=0)
- if X.shape[0] == y.shape[0]:
- self.dataframe = pd.DataFrame(X)
- self.dataframe['target'] = y
- self.transformed_y = pd.DataFrame(y)
- else:
- print("ERROR: Unmatching target and data sizes")
- exit(1)
- def transform(self, X, y=None):
- '''
- Recibe los datos de entrada X y opcionalmente los datos de salida esperados y para preprocesar los datos de entrada.
- X Matriz bidimiensional con forma (u,v) donde u es el número de tuplas y v el número de variables
- y (opcional) Vector unidimensional con longitud u que contiene las salidas esperadas para cada una de las tuplas de X
- Devuelve el conjunto de datos X transformado
- Operaciones:
- Limpieza de datos: Inserción de la mediana en los valores nulos. Boston carece de los mismos pero es necesario para casos generales.
- Reducción del dataset: Matriz de correlación para eliminar las tuplas irrelevantes. En este caso concreto obtenemos una relación
- razonable usando 2 columnas en lugar de 13.
- Idea de usar dataframe.corr: https://towardsdatascience.com/linear-regression-on-boston-housing-dataset-f409b7e4a155
- Returns:
- cor_dataset: X reducido y transformado a pd.Dataset
- transformed_y: y cambiado a tipo pd.Dataset
- Observaciones:
- No se han normalizado los datos. Comentado se presenta un esquema de la implementación pensada para hacerlo que hubiera sido
- normalización robusta. Sin embargo su apliación empeora considerablemente el resultado.
- '''
- self.fit(X,y)
- #Eliminar tuplas null usando dataframes
- if self.dataframe.isnull().sum().sum():
- is_na_dataframe = pd.isna(self.dataframe)
- for row in range(self.dataframe.shape[0]):
- for column in range(self.dataframe.shape[1]):
- if is_na_dataframe[row,column] == True:
- self.dataframe[row,column] = self.median[column]
- self.dataframe = self.dataframe.to_numpy()
- # eliminamos el ruido
- for row in self.dataframe:
- temp = row[1:] - self.noise_quantile
- print("lo que devuelve any")
- print(temp.any() <= 0)
- print(temp)
- if temp.any() <= 0:
- print("borramos la row")
- print(row)
- self.dataframe = np.delete(self.dataframe, row, 0)
- # normalizamos los datos
- self.dataframe = scale(self.dataframe[:,1:], self.min, self.max)
- '''
- # Normalización de los datos mediante normalización robusta
- self.median = np.median(cor_dataset,axis=1)
- self.iqr = scipy.stats.iqr(cor_dataset, axis=1, rng=(25, 75))
- print(self.iqr.shape)
- print(self.median.shape)
- print(cor_dataset.shape)
- for row in range(cor_dataset.shape[0]):
- for column in range(cor_dataset.shape[1]):
- if self.iqr[row] == 0:
- cor_dataset.iloc[row,column] = None
- continue
- cor_dataset.iloc[row,column] = (cor_dataset.iloc[row,column] - self.median[row])/self.iqr[row]
- #Creamos una nueva media para valores normalizados
- mediana_normalizada = np.median(cor_dataset,axis=0)
- # Si algún dato fuera problemático por el valor intercuartílico lo rellenamos con la media devalores normalizados
- if self.dataframe.isnull().sum().sum() != 0:
- is_na_dataframe = pd.isna(self.cor_dataset)
- for row in range(self.dataframe.shape[0]):
- for column in range(self.dataframe.shape[1]):
- if is_na_dataframe[row, column] == True:
- cor_dataset.iloc[row, column] = mediana_normalizada
- '''
- #return cor_dataset,self.transformed_y
- return self.dataframe, self.transformed_y
- class GradientDescent(object):
- def __init__(self, alfa, **args):
- self.alfa = alfa
- self.__dict__.update(args)
- # OPTIONAL TODO
- self.thetas = None
- def fit(self, X, y):
- '''
- Recibe los datos de entrada X y los datos de salida esperados y para entrenar el modelo de descenso de gradiente.
- X Matriz bidimiensional con forma (u,v) donde u es el número de tuplas y v el número de variables
- y Vector unidimensional con longitud u que contiene las salidas esperadas para cada una de las tuplas de X
- No devuelve nada
- Método de entrenamiento de la regresión. Usa el algoritmo de la diapositiva 8 tema 3
- params:
- thetas: Parámetro a optimizar. Es actualizado con cada vuelta del bucle y usado para calcular el mse
- returns:
- None
- cambios: el valor de theta.
- Observaciones: El código comprueba mínimos locales. Podría usarse memoria para guardar mínimos y recorrer el
- espectro completo de alpha pero mirando el coste de computación y error obtenido esta implementación se ha considerado
- óptima.
- '''
- # TODO
- self.thetas = np.ones(X.shape[1])
- #TODO
- print("Untrained mse: ",self.mse(X,y))
- while True:
- old_mse = self.mse(X, y)
- self.fit_step(X, y)
- trained_mse = self.mse(X, y)
- if trained_mse > old_mse :
- break
- print("New mse: " , trained_mse)
- def fit_step(self, X, y):
- '''
- Recibe los datos de entrada X y los datos de salida esperados y para realizar una actualización en batch
- de los valores thetas
- X Matriz bidimiensional con forma (u,v) donde u es el número de tuplas y v el número de variables
- y Vector unidimensional con longitud u que contiene las salidas esperadas para cada una de las tuplas de X
- No devuelve nada
- Llamado únicamente por fit() este método es quien hace los cambios reales sobre thetas.
- Params:
- hypothesis: llamada al método predict() para obtener el resultado obtenido con las thetas actuales.
- summation: equivalente al sumatorio de la fórmula de la regresión (pg 17 tema 3)
- y: Soluciones reales a contrastar con la hipótesis
- theta: solución a la fórmula
- Returns: N/A
- Cambios:
- Actualiza el valor de Theta
- '''
- # TODO
- hypothesis = self.predict(X)
- for column in range(X.shape[1]):
- summation = 0
- theta = 0
- for row in (range(X.shape[0])):
- summation += ((hypothesis[row] - y.iloc[row].item()) * X.iloc[row,column])
- theta = self.thetas[column] - ((self.alfa / X.shape[0]) * summation)
- # Guardamos en nuestro array de thetas el resultado obtenido para cada iteración de i
- self.thetas[column] = theta
- def predict(self, X):
- '''
- Recibe los datos de entrada X para estimar la salida estimada y_ segun el modelo entrenado
- X Matriz bidimiensional con forma (u,v) donde u es el número de tuplas y v el número de variables
- y Vector unidimensional con longitud u que contiene las salidas esperadas para cada una de las tuplas de X
- Devuelve un vector unidimensional y_ de longitud u el cual contiene el conjunto de salidas estimadas para X
- Transponemos thetas (n,) para poder multiplicarlo por X
- '''
- # TODO
- #Multiplicamos cada X por su índice
- return X.dot(self.thetas.T)
- def mse(self, X, y):
- '''
- Recibe los datos de entrada X y los datos de salida esperados y para obtener el error cuadratico medio
- del modelo entrenado
- X Matriz bidimiensional con forma (u,v) donde u es el número de tuplas y v el número de variables
- y Vector unidimensional con longitud u que contiene las salidas esperadas para cada una de las tuplas de X
- Devuelve el error cuadratico medio (float)
- Aplicada fórmula de MSE del tema 2 diapositiva 3
- Params:
- y_pred = hipótesis obtenida por la regresión
- mse = error cuadrático medio a calcular
- m = número de elementos en X (filas)
- Returns:
- mse/m = error cuadrático medio
- '''
- # TODO
- y_pred = self.predict(X)
- mse = 0
- m = X.shape[0]
- i = 0
- for prediction in y_pred:
- mse += ((prediction - y.iloc[i].item()) ** 2)
- i += 1
- return mse / m
- if __name__ == "__main__":
- np.random.seed(1)
- from sklearn import datasets
- from sklearn.preprocessing import MinMaxScaler
- np.random.seed(1)
- X, y = datasets.load_diabetes(return_X_y=True)
- preprocesado = Preprocessor()
- X,y = preprocesado.transform(X,y)
- print("cuantil")
- print(X.shape)
- plt.plot(X[1,])
- plt.show()
- '''
- X, y = datasets.load_boston(return_X_y=True)
- preprocessor = Preprocessor()
- # Dividimos el número total de tuplas en un 70%
- # Podríamos considerar repartir las tuplas de manera aleatoria para mejorar eficiencia
- # Preprocesado de datos (utilizar el modelo de preprocesado creado)
- # TODO
- X,y = preprocessor.transform(X,y)
- num_of_rows = int((X.shape[0]) * 0.8)
- X_train = X.iloc[:num_of_rows] # indexes rows for training data
- X_test = X.iloc[num_of_rows:] # indexes rows for test data
- y_train = y.iloc[:num_of_rows]
- y_test = y.iloc[num_of_rows:]
- # Creación y entrenamiento del modelo de descenso de gradiente
- # TODO
- # Error mínimo conseguido 25.05 con este valor de alfa. Si es mayor salta enormemente los valores óptimos
- GD = GradientDescent(0.01)
- GD.fit(X_train, y_train)
- #Resultado final
- print("MSE for test_subset: ")
- print(GD.mse(X_test, y_test))
- '''
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement