Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cPickle
- import theano
- from theano import tensor as T
- from theano.tensor.nnet import conv
- from theano.tensor.signal import downsample
- import numpy as np
- import gzip
- class LeNetConvPoolLayer(object):
- def __init__(self, rng, filter_shape, image_shape, pool_size=(2,2)):
- assert image_shape[1] == filter_shape[1] # Checking depth of image vs. depth of filter
- self.filter_shape = filter_shape
- self.image_shape = image_shape
- self.pool_size = pool_size
- fan_in = np.prod(filter_shape[1:]) # Counting total input neurons (number of pixels in image, depth included)
- fan_out = (filter_shape[0] * np.prod(filter_shape[2:])) / np.prod(pool_size) # Counting total output neurons (number of kernels * image dimensions) / pooling reduction
- W_bound = np.sqrt(6. / (fan_in + fan_out)) # An arbitrary boundary on weights
- self.W = theano.shared(value=np.asarray(rng.uniform(low = -W_bound, high = W_bound, size = filter_shape),
- dtype=theano.config.floatX), name='convW',borrow=True)
- self.b = theano.shared(value=np.zeros((filter_shape[0],), dtype=theano.config.floatX),
- name='convB', borrow=True)
- self.params = [self.W, self.b]
- def output(self, input):
- conv_out = conv.conv2d(input=input,
- filters=self.W,
- filter_shape=self.filter_shape,
- image_shape=self.image_shape)
- pooled_out = downsample.max_pool_2d(
- input=conv_out,
- ds=self.pool_size,
- ignore_border=True
- )
- # Aligns the bias vector in such a way that each feature map is augmented individually, but also such that each
- # feature map receives the same bias regardless of what image it happens to be on. All pixels within the feature
- # map are also augmented in the same way.
- return T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
- class HiddenLayer(object):
- def __init__(self, rng, n_in, n_out,activation=T.nnet.sigmoid):
- self.activation = activation
- W = theano.shared(value=np.asarray(rng.uniform(
- low = -np.sqrt(6. / (n_in + n_out)),
- high = np.sqrt(6. / (n_in + n_out)),
- size = (n_in, n_out)
- ),dtype=theano.config.floatX), name='fcW', borrow=True)
- b = theano.shared(value=np.zeros((n_out,), dtype=theano.config.floatX), name='fcB', borrow=True)
- self.W = W
- self.b = b
- self.activation = activation
- self.params = [self.W, self.b]
- def output(self, input):
- input = input.flatten(2)
- lin_output = T.dot(input, self.W) + self.b
- return lin_output if self.activation is None else self.activation(lin_output)
- class LogisticRegression(object):
- def __init__(self, n_in, n_out):
- self.W = theano.shared(
- value=np.zeros(
- (n_in, n_out),
- dtype=theano.config.floatX
- ),
- name='softmaxW',
- borrow=True)
- self.b = theano.shared(
- value=np.zeros(
- (n_out,),
- dtype=theano.config.floatX
- ),
- name='softmaxB',
- borrow=True)
- self.params = [self.W, self.b]
- def output(self, input):
- input = input.flatten(2)
- self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
- print(self.p_y_given_x)
- return T.argmax(self.p_y_given_x, axis=1)
- class MLP(object):
- def __init__(self, topology):
- self.layers = []
- rng = np.random.RandomState(6546)
- for layer in topology:
- type = layer[0]
- if type == "conv":
- filter_shape = layer[1]
- image_shape = layer[2]
- self.layers.append(LeNetConvPoolLayer(rng, filter_shape, image_shape))
- if type == "fc":
- input_count = layer[1]
- output_count = layer[2]
- self.layers.append(HiddenLayer(rng, input_count, output_count))
- if type == "softmax":
- input_count = layer[1]
- output_count = layer[2]
- self.layers.append(LogisticRegression(input_count, output_count))
- # Combine parameters from all layers
- self.params = []
- for layer in self.layers:
- self.params += layer.params
- def output(self, x):
- # Recursively compute output
- new_x = x
- for layer in self.layers:
- new_x = layer.output(new_x)
- return new_x
- def squared_error(self, x, y):
- return T.sum((self.output(x) - y)**2)
- def get_updates(cost, params, learning_rate):
- updates = []
- for param in params:
- updates.append((param, param - learning_rate * T.grad(cost, param)))
- return updates
- def load_data(dataset):
- f = gzip.open(dataset, 'rb')
- train_set, valid_set, test_set = cPickle.load(f)
- f.close()
- def shared_dataset(data_xy, borrow=True):
- data_x, data_y = data_xy
- shared_x = theano.shared(np.asarray(data_x, dtype=theano.config.floatX),
- borrow=borrow)
- shared_y = theano.shared(np.asarray(data_y, dtype=theano.config.floatX),
- borrow=borrow)
- return shared_x, T.cast(shared_y, 'int32')
- test_set_x, test_set_y = shared_dataset(test_set)
- valid_set_x, valid_set_y = shared_dataset(valid_set)
- train_set_x, train_set_y = shared_dataset(train_set)
- return [(train_set_x, train_set_y), (valid_set_x, valid_set_y), (test_set_x, test_set_y)]
- batch_size = 500
- datasets = load_data('mnist.pkl.gz')
- train_set_x, train_set_y = datasets[0]
- valid_set_x, valid_set_y = datasets[1]
- test_set_x, test_set_y = datasets[2]
- n_train_batches = int(train_set_x.get_value(borrow=True).shape[0] / batch_size)
- n_valid_batches = int(valid_set_x.get_value(borrow=True).shape[0] / batch_size)
- n_test_batches = int(test_set_x.get_value(borrow=True).shape[0] / batch_size)
- topology = [
- #("conv", (32, 1, 5, 5), (batch_size, 1, 28, 28)),
- #("conv", (64, 32, 5, 5), (batch_size, 32, 12, 12)),
- #("fc", 64 * 4 * 4, 100),
- ("softmax", 784, 10)
- ]
- mlp = MLP(topology)
- # Create Theano variables for the MLP input
- # ... and the desired output
- # Learning rate and momentum hyperparameter values
- # Again, for non-toy problems these values can make a big difference
- # as to whether the network (quickly) converges on a good local minimum.
- learning_rate = 0.5
- # Create a function for computing the cost of the network given an input
- x = T.matrix('x')
- x_ = x.reshape((batch_size, 1, 28, 28))
- y = T.ivector('y')
- cost = mlp.squared_error(x_, y)
- minibatch_index = T.lscalar('minibatch_index')
- # Create a theano function for training the network
- train = theano.function([minibatch_index], cost,
- updates=get_updates(cost, mlp.params, learning_rate),
- givens= {
- x: train_set_x[minibatch_index * batch_size : (minibatch_index + 1) * batch_size],
- y: train_set_y[minibatch_index * batch_size : (minibatch_index + 1) * batch_size]
- })
- # Create a theano function for computing the MLP's output given some input
- mlp_output = theano.function([minibatch_index], mlp.output(x_),
- givens = {
- x: train_set_x[minibatch_index * batch_size : (minibatch_index + 1) * batch_size]
- },
- allow_input_downcast=True)
- expected_output = theano.function([minibatch_index], y,
- givens = {
- y: train_set_y[minibatch_index * batch_size : (minibatch_index + 1) * batch_size]
- })
- # Keep track of the number of training iterations performed
- iteration = 0
- max_iteration = 20
- while iteration < max_iteration:
- print("Iteration %i" % iteration)
- for minibatchIndex in xrange(n_train_batches):
- current_cost = train(minibatchIndex)
- current_output = mlp_output(minibatchIndex)
- this_y = expected_output(minibatchIndex)
- print("Error: %i" % current_cost)
- #for a, b in zip(current_output, this_y):
- # print(("Prediction: %i, Actual : %i") % (a, b))
- accuracy = np.mean(np.not_equal(current_output,this_y))
- #print(("Iteration %i: Minibatch %i/%i: Accuracy %f%%") % (iteration, minibatchIndex,
- # n_train_batches, accuracy))
- iteration += 1
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement