Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class SLogRegOptimizer(BaseEstimator, ClassifierMixin):
- def __init__(self, init_type='zero',
- max_iter=10000,
- eps=1e-4,
- method='SGD',
- batch_size=100,
- params={'lr': 0.1, 'C': 0.01},
- validation=False):
- self.init_type = init_type
- self.max_iter = max_iter
- self.batch_size = batch_size
- self.eps = eps
- self.method = method
- self.params = params
- self.lr = self.params["lr"]
- self.C = self.params["C"]
- self.val = validation
- if self.method == 'momentum':
- self.gamma = self.params["momentum_term"]
- self.v_wt = 0
- self.v_bt = 0
- if self.method == 'adagrad':
- self.alpha = self.params["alpha"]
- def _loss(self, X, y, margin):
- return np.log(1 + np.exp(margin)).mean(axis=0)[0]
- def _grad_w(self, X, y, margin):
- return - (y * X * (1 - 1 / (1 + np.exp(margin)))).mean(axis=0).reshape(-1, 1) + self.C * self.w
- def _grad_b(self, X, y, margin):
- return - (y * (1 - 1 / (1 + np.exp(margin)))).mean(axis=0)
- def _update_weights_sgd(self, grad_w, grad_b):
- self.w -= self.lr * grad_w
- self.b -= self.lr * grad_b
- return self.lr * grad_w
- def _update_weights_adagrad(self, grad_w, grad_b):
- delta_w = self.lr * grad_w / np.sqrt(self.eps + self._cum_w)
- delta_b = self.lr * grad_b / np.sqrt(self.eps + self._cum_b)
- self.w -= delta_w
- self.b -= delta_b
- self._cum_w += grad_w ** 2
- self._cum_b += grad_b ** 2
- return self.lr * grad_w / np.sqrt(self.alpha + self._cum_w)
- def _update_weights_momentum(self, grad_w, grad_b):
- self.v_wt = self.v_wt * self.gamma + self.lr * grad_w
- self.v_bt = self.v_bt * self.gamma + self.lr * grad_b
- self.w -= self.v_wt
- self.b -= self.v_bt
- return self.v_wt
- def _gen_batch(self, X, y, max_idx):
- idx = random.randint(0, max_idx, size=self.batch_size)
- return X[idx], y[idx]
- def fit(self, X, y):
- self.history = []
- if isinstance(X, pd.DataFrame):
- X = X.values
- if isinstance(y, pd.DataFrame):
- y = y.values
- if len(y.shape) == 1:
- y = y.reshape(-1, 1)
- if self.val:
- X, X_val, y, y_val = train_test_split(X, y, test_size=0.3)
- self.val_history = []
- if self.method == 'momentum':
- updater = self._update_weights_momentum
- if self.method == 'SGD':
- updater = self._update_weights_sgd
- if self.method == 'adagrad':
- updater = self._update_weights_adagrad
- max_batch_idx = X.shape[0]
- weights_dim = X.shape[1]
- if self.init_type == 'zero':
- self.w = np.zeros((weights_dim, 1))
- self.b = 0
- if self.init_type == 'random':
- self.w = random.randn(weights_dim, 1)
- self.b = random.randn(1)
- margin = - y * (X.dot(self.w) + self.b)
- if self.method == 'adagrad':
- self._cum_w = 0.1
- self._cum_b = 0.1
- iter_num = 0
- w_delta = 1000
- while np.linalg.norm(w_delta) > self.eps and iter_num != self.max_iter:
- X_batch, y_batch = self._gen_batch(X, y, max_batch_idx)
- margin = - y_batch * (X_batch.dot(self.w) + self.b)
- grad_w = self._grad_w(X_batch, y_batch, margin)
- grad_b = self._grad_b(X_batch, y_batch, margin)
- w_delta = updater(grad_w, grad_b)
- iter_num += 1
- margin = - y * (X.dot(self.w) + self.b)
- self.history.append(self._loss(X, y, margin))
- if self.val:
- margin = - y_val * (X_val.dot(self.w) + self.b)
- self.val_history.append(self._loss(X_val, y_val, margin))
- self._conv_iter = iter_num
- def predict(self, X):
- return 1 / (1 + np.exp(- (X.dot(self.w) + self.b) ))
- def predict_proba(self, X):
- if isinstance(X, pd.DataFrame):
- X = X.values
- pred = 1 / (1 + np.exp(- (X.dot(self.w) + self.b) ))
- return np.concatenate((1 - pred, pred), axis=1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement