Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- import theano
- from theano import tensor as T
- from theano.tensor.nnet import conv2d
- import data_organizer, normalize
- # Default values
- BATCH_SIZE = 10
- HEIGHT = normalize.FINAL_HEIGHT
- WIDTH = normalize.FINAL_WIDTH
- RNG_SEED = 1337
- ETA = 0.1
- LMBDA = 0.005
- # Useful constants
- VAR_TANH_STD = 0.39429449
- VAR_HARD_TANH_STD = 0.51605855
- BACK_ADJUST_TANH = 0.46440290
- BACK_ADJUST_HARD_TANH = 0.68268949
- COMPROMISE_TANH = np.sqrt(VAR_TANH_STD * BACK_ADJUST_TANH)
- COMPROMISE_HARD_TANH = np.sqrt(VAR_HARD_TANH_STD * BACK_ADJUST_HARD_TANH)
- class _ConvLayer:
- def __init__(self, rng, input_img, single_input_img,
- filt_shp, input_shp, activation, var_adjust, has_mask=False):
- # input_shp: (mini-batch size, # input maps, height, width)
- # filt_shp: (# output maps, # input maps, kernel height, kernel width)
- self.input_img = input_img
- self.activation = activation
- fan_in = np.prod(filt_shp[1:])
- fan_out = filt_shp[0] * np.prod(filt_shp[2:])
- W_var = 2.0 / (var_adjust * (fan_in + fan_out))
- # var_adjust ~= var(A(prev_w_in)),
- # var_adjust ~= var(A'(prev_w_in)) + E(A'(prev_w_in))^2
- # W_var ~= 1.0 / (var_adjust * fan_in)
- # W_var ~= 1.0 / (var_adjust * fan_out)
- # Feedforward:
- # There are fan_in input activations of m: 0, v: var(A(prev_w_in))
- # Each is multiplied by an independent weight of
- # m: 0, v: 1.0 / (var(A(prev_w_in)) * fan_in)
- # The input activations are correlated, but multiplying independent
- # weights make the products independent
- # Therefore weighted input is of m: 0, v: 1.0
- # Backpropagation:
- # There are fan_out output weighted input gradients of
- # m: 0, v: out_var / plane_size^2
- # Each is multiplied by an weight of
- # m: 0, v: 1.0 / ((var(A'(prev_w_in) + E(A'(prev_w_in))^2) * fan_out)
- # Finally, the product is multiplied by the gradient term of
- # m: E(A'(prev_w_in)), v: var(A'(prev_w_in))
- # Let's suppose these values are independent
- # (last 2 are independent, any other pair has slight correlation)
- # The output weighted input gradients are correlated, but
- # multiplying independent weights make the products independent
- # Therefore input weighted input gradients are of
- # m: 0, v: out_var / plane_size^2
- # Bias gradients sum up the an entire plane of weighted input
- # gradients, which are correlated, making the bias gradients of
- # m: 0, v: out_var
- # Weight gradients multiply each weighted input gradient by
- # A(prev_w_in), which is also correlated, making the
- # weight gradients of m: 0, v: var(A(prev_w_in)) * out_var
- # W: weights ---
- # (# output maps, # input maps, kernel height, kernel width)
- self.W = theano.shared(np.asarray(rng.normal(
- loc=0.0, scale=np.sqrt(W_var), size=filt_shp),
- dtype = theano.config.floatX),
- borrow = True)
- # b: biases --- (# output maps)
- # Ones are added to allow broadcasting with input_shp
- self.b = theano.shared(np.zeros(filt_shp[0],
- dtype = theano.config.floatX),
- borrow = True)
- # For some reason I can't do bordermode='half'
- top = filt_shp[2] // 2
- bot = input_shp[2] + top
- left = filt_shp[3] // 2
- right = input_shp[3] + left
- conv_out = conv2d(input = input_img, filters = self.W,
- filter_shape = filt_shp, input_shape = input_shp,
- border_mode='full')
- single_conv_out = conv2d(input = single_input_img, filters = self.W,
- filter_shape = filt_shp, input_shape = (1,) + input_shp[1:],
- border_mode='full')
- # conv_out: convolution, or weighted input, of a mini-batch ---
- # (mini-batch size, # output maps, height, width)
- conv_out = conv_out[:, :, top:bot, left:right]
- # single_conv_out: convolution, or weighted input, of one image ---
- # (1, # output maps, height, width)
- single_conv_out = single_conv_out[:, :, top:bot, left:right]
- broadcast_b = self.b.dimshuffle('x', 0, 'x', 'x')
- # output_img: final output ---
- # (mini-batch size, # output maps, height, width)
- self.output_img = self.activation(conv_out + broadcast_b)
- self.single_output_img = \
- self.activation(single_conv_out + broadcast_b)
- # Useful for the backpropogation step
- self.params = [self.W, self.b]
- self.dlsscst_dt_contrib = var_adjust * np.prod(filt_shp) + filt_shp[0]
- if (not has_mask):
- self.dregcst_dt_contrib = W_var * np.prod(filt_shp)
- self.sum_square_weights_contrib = (self.W * self.W).sum()
- else:
- self.dregcst_dt_contrib = W_var * \
- filt_shp[0] * (filt_shp[1] - 1) * np.prod(filt_shp[2:])
- W_reshaped = self.W[:, :-1, :, :]
- self.sum_square_weights_contrib = (W_reshaped * W_reshaped).sum()
- class CNN:
- def __init__(self, num_planes=[3, 16, 3], kernel_size=(5, 5),
- img_shp=[BATCH_SIZE, HEIGHT, WIDTH], has_mask=False,
- rng_seed=RNG_SEED, eta=ETA, lmbda=LMBDA):
- assert(len(num_planes) >= 2 and
- kernel_size[0] % 2 == 1 and
- kernel_size[1] % 2 == 1)
- rng = np.random.RandomState(rng_seed)
- eta_shared = theano.shared(np.asarray(eta,
- dtype=theano.config.floatX))
- lmbda_shared = theano.shared(np.asarray(lmbda,
- dtype=theano.config.floatX))
- # input_img: (mini-batch size, # input maps, height, width)
- input_img = T.tensor4('input_img', dtype=theano.config.floatX)
- s_input_img = T.tensor3('s_input_img', dtype=theano.config.floatX)
- s_truth_img = T.tensor3('s_truth_img', dtype=theano.config.floatX)
- s_input_img_reshaped = s_input_img.dimshuffle('x', 0, 1, 2)
- s_truth_img_reshaped = s_truth_img.dimshuffle('x', 0, 1, 2)
- # groundtruth_img: (mini-batch size, # output maps, height, width)
- groundtruth_img = T.tensor4('output_img', dtype=theano.config.floatX)
- # Fast, nonlinear and symmetric
- hard_tanh = lambda x: T.clip(x, -1, 1)
- layers = []
- params = []
- sum_square_weights = 0
- prev_output = input_img
- prev_single_output = s_input_img_reshaped
- dlsscst_dt_contrib_sum = 0
- dregcst_dt_contrib_sum = 0
- for i in range(len(num_planes) - 2):
- if (i == 0):
- var_adjust = COMPROMISE_TANH
- else:
- var_adjust = COMPROMISE_HARD_TANH
- layer = _ConvLayer(rng, input_img=prev_output,
- single_input_img=prev_single_output,
- input_shp=(img_shp[0], num_planes[i],
- img_shp[1], img_shp[2]),
- filt_shp=(num_planes[i + 1], num_planes[i],
- kernel_size[0], kernel_size[1]),
- activation = hard_tanh,
- var_adjust = var_adjust,
- has_mask = has_mask and (i == 0))
- prev_output = layer.output_img
- prev_single_output = layer.single_output_img
- params += layer.params
- dlsscst_dt_contrib_sum += layer.dlsscst_dt_contrib
- dregcst_dt_contrib_sum += layer.dregcst_dt_contrib
- sum_square_weights += layer.sum_square_weights_contrib
- layers.append(layer)
- layer = _ConvLayer(rng, input_img=prev_output,
- single_input_img=prev_single_output,
- input_shp=(img_shp[0], num_planes[-2],
- img_shp[1], img_shp[2]),
- filt_shp=(num_planes[-1], num_planes[-2],
- kernel_size[0], kernel_size[1]),
- activation = T.tanh,
- var_adjust = VAR_HARD_TANH_STD)
- output_img = layer.output_img
- s_output_img = layer.single_output_img
- params += layer.params
- dlsscst_dt_contrib_sum += layer.dlsscst_dt_contrib
- dregcst_dt_contrib_sum += layer.dregcst_dt_contrib
- sum_square_weights += layer.sum_square_weights_contrib
- layers.append(layer)
- # To make things floatX instead of the default float64
- dlsscst_dt_contrib_sum = \
- np.asarray(dlsscst_dt_contrib_sum, dtype=theano.config.floatX)
- dregcst_dt_contrib_sum = \
- np.asarray(dregcst_dt_contrib_sum, dtype=theano.config.floatX)
- # To make the user entered eta and lmbda values more meaningful
- # Call the time constant 'T' = 1 / user_eta
- # Call the average abs difference across batch and plane 'diff'
- # Change in loss cost in T iterations ~= out_var
- # For practical purposes, out_var ~= diff^2
- # Change in regularization cost in T iterations ~= (usr_lmbda)^2
- # For practical purposes, regularization starts to overpower
- # loss only when diff <= usr_lmbda
- eta_convert = eta_shared / dlsscst_dt_contrib_sum
- lmbda_convert = (lmbda_shared ** 2) * \
- dlsscst_dt_contrib_sum / dregcst_dt_contrib_sum
- # Cost function for tanh last layer
- loss_f = lambda x, y: ((1.0 - y) * np.log(1.0 - x) +
- (1.0 + y) * np.log(1.0 + x)) / (-2.0)
- loss = loss_f(output_img, groundtruth_img)
- s_loss = T.mean(loss_f(s_output_img, s_truth_img_reshaped))
- # Combining data from batches in a way that assumes complete
- # dependence between losses and weighted input gradients
- # In reality, the gradients scale down with the complexity
- # of the inputs and with the additional complexity of the
- # difference between the ground truth and the output,
- # where roughly speaking complexity is the number of
- # uncorrelated regions in a given plane
- loss = T.sum(loss, axis=1)
- loss = T.mean(loss)
- cost = loss + lmbda_convert * sum_square_weights / 2.0
- grads = T.grad(cost, params)
- updates = [(param, param - eta_convert * grad)
- for param, grad in zip(params, grads)]
- self._arch = {'num_planes': num_planes, 'kernel_size': kernel_size,
- 'img_shp': img_shp, 'has_mask': False}
- self._layers = layers
- self._rng_seed = rng_seed
- self.input_shp = (img_shp[0], num_planes[0], img_shp[1], img_shp[2])
- self.output_shp = (img_shp[0], num_planes[-1], img_shp[1], img_shp[2])
- self.eta = eta_shared
- self.lmbda = lmbda_shared
- self.train_model = theano.function(
- inputs=[input_img, groundtruth_img],
- outputs=loss,
- updates=updates)
- self.get_grads = theano.function(
- inputs=[input_img, groundtruth_img],
- outputs=grads)
- self.get_loss = theano.function(
- inputs=[s_input_img, s_truth_img],
- outputs=s_loss)
- self.feed_forward = theano.function(
- inputs=[s_input_img],
- outputs=s_output_img[0])
- def export_info(self):
- result = []
- for layer in self._layers:
- result.append((layer.W.get_value(), layer.b.get_value()))
- return {'arch': self._arch, 'params': result,
- 'rng_seed': self._rng_seed}
- def load_info(info, eta=ETA, lmbda=LMBDA):
- neural_net = CNN(
- num_planes=info['arch']['num_planes'],
- kernel_size=info['arch']['kernel_size'],
- img_shp=info['arch']['img_shp'],
- has_mask=info['arch']['has_mask'],
- eta=eta,
- lmbda=lmbda)
- for (layer, (W, b)) in zip(neural_net._layers, info['params']):
- layer.W.set_value(W)
- layer.b.set_value(b)
- return neural_net
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement