Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- def _define_minibatches(X, Y, mb_size, seed):
- cl = Y.shape[0]
- m = X.shape[1]
- mini_batches = []
- permutation = list(np.random.permutation(m))
- shuffled_X = X[:, permutation]
- shuffled_Y = Y[:, permutation]
- mb_floor = np.floor(m/mb_size).astype(int) #1!!
- for k in range(0, mb_floor):
- mini_batch_X = shuffled_X[:, mb_size*k:mb_size*(k+1)]
- mini_batch_Y = shuffled_Y[:, mb_size*k:mb_size*(k+1)]
- mini_batch = (mini_batch_X, mini_batch_Y)
- mini_batches.append(mini_batch)
- if m % mb_size != 0:
- mini_batch_X = shuffled_X[:,(m-m%mb_size):]
- mini_batch_Y = shuffled_Y[:,(m-m%mb_size):]
- mini_batch = (mini_batch_X, mini_batch_Y)
- mini_batches.append(mini_batch)
- return mini_batches
- def init_parameters(shape, method, bias = True):
- if method == 'he':
- w = np.random.randn(shape[0], shape[1])*np.sqrt(2/shape[1])
- if method == 'rand':
- w = np.random.randn(shape[0], shape[1])
- b = np.zeros((shape[0], 1)) if bias == True else None
- return (w, b)
- class relu:
- @staticmethod
- def activate(Z):
- s = np.maximum(0,Z)
- return s
- @staticmethod
- def backprop(dA, Z):
- dZ = np.multiply(dA, np.int64(Z > 0))
- return dZ
- class softmax:
- @staticmethod
- def activate(Z):
- exp_scores = np.exp(Z - np.max(Z))
- s = exp_scores / np.sum(exp_scores, axis=0, keepdims=True)
- return s
- @staticmethod
- def backprop(dA, Z):
- dZ = dA
- return dZ
- class layer():
- def __init__(self, hidden_units, activation_func, init_method = 'he'):
- self.hidden_units = hidden_units
- self.activation_func = activation_func
- self.init_method = init_method
- @classmethod
- def check(cls):
- return 'layer'
- def initialize(self, shape):
- hidden_units_prev = shape[0]
- self.W, self.b = init_parameters((self.hidden_units, hidden_units_prev),
- method = self.init_method)
- return (self.hidden_units, hidden_units_prev)
- def forward(self, Ap):
- Z = np.dot(self.W, Ap)+self.b
- A = self.activation_func.activate(Z)
- return A, ((Z, self.W), Ap)
- def backward_and_update(self, dA, cache, lr):
- ((Z,w), ap) = cache
- dZ = self.activation_func.backprop(dA, Z)
- m = ap.shape[1]
- d_w = np.dot(dZ, ap.T)*(1/m)
- d_b = np.sum(dZ, axis=1, keepdims=True)*(1/m)
- d_a_prev = w.T.dot(dZ)
- assert (d_a_prev.shape == ap.shape)
- assert (d_w.shape == self.W.shape)
- assert (d_b.shape == self.b.shape)
- self.W = self.W - (lr*d_w)
- self.b = self.b - (lr*d_b)
- return d_a_prev
- class rnn_layer(layer):
- def initialize(self, shape):
- n_x, m, T_x = shape
- hidden_units_prev = n_x
- self.Wx, self.b = init_parameters((self.hidden_units, hidden_units_prev),
- method = self.init_method)
- self.Wa = init_parameters((self.hidden_units, self.hidden_units),
- method = self.init_method,
- bias = False)
- self.Aa = np.zeros((self.hidden_units, m, T_x))
- self.A0 = init_parameters((self.hidden_units, m),
- method = self.init_method,
- bias = False)
- return (self.hidden_units, m, T_x)
- def forward(self, Ax, Aa):
- assert len(Ax.shape)==3
- Tx = Ax.shape[2]
- for t in range(Tx):
- Z = np.dot(self.Wx, Ax[:,:,t]) + np.dot(self.Wa, Aa[:,:,t]) + b
- A = self.activation_func.activate(Z)
- Aa[:,:t] = A
- class nnet():
- def __init__(self):
- self._layers = []
- self.parameters = []
- def add(self, layer):
- if layer.check() == 'layer':
- self._layers.append(layer)
- else:
- raise Exception('Unacceptable object')
- def train(self, X, Y, lr=0.01, num_epoch = 10000):
- self.costs = []
- if len(self._layers)==0:
- raise Exception('Layers were not added')
- #!!!!!!!!!!!!!!!!!!!!!!!
- prev_shape = X.shape #for columns as features
- #initialize parameters
- for n, l in enumerate(self._layers):
- h = l.initialize(prev_shape)
- prev_shape = h
- print('Parameters are initialized\n')
- seed = 10
- ###################
- for i in range(num_epoch):
- seed = seed + 1
- epoch_cost = 0
- minibatches = _define_minibatches(X, Y, 64, seed)
- for minibatch in minibatches:
- (minibatch_X, minibatch_Y) = minibatch
- caches = []
- A_prev = minibatch_X
- #forward propagation
- for n, l in enumerate(self._layers):
- A, cache = l.forward(A_prev)
- A_prev = A
- caches.append(cache)
- #find current cost and dAL for softmax
- epoch_cost += - np.mean(np.sum(minibatch_Y*np.log(A), axis=0))
- dA = A-minibatch_Y
- #backward propagation and parameters update
- for n,l in reversed(list(enumerate(self._layers))):
- dA_prev = l.backward_and_update(dA, caches[n], lr)
- dA = dA_prev
- if i % 1000 == 0:
- print("Cost after epoch %i: %f" %(i, epoch_cost))
- if i % 100 == 0:
- self.costs.append(epoch_cost)
- plt.plot(self.costs)
- plt.ylabel('cost')
- plt.xlabel('iterations (per hundreds)')
- plt.title("Learning rate =" + str(lr))
- plt.show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement