Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- import pickle
- from mlxtend.data import loadlocal_mnist
- import math as m
- from scipy import signal
- import matplotlib.pyplot as plt
- import time
- np.random.seed(1) # заставим numpy выдавать одинаковые набор случайных чисел для каждого запуска программы
- np.set_printoptions(suppress=True) # выводить числа в формате 0.123 а не 1.23e-1
- # В `X` находятся изображения для обучения, а в `y` значения соответственно
- # `X.shape` == (60000, 784) # изображения имеют размер 28x28 pix => 28*28=784
- # `y.shape` == (60000,) # каждое значение это число от 0 до 9 то что изображено на соответствующем изображении
- X, y = loadlocal_mnist(
- images_path="F:/python/n1/train-images.idx3-ubyte",
- labels_path="F:/python/n1/train-labels.idx1-ubyte")
- # В `Xt` находятся изображения для тестирования, а в `yt` значения соответственно
- # `Xt.shape` == (10000, 784) # изображения имеют размер 28x28 pix => 28*28=784
- # `yt.shape` == (10000,) # каждое значение это число от 0 до 9 то что изображено на соответствующем изображении
- Xt, yt = loadlocal_mnist(
- images_path="F:/python/n1/t10k-images.idx3-ubyte",
- labels_path="F:/python/n1/t10k-labels.idx1-ubyte")
- def sigmoid(x, deriv=False):
- if deriv:
- return x * (1 - x)
- return 1.0 / (1.0 + np.exp(-x))
- def convert(y):
- y_d = np.zeros((len(y), 10))
- for idx, val in enumerate(y):
- y_d[idx, val] = 1.0
- return y_d
- X = X * (1 / 255)
- Xt = Xt * (1 / 255)
- # Параметры:
- lr = 1 # значени на которое будет домножаться дельта на каждом шаге
- batch = 60 # кол-во изображений использованное для обучения на каждом шаге
- epochs = 100 # кол-во эпох. Если видно что прогресс есть, но нужно больше итераций
- class MnistConvModel:
- def __init__(self, lr=0.1, batch=60):
- self.lr = lr
- self.batch = batch
- self.filters = 8
- self.W_conv = np.random.uniform(-0.05, 0.05, (self.filters, 3, 3))
- self.W_linear = np.random.uniform(-0.05, 0.05, (self.filters * 26 * 26, 10))
- def load(self, conv, linear):
- with open(conv, 'rb') as f:
- self.W_conv = np.array(pickle.load(f)).reshape((self.filters, 3, 3))
- with open(linear, 'rb') as f:
- self.W_linear = np.array(pickle.load(f)).reshape((self.filters * 26 * 26, -1))
- # Linear Layer
- def linear_forward(self, X):
- return np.dot(X, self.W_linear)
- def linear_backward(self, e):
- return np.dot(e, self.W_linear.T)
- # Sigmoid Layer
- def sigmoid_forward(self, X):
- return sigmoid(X)
- def sigmoid_backward(self, e):
- return e * sigmoid(self.o_sigmoid, True)
- # ReLU Layer
- def relu_forward(self, X):
- X_o = X.copy()
- X_o[X < 0] = 0
- return X_o
- def relu_backward(self, s):
- res = s.copy()
- res[res > 0] = 1.0
- return res
- # Convolution Layer
- def convolution_backward(self, e):
- res = np.zeros((len(e), len(e[0]), len(e[0][0]) + len(self.W_conv[0]) - 1, len(e[0][0]) + len(self.W_conv[0]) - 1))
- # res = np.zeros((len(e), 1, len(e[0][0]) + len(self.W_conv[0]) - 1, len(e[0][0]) + len(self.W_conv[0]) - 1))
- # print(e.shape, self.W_conv.shape)
- for b in np.arange(len(e)):
- for ch in np.arange(len(e[b])):
- # print(signal.correlate2d(e[b][ch], self.W_conv[ch], mode="full").shape)
- res[b][ch] += signal.correlate2d(e[b][ch], self.W_conv[ch], mode="full")
- # res[b][0] += signal.correlate2d(e[b][ch], self.W_conv[ch], mode="full")
- # print(np.max(e))
- res /= len(e)
- return res # TODO
- def convolution_forward(self, X):
- (filter_channels, filter_height, filter_width) = self.W_conv.shape
- (batch_size, in_channels, in_rows, in_cols) = X.shape
- (out_channels, out_rows, out_cols) = (filter_channels, in_rows - 2, in_cols - 2)
- res = np.zeros((batch_size, out_channels, out_rows, out_cols))
- for batch in range(0, batch_size):
- for och in range(0, out_channels):
- for ich in range(0, in_channels):
- res[batch][och] += signal.convolve2d(X[batch][ich], self.W_conv[och], mode='valid')
- return res
- def forward(self, X):
- self.X = X
- # print("f", self.X.shape)
- self.o_conv = self.convolution_forward(X)
- # print("f", self.o_conv.shape)
- self.o_relu = self.relu_forward(self.o_conv).reshape(len(X), -1)
- # print("f", self.o_relu.shape)
- self.o_linear = self.linear_forward(self.o_relu)
- self.o_sigmoid = self.sigmoid_forward(self.o_linear)
- return self.o_sigmoid
- def backward(self, e):
- self.e_sigmoid = self.sigmoid_backward(e)
- self.e_linear = self.linear_backward(self.e_sigmoid)
- self.e_relu = self.relu_backward(self.e_linear.reshape((-1, self.filters, 26, 26)))
- self.e_conv = self.convolution_backward(self.e_relu)
- def calc_gradients(self):
- scaler = 1 / len(self.X)
- # print(self.W_conv.shape, self.e_conv.shape, self.o_conv.shape)
- # asd
- self.dW_linear = np.dot(self.o_relu.T, self.e_sigmoid) * scaler
- temp = np.zeros(self.W_conv.shape)
- # print(temp.shape)
- for b in np.arange(len(self.e_conv)):
- for ch in np.arange(len(self.W_conv)):
- # print(self.o_conv.shape, self.e_conv[b][ch].shape, self.o_conv[ch].shape)
- # print(self.e_conv.shape)
- temp[ch] += signal.correlate2d(self.e_conv[b][ch], self.e_relu[b][ch], mode="valid")
- temp *= scaler
- # print(np.max(temp))
- self.dW_conv = temp
- # (self.X.reshape(100, 28, 28).T.dot(self.e_conv.reshape(len(self.X), -1)) * scaler)
- # print(self.dW_conv.shape)
- # asd
- def update(self):
- self.W_linear -= self.dW_linear * self.lr
- self.W_conv -= self.dW_conv * self.lr
- def mse(o, y):
- return np.sum(np.square(o - y))
- def mse_prime(o, y):
- return 2 * (o - y)
- def validate(model, X, y):
- tp = model.forward(X)
- return np.sum(y == np.argmax(tp, axis=1)) / len(y)
- def train(model, X, y, epochs=100, batch_size=100, validation=None):
- batch_count = m.ceil(len(y) / batch_size)
- loss = np.zeros((epochs * 6))
- t = np.zeros((len(y), 10))
- np.put_along_axis(t, y.reshape((-1, 1)), 1.0, axis=1)
- lind = 0
- for epoch in range(0, epochs):
- st_time = time.time()
- print("Epoch ", epoch + 1)
- for index, (bX, bt) in enumerate(zip(np.split(X, batch_count), np.split(t, batch_count))):
- res = model.forward(bX)
- error = mse_prime(res, bt)
- model.backward(error)
- model.calc_gradients()
- model.update()
- if index % 100 == 0:
- loss[lind] = mse(res, bt)
- print(" Loss: ", loss[lind])
- lind += 1
- if validation is not None:
- end_time = time.time()
- (model, val_X, val_y) = validation
- print(" Accuracy: ", validate(model, val_X, val_y), " Time: ", end_time - st_time)
- plt.title("Loss функция") # заголовок
- plt.xlabel("эпоха") # ось абсцисс
- plt.ylabel("loss") # ось ординат
- plt.grid() # включение отображение сетки
- # print('h')
- plt.plot(np.arange(epochs * 6), loss)
- plt.show()
- if __name__ == "__main__":
- model = MnistConvModel()
- X = X.reshape((-1, 1, 28, 28))
- Xt = Xt.reshape((-1, 1, 28, 28))
- train(model, X, y, epochs=10, validation=(model, Xt, yt))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement