Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import csv
- import glob
- import os
- import pathlib
- import random
- import subprocess
- import sys
- import cv2
- import dlib
- import joblib
- import numpy as np
- from tqdm import tqdm
- import shutil
- import Models.Config.SEResNet50_config as config
- from mtcnn.mtcnn import MTCNN
- base_path=os.path.dirname(os.path.abspath(__file__))
- ###AFEW utils
- def _read_dataset(partition,input_path_ds,output_path_cache=base_path + '/CacheFrameProcessing',debug_max_num_samples=None , cache_p = None):
- """read a partition of dataset"""
- print("Init reading from video files")
- data = []
- if not os.path.isdir(output_path_cache):
- os.makedirs(output_path_cache)
- #iterate partition
- for set in list_dirs(input_path_ds):
- if partition == os.path.basename(set):
- print("Processing partition: ",partition)
- #for this partition extract all video frames
- for class_dir in tqdm(list_dirs(set)):
- print("Procssing class: ",os.path.basename(class_dir))
- #init params
- openface_fdir = ""
- label = os.path.basename(class_dir)
- #exctract video frames for any video in a class
- openface_fdir, _ = extract_frames_from_video_folder(class_dir,output_path_cache, debug_max_num_samples, cache_p, partition)
- #preprocess every video frame by detectding and aligning faces
- returned_sequences, map_infos = pre_process_video(openface_fdir,output_path_cache,cache_p, partition)
- #append processed data
- data += process_data(returned_sequences,map_infos,label)
- #check dataset integrity and get statistics
- data = check_data(data,output_path_cache, cache_p, partition,input_path_ds)
- #flush
- shutil.rmtree(output_path_cache)
- return data
- def recover_data(input_path_ds, output_cache_path, cache_p, partition, failed_sequences):
- print("Recovering failed videos")
- recovered = []
- recover_path = cache_p + "/to_recover"
- if not os.path.isdir(recover_path):
- os.makedirs(recover_path)
- #iterate partition
- for set in list_dirs(input_path_ds):
- if partition == os.path.basename(set):
- for class_dir in tqdm(list_dirs(set)):
- file_list = glob.glob('{}/*.avi'.format(class_dir))
- file_list.sort()
- for f in range(0, file_list.__len__()):
- aviName = file_list[f].split('/')[(-1)].rstrip('.avi')
- for item in failed_sequences:
- if aviName in item:
- shutil.copy(f,recover_path)
- openface_fdir, _ = extract_frames_from_video_folder(recover_path,recover_path, None, cache_p, partition)
- #generate all bbox for failed video with our detector
- fd = MTCNN()
- bbox_dir = get_bbox(recover_path,fd)
- #preprocess every video frame by using our bbox for aligning faces
- returned_sequences, map_infos = pre_process_video(openface_fdir,output_cache_path,cache_p, partition,bbox_dir)
- for i,video in enumerate(returned_sequences):
- new_seq = list()
- new_map_info = list()
- for item in failed_sequences:
- if map_infos[i]['video_name'] == item[0]:
- label = item[1]
- new_seq.append(returned_sequences[i])
- new_map_info.append(map_infos[i])
- recovered += process_data(new_seq,new_map_info,label)
- print("End recovering failed data")
- return recovered
- def get_bbox(recover_path, fd):
- pass
- def process_data(sequences, infos, label):
- data = []
- for i in range(len(sequences)):
- example = {
- 'frames': sequences[i],
- 'label': label,
- 'info': infos[i],
- }
- data.append(example)
- return data
- def check_data(data, output_cache_path, cache_p, partition, input_path_ds):
- """Check data video integrity filtering out bad sequences, in addition a statistics log will be stored"""
- total_frames = 0 #total frames in data
- tatal_frames_discarded = 0 #without face or with wrong prediction
- total_faces_recognized_percentage = list() #percentage of face recognition/alignment success
- total_failed_sequences = list() #will contain all video's names failed during pre process
- print("Checking data integrity")
- #open statistic file in order to store statistics data
- csv.register_dialect('mydialect', delimiter = ';', quotechar = '"', lineterminator = '\r\n', quoting = csv.QUOTE_MINIMAL)
- with open(os.path.join(cache_p,'dataset_' + partition + '_statistics.csv'), 'w', newline='') as stats_file:
- print("Stats log file opened")
- writer = csv.writer(stats_file,dialect='mydialect')
- writer.writerow(["Video", "Label", "Total frames", "Discarded frames", "face_presence_percentage"])
- #iterate over all items
- for item in data:
- info = item['info']
- if info['total_frames'] - info['discarded_frames'] > 0:
- writer.writerow([info['video_name'], item['label'], info['total_frames'], info['discarded_frames'], info['face_present_percentage']])
- #update global stats variable
- total_frames += info['total_frames']
- tatal_frames_discarded += info['discarded_frames']
- total_faces_recognized_percentage.append(info['face_present_percentage'])
- elif info['total_frames'] - info['discarded_frames'] == 0:
- total_failed_sequences.append((info['video_name'],item['label']))
- data.remove(item)
- #recover failed_sequences if there are
- if len(total_failed_sequences) > 0:
- #write dataset stats
- writer.writerow([' '])
- writer.writerow(['Recovered videos'])
- writer.writerow(["Video", "Label", "Total frames", "Discarded frames", "face_presence_percentage"])
- recovered = recover_data(input_path_ds, output_cache_path, cache_p, partition, total_failed_sequences)
- #update new statistics based on new recovered videos
- for item in recovered:
- info = item['info']
- writer.writerow([info['video_name'], item['label'], info['total_frames'], info['discarded_frames'], info['face_present_percentage']])
- #update global stats variable
- total_frames += info['total_frames']
- tatal_frames_discarded += info['discarded_frames']
- total_faces_recognized_percentage.append(info['face_present_percentage'])
- data += recovered
- #write dataset stats
- writer.writerow([' '])
- writer.writerow(['Dataset statistics'])
- writer.writerow(["Total frames", "Total discarded frames", "face_presence_percentage_mean", "Failed sequences"])
- writer.writerow([total_frames, tatal_frames_discarded, np.mean(total_faces_recognized_percentage), '\r\n'.join(total_failed_sequences) ])
- stats_file.close()
- print("End check data integrity")
- return data
- def list_dirs(directory):
- """Returns all directories in a given directory"""
- return [f for f in pathlib.Path(directory).iterdir() if f.is_dir()]
- def _get_video_confidence():
- #non risolti
- #005543160 casinò
- #020913240 django
- #011029550 harry potter amico
- #001522614 piton
- confidences = {
- '002818854':0.15,
- '004312720':0.15,
- '012256040':0.15,
- '015407880':0.25,
- '001143440':0.25,
- '002021400':0.15,
- '004305440':0.15,
- '003044960':0.15,
- '023340360':0.24,
- '004827807':0.162,
- '014928240':0.15,
- '004513880':0.17,
- }
- return confidences
- def extract_frames_from_video_folder(input_avi, output_path_cache, debug_max_num_samples, cache_p, partition):
- """Extract frames from a folder(class)"""
- file_list = glob.glob('{}/*.avi'.format(input_avi))
- file_list.sort()
- data = []
- error_video = []
- #iterate over all video in dir
- openface_fdir = []
- print("Init Frames Extraction")
- current_num_samples = 0
- for f in range(0, file_list.__len__()):
- try:
- aviName = file_list[f].split('/')[(-1)].rstrip('.avi')
- #get path and file name
- save_path = '{}/{}'.format(output_path_cache,aviName)
- if not os.path.isdir(save_path):
- os.makedirs(save_path)
- output = '{}/{}-%3d_frame.png'.format(save_path ,aviName)
- #get aspect ratio
- asr = get_output_size(file_list[f])
- #extract all frames from a video
- extract_frames(file_list[f], output, asr, cache_p, partition)
- openface_fdir.append(save_path)
- if debug_max_num_samples is not None:
- if current_num_samples == debug_max_num_samples-1:
- break
- except:
- #check and count video lost
- error_video.append(aviName)
- print(aviName + ' ffmpeg failed' + '\n')
- current_num_samples += 1
- print("End Frames Extraction")
- return openface_fdir, error_video
- def extract_frames(src, dest, asr, cache_p, partition):
- """Call ffmpeg service and save all frames in dest folder"""
- print("Calling FFMPEG on video: ",os.path.basename(src))
- #command = ["ffmpeg", "-i", src,"-s", asr, "-q:a", "1", dest]
- command = ['ffmpeg', '-loglevel','info','-hide_banner','-nostats','-i', src,'-s', asr, '-q:a', '1', dest]
- try:
- log_file = open(os.path.join(cache_p,'FFMPEG_output_' + partition + '.log'),"a")
- p = subprocess.Popen(command, stdout=log_file, stderr=log_file).wait()
- log_file.close()
- except Exception as e:
- print(e)
- def pre_process_video(openface_fdir,frames_dir,cache_p, partition, resize_shape=(224, 224), bbox = None):
- """preprocess video"""
- aligned_videos = []
- all_maps = []
- print("Init pre processing")
- #create command for open face
- command = ['/Users/dp.alex/OpenFace/build/bin/FeatureExtraction']
- for _dir in openface_fdir:
- command.append("-fdir")
- command.append(_dir)
- if bbox is not None:
- command.append("-bboxdir")
- command.append(bbox)
- resize_shape = 400
- scale = 1.46
- command += ['-out_dir', frames_dir, '-simsize', str(resize_shape), '-simscale', str(scale),
- '-format_aligned', 'png', '-nomask', '-multiview', '1', '-simalign', '-wild', '-nobadaligned']
- try:
- print("Calling OpenFace")
- log_file = open(os.path.join(cache_p,'OpenFace_output_' + partition + '.log'),"a")
- p = subprocess.Popen(command, stdout = log_file, stderr = log_file).wait()
- log_file.close()
- print("End OpenFace")
- except Exception as e:
- print(e)
- #theshold for diltering out bad faces
- threshold_detection = 0.1
- #keep needed info from openface csv out
- for filename in os.listdir(frames_dir):
- if filename.endswith(".csv"):
- aligned_frames = []
- filename = filename[:-4]
- aligned_frames_dir = frames_dir + "/" + filename + "_aligned"
- #open csv
- with open(frames_dir+"/"+filename+".csv",mode = 'r') as csv_file:
- csv_reader = csv.DictReader(csv_file, delimiter=',')
- line_count = 0
- map_info = {}
- map_frame = {}
- map_info['video_name'] = filename
- readed_frames = 0
- discarded_frames = 0
- for row in csv_reader:
- if int(row[' success']) == 1 and float(row[' confidence']) > threshold_detection:
- aligned_frame ='{}/frame_det_00_{:06d}.png'.format(aligned_frames_dir,int(row['frame']))
- aligned_frames.append(cv2.imread(aligned_frame))
- map_frame[row['frame']] = row[' confidence']
- else:
- discarded_frames += 1
- readed_frames = int(row['frame'])
- csv_file.close()
- map_info['total_frames'] = readed_frames
- map_info['discarded_frames'] = discarded_frames
- map_info['face_present_percentage'] = np.round((readed_frames - discarded_frames)/readed_frames,2)
- map_info['detections_info'] = map_frame
- all_maps.append(map_info)
- aligned_videos.append(aligned_frames)
- #when everything is done flush directories
- shutil.rmtree(frames_dir+"/"+filename)
- shutil.rmtree(frames_dir+"/"+filename+"_aligned")
- os.remove(frames_dir+"/"+filename+".csv")
- print("End pre processing")
- return aligned_videos, all_maps
- def get_output_size(path, fixed = True, w=720, h=480):
- """given input path of video, returns it's width and height"""
- cap = cv2.VideoCapture(path)
- if fixed:
- width = w
- height = h
- else:
- if cap.isOpened():
- width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- return '{}x{}'.format(width, height)
- def split_video(item= None,split_len=16,partition = 'Train'):
- splitted_video =[]
- video = item['frames']
- label = item['label']
- len_video=len(video)
- steps = len_video // split_len
- rest = len_video % split_len
- i = 0
- #if video len is > of split len
- if steps >0:
- #get all possible sequences
- while i < steps:
- start = i*split_len
- stop = (i*split_len)+split_len
- actual = np.array(video[start:stop])
- item = {
- 'frames' : actual,
- 'label' : label,
- }
- splitted_video.append(item)
- i += 1
- pads = []
- #do padding if there are enough samples left
- if 'val' not in partition.lower():
- print('Padding on train gen video')
- if rest >= (split_len/2):
- for i in range(split_len-rest):
- pads.append(video[-1])
- start = stop
- last = np.concatenate( (video[start:], pads),axis=0)
- item = {
- 'frames':np.array(last),
- 'label':label,
- }
- splitted_video.append(item)
- #do padding il video_len is < split_len
- elif steps == 0:
- rest = split_len - len_video
- pads = []
- for i in range(rest):
- pads.append(video[-1])
- last = np.concatenate( (video, pads),axis=0)
- item = {
- 'frames':np.array(last),
- 'label':label,
- }
- splitted_video.append(item)
- return splitted_video
- def top_left(f):
- return (f['roi'][0], f['roi'][1])
- def bottom_right(f):
- return (f['roi'][0]+f['roi'][2], f['roi'][1]+f['roi'][3])
- def enclosing_square(rect):
- def _to_wh(s,l,ss,ll, width_is_long):
- if width_is_long:
- return l,s,ll,ss
- else:
- return s,l,ss,ll
- def _to_long_short(rect):
- x,y,w,h = rect
- if w>h:
- l,s,ll,ss = x,y,w,h
- width_is_long = True
- else:
- s,l,ss,ll = x,y,w,h
- width_is_long = False
- return s,l,ss,ll,width_is_long
- s,l,ss,ll,width_is_long = _to_long_short(rect)
- hdiff = (ll - ss)//2
- s-=hdiff
- ss = ll
- return _to_wh(s,l,ss,ll,width_is_long)
- def add_margin(roi, qty):
- return (
- (roi[0]-qty),
- (roi[1]-qty),
- (roi[2]+2*qty),
- (roi[3]+2*qty ))
- def cut(frame, roi):
- pA = ( int(roi[0]) , int(roi[1]) )
- pB = ( int(roi[0]+roi[2]-1), int(roi[1]+roi[3]-1) ) #pB will be an internal point
- W,H = frame.shape[1], frame.shape[0]
- A0 = pA[0] if pA[0]>=0 else 0
- A1 = pA[1] if pA[1]>=0 else 0
- data = frame[ A1:pB[1], A0:pB[0] ]
- if pB[0] < W and pB[1] < H and pA[0]>=0 and pA[1]>=0:
- return data
- w,h = int(roi[2]), int(roi[3])
- img = np.zeros((h,w,frame.shape[2]), dtype=np.uint8)
- offX = int(-roi[0]) if roi[0]<0 else 0
- offY = int(-roi[1]) if roi[1]<0 else 0
- np.copyto( img[ offY:offY+data.shape[0], offX:offX+data.shape[1] ], data )
- return img
- def cut_centered(frame, shape = (224, 224) ,random = True, random_values = None, max_change_fraction=0.045, only_narrow=False):
- from PIL import Image
- left = int((frame.shape[1] - shape[0])/2)
- top = int((frame.shape[1] - shape[0])/2)
- right = int((frame.shape[1] + shape[0])/2)
- bottom = int((frame.shape[1] + shape[0])/2)
- if random:
- if random_values is None:
- sigma = shape[0]*max_change_fraction
- xy = _random_normal_crop(2, sigma, mean=-sigma/5).astype(int)
- wh = _random_normal_crop(2, sigma*2, mean=sigma/2, positive=only_narrow).astype(int)
- else:
- xy, wh = random_values
- else:
- xy = [0,0]
- wh = [0,0]
- return frame[(top + wh[0]):(bottom + wh[0]), (left + xy[0]):(right + xy[0]), :]
- def pad(img):
- w,h,c = img.shape
- if w==h:
- return img
- size = max(w,h)
- out = np.zeros((size,size,c))
- np.copyto(out[0:w, 0:h], img)
- return out
- def findRelevantFace(objs, W,H):
- mindistcenter = None
- minobj = None
- for o in objs:
- cx = o['roi'][0] + (o['roi'][2]/2)
- cy = o['roi'][1] + (o['roi'][3]/2)
- distcenter = (cx-(W/2))**2 + (cy-(H/2))**2
- if mindistcenter is None or distcenter < mindistcenter:
- mindistcenter = distcenter
- minobj = o
- return minobj
- tmp_A = []
- FIT_PLANE_SIZ=16
- for y in np.linspace(0,1,FIT_PLANE_SIZ):
- for x in np.linspace(0,1,FIT_PLANE_SIZ):
- tmp_A.append([y, x, 1])
- Amatrix = np.matrix(tmp_A)
- def _fit_plane(im):
- original_shape=im.shape
- if len(im.shape)>2 and im.shape[2]>1:
- im = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
- im = cv2.resize(im, (FIT_PLANE_SIZ,FIT_PLANE_SIZ))
- if im.dtype==np.uint8:
- im = im.astype(float)
- # do fit
- A = Amatrix
- tmp_b = []
- for y in range(FIT_PLANE_SIZ):
- for x in range(FIT_PLANE_SIZ):
- tmp_b.append(im[y,x])
- b = np.matrix(tmp_b).T
- fit = (A.T * A).I * A.T * b
- fit[0]/=original_shape[0]
- fit[1]/=original_shape[1]
- def LR(x,y):
- return np.repeat(fit[0]*x,len(y),axis=0).T + np.repeat(fit[1]*y,len(x),axis=0) + fit[2]
- xaxis = np.array(range(original_shape[1]))
- yaxis = np.array(range(original_shape[0]))
- imest = LR(yaxis, xaxis)
- return np.array(imest)
- def linear_balance_illumination(im):
- if im.dtype==np.uint8:
- im = im.astype(float)
- if len(im.shape)==2:
- im = np.expand_dims(im,2)
- if im.shape[2] > 1:
- im = cv2.cvtColor(im, cv2.COLOR_BGR2YUV)
- imout = im.copy()
- imest = _fit_plane(im[:,:,0])
- imout[:,:,0] = im[:,:,0] - imest + np.mean(imest)
- if im.shape[2] > 1:
- imout = cv2.cvtColor(imout, cv2.COLOR_YUV2BGR)
- return imout.reshape(im.shape)
- def mean_std_normalize(inp):
- std = inp.flatten().std()
- if std < 0.001:
- std = 0.001
- return (inp - inp.flatten().mean()) / inp.flatten().std()
- def _random_normal_crop(n, maxval, positive=False, mean=0):
- gauss = np.random.normal(mean,maxval/2,(n,1)).reshape((n,))
- gauss = np.clip(gauss, mean-maxval, mean+maxval)
- if positive:
- return np.abs(gauss)
- else:
- return gauss
- def random_change_image(img,random_values = (_random_normal_crop(1, 0.5, mean=1)[0],_random_normal_crop(1, 48)[0],random.randint(0,1))):
- #brightness and contrast
- a, b, random = random_values
- img=(img-128.0)*a + 128.0 + b
- img = np.clip(img, 0, 255)
- img = img.astype(np.uint8)
- # flip
- if random:
- img=np.fliplr(img)
- return img
- def random_change_roi(roi, max_change_fraction=0.045, only_narrow=False, random_values = None):
- #random crop con prob + alta su 0 (gaussiana)
- sigma = roi[3]*max_change_fraction
- if random_values is None:
- xy = _random_normal_crop(2, sigma, mean=-sigma/5).astype(int)
- wh = _random_normal_crop(2, sigma*2, mean=sigma/2, positive=only_narrow).astype(int)
- else:
- xy, wh = random_values
- print( "orig roi: %s" % str(roi) )
- print( "rand changes -> xy:%s, wh:%s" % (str(xy), str(wh)))
- roi2 = (roi[0]+xy[0], roi[1]+xy[1], roi[2]-wh[0], roi[3]-wh[1])
- print("new roi: %s" % str(roi2))
- return roi2
- def roi_center(roi):
- return (roi[0]+roi[2]//2, roi[1]+roi[3]//2)
- def random_image_rotate(img, rotation_center,random_angle_deg = _random_normal_crop(1, 10)[0]):
- angle_deg = random_angle_deg
- M = cv2.getRotationMatrix2D(rotation_center, angle_deg, 1.0)
- nimg = cv2.warpAffine(img, M, dsize=img.shape[0:2])
- return nimg.reshape(img.shape)
- def random_image_skew(img, rotation_center, random_skew = _random_normal_crop(2, 0.1, positive=True)):
- s = random_skew
- M=np.array( [ [1,s[0],1], [s[1],1,1]] )
- nimg = cv2.warpAffine(img, M, dsize=img.shape[0:2])
- return nimg.reshape(img.shape)
- def equalize_hist(img):
- if len(img.shape)>2 and img.shape[2] > 1:
- img_yuv = cv2.cvtColor(img, cv2.COLOR_BGR2YUV)
- img_yuv[:,:,0] = cv2.equalizeHist(img_yuv[:,:,0])
- return cv2.cvtColor(img_yuv, cv2.COLOR_YUV2BGR)
- else:
- return cv2.equalizeHist(img)
- def draw_emotion(y, w,h):
- EMOTIONS = config.CLASSES
- COLORS = [(120,120,120), (50,50,255), (0,255,255), (255,0,0), (0,0,140), (0,200,0), (42,42,165), (100,100,200), (170,170,170), (80,80,80)]
- emotionim = np.zeros((w,h,3), dtype=np.uint8)
- barh = h//len(EMOTIONS)
- MAXEMO = np.sum(y)
- for i,yi in enumerate(y):
- #print((EMOTIONS[i], yi))
- p1,p2 = (0,i*barh), (int(yi*w//MAXEMO), (i+1)*20)
- #cv2.rectangle(emotionim, p1,p2, COLORS[i], cv2.FILLED)
- cv2.putText(emotionim, "%s: %.1f" % (EMOTIONS[i], yi), (0,i*20+14), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255))
- return emotionim
- def show_frame(frame, text):
- font = cv2.FONT_HERSHEY_SIMPLEX
- position = (10,20)
- fontScale = 0.3
- fontColor = (255,255,255)
- lineType = 1
- cv2.putText(frame,
- text,
- position,
- font,
- fontScale,
- fontColor,
- lineType)
- cv2.imshow('frame',frame)
- cv2.waitKey(0)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement