Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import tensorflow as tf
- import tensorflow.contrib.slim as slim
- import numpy as np
- import cv2
- import random
- import math
- from utils import *
- import os
- from sklearn.utils import shuffle
- from roi_pooling import ROIPoolingLayer
- os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
- def _base_inference(inputs):
- with slim.arg_scope([slim.conv2d], activation_fn = tf.nn.relu , trainable = False, \
- weights_initializer = tf.truncated_normal_initializer(mean = 0.0, stddev = 0.01), \
- weights_regularizer = slim.l2_regularizer(0.0005)):
- net = slim.repeat(inputs, 2, slim.conv2d, 64, [3, 3], padding = 'SAME', scope='conv1', trainable = False)
- net = slim.max_pool2d(net, [2, 2], padding='VALID', scope = 'pool1')
- net = slim.repeat(net, 2, slim.conv2d, 128, [3, 3], padding = 'SAME', scope='conv2', trainable = False)
- net = slim.max_pool2d(net, [2, 2], padding='VALID', scope = 'pool2')
- net = slim.repeat(net, 3, slim.conv2d, 256, [3, 3], padding = 'SAME', scope='conv3')
- net = slim.max_pool2d(net, [2, 2], padding='VALID', scope = 'pool3')
- net = slim.repeat(net, 3, slim.conv2d, 512, [3, 3], padding = 'SAME', scope='conv4')
- net = slim.max_pool2d(net, [2, 2], padding='VALID', scope = 'pool4')
- net = slim.repeat(net, 3, slim.conv2d, 512, [3, 3], padding = 'SAME', scope='conv5')
- #net = slim.max_pool2d(net, [2, 2], padding='VALID', scope = 'pool5')
- return net
- def _inference(x, k_anchors, name=None, dim_reduce = 16, aspect_ratios = [1, 2, 3], scales = [32, 64, 128], cls_thresh = 0.7):
- with slim.arg_scope([slim.conv2d], trainable = False, activation_fn = tf.nn.relu, \
- weights_initializer = tf.truncated_normal_initializer(mean = 0.0, stddev = 0.01), \
- weights_regularizer = slim.l2_regularizer(0.0005)):
- # Itermediate layer
- itermediate_layer = slim.conv2d(x, 512, [3, 3], padding = 'SAME', scope = 'itermediate_layer')
- # Class regression
- RPN_cls = slim.conv2d(itermediate_layer, 2*k_anchors, [1, 1], padding = 'VALID', scope = 'classification', activation_fn = None)
- # Bounding box regression
- RPN_reg = slim.conv2d(itermediate_layer, 4*k_anchors, [1, 1], padding = 'VALID', scope = 'regression', activation_fn = None)
- # Get feature map shape
- ftmap_shape = tf.shape(itermediate_layer)
- confidence_list = tf.reshape(RPN_cls, [-1, 2])
- regression_list = tf.reshape(RPN_reg, [-1, 4])
- print('Itermediate Layer shape: ')
- print(itermediate_layer.shape)
- print('Confidence Layer shape: ')
- print(RPN_cls.shape)
- print('Bounding boxes Regression Layer shape: ')
- print(RPN_reg.shape)
- print('Confidence list shape: ')
- print(confidence_list.shape)
- print('Bounding boxes Regression list shape: ')
- print(regression_list.shape)
- probabilities = tf.nn.softmax(confidence_list, axis = 1)
- probabilities = probabilities[:, 1]
- return confidence_list, regression_list, probabilities
- def _predict_boxes(probabilities, regression_list, placeholder_anchors, placeholder_anchors_cleaned, cls_thresh = 0.7, top_k = 128, nms_threshold = 0.7):
- # Remove unclean boxes
- cleaned_probabilites = tf.gather(probabilities, placeholder_anchors_cleaned)
- cleaned_regression = tf.gather(regression_list, placeholder_anchors_cleaned)
- cleaned_anchors = tf.gather(placeholder_anchors, placeholder_anchors_cleaned)
- # Remove boxes by threshold
- indices = tf.where(cleaned_probabilites > cls_thresh)
- cleaned_probabilites = tf.gather(cleaned_probabilites, indices)
- cleaned_probabilites = tf.reshape(cleaned_probabilites, [-1])
- cleaned_regression = tf.gather(cleaned_regression, indices)
- cleaned_regression = tf.reshape(cleaned_regression, [-1, 4])
- cleaned_anchors = tf.gather(cleaned_anchors, indices)
- cleaned_anchors = tf.reshape(cleaned_anchors, [-1, 4])
- # Get predict box
- predict_boxes_x = tf.add(tf.multiply(cleaned_anchors[:, 2], cleaned_regression[:, 0]), cleaned_anchors[:, 0])
- predict_boxes_x = tf.reshape(predict_boxes_x, (-1, 1))
- predict_boxes_y = tf.add(tf.multiply(cleaned_anchors[:, 3], cleaned_regression[:, 1]), cleaned_anchors[:, 1])
- predict_boxes_y = tf.reshape(predict_boxes_y, (-1, 1))
- predict_boxes_w = tf.multiply(cleaned_anchors[:, 2], tf.exp(cleaned_regression[:, 2]))
- predict_boxes_w = tf.reshape(predict_boxes_w, (-1, 1))
- predict_boxes_h = tf.multiply(cleaned_anchors[:, 3], tf.exp(cleaned_regression[:, 3]))
- predict_boxes_h = tf.reshape(predict_boxes_h, (-1, 1))
- predict_boxes = tf.concat([predict_boxes_x - predict_boxes_w/2, predict_boxes_y - predict_boxes_h/2, predict_boxes_x + predict_boxes_w/2, predict_boxes_y + predict_boxes_h/2], axis = -1)
- # Reduce boxes by NMS
- # Using tf.image.non_max_suppression
- cleaned_indices = tf.image.non_max_suppression(predict_boxes, cleaned_probabilites, max_output_size = top_k, iou_threshold = nms_threshold)
- nms_cleaned_probabilites = tf.gather(cleaned_probabilites, cleaned_indices)
- nms_cleaned_probabilites = tf.reshape(nms_cleaned_probabilites, [-1])
- nms_predict_boxes = tf.gather(predict_boxes, cleaned_indices)
- nms_predict_boxes = tf.reshape(nms_predict_boxes, [-1, 4])
- #return predict_boxes, cleaned_probabilites
- return nms_predict_boxes, nms_cleaned_probabilites
- def _fastRCNN(feature_map, boxes_proposal, pooled_width = 7, pooled_height = 7, total_class = 1):
- roi_layer = ROIPoolingLayer(pooled_height, pooled_width)
- pooled_features = roi_layer([feature_map, boxes_proposal])[0]
- pooled_features = tf.layers.flatten(pooled_features)
- with slim.arg_scope([slim.fully_connected], activation_fn = tf.nn.relu, \
- weights_initializer = tf.truncated_normal_initializer(mean = 0.0, stddev = 0.01), \
- weights_regularizer = slim.l2_regularizer(0.0005)):
- fc1 = slim.fully_connected(pooled_features, 2048, scope='rcnn/fc_1')
- fc2 = slim.fully_connected(fc1, 2048, scope='rcnn/fc_2')
- cls = slim.fully_connected(fc2, total_class+1, scope='rcnn/classification')
- reg = slim.fully_connected(fc2, 4, scope='rcnn/regression')
- return cls, reg
- def name_in_checkpoint(v):
- return 'vgg_16/' + v.op.name
- def fine_turning_vgg16():
- variables_to_restore = slim.get_model_variables()
- variables_to_restore = {name_in_checkpoint(v):v for v in variables_to_restore if v.name.startswith('conv')}
- restorer = tf.train.Saver(variables_to_restore)
- return restorer
- def rpn_generate_batch_label(gt_boxes, dim_reduce, imageSize, aspect_ratios, scales, batch_size, visualize = False, minibatch_size = 256, repeat_positives = True, \
- positives_threshold = 0.7, negatives_threshold = 0.3):
- # Prepare
- feature_map_width = int(imageSize[0]/dim_reduce)
- feature_map_height = int(imageSize[1]/dim_reduce)
- anchors = rpn_generate_anchor_boxes((feature_map_width, feature_map_height), imageSize, aspect_ratios, scales)
- #print('Anchors shape {}'.format(anchors.shape))
- anchors_cleaned = remove_cross_boundaries_anchors(anchors, imageSize[0], imageSize[1])
- # Anchors're in center, width, height format
- # Convert anchors from center width, height -> topleft bottom right
- A = format_anchors(anchors)
- n_anchors = len(A)
- batch_anchors = np.reshape(anchors, (-1, 4))
- batch_anchors = np.tile(batch_anchors, (batch_size, 1))
- #print('Anchor shape: ' + str(batch_anchors.shape))
- #print('GT: ' + str(gt_boxes))
- #print('Batch anchors: {} {}'.format(batch_anchors[6163], batch_anchors[6463]) )
- # Calculate iou between each groundtruth box & anchor box
- gt_each_img = np.zeros((batch_size), dtype = np.int32)
- for i in range(batch_size):
- gt_each_img[i] = len(gt_boxes[i])
- #print('Number of groundtruth: ')
- #print(gt_each_img)
- gt_boxes_flatten = [item for sublist in gt_boxes for item in sublist]
- gt_boxes_flatten = np.reshape(gt_boxes_flatten, (-1, 4))
- # Convert Groundtruth box from c, w, h -> tl br
- gt_boxes_flatten_formated = format_anchors(gt_boxes_flatten)
- #print('Groundtruth box: ')
- #print(gt_boxes_flatten)
- total_gt = len(gt_boxes_flatten_formated)
- IoU = iou_tensorial(gt_boxes_flatten_formated, A)
- idx_of_GT = np.argmax(IoU, axis = 0)
- A_scores_list = np.max(IoU, axis = 0)
- # Get safety object
- safety_object = np.argmax(IoU, axis = 1)
- A_scores_list[A_scores_list>positives_threshold] = 1
- A_scores_list[A_scores_list<negatives_threshold] = 0
- A_scores_list[np.logical_and(A_scores_list>0, A_scores_list<1)] = 0.5
- #print(A_scores_list)
- A_scores_list_cleaned = A_scores_list[anchors_cleaned]
- object_indices = anchors_cleaned[A_scores_list_cleaned > 0.9]
- # print('Object indices: {}'.format(object_indices))
- non_object_indices = anchors_cleaned[A_scores_list_cleaned < 0.1]
- # print('non_object indices: {}'.format(non_object_indices))
- # include at least one object
- if (len(object_indices) == 0):
- object_indices = np.array(safety_object)
- #print('Safety obj: {}'.format(safety_object))
- A_scores_list[safety_object] = 1
- idx = np.argwhere(non_object_indices == safety_object)
- non_object_indices = np.delete(non_object_indices, idx)
- assert(len(object_indices) > 0)
- if repeat_positives:
- chosen_idx_obj = np.random.choice(object_indices, int(minibatch_size/2),replace = True)
- assert(len(chosen_idx_obj) + len(non_object_indices) >= minibatch_size)
- else:
- assert(len(object_indices) + len(non_object_indices) >= minibatch_size)
- chosen_idx_obj = np.random.choice(object_indices, min(len(object_indices), int(minibatch_size/2)),replace = False)
- chosen_idx_non_obj = np.random.choice(non_object_indices, minibatch_size - len(chosen_idx_obj), replace = False)
- #print('Chosen Object indices: {}'.format(chosen_idx_obj))
- #print('Chosen NonObject indices: {}'.format(chosen_idx_non_obj))
- minibatch_indices = np.concatenate((chosen_idx_obj, chosen_idx_non_obj))
- # minibatch only
- ground_truth_anchors_list = np.zeros((len(minibatch_indices), 4))
- ground_truth_anchors_list[:,0] = (gt_boxes_flatten[idx_of_GT[minibatch_indices],0] - batch_anchors[minibatch_indices,0]) / batch_anchors[minibatch_indices,2]
- ground_truth_anchors_list[:,1] = (gt_boxes_flatten[idx_of_GT[minibatch_indices],1] - batch_anchors[minibatch_indices,1]) / batch_anchors[minibatch_indices,3]
- ground_truth_anchors_list[:,2] = np.log(gt_boxes_flatten[idx_of_GT[minibatch_indices],2]/batch_anchors[minibatch_indices,2])
- ground_truth_anchors_list[:,3] = np.log(gt_boxes_flatten[idx_of_GT[minibatch_indices],3]/batch_anchors[minibatch_indices,3])
- #print(ground_truth_anchors_list)
- a = A_scores_list[minibatch_indices]
- A_scores_one_hot = np.zeros((a.shape[0], 2))
- A_scores_one_hot[np.arange(a.shape[0]), a.astype(int)] = 1
- #print(A_scores_one_hot)
- return minibatch_indices, A_scores_one_hot, ground_truth_anchors_list
- def fastRCNN_generate_batch_labels(boxes_proposal, groundtruth, labels, cls_thresh = 0.5, number_samples = 128, positive_percent = 0.25, n_classes = 1):
- # boxes_proposal - array [?, 4] in topleft bottomright format
- # groundtruth - array [?, 4] in center, width, height format
- # labels - array [?, n] - one hot embedding
- # n = number of classes
- # Compute IOU
- fgroundtruth = format_anchors(groundtruth)
- iou = iou_tensorial(fgroundtruth, boxes_proposal)
- idx_of_GT = np.argmax(iou, axis = 0)
- bp_labels = labels[idx_of_GT]
- probabilities = np.max(iou, axis = 0)
- # Generate positive & negative
- positive = np.arange(len(idx_of_GT))[probabilities > cls_thresh]
- negative = np.arange(len(idx_of_GT))[probabilities <= cls_thresh]
- # Get idx
- choosen_positive = np.random.choice(positive, int(number_samples*positive_percent), replace = True)
- choosen_negative = np.random.choice(negative, number_samples - int(number_samples*positive_percent), replace = True)
- choosen_idx = np.concatenate((choosen_positive, choosen_negative))
- # Get classes label
- positive_labels = list(bp_labels[choosen_positive])
- sample_labels = positive_labels + list((number_samples - int(number_samples*positive_percent))*[0])
- sample_labels_onehot = np.zeros((len(sample_labels), n_classes+1), dtype = np.uint8)
- sample_labels_onehot[np.arange(len(sample_labels)), sample_labels] = 1
- sample_labels_onehot = np.array(sample_labels_onehot, dtype = np.float32)
- # Get regression labels - called 'offsets'
- fboxes_proposal = format_anchors_inv(boxes_proposal)
- offsets = np.zeros((len(choosen_positive), 4))
- offsets[:, 0] = (fboxes_proposal[choosen_positive, 0] - groundtruth[idx_of_GT[choosen_positive], 0]) / fboxes_proposal[choosen_positive, 2]
- offsets[:, 1] = (fboxes_proposal[choosen_positive, 1] - groundtruth[idx_of_GT[choosen_positive], 1]) / fboxes_proposal[choosen_positive, 3]
- offsets[:, 2] = np.log(fboxes_proposal[choosen_positive, 2] / groundtruth[idx_of_GT[choosen_positive], 2])
- offsets[:, 3] = np.log(fboxes_proposal[choosen_positive, 3] / groundtruth[idx_of_GT[choosen_positive], 3])
- return choosen_positive, choosen_idx, sample_labels_onehot, offsets
- def rpn_calculate_loss(confidence_list, regression_list, minibatchIndices, confidence_gt, regression_gt, name=None):
- with tf.name_scope(name, 'RPN_Loss', [confidence_list, regression_list, minibatchIndices, confidence_gt, regression_gt]):
- minibatchIndices = tf.convert_to_tensor(minibatchIndices)
- confidence_gt = tf.convert_to_tensor(confidence_gt)
- regression_gt = tf.convert_to_tensor(regression_gt)
- # Confidence loss: Cross Entropy
- cls_minibatch = tf.gather(confidence_list, minibatchIndices)
- confidence_loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels = confidence_gt, logits = cls_minibatch), name = 'confidence_loss')
- #confidence_loss = tf.divide(confidence_loss, n_classes)
- # Regression loss: Smooth L1
- reg_minibatch = tf.gather(regression_list, minibatchIndices)
- abs_deltaT = tf.abs(reg_minibatch - regression_gt)
- L1smooth = tf.where(tf.less(abs_deltaT, 1.0), 0.5 * abs_deltaT * abs_deltaT, abs_deltaT - 0.5)
- reg_loss = tf.reduce_mean(confidence_gt[:,1] * tf.reduce_sum(L1smooth, axis=-1))
- reg_loss = 10*reg_loss
- # Total Loss
- total_loss = tf.add(confidence_loss, reg_loss)
- return total_loss, confidence_loss, reg_loss
- def fastRCNN_calculate_loss(cls, reg, positive_idx, choosen_idx, gt_cls_onehot, gt_reg, name = None):
- with tf.name_scope(name, 'FastRCNN_Loss', [cls, reg, positive_idx, choosen_idx, gt_cls_onehot, gt_reg]):
- # Convert
- positive_idx = tf.convert_to_tensor(positive_idx)
- choosen_idx = tf.convert_to_tensor(choosen_idx)
- gt_cls_onehot = tf.convert_to_tensor(gt_cls_onehot)
- gt_reg = tf.convert_to_tensor(gt_reg)
- # Calculate confidence loss
- cls_minibatch = tf.gather(cls, choosen_idx)
- cls_loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels = gt_cls_onehot, logits = cls_minibatch), name = 'RCNN_loss')
- # Calculate regression loss
- reg_minibatch = tf.gather(reg, positive_idx)
- abs_deltaT = tf.abs(reg_minibatch - gt_reg)
- L1smooth = tf.where(tf.less(abs_deltaT, 1.0), 0.5 * abs_deltaT * abs_deltaT, abs_deltaT - 0.5)
- reg_loss = tf.reduce_mean(tf.reduce_sum(L1smooth, axis=-1))
- total_loss = tf.add(cls_loss, reg_loss)
- return total_loss, cls_loss, reg_loss
- def get_region_of_interest(ft_shape, boxes_proposal, dim_reduce = 16):
- # Normalize boxes_proposal
- boxes_proposal = boxes_proposal/dim_reduce
- # Convert from center, width, height to topleft bottom right
- boxes_proposal_x1 = boxes_proposal[:, 0]
- boxes_proposal_x1 = tf.reshape(boxes_proposal_x1, (-1, 1))
- boxes_proposal_x1 = tf.clip_by_value(boxes_proposal_x1, 0.0, boxes_proposal_x1)
- boxes_proposal_y1 = boxes_proposal[:, 1]
- boxes_proposal_y1 = tf.reshape(boxes_proposal_y1, (-1, 1))
- boxes_proposal_y1 = tf.clip_by_value(boxes_proposal_y1, 0.0, boxes_proposal_y1)
- boxes_proposal_x2 = boxes_proposal[:, 2]
- boxes_proposal_x2 = tf.reshape(boxes_proposal_x2, (-1, 1))
- boxes_proposal_x2 = tf.clip_by_value(boxes_proposal_x2, boxes_proposal_x1, ft_shape[2])
- boxes_proposal_y2 = boxes_proposal[:, 3]
- boxes_proposal_y2 = tf.reshape(boxes_proposal_y2, (-1, 1))
- boxes_proposal_y2 = tf.clip_by_value(boxes_proposal_y2, boxes_proposal_y1, ft_shape[1])
- # Format to int
- boxes_proposal_cell = tf.cast(tf.concat([boxes_proposal_x1, boxes_proposal_y1, boxes_proposal_x2, boxes_proposal_y2], axis = -1), tf.int32)
- return boxes_proposal_cell
- def calculate_eval(predict_cls, predict_reg, gt_boxes, cls_thresh, reg_iou, imWidth, imHeight):
- total_gt_boxes = len(gt_boxes)
- total_reg_boxes = len(predict_reg)
- if total_reg_boxes == 0:
- return 0, 0
- reg_respone = format(predict_reg) # Convert to tl-br
- gt_boxes = np.array(gt_boxes)
- gt_boxes_formated = format_anchors(gt_boxes)
- iou_matrix = iou_tensorial(gt_boxes_formated, reg_respone)
- precision_respone = np.max(iou_matrix, axis = 0)
- #print('Precision_respone')
- #print(precision_respone)
- recall_respone = np.max(iou_matrix, axis = 1)
- precision_wrong = len(np.argwhere(precision_respone < reg_iou).flatten())
- recall_miss = len(np.argwhere(recall_respone < reg_iou).flatten())
- precision = (total_reg_boxes - precision_wrong)/total_reg_boxes
- recall = (total_gt_boxes - recall_miss)/total_gt_boxes
- return precision, recall
- class RPN:
- def __init__(self):
- self.BATCH_SIZE = 1
- self.lr = 0.0001
- self.max_size = 1024
- self.min_size = 608
- self.aspect_ratios = [1, 2, 3]
- self.scales = [32, 64, 128] # In pixel
- self.k_anchors = len(self.aspect_ratios)*len(self.scales)
- self.n_classes = 1
- self.max_epoches = 20
- self.epoches_to_save = 5
- self.global_step = 0
- self.valid = 0.1
- self.check_point = 'vgg_16.ckpt'
- self.eval_iou = [0.5, 0.75]
- self.eval_cls = [0.5, 0.75]
- self.epoch_to_validate = 5
- self.rpn_checkpoint = 'graphs/model.ckpt'
- self.rpn_trained = 'rpn_trained.ckpt'
- def summary(self):
- with tf.name_scope('summaries'):
- tf.summary.scalar('confidence_loss', self.confidence_loss)
- tf.summary.scalar('regression_loss', self.regression_loss)
- tf.summary.scalar('total_loss', self.total_loss)
- self.summary_op = tf.summary.merge_all()
- def build_net(self):
- # Forward
- self.image_batch_placeholder = tf.placeholder(shape = [None, None, None, 3],dtype='float32')
- self.placeholder_anchors = tf.placeholder(shape = [None, 4], dtype='float32', name = 'PL_anchors')
- self.placeholder_anchors_cleaned = tf.placeholder(shape = [None],dtype='int32', name = 'PL_anchors_cleaned')
- # Inference
- self.feature_map = _base_inference(self.image_batch_placeholder)
- self.ft_shape = tf.cast(tf.shape(self.feature_map), tf.float32)
- self.confidence_list, self.regression_list, self.pp_probabilities = _inference(self.feature_map, self.k_anchors)
- self.prediction, self.probabilities = _predict_boxes(self.pp_probabilities, self.regression_list, self.placeholder_anchors, self.placeholder_anchors_cleaned)
- self.boxes_proposal_cell = get_region_of_interest(self.ft_shape, self.prediction)
- self.boxes_proposal_cell = tf.convert_to_tensor([self.boxes_proposal_cell])
- self.cls, self.reg = _fastRCNN(self.feature_map, self.boxes_proposal_cell)
- # Calculate loss
- self.minibatch_indices = tf.placeholder(dtype='int32', name = 'PL_RPN_indices')
- self.A_scores_one_hot = tf.placeholder(dtype='float32', name = 'PL_RPN_onehot_label')
- self.gt_anchors_list = tf.placeholder(dtype='float32', name = 'PL_RPN_gt')
- self.total_loss, self.confidence_loss, self.regression_loss = rpn_calculate_loss(self.confidence_list, self.regression_list, \
- self.minibatch_indices, \
- self.A_scores_one_hot, self.gt_anchors_list)
- self.saver = tf.train.Saver()
- # Calculate fastRCNN loss
- self.placeholder_choosen_idx = tf.placeholder(shape = [None], dtype='int32', name = 'PL_indices')
- self.placeholder_positive_idx = tf.placeholder(shape = [None], dtype='int32', name = 'PL_positive_indices')
- self.placeholder_gt_reg = tf.placeholder(shape = [None, 4], dtype='float32', name = 'PL_gt_reg')
- self.placeholder_gt_cls = tf.placeholder(shape = [None, self.n_classes+1], dtype='float32', name = 'PL_gt_cls')
- self.rcnn_total_loss, self.rcnn_cls_loss, self.rcnn_reg_loss = fastRCNN_calculate_loss(self.cls, self.reg, self.placeholder_positive_idx, \
- self.placeholder_choosen_idx, self.placeholder_gt_cls, self.placeholder_gt_reg)
- # Accuracy
- self.summary()
- # Optimizer
- #self.RPN_trainer = tf.train.MomentumOptimizer(learning_rate = self.lr, momentum = 0.9).minimize(self.total_loss, name = 'RPN_trainer')
- self.RPN_trainer = tf.train.AdamOptimizer(learning_rate = self.lr).minimize(self.total_loss, name = 'RPN_trainer')
- self.FastRCNN_trainer = tf.train.AdamOptimizer(learning_rate = self.lr).minimize(self.rcnn_total_loss, name = 'FastRCNN_trainer')
- self.session = tf.Session()
- def get_data(self, visualize = True):
- data_imgs, data_labels, data_coordinates, self.train_sizes = get_data('plate/images', 'plate/labels', self.min_size, self.max_size, visualize = False)
- self.train_imgs, self.train_labels, self.train_coordinates, self.val_imgs, self.val_labels, self.val_coordinates = split_dataset(data_imgs, data_labels, data_coordinates, self.valid)
- print('Train: {} Val: {}'.format(len(self.train_imgs), len(self.val_imgs)))
- def shuffle_data(self):
- ind = [i for i in range(len(self.train_labels))]
- ind = shuffle(ind)
- self.train_imgs, self.train_labels, self.train_coordinates = shuffle(self.train_imgs, self.train_labels, self.train_coordinates)
- def rpn_train(self):
- max_steps = int(len(self.train_labels) / self.BATCH_SIZE)
- writer = tf.summary.FileWriter('graphs', self.session.graph)
- self.session.run([tf.global_variables_initializer()])
- # Restore parameter
- print('Fineturning with VGG16-ImageNet ...')
- self.fturning_saver = fine_turning_vgg16()
- self.fturning_saver.restore(self.session, self.check_point)
- for epoch in range(self.max_epoches):
- # Shuffle data
- self.shuffle_data()
- print('Epoches {} / {} :'.format(epoch, self.max_epoches))
- sumary_total_loss = []
- sumary_confidence_loss = []
- sumary_regression_loss = []
- for step in range(max_steps):
- minibatch_imgs, minibatch_labels, minibatch_gt_coors, minibatch_sizes = self.train_imgs[step], self.train_labels[step], self.train_coordinates[step], self.train_sizes[step]
- height, width,_ = minibatch_imgs.shape
- #print('Image shape: {}x{}'.format(width, height))
- minibatch_imgs = np.array([minibatch_imgs])
- minibatch_indices, A_scores_one_hot, ground_truth_anchors_list = rpn_generate_batch_label(minibatch_gt_coors, 16, (width, height), self.aspect_ratios, self.scales, self.BATCH_SIZE)
- _, total_loss, confidence_loss, regression_loss, conf_list, summaries = self.session.run([self.RPN_trainer, self.total_loss, self.confidence_loss, self.regression_loss, self.confidence_list, self.summary_op], \
- feed_dict = { self.minibatch_indices: minibatch_indices, \
- self.A_scores_one_hot: A_scores_one_hot, \
- self.gt_anchors_list: ground_truth_anchors_list, \
- self.image_batch_placeholder: minibatch_imgs})
- #print('Confidence list shape {}'.format(conf_list.shape))
- writer.add_summary(summaries, global_step = epoch*max_steps + step)
- sumary_total_loss.append(total_loss)
- sumary_confidence_loss.append(confidence_loss)
- sumary_regression_loss.append(regression_loss)
- print('Step: {}/{} | Conf_loss: {} | Reg_loss: {} | Total_loss: {}'.format(step, max_steps, confidence_loss, regression_loss, total_loss))
- mean_total_loss = np.mean(np.array(sumary_total_loss))
- mean_confidence_loss = np.mean(np.array(sumary_confidence_loss))
- mean_regression_loss = np.mean(np.array(sumary_regression_loss))
- print('-----------------------------------------------------------------------------------------------------------------------------')
- print('Epoch: {} Conf_loss: {} Reg_loss: {} Total_loss: {}'.format(epoch, mean_confidence_loss, mean_regression_loss, mean_total_loss))
- print('-----------------------------------------------------------------------------------------------------------------------------')
- # if epoch % self.epoch_to_validate == 0:
- # self.eval()
- #pass
- self.saver.save(self.session, 'graphs/model.ckpt')
- writer.close()
- def fastRCNN_train(self, dim_reduce = 16):
- max_steps = int(len(self.train_labels) / self.BATCH_SIZE)
- writer = tf.summary.FileWriter('graphs', self.session.graph)
- print('Fast R-CNN training ...')
- self.session.run([tf.global_variables_initializer()])
- print('Restore RPN parameters ...')
- self.saver.restore(self.session, self.rpn_trained)
- for epoch in range(self.max_epoches):
- self.shuffle_data()
- print('Epoches {} / {} :'.format(epoch, self.max_epoches))
- sumary_total_loss = []
- sumary_confidence_loss = []
- sumary_regression_loss = []
- for step in range(max_steps):
- minibatch_imgs, minibatch_labels, minibatch_gt_coors, minibatch_sizes = self.train_imgs[step], self.train_labels[step], self.train_coordinates[step], self.train_sizes[step]
- height, width,_ = minibatch_imgs.shape
- minibatch_imgs = np.array([minibatch_imgs])
- minibatch_labels = np.array(minibatch_labels, dtype = np.uint8)
- minibatch_gt_coors = np.array(minibatch_gt_coors)
- # Get proposal boxes
- anchors = rpn_generate_anchor_boxes((int(width/dim_reduce), int(height/dim_reduce)), (width, height), self.aspect_ratios, self.scales)
- anchors = anchors.reshape((-1, 4))
- anchors_cleaned = remove_cross_boundaries_anchors(anchors, width, height)
- boxes_proposal = self.session.run(self.prediction, \
- feed_dict = {self.image_batch_placeholder : minibatch_imgs, \
- self.placeholder_anchors : anchors,\
- self.placeholder_anchors_cleaned: anchors_cleaned})
- # Training
- choosen_positive, choosen_idx, sample_labels_onehot, offsets = fastRCNN_generate_batch_labels(boxes_proposal, minibatch_gt_coors, minibatch_labels)
- _, total_loss, confidence_loss, regression_loss = self.session.run([self.FastRCNN_trainer, self.rcnn_total_loss, self.rcnn_cls_loss, self.rcnn_reg_loss], \
- feed_dict = {self.image_batch_placeholder : minibatch_imgs, \
- self.placeholder_positive_idx: choosen_positive, \
- self.placeholder_choosen_idx: choosen_idx, \
- self.placeholder_gt_cls: sample_labels_onehot, \
- self.placeholder_gt_reg: offsets, \
- self.placeholder_anchors : anchors,\
- self.placeholder_anchors_cleaned: anchors_cleaned })
- #print('Confidence list shape {}'.format(conf_list.shape))
- #writer.add_summary(summaries, global_step = epoch*max_steps + step)
- sumary_total_loss.append(total_loss)
- sumary_confidence_loss.append(confidence_loss)
- sumary_regression_loss.append(regression_loss)
- print('Step: {}/{} | Conf_loss: {} | Reg_loss: {} | Total_loss: {}'.format(step, max_steps, confidence_loss, regression_loss, total_loss))
- mean_total_loss = np.mean(np.array(sumary_total_loss))
- mean_confidence_loss = np.mean(np.array(sumary_confidence_loss))
- mean_regression_loss = np.mean(np.array(sumary_regression_loss))
- print('-----------------------------------------------------------------------------------------------------------------------------')
- print('Epoch: {} Conf_loss: {} Reg_loss: {} Total_loss: {}'.format(epoch, mean_confidence_loss, mean_regression_loss, mean_total_loss))
- print('-----------------------------------------------------------------------------------------------------------------------------')
- def eval(self):
- print('Validating ...')
- total_val_img = len(self.train_imgs)
- list_precision = []
- list_recall = []
- for i in range(total_val_img):
- precision, recall = [], []
- img, labels, gt_coors = self.train_imgs[i], self.train_labels[i], self.train_coordinates[i]
- img = img.reshape((1, self.img_height, self.img_width, 3))
- conf_list, reg_list, box_predict = self.session.run([self.confidence_list, self.regression_list, self.prediction], feed_dict = {self.image_batch_placeholder: img})
- cls_op = tf.nn.softmax(conf_list, axis = 1)
- with tf.Session() as sess:
- proba = sess.run(cls_op)
- proba = proba[:, 1]
- for cs in self.eval_cls:
- for iou in self.eval_iou:
- tmp1, tmp2 = calculate_eval(proba, box_predict, gt_coors, cs, iou, self.img_width, self.img_height)
- precision.append(tmp1)
- recall.append(tmp2)
- list_precision.append(precision)
- list_recall.append(recall)
- # Show result
- list_precision = np.array(list_precision)
- list_recall = np.array(list_recall)
- count = 0
- for cs in self.eval_cls:
- for iou in self.eval_iou:
- precision = np.mean(list_precision[count, :])
- recall = np.mean(list_recall[count, :])
- print('Cls: {} | IoU: {} | Precision: {} | Recall: {}'.format(cs, iou, precision, recall))
- def predict(self, image, cls_thresh = 0.7, dim_reduce = 16):
- img = read_and_rescale(image)
- height, width, _ = img.shape
- print('Resized: {}x{}'.format(width, height))
- self.session.run([tf.global_variables_initializer()])
- print('Loading pretrained model ...')
- #saver_temp = tf.train.Saver(v.op.name for v in slim.get_model_variables() if not v.startswith('rcnn'))
- self.saver.restore(self.session, 'graphs/model.ckpt')
- image = np.array([img])
- print('Predict: ')
- # Get anchors
- anchors = rpn_generate_anchor_boxes((int(width/dim_reduce), int(height/dim_reduce)), (width, height), self.aspect_ratios, self.scales)
- anchors = anchors.reshape((-1, 4))
- anchors_cleaned = remove_cross_boundaries_anchors(anchors, width, height)
- predict_boxes, probabilities, pp_cell = self.session.run([self.prediction, self.probabilities, self.reg], \
- feed_dict = {self.image_batch_placeholder:image, \
- self.placeholder_anchors: anchors,\
- self.placeholder_anchors_cleaned: anchors_cleaned})
- #predict_boxes = nms(predict_boxes, cleaned_probabilites)
- #print(boxes_cell)
- #print(reg)
- #print(reg.shape)
- #print(ftmap)
- print(pp_cell)
- for i in range(len(predict_boxes)):
- x1, y1, x2, y2 = predict_boxes[i]
- if x1>0 and y1>0 and x2>0 and y2>0:
- img = cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 0, 255), 1)
- cv2.imshow('predict', img)
- cv2.waitKey(0)
- if __name__ == '__main__':
- net = RPN()
- net.get_data(visualize = False)
- net.build_net()
- #net.summary()
- net.fastRCNN_train()
- #net.predict('plate/images/file_0.jpg')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement