Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cv2
- import os
- from Dataset_Utils.dataset_tools import findRelevantFace, enclosing_square
- from Dataset_Utils.facedetect_vggface2.face_detector import FaceDetector
- from Dataset_Utils.facedetect_vggface2.face_aligner import FaceAligner
- import dlib
- from tqdm import tqdm
- import numpy as np
- import joblib
- from keras.models import model_from_json
- from Dataset.Dataset_Utils.dataset_tools import findFaceOnSide, cut
- from openface_model import create_model
- BASE_PATH = os.path.dirname(os.path.abspath(__file__))
- cache_p = 'Aff_Wild_Cache/'
- input_p_ds = 'AFF-Wild/VA'
- def findCosineDistance(source_representation, test_representation):
- a = np.matmul(np.transpose(source_representation), test_representation)
- b = np.sum(np.multiply(source_representation, source_representation))
- c = np.sum(np.multiply(test_representation, test_representation))
- return 1 - (a / (np.sqrt(b) * np.sqrt(c)))
- def l2_normalize(x, axis=-1, epsilon=1e-10):
- output = x / np.sqrt(np.maximum(np.sum(np.square(x), axis=axis, keepdims=True), epsilon))
- return output
- def findEuclideanDistance(source_representation, test_representation):
- euclidean_distance = source_representation - test_representation
- euclidean_distance = np.sum(np.multiply(euclidean_distance, euclidean_distance))
- euclidean_distance = np.sqrt(euclidean_distance)
- # euclidean_distance = l2_normalize(euclidean_distance )
- return euclidean_distance
- class AffWild_Dataset:
- gen = None
- partition = None
- def __init__(self, partition='Training', input_path=input_p_ds, cache_path=cache_p, target_shape=(224, 224, 3),
- shuffle_samples=True, augment=True, custom_augmentation=None, debug_max_num_samples=None, split=False,
- split_len=16, cast_to_imgs=False, number_frames_for_cast=4):
- self.target_shape = target_shape
- self.custom_augmentation = custom_augmentation
- self.augment = augment
- self.split = split
- self.info = list()
- self.partition = partition
- import tensorflow as tf
- print("loading model openface")
- self.model = create_model()
- self.model.load_weights("Dataset_Utils/openface_weights.h5")
- print('Loading data...')
- cache_path = os.path.join(cache_p, partition)
- cache_file_name = '%s.%s.info' % ("aff_wild", partition)
- # if exist, read dataset info from file
- try:
- with open(os.path.join(cache_path, cache_file_name), 'rb') as f:
- self.info = joblib.load(f)
- if debug_max_num_samples is not None and debug_max_num_samples < len(self.info):
- self.info = self.data[:debug_max_num_samples]
- print("Data loaded. %d samples, from cache" % (len(self.info)))
- except FileNotFoundError:
- print('File not found,creating...')
- # read dataset starting from original video
- # check if cache folder samples exist
- if not os.path.isdir(cache_path):
- os.makedirs(cache_path)
- # if not, read all dataset info
- # read and process al video in cache
- print('Files not found,creating...')
- self._create_map(partition, input_path)
- # save dataset info as .cache file
- print("doing backup on cache file")
- with open(os.path.join(cache_path, cache_file_name), 'wb') as f:
- joblib.dump(self.info, f)
- def _create_map(self, partition, input_path_ds):
- fd = FaceDetector()
- fa = FaceAligner()
- annotation_path = input_path_ds + '/annotation' + '/' + partition
- video_path = input_path_ds + "/videos/" + partition
- # iterate over all video in dir
- for filename in tqdm(os.listdir(annotation_path)):
- print(filename)
- try:
- self._process_video(path=(annotation_path + "/" + filename), face_detector=fd, face_aligner=fa,
- time_step=16, video_dir_path=video_path)
- except Exception as e:
- try:
- print(e)
- finally:
- e = None
- del e
- def _get_annotations(self, path):
- video_name = path.split('/')[(-1)].replace('.txt', '')
- annotations = {'normal': [None, True], 'right': [None, False], 'left': [None, False]}
- if '_right' in video_name:
- video_name = video_name.replace('_right', '')
- annotations['normal'][1] = False
- annotations['right'][1] = True
- elif '_left' in video_name:
- video_name = video_name.replace('_left', '')
- annotations['normal'][1] = False
- annotations['left'][1] = True
- annotations_path = os.path.dirname(path)
- if os.path.isfile(annotations_path + "/" + video_name + '.txt'):
- annotations['normal'][0] = np.loadtxt((annotations_path + "/" + video_name + '.txt'), delimiter=',',
- skiprows=1)
- if os.path.isfile(annotations_path + "/" + video_name + '_right.txt'):
- annotations['right'][0] = np.loadtxt((annotations_path + "/" + video_name + '_right.txt'), delimiter=',',
- skiprows=1)
- if os.path.isfile(annotations_path + "/" + video_name + '_left.txt'):
- annotations['left'][0] = np.loadtxt((annotations_path + "/" + video_name + '_left.txt'), delimiter=',',
- skiprows=1)
- return annotations
- def _process_video(self, path='', face_detector=None, face_aligner=None, time_step=16, video_dir_path=''):
- video_name = path.split('/')[(-1)].replace('.txt', '')
- mode = 'normal'
- annotations = self._get_annotations(path)
- if "_right" in video_name:
- video_name = video_name.replace('_right', '')
- elif "_left" in video_name:
- video_name = video_name.replace('_left', '')
- video_path = video_dir_path + "/" + video_name
- if os.path.isfile(video_path + '.mp4'):
- video_path = video_path + '.mp4'
- if os.path.isfile(video_path + '.avi'):
- video_path = video_path + '.avi'
- cv2video = cv2.VideoCapture(video_path)
- extra_annotation = None
- ann = None
- for k, v in annotations.items():
- if v[1]:
- ann = [k, v[0]]
- elif v[0] is not None:
- extra_annotation = [k, v[0]]
- info = self._init_map(video_name, cv2video, time_step, ann[1])
- if (cv2video.isOpened() == False):
- print("Error opening video stream or file")
- info['roi'], info['landmarks'], info['corrupted_frames'], info['indices'] = self._get_video_info(cv2video,
- face_detector,
- face_aligner,
- info, mode,
- time_step, ann,
- extra_annotation)
- self.info.append(info)
- def _init_map(self, video_name, cv2video, time_step, ann):
- print("Init info map")
- info = {
- 'video_name': video_name,
- 'total_frames': cv2video.get(cv2.CAP_PROP_FRAME_COUNT),
- 'fps': cv2video.get(cv2.CAP_PROP_FPS),
- 'height': cv2video.get(cv2.CAP_PROP_FRAME_HEIGHT),
- 'width': cv2video.get(cv2.CAP_PROP_FRAME_WIDTH),
- 'time_step': time_step,
- 'indices': list(),
- 'annotations': ann,
- 'duration': cv2video.get(cv2.CAP_PROP_FRAME_COUNT) / cv2video.get(cv2.CAP_PROP_FPS),
- 'landmarks': list(),
- 'roi': list(),
- 'corrupted_frames': list(),
- }
- print("End Init info map")
- return info
- def _get_video_info(self, cv2video, face_detector, face_aligner, info, mode, time_step, current_annotation,
- extra_annotation):
- frame_counter = 0
- rois = list()
- landmarks = list()
- corrupted_frames = list()
- indices = list()
- first = True
- curr_bound = 0
- last_corr = 0
- print("l1: ",len(current_annotation[1]))
- print("l2: ",len(extra_annotation[1]))
- print("tot:",info['total_frames'])
- # Read until video is completedwhile(frame_counter < info['total_frames']):
- print("Processing video")
- # pbar = tqdm(total=info['total_frames'] + 1)
- prec_face = None
- while (frame_counter < info['total_frames']):
- # Capture frame-by-frame
- if True:
- ret, frame = cv2video.read()
- if ret == True:
- faces = face_detector.detect(frame)
- if extra_annotation is None:
- f = findRelevantFace(faces, frame.shape[1], frame.shape[0])
- else:
- if len(faces) > 1:
- if (current_annotation[0] == 'right' or 'left') and (
- extra_annotation[0] == 'right' or 'left'):
- # both annotations (right left) present
- if current_annotation[0] == 'right':
- f = findFaceOnSide(faces, True, frame.shape[1])
- else:
- f = findFaceOnSide(faces, False, frame.shape[1])
- elif current_annotation[0] == 'normal' or extra_annotation[0] == 'normal':
- if current_annotation[0] == 'normal':
- if extra_annotation[0] == 'right':
- f = findFaceOnSide(faces, False, frame.shape[1], True)
- else:
- f = findFaceOnSide(faces, True, frame.shape[1], True)
- else:
- if current_annotation[0] == 'right':
- f = findFaceOnSide(faces, True, frame.shape[1])
- else:
- f = findFaceOnSide(faces, False, frame.shape[1])
- elif len(faces) == 1:
- current_frame_annotation = False if current_annotation[1][frame_counter][0] == 0 and \
- current_annotation[1][frame_counter][1] == 0 else True
- current_extra_frame_annotation = False if extra_annotation[1][frame_counter][0] == 0 and \
- extra_annotation[1][frame_counter][
- 1] == 0 else True
- #print(current_extra_frame_annotation)
- if current_extra_frame_annotation is False or current_frame_annotation is True:
- f = findRelevantFace(faces, frame.shape[1], frame.shape[0])
- else:
- f = None
- else:
- f = None
- if (f is not None) and (f['img'].size != 0):
- tmp_roi = enclosing_square(f['roi'])
- tmp_img = cut(frame,tmp_roi)
- tmp_img = cv2.resize(tmp_img,(96,96))
- if prec_face is None:
- prec_face = tmp_img
- if frame_counter > 0:
- if current_annotation[0] == 'right':
- cv2.imshow("fimg",f['img'])
- cv2.imshow("tmp",tmp_img)
- cv2.imshow("prec",prec_face)
- cv2.waitKey(0)
- img1_representation = self.model.predict(np.expand_dims(prec_face,axis=0))[0,:]
- img2_representation = self.model.predict(np.expand_dims(tmp_img,axis=0))[0,:]
- cosine = findCosineDistance(img1_representation, img2_representation)
- euclidean = findEuclideanDistance(img1_representation, img2_representation)
- prec_face = tmp_img
- print("cosine: ",cosine)
- print("eucl: ",euclidean)
- f['roi'] = enclosing_square(f['roi'])
- # f['roi'] = add_margin(f['roi'], 0.2)
- detections = dlib.full_object_detections()
- detections.append(
- face_aligner.get_shape_detections(frame, dlib.rectangle(f['roi'][0], f['roi'][1],
- f['roi'][0] + f['roi'][2],
- f['roi'][1] + f['roi'][
- 3])))
- rois.append(f['roi'])
- landmarks.append(detections)
- # cv2.imshow("left", f['img'])
- # cv2.waitKey(0)
- else:
- corrupted_frames.append(frame_counter)
- rois.append(None)
- landmarks.append(None)
- # Break the loop
- else:
- print("ret False")
- corrupted_frames.append(frame_counter)
- rois.append(None)
- landmarks.append(None)
- if frame_counter - len(corrupted_frames) > 0:
- if (frame_counter - len(corrupted_frames)) % time_step == 0:
- if (frame_counter - curr_bound) >= len(corrupted_frames) - last_corr + time_step:
- indices.append((curr_bound, frame_counter - 1))
- last_corr = len(corrupted_frames)
- curr_bound = frame_counter
- else:
- last_corr = len(corrupted_frames)
- curr_bound = frame_counter
- frame_counter += 1
- # pbar.update(1)
- # pbar.close()
- # When everything done, release the video capture object
- cv2video.release()
- # Closes all the frames
- cv2.destroyAllWindows()
- return (rois, landmarks, corrupted_frames, indices)
- if __name__ == "__main__":
- AffWild_Dataset()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement