Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class LogisticRegression:
- def __init__(self, _lambda=0.05,
- lr = 0.001,
- n_iters = 100,
- tolerance = 0.0001,
- min_iter_tolerance = 10,
- privacy_mechanism = None,
- epsilon = 1e-2,
- delta =1e-5):
- if(min_iter_tolerance < 0):
- raise Exception('Minimum iteration can not be negative.')
- if privacy_mechanism:
- if type(privacy_mechanism) == str:
- if not (privacy_mechanism.lower() == 'laplace' or privacy_mechanism.lower() == 'gaussian'):
- raise Exception('Differential privacy mechanism should be Laplace or Gaussian.')
- else:
- self.differential_privacy_mechanism = privacy_mechanism.lower()
- if (delta <= 0 or epsilon <= 0):
- raise Exception('Privacy parameter delta or epsilon should be positive.')
- self.epsilon = epsilon
- self.delta = delta
- else:
- raise Exception('Privacy mechanism should be a string.')
- else:
- self.differential_privacy_mechanism = privacy_mechanism
- self._lambda = _lambda
- self.lr = lr
- self.n_iters = n_iters
- self.tolerance = tolerance
- self.iterTolerance = min_iter_tolerance
- self.weights = None
- self.J_train = None
- self.J_validation = None
- self.norm_of_gradient_dw = None
- self.max_iteration_required = None
- def fit(self, X, y, X_validation, y_validation):
- n_samples, n_features = X.shape # m x n
- # weight bias in one vector -> [b, W]
- # zero initialization
- # self.weights = np.zeros((1,n_features + 1)) # 1st element is bias 'b'
- # self.weights[0, 0] = np.random.rand() # self.bias # 1st element of weight (bias) random initialization
- # random initialization
- self.weights = np.reshape(np.random.rand(n_features+1), (1,n_features+1)) # [b, W] # 1x(n+1)
- X = np.concatenate((np.ones((n_samples, 1)), X), axis = 1) # 1st column all '1' to be multiplied by bias 'b'
- X_validation = np.concatenate((np.ones((X_validation.shape[0], 1)), X_validation), axis = 1)
- self.J_train = []
- self.J_validation = []
- self.norm_of_gradient_dw = []
- converge_count = 0;
- for i in tqdm(range(self.n_iters)):
- y_hat, dw, cost_train = self.calculateGradient(X, y, isTrainData=True) # db,
- y_hat_validation, cost_validation = self.calculateGradient(X_validation, y_validation)
- # weight and bias update
- self.weights = self.weights - (self.lr*(dw.T))
- # Norm of the Gradient(dJ/dw)
- self.norm_of_gradient_dw.append(np.linalg.norm(dw))
- # train cost
- self.J_train.append(cost_train)
- # validation cost
- self.J_validation.append(cost_validation)
- print('Training loss = {} and Valaidation loss = {} after {} iteration'.format(cost_train, cost_validation, i))
- # breaking condition after convergence (reach at tolerance level)
- # if(np.all(abs(dw)) <= self.tolerance):
- # break
- if(np.abs(self.J_train[int(i)-1] - self.J_train[int(i)]) <= self.tolerance
- and i != 0
- and i >= self.iterTolerance):
- """ if only number of iteration is greater or equal to minimum number of iterations permitted
- then the convergence will be checked"""
- converge_count += 1
- """print('{} {} {} {}'.format(converge_count,
- self.J_train[int(i)-1],
- self.J_train[int(i)],
- np.abs(self.J_train[int(i)-1] - self.J_train[int(i)])))"""
- if converge_count >= 5:
- self.max_iteration_required = i - converge_count + 1
- break
- """converge_count is used totest for consecutive 6 (5 counts) points if converges then the max_iteration
- for converge will be current_iteration-count + 1 (+1 because converge_count increases before check
- for break)"""
- else:
- converge_count = 0 # to avoid increase count less than 3 consecutive value in tolerance level
- def sigmoid_function(self, var):
- return 1.0 / (1.0 + np.exp(-(var)))
- def hypothesis(self, X):
- linear_model = np.dot(self.weights, X.T) # + self.bias # m x 1
- y_h = self.sigmoid_function(linear_model)
- y_h[y_h==0.0] = 0.000000000001 # to prevent log(0) in cost
- y_h[y_h==1.0] = 0.999999999999 # to prevent log(1) in cost
- return y_h
- def calculateGradient(self, X, y, isTrainData = False):
- n_samples = X.shape[0]
- y_hat = self.hypothesis(X)
- # calculate cost
- cost = self.cost(y, y_hat, n_samples)
- if isTrainData == True:
- # calculate gradiant
- dw = (1/n_samples)*np.dot(X.T,(y_hat-y.T).T) + np.sum((self._lambda/n_samples)*self.weights) # (n+1) x 1 # dJ/dw
- dw[0,0] = (1/n_samples)*np.sum(y_hat-y.T) # 1 x 1 # dJ/db # bias
- # print('shape of dw: ', dw.shape)
- # print(f'dw: {dw}\n db: {db}')
- if self.differential_privacy_mechanism:
- Delta_f = 1/n_samples
- if self.differential_privacy_mechanism == 'laplace':
- sigma = Delta_f / self.epsilon
- dw += np.random.laplace(loc=0.0, scale=sigma, size=(X.shape[1], 1)) # (n_features + 1)x1
- elif self.differential_privacy_mechanism == 'gaussian':
- sigma = (Delta_f / self.epsilon)*math.sqrt(2*math.log(1.25/self.delta))
- sigma = sigma**2 # sigma_square
- dw += np.random.normal(loc=0.0, scale=sigma, size=(X.shape[1], 1))
- return y_hat, dw, cost # ,db
- else:
- return y_hat,cost
- def cost(self, y, y_hat,n_samples):
- cost = (-1.0/n_samples)
- cost *= np.sum((y.T * np.log(y_hat)) + ((1.0-y.T) * (np.log(1.0-y_hat))))
- # cost += (0.5*self._lambda) * np.sum(np.dot(self.weights.T,self.weights)) # (0.5*self._lambda/n_samples)
- cost += (0.5*(self._lambda/n_samples)) * np.sum(np.dot(self.weights.T,self.weights)) # (0.5*self._lambda/n_samples)
- return cost
- def predict(self, X, weights, threshold = 0.5): # bias,
- bias = weights[0,0]
- weights_without_bias = weights[:, 1:]
- linear_model = np.dot(weights_without_bias, X.T) + bias
- y_predict = self.sigmoid_function(linear_model)
- y_predict[y_predict >= threshold] = 1.0 # converts real to binary
- y_predict[y_predict < threshold] = 0.0
- return y_predict
- def prediction_result(self, y_true, y_predict):
- if (y_true.shape[1] != y_predict.shape[1]): # assumed y_true, y_predict to be row vector
- raise Exception("Label and prediction vector's length are not equal.")
- tp = tn = fp = fn = 0
- tp = np.sum((y_predict == y_true) & (y_predict == 1.0)) # true_value == 1 and predicted == 1
- tn = np.sum((y_predict == y_true) & (y_predict == 0.0)) # true_value == 0 and predicted == 0
- fp = np.sum((y_predict != y_true) & (y_predict == 1.0)) # true_value == 0 and predicted == 1
- fn = np.sum((y_predict != y_true) & (y_predict == 0.0)) # true_value == 1 and predicted == 0
- """precision = tp / (tp+fp) ---> tp = true +ve, fp = false +ve
- recall or TPR = tp / (tp+fn) ---> tp = true +ve, fn = false -ve, TPR = true +ve rate
- FPR = fp / (tn+fp) ---> tn = true -ve, fp = false +ve"""
- accuracy = (tp + tn) / (tp + tn + fp +fn) if(tp + tn + fp +fn) != 0 else 0
- precision = tp / (tp + fp) if(tp + fp) != 0 else 0
- recall_or_TPR = tp / (tp + fn) if(tp + fn) != 0 else 0
- FPR = fp / (tn + fp) if(tn + fp) != 0 else 0
- return {'acc': accuracy, 'pre': precision, 'tpr': recall_or_TPR, 'fpr': FPR}
- def summary(self):
- result = {
- 'weights': self.weights ,
- 'J_train': self.J_train,
- 'J_validation': self.J_validation,
- 'norm_of_gradient_dw': self.norm_of_gradient_dw,
- 'max_iteration_required': self.max_iteration_required
- }
- return result
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement