Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- import pickle
- from mlxtend.data import loadlocal_mnist
- import math as m
- from scipy import signal
- import matplotlib.pyplot as plt
- import time
- np.random.seed(1) # заставим numpy выдавать одинаковые набор случайных чисел для каждого запуска программы
- np.set_printoptions(suppress=True) # выводить числа в формате 0.123 а не 1.23e-1
- # В `X` находятся изображения для обучения, а в `y` значения соответственно
- # `X.shape` == (60000, 784) # изображения имеют размер 28x28 pix => 28*28=784
- # `y.shape` == (60000,) # каждое значение это число от 0 до 9 то что изображено на соответствующем изображении
- X, y = loadlocal_mnist(
- images_path="train-images.idx3-ubyte",
- labels_path="train-labels.idx1-ubyte")
- # В `Xt` находятся изображения для тестирования, а в `yt` значения соответственно
- # `Xt.shape` == (10000, 784) # изображения имеют размер 28x28 pix => 28*28=784
- # `yt.shape` == (10000,) # каждое значение это число от 0 до 9 то что изображено на соответствующем изображении
- Xt, yt = loadlocal_mnist(
- images_path="t10k-images.idx3-ubyte",
- labels_path="t10k-labels.idx1-ubyte")
- def sigmoid(x, deriv=False):
- if deriv:
- return x * (1 - x)
- return 1.0 / (1.0 + np.exp(-x))
- def convert(y):
- y_d = np.zeros((len(y), 10))
- for idx, val in enumerate(y):
- y_d[idx, val] = 1.0
- return y_d
- X = X * (1 / 255)
- Xt = Xt * (1 / 255)
- # Параметры:
- lr = 1 # значени на которое будет домножаться дельта на каждом шаге
- batch = 60 # кол-во изображений использованное для обучения на каждом шаге
- epochs = 100 # кол-во эпох. Если видно что прогресс есть, но нужно больше итераций
- class MnistConvModel:
- def __init__(self, lr=0.1, batch=60):
- self.lr = lr
- self.batch = batch
- self.filters = 8
- self.W_conv = np.random.uniform(-0.05, 0.05, (self.filters, 3, 3))
- self.W_linear = np.random.uniform(-0.05, 0.05, (self.filters * 26 * 26, 10))
- def load(self, conv, linear):
- with open(conv, 'rb') as f:
- self.W_conv = np.array(pickle.load(f)).reshape((self.filters, 3, 3))
- with open(linear, 'rb') as f:
- self.W_linear = np.array(pickle.load(f)).reshape((self.filters * 26 * 26, -1))
- # Linear Layer
- def linear_forward(self, X):
- return np.dot(X, self.W_linear)
- def linear_backward(self, e):
- return np.dot(e, self.W_linear.T)
- # Sigmoid Layer
- def sigmoid_forward(self, X):
- return sigmoid(X)
- def sigmoid_backward(self, e):
- return e * sigmoid(self.o_sigmoid, True)
- # ReLU Layer
- def relu_forward(self, X):
- #print("reLuForward: ", X.shape)
- X_o = X.copy()
- X_o[X < 0] = 0
- return X_o
- def relu_backward(self, s):
- res = s.copy()
- res[res > 0] = 1.0
- return res
- # Convolution Layer
- def convolution_forward(self, X):
- (filter_channels, filter_height, filter_width) = self.W_conv.shape
- (batch_size, in_channels, in_rows, in_cols) = X.shape
- (out_channels, out_rows, out_cols) = (filter_channels, in_rows - 2, in_cols - 2)
- res = np.zeros((batch_size, out_channels, out_rows, out_cols))
- for batch in range(0, batch_size):
- for och in range(0, out_channels):
- for ich in range(0, in_channels):
- res[batch][och] += signal.convolve2d(X[batch][ich], self.W_conv[och], mode='valid')
- #print(res.shape)
- return res
- def convolution_backward(self, e):
- (filter_channels, filter_height, filter_width) = self.W_conv.shape
- (batch_size, out_channels, out_rows, out_cols) = e.shape
- (in_channels, in_rows, in_cols) = (1, out_rows + 2, out_cols + 2)
- res = np.zeros((batch_size, in_channels, in_rows, in_cols))
- for batch in range(0, batch_size):
- for ich in range(0, in_channels):
- for och in range(0, out_channels):
- res[batch][0] += signal.convolve2d(e[batch][och], self.W_conv[och], mode='full')
- #print(res.shape)
- res = res / len(e) / 8
- return res
- def forward(self, X):
- self.X = X
- self.o_conv = self.convolution_forward(X)
- self.o_relu = self.relu_forward(self.o_conv).reshape(len(X), -1)
- self.o_linear = self.linear_forward(self.o_relu)
- self.o_sigmoid = self.sigmoid_forward(self.o_linear)
- return self.o_sigmoid
- def backward(self, e):
- self.e_sigmoid = self.sigmoid_backward(e)
- self.e_linear = self.linear_backward(self.e_sigmoid)
- self.e_relu = self.relu_backward(self.e_linear.reshape((-1, self.filters, 26, 26)))
- self.e_conv = self.convolution_backward(self.e_relu)
- def calc_gradients(self):
- scaler = 1 / len(self.X)
- # print(self.o_relu.T.shape)
- # print(self.e_sigmoid.shape)
- self.dW_linear = np.dot(self.o_relu.T, self.e_sigmoid) * scaler
- #print( self.e_conv.shape)
- (batch_size, out_channels, out_rows, out_cols) = self.e_conv.shape
- (in_channels, in_rows, in_cols) = (1, out_rows + 2, out_cols + 2)
- # res = np.zeros((batch_size, out_channels, in_rows, in_cols))
- # for batch in range(0, batch_size):
- # for och in range(0, out_channels):
- #
- # res[batch][och] += signal.convolve2d( self.e_relu[batch][och], self.W_conv[och], mode='full')
- # print(self.X[batch][0].shape)
- # print(res[batch][och].shape)
- temp = np.zeros(self.W_conv.shape)
- #print()
- for batch in range(0, batch_size):
- for och in range(0, out_channels):
- # print(res[batch][och].shape)
- # print(self.e_relu[batch][och].shape)
- # print()
- temp[och] += signal.convolve2d(self.e_conv[batch][0], self.e_relu[batch][och], mode='valid')
- #print(self.dW_conv == temp)
- self.dW_conv = temp * scaler
- def update(self):
- self.W_linear -= self.dW_linear * self.lr
- self.W_conv -= self.dW_conv * self.lr
- def mse(o, y):
- return np.sum(np.square(o - y))
- def mse_prime(o, y):
- return 2 * (o - y)
- def validate(model, X, y):
- tp = model.forward(X)
- return np.sum(y == np.argmax(tp, axis=1)) / len(y)
- def train(model, X, y, epochs=100, batch_size=100, validation=None):
- batch_count = m.ceil(len(y) / batch_size)
- #print( batch_count)
- t = np.zeros((len(y), 10))
- np.put_along_axis(t, y.reshape((-1, 1)), 1.0, axis=1)
- losses = np.zeros((epochs * 6))
- lindex = 0
- for epoch in range(0, epochs):
- print("Epoch ", epoch + 1)
- st_t = time.time()
- for index, (bX, bt) in enumerate(zip(np.split(X, batch_count), np.split(t, batch_count))):
- # print(bX.shape)
- res = model.forward(bX)
- error = mse_prime(res, bt)
- model.backward(error)
- model.calc_gradients()
- model.update()
- if index % 100 == 0:
- losses[lindex] = mse(res, bt)
- print(" Loss: ", losses[lindex])
- lindex += 1
- if validation is not None:
- end_t = time.time()
- (model, val_X, val_y) = validation
- print(" Accuracy: ", validate(model, val_X, val_y))
- plt.title("Loss функция")
- plt.xlabel('')
- plt.ylabel('loss')
- plt.grid()
- plt.plot(np.arange(epochs * 6), losses)
- plt.show()
- if __name__ == "__main__":
- model = MnistConvModel()
- X = X.reshape((-1, 1, 28, 28))
- Xt = Xt.reshape((-1, 1, 28, 28))
- train(model, X, y, epochs=10, validation=(model, Xt, yt))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement