Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- import sys
- from PIL import Image, ImageDraw, ImageEnhance, ImageColor
- class Rpn:
- def __init__(self, tf, resnet_model, image_height, image_width, in_channels, mid_channels, base_anchor_size, anchor_ratios, anchor_scales, subsample_rate):
- """
- in_channels for resnet: 512
- mid_channels: 512
- base_anchor_size: 128
- anchor_ratios: [[1,1],[1,2],[2,1]]
- anchor_scales: [1,2,3,4]
- subsample_rate: (image_size//feature_map_size), ex. (1000//32) --> 31
- """
- self.tf = tf
- self.resnet_model = resnet_model
- self.resnet_model_inputs = self.resnet_model.input
- self.resnet_model_feature_map = resnet_model.output
- self.image_height = image_height
- self.image_width = image_width
- self.in_channels = in_channels
- self.mid_channels = mid_channels
- self.base_anchor_size = base_anchor_size
- self.anchor_ratios = anchor_ratios
- self.anchor_scales = anchor_scales
- self.subsample_rate = subsample_rate
- self.anchor_boxes = self.generate_anchor_boxes(self.base_anchor_size, self.anchor_ratios, self.anchor_scales)
- self.anchor_boxes_over_image = self.generate_anchor_boxes_over_image(self.anchor_boxes, self.image_height, self.image_width, self.subsample_rate)
- self.model = self.create_model()
- def get_model(self):
- return self.model
- def create_model(self):
- rpn_conv = self.tf.keras.layers.SeparableConv2D(filters=512, kernel_size=[3, 3], strides=[1, 1], padding="same", data_format="channels_last", activation="relu")(self.resnet_model.output)
- rpn_class_score = self.tf.keras.layers.SeparableConv2D(filters=24, kernel_size=[1, 1], strides=[1, 1], padding="valid", data_format="channels_last", activation="softmax")(rpn_conv)
- rpn_class_score_shape = rpn_class_score.get_shape().as_list()
- rpn_class_score = self.tf.keras.layers.Reshape(rpn_class_score_shape[1:3] + [rpn_class_score_shape[3]//2, 2])(rpn_class_score)
- rpn_bbox_pred = self.tf.keras.layers.SeparableConv2D(filters=48, kernel_size=[1, 1], strides=[1, 1], padding="valid", data_format="channels_last")(rpn_conv)
- rpn_bbox_pred_shape = rpn_bbox_pred.get_shape().as_list()
- rpn_bbox_pred = self.tf.keras.layers.Reshape(rpn_bbox_pred_shape[1:3] + [rpn_bbox_pred_shape[3]//4, 4])(rpn_bbox_pred)
- rpn_model = self.tf.keras.Model(inputs=self.resnet_model.input, outputs=[rpn_class_score, rpn_bbox_pred])
- self.optimizer = self.tf.keras.optimizers.Nadam(0.001)
- #rpn_model.compile(optimizer=self.optimizer, loss=[self.tf.keras.losses.BinaryCrossentropy(),self.tf.keras.losses.Huber()], metrics=['binary_accuracy', 'MeanSquaredError'])
- return rpn_model
- def get_iou(self, anchor_box_predictions, ground_truth_bounding_boxes, giou=False):
- """ Predicted and ground truth bounding box coordinates """
- anchor_box_predictions_center_x, anchor_box_predictions_center_y, anchor_box_predictions_width, anchor_box_predictions_height = anchor_box_predictions[:,:,:,:,0:1], anchor_box_predictions[:,:,:,:,1:2], anchor_box_predictions[:,:,:,:,2:3], anchor_box_predictions[:,:,:,:,3:4]
- anchor_box_predictions_x1 = anchor_box_predictions_center_x - (anchor_box_predictions_width / 2)
- anchor_box_predictions_x2 = anchor_box_predictions_center_x + (anchor_box_predictions_width / 2)
- anchor_box_predictions_y1 = anchor_box_predictions_center_y - (anchor_box_predictions_height / 2)
- anchor_box_predictions_y2 = anchor_box_predictions_center_y + (anchor_box_predictions_height / 2)
- """ For the predicted box ensure x2>x1 and y2>y1: """
- anchor_box_predictions_x1, anchor_box_predictions_x2 = np.minimum(anchor_box_predictions_x1, anchor_box_predictions_x2), np.maximum(anchor_box_predictions_x1, anchor_box_predictions_x2)
- anchor_box_predictions_y1, anchor_box_predictions_y2 = np.minimum(anchor_box_predictions_y1, anchor_box_predictions_y2), np.maximum(anchor_box_predictions_y1, anchor_box_predictions_y2)
- """ Flatten ground truth boxes sorta """
- concat = np.concatenate(ground_truth_bounding_boxes, axis=0)
- ground_truth_bounding_boxes_center_x, ground_truth_bounding_boxes_center_y, ground_truth_bounding_boxes_width, ground_truth_bounding_boxes_height = (concat[:, 0:1]).reshape((1,1,1,1,-1)), (concat[:, 1:2]).reshape((1,1,1,1,-1)), (concat[:, 2:3]).reshape((1,1,1,1,-1)), (concat[:, 3:4]).reshape((1,1,1,1,-1))
- ground_truth_bounding_boxes_x1 = ground_truth_bounding_boxes_center_x - (ground_truth_bounding_boxes_width / 2)
- ground_truth_bounding_boxes_x2 = ground_truth_bounding_boxes_center_x + (ground_truth_bounding_boxes_width / 2)
- ground_truth_bounding_boxes_y1 = ground_truth_bounding_boxes_center_y - (ground_truth_bounding_boxes_height / 2)
- ground_truth_bounding_boxes_y2 = ground_truth_bounding_boxes_center_y + (ground_truth_bounding_boxes_height / 2)
- """ Get areas of boxes """
- anchor_box_predictions_area = (anchor_box_predictions_x2 - anchor_box_predictions_x1) * (anchor_box_predictions_y2 - anchor_box_predictions_y1)
- ground_truth_bounding_boxes_area = (ground_truth_bounding_boxes_x2 - ground_truth_bounding_boxes_x1) * (ground_truth_bounding_boxes_y2 - ground_truth_bounding_boxes_y1)
- """ Calculate intersection between prediction boxes and ground truth boxes """
- x1_intersection, x2_intersection = np.maximum(anchor_box_predictions_x1, ground_truth_bounding_boxes_x1), np.minimum(anchor_box_predictions_x2, ground_truth_bounding_boxes_x2)
- y1_intersection, y2_intersection = np.maximum(anchor_box_predictions_y1, ground_truth_bounding_boxes_y1), np.minimum(anchor_box_predictions_y2, ground_truth_bounding_boxes_y2)
- """ Calculate intersection area """
- intersection = np.where(np.logical_and(x2_intersection > x1_intersection, y2_intersection > y1_intersection), (x2_intersection - x1_intersection) * (y2_intersection - y1_intersection), 0)
- """ Find the coordinates of smallest enclosing box (union) """
- x1_coord, x2_coord = np.minimum(anchor_box_predictions_x1, ground_truth_bounding_boxes_x1), np.maximum(anchor_box_predictions_x2, ground_truth_bounding_boxes_x2)
- y1_coord, y2_coord = np.minimum(anchor_box_predictions_y1, ground_truth_bounding_boxes_y1), np.maximum(anchor_box_predictions_y2, ground_truth_bounding_boxes_y2)
- """ Get area of union box """
- union_area = (x2_coord - x1_coord) * (y2_coord - y1_coord)
- """ Intersection over union """
- iou = intersection / (anchor_box_predictions_area + ground_truth_bounding_boxes_area - intersection)
- Giou = iou - ((union_area-(anchor_box_predictions_area + ground_truth_bounding_boxes_area - intersection))/union_area)
- indices = [len(bb) for bb in ground_truth_bounding_boxes]
- indices_end = [np.sum(indices[:i + 1]) for i in range(len(indices))]
- indices_start = np.insert(indices_end, 0, 0)[:-1]
- """ get iou, and giou with respect to correct bounding boxes for each batch image, also get those correct bounding boxes """
- iou_true = np.zeros(iou.shape[:-1])
- Giou_true = np.zeros(Giou.shape[:-1])
- bounding_boxes_true = np.zeros(anchor_box_predictions.shape)
- for index, (s, e) in enumerate(zip(indices_start, indices_end)):
- iou_true[index] = np.amax(iou[index,:,:,:, s:e], axis=3)
- Giou_true[index] = np.amax(Giou[index,:,:,:, s:e], axis=3)
- if giou is True:
- bounding_boxes_true[index] = (ground_truth_bounding_boxes[index][np.argmax(Giou[index,:,:,:, s:e], axis=3)])[:,:,:,:4]
- else:
- bounding_boxes_true[index] = (ground_truth_bounding_boxes[index][np.argmax(iou[index,:,:,:, s:e], axis=3)])[:,:,:,:4]
- return (Giou_true,bounding_boxes_true) if giou else (iou_true, bounding_boxes_true)
- def train_model(self, inputs, ground_truth_bounding_boxes):
- batch_size = inputs.shape[0]
- predictions = self.model.predict(inputs)
- object_predictions = predictions[0]
- anchor_box_change_predictions = predictions[1]
- anchor_box_predictions = self.anchor_boxes_over_image + anchor_box_change_predictions
- anchor_boxes_giou, anchor_boxes_nearest_bounding_box = self.get_iou(anchor_box_predictions, ground_truth_bounding_boxes, giou=True)
- object_prediction_labels = np.zeros(object_predictions.shape[:-1])
- object_prediction_labels[anchor_boxes_giou > 0.5] += 1
- print(object_predictions.shape, anchor_box_change_predictions.shape, object_prediction_labels.shape)
- print([x.shape for x in self.model.trainable_weights])
- sys.exit()
- mini_batch_sample_size = 256*batch_size
- object_predictions_flat = object_predictions.flatten()
- object_prediction_labels_flat = object_prediction_labels.flatten()
- anchor_box_predictions_flat = anchor_box_predictions.reshape((-1, 4))
- anchor_boxes_nearest_bounding_box_flat = anchor_boxes_nearest_bounding_box.reshape((-1, 4))
- """Shuffle flattened y_pred and y_true all in the same order"""
- random_indices = np.random.choice(object_predictions_flat.shape[0], object_predictions_flat.shape[0], replace=False)
- object_predictions_flat = object_predictions_flat[random_indices]
- object_prediction_labels_flat = object_prediction_labels_flat[random_indices]
- anchor_box_predictions_flat = anchor_box_predictions_flat[random_indices]
- anchor_boxes_nearest_bounding_box_flat = anchor_boxes_nearest_bounding_box_flat[random_indices]
- """ Sort in ascending order from background to foreground """
- ind_sorted = np.argsort(object_prediction_labels_flat)
- object_predictions_flat = object_predictions_flat[ind_sorted]
- object_prediction_labels_flat = object_prediction_labels_flat[ind_sorted]
- anchor_box_predictions_flat = anchor_box_predictions_flat[ind_sorted]
- anchor_boxes_nearest_bounding_box_flat = anchor_boxes_nearest_bounding_box_flat[ind_sorted]
- """ Get 128 background anchors and 128 foreground anchors and merge them into a single batch """
- split_amount = mini_batch_sample_size // 2
- #Background
- background_object_predictions_flat = object_predictions_flat[:split_amount]
- background_object_prediction_labels_flat = object_prediction_labels_flat[:split_amount]
- #Foreground
- foreground_object_predictions_flat = object_predictions_flat[-split_amount:]
- foreground_object_prediction_labels_flat = object_prediction_labels_flat[-split_amount:]
- foreground_anchor_box_predictions_flat = anchor_box_predictions_flat[-split_amount:]
- foreground_anchor_boxes_nearest_bounding_box_flat = anchor_boxes_nearest_bounding_box_flat[-split_amount:]
- #Merge
- final_object_predictions_flat = np.concatenate((background_object_predictions_flat,foreground_object_predictions_flat))
- final_object_prediction_labels_flat = np.concatenate((background_object_prediction_labels_flat, foreground_object_prediction_labels_flat))
- #Take only foreground anchor and the closest ground truth boxes from the mini batch
- final_anchor_box_predictions_flat = foreground_anchor_box_predictions_flat[np.nonzero(foreground_object_prediction_labels_flat)]
- final_anchor_boxes_nearest_bounding_box_flat = foreground_anchor_boxes_nearest_bounding_box_flat[np.nonzero(foreground_object_prediction_labels_flat)]
- #====================================================WHAT DO I DO FROM HERE?????????????????====================================================
- class_binary_loss = self.tf.keras.losses.BinaryCrossentropy
- huber = self.tf.keras.losses.Huber(reduction=self.tf.keras.losses.Reduction.NONE)
- anchor_box_regression_loss = huber(final_anchor_boxes_nearest_bounding_box_flat, final_anchor_box_predictions_flat)
- print(anchor_box_regression_loss.shape)
- #print((tf.keras.optimizers.get_weights()).shape)
- #print([x.shape for x in self.model.trainable_weights])
- lo = class_binary_loss(final_object_prediction_labels_flat, final_object_predictions_flat)
- #print(lo.shape)
- #print(lo.shape)
- g = self.tf.keras.backend.gradients(self.tf.ones([1], tf.int32), self.model.trainable_weights[:-3])
- #self.optimizer.minimize(lo, self.model.trainable_weights[:-3])
- #self.optimizer.minimize(anchor_box_regression_loss, self.model.trainable_weights[:-6]+self.model.trainable_weights[-3:])
- #print(class_binary_loss, anchor_box_regression_loss)
- def generate_anchor_boxes(self, base_anchor_size, anchor_ratios, anchor_scales):
- """
- Every anchor box is different
- Number of anchor boxes: len(anchor_ratios)*len(anchor_scales)
- """
- anchor_boxes = []
- for scale in anchor_scales:
- for aspect_ratio in anchor_ratios:
- height = base_anchor_size*scale*aspect_ratio[0]
- width = base_anchor_size*scale*aspect_ratio[1]
- anchor_box = [width, height]
- anchor_boxes.append(anchor_box)
- return anchor_boxes
- def generate_anchor_boxes_over_image(self, anchor_boxes, image_height, image_width, subsample_rate):
- """
- Returns an array of all the anchor boxes, each with shape [anchor_box_center_x, anchor_box_center_y, width, height]
- """
- anchor_boxes_over_image = np.zeros(( image_height//subsample_rate, image_width//subsample_rate, len(anchor_boxes), 4))
- for row in range(anchor_boxes_over_image.shape[0]):
- for col in range(anchor_boxes_over_image.shape[1]):
- for anchor_idx in range(anchor_boxes_over_image.shape[2]):
- anchor_boxes_over_image[row,col,anchor_idx] = [(row+1)*subsample_rate, (col+1)*subsample_rate, anchor_boxes[anchor_idx][0], anchor_boxes[anchor_idx][1]]
- return anchor_boxes_over_image
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement