Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from vidstab import VidStab, vidstab_utils
- import math
- import random
- from cv2 import cv2
- import imutils.feature.factories as kp_factory
- import numpy as np
- import argparse
- from PIL import Image
- import subprocess
- from matplotlib import pyplot as plt
- from typing import List, Set, Union
- import imutils
- import json
- kp_method = 'DENSE'
- # stabilizer = VidStab(kp_method=kp_method)
- # TODO: 1. Фильтр Калмана
- # 2. Декомпозиция с последующим удалением эффекта rolling-shutter
- # 3. Определение фантомных кадров
- # 4. Импейтинг. Идея для импейтинга - использовать PTTools
- # 5. Удаление фона
- # 6. Нужно придумать, что делать с фантомными кадрами
- width = 1920
- height = 1080
- # stabilizer.gen_transforms('sample2.mp4')
- # kp_detector = kp_factory.FeatureDetector_create('DENSE', step=22, radius=0.5)
- # kp_detector = kp_factory.FeatureDetector_create('DENSE', step=15, radius=0.5)
- # kp_detector = kp_factory.FeatureDetector_create('BRISK', thresh=10)
- # kp_detector = kp_factory.FeatureDetector_create('FAST')
- def probabilistic_round(x):
- return int(math.floor(x + random.random()))
- def split_into_blocks(array, nrows, ncols):
- """Разбивает 2D-изображение на блоки (nrows, ncols).
- См. https://stackoverflow.com/a/51914911, https://stackoverflow.com/a/16858283
- """
- h, w = array.shape
- assert h % nrows == 0
- assert w % ncols == 0
- return (array.reshape(h // nrows, nrows, -1, ncols)
- .swapaxes(1, 2)
- .reshape(-1, nrows, ncols))
- def is_duplicate_frame_ffmpeg(prev_frame_gray, cur_frame_gray, thresh_lo = 64 * 5, thresh_hi = 64 * 12, frac = 0.33):
- """Алгоритм поиска дупликатов кадров из ffmpeg mpdecimate.
- Считаем отличия по блокам 8x8.
- Считаем число блоков, отличающихся на hi, число блоков, отличающихся lo.
- Если хоть один блок отличается на hi, или доля блоков, отличающихся на lo больше frac, то это не дупликат.
- """
- frame_diff = cv2.absdiff(prev_frame_gray, cur_frame_gray)
- blks = split_into_blocks(frame_diff, 8, 8)
- sum_diff = np.sum(blks, (1, 2))
- num_lo_blks = np.count_nonzero(sum_diff > thresh_lo)
- num_hi_blks = np.count_nonzero(sum_diff > thresh_hi)
- num_total_blks = len(sum_diff)
- return num_hi_blks == 0 and (num_lo_blks / num_total_blks) < frac
- def find_all_duplicates(input_path: str):
- vid = cv2.VideoCapture(input_path)
- frame_gray = prev_frame_gray = None
- dups = []
- i = 1
- while True:
- retval, frame = vid.read()
- if not retval:
- break
- prev_frame_gray = frame_gray
- frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- if is_duplicate_frame_ffmpeg(prev_frame_gray, frame_gray):
- yield i, frame
- vid.release()
- # def gen_mid_frames(input_path: str):
- # for i, frame in find_all_duplicates(input_path):
- def write_keypoints_to_ffmpeg_vidstab(prev_kps, cur_kps, scale=1.0):
- allkps = []
- for kp1, kp2 in zip(prev_kps, cur_kps):
- kp1x = kp1[0]
- kp1y = kp1[1]
- kp2y = kp2[1]
- kp2x = kp2[0]
- # if kp1x > 0.25 * width and kp1x < 0.75 * width and kp1y > 0.25 * width and kp1y < 0.75 * width:
- # continue
- kp_str = '(LM {0} {1} {2} {3} 112 0.5 0.5)'.format(
- probabilistic_round((kp2x - kp1x) * scale),
- probabilistic_round((kp2y - kp1y) * scale),
- probabilistic_round(kp2x),
- probabilistic_round(kp2y))
- allkps.append(kp_str)
- return '(List {0} [{1}])'.format(len(allkps), ','.join(allkps))
- # prev_kps_raw = None
- prev_kps_high_contrast = None
- write_ffmpeg_vidstab = True
- trf_file = None
- # class VideoWrapper:
- # def __init__(self, video):
- # self._video = video
- # self._framebuf = []
- # self._curFrameIdx = 0
- # self._frames = 0
- # def getFrame(self, idx):
- # cvvideo.next()
- # def nextFrame(self):
- # self._curFrameIdx += 1
- class FrameInfo:
- def __init__(self, transform, is_bad: bool, is_dup: bool):
- """
- @is_bad Плохой кадр, его не должно быть в выходном потоке
- """
- self.transform = transform
- self.is_bad_frame = is_bad
- self.is_duplicate = is_dup
- def detect_kps(frame_gray, dense_step=30):
- kp_detector = kp_factory.FeatureDetector_create('DENSE', step=dense_step, radius=0.5)
- # kp_detector = kp_factory.FeatureDetector_create('BRISK', thresh=10)
- kps_raw = kp_detector.detect(frame_gray)
- # TODO: ЧТО ЭТО ТАКОЕ???? Видимо, дополнительная фильтрация от нельда, темных предметов.
- # Но это довольно скверная идея, плюс я ведь использую детектор фигуристов. А на показалках лед может быть темным.
- # Я ведь отказывался от этого? Идея же не рабочая.
- # kps_raw_filtered = []
- # for kp in kps_raw:
- # if frame_gray[min(int(kp.pt[1]), 1079), min(int(kp.pt[0]), 1919)] + 10 > random.randint(0, 255):
- # kps_raw_filtered.append(kp)
- # kps_raw = kps_raw_filtered
- kps = np.array([kp.pt for kp in kps_raw], dtype='float32').reshape(-1, 1, 2)
- return kps_raw, kps
- # TODO: этот код - это такая дичь. Нужно все это рефакторить.
- class TransInfo:
- def count_inliers(self):
- # TODO: почему не работает count_nonzero()?
- i = 0
- for kpp0, kpp1, is_inlier in zip(self.prev_inlier_keypoints, self.cur_inlier_keypoints, self.inliers):
- if is_inlier:
- i += 1
- return i
- def __init__(self, frame_gray, prev_frame_gray, skaters, dense_step=30, noinl=False, win_size=30):
- self.kps_raw, self.kps = detect_kps(prev_frame_gray, dense_step=dense_step)
- self.kps = filter_keypoints(self.kps, skaters)
- if len(self.kps) > 0:
- self.optical_flow = cv2.calcOpticalFlowPyrLK(prev_frame_gray, frame_gray, self.kps, None, winSize=(win_size, win_size), maxLevel=5)
- self.prev_matched_keypoints, self.cur_matched_keypoints = vidstab_utils.match_keypoints(self.optical_flow, self.kps)
- if len(self.kps) == 0 or len(self.prev_matched_keypoints) < 5:
- self.transform = [[1, 0, 0], [0, 1, 0]]
- self.inliers = np.array([])
- self.cur_inlier_keypoints = np.array([])
- self.prev_inlier_keypoints = np.array([])
- self.bbox = [0, 0, 0, 0]
- else:
- self.transform, self.inliers = cv2.estimateAffine2D(
- np.array(self.prev_matched_keypoints), np.array(self.cur_matched_keypoints))
- self.cur_inlier_keypoints = np.array(self.cur_matched_keypoints)[np.array(self.inliers) == 1]
- self.prev_inlier_keypoints = np.array(self.prev_matched_keypoints)[np.array(self.inliers) == 1]
- self.bbox = cv2.boundingRect(np.float32(self.cur_inlier_keypoints))
- another_trans = None
- inlier_cnt = self.count_inliers()
- # self.scheme = f'[DENSE {1}]'
- # print(inlier_cnt, noinl)
- if inlier_cnt < 350 and not noinl:
- eq_hist_trans = TransInfo(cv2.equalizeHist(frame_gray), cv2.equalizeHist(prev_frame_gray), skaters=skaters,
- dense_step=dense_step, noinl=True, win_size=40)
- if eq_hist_trans.count_inliers() * 1.2 > inlier_cnt:
- print (f"yes, eq hist is better {eq_hist_trans.count_inliers()} > {inlier_cnt}")
- another_trans = eq_hist_trans
- inlier_cnt = another_trans.count_inliers()
- if inlier_cnt < 350 and dense_step > 15:
- dense_trans = TransInfo(frame_gray, prev_frame_gray, skaters=skaters, dense_step=15, noinl=True, win_size=40)
- if dense_trans.count_inliers() * 1.2 > inlier_cnt:
- print (f"yes, high density is better {dense_trans.count_inliers()} > {inlier_cnt}")
- another_trans = dense_trans
- inlier_cnt = another_trans.count_inliers()
- if inlier_cnt < 350 and dense_step > 10:
- dense_trans = TransInfo(frame_gray, prev_frame_gray, skaters=skaters, dense_step=10, noinl=True, win_size=40)
- if dense_trans.count_inliers() * 1.2 > inlier_cnt:
- print (f"yes, very high density is better {dense_trans.count_inliers()} > {inlier_cnt}")
- another_trans = dense_trans
- if another_trans:
- self.inliers = another_trans.inliers
- self.kps = another_trans.kps
- self.kps_raw = another_trans.kps_raw
- self.optical_flow = another_trans.optical_flow
- self.cur_inlier_keypoints = another_trans.cur_inlier_keypoints
- self.prev_inlier_keypoints = another_trans.prev_inlier_keypoints
- self.prev_matched_keypoints = another_trans.prev_matched_keypoints
- self.transform = another_trans.transform
- self.bbox = another_trans.bbox
- class TrfWriter:
- def __init__(self):
- pass
- def detect_skaters(image):
- image = imutils.resize(image, width=960)
- hog = cv2.HOGDescriptor()
- hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
- PADDING = 80
- CENTRAL_RECT = (300, 300)
- EXCLUDE_CENTRAL_RECT = True
- (rects, weights) = hog.detectMultiScale(image, winStride=(4, 4), padding=(PADDING, PADDING), scale=1.15)
- if EXCLUDE_CENTRAL_RECT:
- central_rect = np.array([(960 / 2 - CENTRAL_RECT[0] / 2, 540 / 2 - CENTRAL_RECT[1] / 2, CENTRAL_RECT[0], CENTRAL_RECT[1])], dtype=int)
- if len(rects) == 0:
- rects = central_rect
- else:
- rects = np.concatenate((rects, central_rect))
- # if len(rects):
- # rects +=
- # print (rects, weights)
- return rects * 2, weights
- def filter_keypoints(kps, skaters):
- filtered_kps = []
- for kp in list(kps):
- hitsbbox = False
- for sk in skaters:
- if (kp[0, 0] > sk[0] and kp[0, 1] > sk[1] and
- kp[0, 0] < sk[0] + sk[2] and kp[0, 1] < sk[1] + sk[3]):
- hitsbbox = True
- break
- if not hitsbbox:
- filtered_kps.append(kp)
- return np.array(filtered_kps)
- def gen_motion_vectors(input_path: str, write_ffmpeg_vidstab: bool, trf_path: str, duplicate_set: Set[int]):
- vid = cv2.VideoCapture(input_path)
- prev_frame_gray = None
- frame_idx = 0
- prev_kps = None
- camera_trajectory = []
- num_dropped = 0
- input_width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
- input_height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
- input_fps = vid.get(cv2.CAP_PROP_FPS)
- framecount = vid.get(cv2.CAP_PROP_FRAME_COUNT)
- if write_ffmpeg_vidstab:
- # trf_path = input_path + '.trf'
- trf_file = open(trf_path, 'w')
- trf_file.writelines(['VID.STAB 1\n# generated by python vidstab, keypoints = {0}\nFrame 1 (List 0 [])\n'.format(kp_method)])
- # 59.94
- if False:
- debug_vid = FfmpegVideoWriter(input_path + '.debug.mp4', fps=input_fps, quality=20, filters='',
- w=input_width, h=input_height, duration_s=framecount / input_fps, audio_file=None)
- else:
- debug_vid = None
- old_skaters = []
- prev_was_duplicate = False
- duplicate_count = 0
- while True:
- is_duplicate = duplicate_set and frame_idx != 0 and frame_idx + 1 in duplicate_set
- retval, frame = vid.read()
- if not retval:
- break
- if not is_duplicate:
- frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- # frame_gray = cv2.convertScaleAbs(frame_gray, alpha=5, beta=-1000)
- # frame_gray = cv2.adaptiveThreshold(frame_gray,255,cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY,11,2)
- # frame_gray = cv2.equalizeHist(frame_gray)
- # frame_gray = cv2.normalize(frame_gray, None, 0, 255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U)
- # frame_gray = np.min(np.max((frame_gray.astype(np.float32) - 127) * 6, 255.0), 0.0).astype(np.uint8)
- # frame_gray = (frame_gray - 150) * 10
- # frame_gray = (frame_gray - 150) * 10
- if frame_idx == 0:
- # prev_kps_raw, prev_kps = detect_kps(frame_gray)
- pass
- else:
- # Дубликаты требуют специального обхождения
- is_duplicate = is_duplicate or is_duplicate_frame_ffmpeg(prev_frame_gray, frame_gray)
- if is_duplicate:
- duplicate_count += 1
- # is_dropped = True if frame_idx == 80 else False
- # is_duplicate = False
- is_dropped = False
- if is_duplicate:
- print ('DUPLICATE')
- if write_ffmpeg_vidstab:
- trf_file.writelines(['Frame {0} (List 0 [])\n'.format(num_dropped + frame_idx + 1)])
- camera_trajectory.append(FrameInfo([[1, 0, 0], [0, 1, 0]], is_bad=is_dropped, is_dup=is_duplicate))
- if not is_duplicate:
- # Если не можем найти фигуристов, используем с предыдущего кадра
- skaters, _ = detect_skaters(frame_gray)
- if len(skaters) == 0 and len(old_skaters) != 0:
- skaters = old_skaters
- else:
- old_skaters = skaters
- ti_lowcontrast = TransInfo(frame_gray, prev_frame_gray, skaters)
- ti = ti_lowcontrast
- if write_ffmpeg_vidstab:
- # TODO: переделать
- prev_inlier_keypoints1 = []
- cur_inlier_keypoints1 = []
- for kpp0, kpp1, is_inlier in zip(ti.prev_inlier_keypoints, ti.cur_inlier_keypoints, ti.inliers):
- if is_inlier:
- prev_inlier_keypoints1.append(kpp0)
- cur_inlier_keypoints1.append(kpp1)
- scale = 1.0 if (not is_dropped and not prev_was_duplicate) else 0.5
- points_str = write_keypoints_to_ffmpeg_vidstab(prev_inlier_keypoints1, cur_inlier_keypoints1, scale)
- # Если обнаружили фантомный дропнутый кадр, то вставляем два фрейма с половиной движения
- trf_file.writelines(['Frame {0} {1}\n'.format(num_dropped + frame_idx + 1, points_str)])
- if is_dropped:
- trf_file.writelines(['Frame {0} {1}\n'.format(num_dropped + frame_idx + 2, points_str)])
- if is_dropped:
- camera_trajectory.append(FrameInfo([[1, 0, 0], [0, 1, 0]], is_bad=is_dropped, is_dup=is_duplicate))
- camera_trajectory.append(FrameInfo(ti.transform, is_bad=False, is_dup=False))
- print (f'frame={frame_idx} inliers={ti.count_inliers()}')
- # if ti.count_inliers() < 400: # 100
- # matched_raw_kps = [kpp for kpp, is_matched in zip(kps_raw, optical_flow[1]) if is_matched]
- # kpps_inliers = [kpp for kpp, is_inlier in zip(matched_raw_kps, inliers) if is_inlier]
- if debug_vid:
- outImg = np.zeros(frame_gray.shape, np.uint8,order='C').ravel()
- outImg = cv2.drawKeypoints(frame_gray, ti.kps_raw, outImg)
- for sk in skaters:
- cv2.rectangle(outImg, (sk[0], sk[1]), (sk[2] + sk[0], sk[3] + sk[1]), (255, 255, 255), thickness=5)
- cv2.rectangle(outImg, (ti.bbox[0], ti.bbox[1]), (ti.bbox[0] + ti.bbox[2], ti.bbox[1] + ti.bbox[3]), (0, 255, 0), 3)
- for kpp0, kpp1, is_inlier in zip(ti.prev_inlier_keypoints, ti.cur_inlier_keypoints, ti.inliers):
- color = (0, 255, 0) if is_inlier else (0, 0, 255)
- cv2.line(outImg, (kpp0[0], kpp0[1]), (kpp1[0], kpp1[1]), (0,0,255), 1)
- cv2.putText(outImg, f'FRAME {frame_idx}', (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
- debug_vid.write_frame(outImg)
- # cv2.imwrite('out1/frame{0}.png'.format(frame_idx), outImg)
- # cv2.imwrite('out1/frame_clean{0}.png'.format(frame_idx), frame_gray)
- # prev_kps = ti.kps
- # prev_kps_raw = kps_raw
- if is_dropped:
- num_dropped += 1
- # print(frame_idx)
- prev_was_duplicate = is_duplicate
- # print()
- frame_idx += 1
- prev_frame_gray = frame_gray
- vid.release()
- if debug_vid:
- debug_vid.release()
- stats = {
- 'duplicate_count': duplicate_count
- }
- return camera_trajectory, stats
- def trajectory_smoother(frames):
- return frames
- ffmpeg_executable = 'ffmpeg'
- def fps_to_ffmpeg_fps(fps: float):
- if round(fps * 100) == 5994:
- return '60000/1001'
- elif round(fps * 100) % 100 == 0:
- return str(int(fps))
- return str(fps)
- def dnxhd_bitrate(w: int, h: int, fps: float):
- if w == 1920 and h == 1080:
- if int(round(fps)) == 60:
- return 'yuv422p', '440M', None
- elif int(round(fps)) == 50:
- return 'yuv422p', '365M', None
- elif int(round(fps)) == 30:
- return 'yuv422p', '220M', None
- elif w == 3840 and h == 2160:
- if int(round(fps)) == 60:
- return 'yuv422p10le', '1760M', 'dnxhr_hqx'
- elif int(round(fps)) == 50:
- return 'yuv422p10le', '1460M', 'dnxhr_hqx'
- elif int(round(fps)) == 30:
- return 'yuv422p10le', '880M', 'dnxhr_hqx'
- raise Exception(f'fps {fps} incompatible with dnxhd')
- class FfmpegVideoWriter:
- def __init__(self, path: str, quality: Union[int, str], w: int, h: int, duration_s: float, fps: float, filters: str = None, preset: str = 'slow', audio_file: str = None):
- assert isinstance(quality, str) or quality >= 0 and quality < 51
- fps_str = format(fps, '.2f')
- args = [ffmpeg_executable, '-y', '-f', 'image2pipe', '-vcodec', 'bmp', '-t', '00:{0}:{1}'.format(int(duration_s // 60), duration_s % 60), '-video_size', '{0}x{1}'.format(int(w), int(h)) , '-r', fps_str, '-i', '-']
- if audio_file:
- # args += ['-i', audio_file]
- args += ['-i', audio_file, '-map', '0:v:0', '-map', '1:a:0']
- if filters:
- args += ['-vf', filters]
- res_str = f'{w}x{h}'
- if quality == 'dnxhd':
- pix_fmt, bitrate, profile = dnxhd_bitrate(w, h, fps)
- args += ['-vcodec', 'dnxhd', '-acodec', 'pcm_s16le', '-s', res_str, '-b:v', bitrate, '-s', res_str, '-r', fps_to_ffmpeg_fps(fps), '-pix_fmt', pix_fmt]
- if profile:
- args += ['-profile:v', profile]
- args += ['-f', 'mov', path]
- else:
- # args += ['-vcodec', 'dnxhd', '-acodec', 'pcm_s16le', '-s', '1920x1080', '-b:v', '365M', '-s', '1920x1080', '-r', '50', '-pix_fmt', 'yuv422p', '-f', 'mov', path]
- args += ['-vcodec', 'libx264', '-crf', str(quality), '-preset', preset, '-r', fps_str, path]
- print(args)
- self.p = subprocess.Popen(args, stdin=subprocess.PIPE)
- def write_frame(self, cvframe):
- rgbimg = cv2.cvtColor(np.array(cvframe, dtype=np.uint8), cv2.COLOR_BGR2RGB)
- img = Image.fromarray(rgbimg)
- img.save(self.p.stdin, 'BMP')
- def release(self):
- self.p.stdin.close()
- self.p.wait()
- # import bright
- # def remove_black_frames(input_path, output_path, frames_list, audio_input):
- # i = 0
- def render(input_path: str, output_path: str, trf_path: str, trajectory, quality, smoothness=20, bright_correct=False):
- vid = cv2.VideoCapture(input_path)
- if bright_correct:
- vid = bright.create_brightness_from_video_capture(vid)
- baddies = []
- dups = []
- for frame_idx, frame_info in enumerate(trajectory):
- if frame_info.is_bad_frame:
- baddies.append(frame_idx)
- if frame_info.is_duplicate:
- dups.append(frame_idx)
- # :maxangle=0
- vidstab_filter = 'vidstabtransform=input={0}:zoom=0:smoothing={1}:interpol=bicubic'.format(trf_path, smoothness) # :crop=black
- unsharp_filter = 'unsharp=5:5:0.4:3:3:0.2'
- additional_filters = 'eq=brightness=0.05:contrast=1.1:gamma=1.05:saturation=1.1'
- # filters = [vidstab_filter, unsharp_filter, additional_filters]
- filters = [vidstab_filter, unsharp_filter]
- # filters = [unsharp_filter]
- # # почему-то вариант с селектом не работает, используем промежуточный файл
- # if len(baddies) > 0:
- # tmp_path = output_path + '.tmp.mov'
- # output_vid_path = tmp_path
- # else:
- # output_vid_path = output_path
- baddies_s = []
- for i, bad in enumerate(baddies):
- baddies_s.append(i + bad)
- if len(baddies) > 0 or len(dups):
- good_selector = '*'.join(['not(eq(n\\,{0}))'.format(bad_idx) for bad_idx in sorted(baddies + dups)])
- select_only_good_filter = 'select={0}'.format(good_selector)
- filters.append(select_only_good_filter)
- if len(baddies) > 0:
- setpts_filter = 'setpts=N/FRAME_RATE/TB'
- # setpts_filter = 'setpts=\'PTS-(1/FR/TB)*({0})\''.format('+'.join(['gte(N\\,{0})'.format(bad_idx) for bad_idx in baddies_s]))
- filters.append(setpts_filter)
- filters_str = ','.join(filters)
- input_width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
- input_height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
- input_fps = vid.get(cv2.CAP_PROP_FPS)
- framecount = vid.get(cv2.CAP_PROP_FRAME_COUNT)
- print(input_fps)
- print(filters_str)
- # 59.94
- outputvid = FfmpegVideoWriter(output_path, fps=input_fps, quality=quality, filters=filters_str, w=input_width, h=input_height,
- duration_s=(framecount + len(baddies)) / input_fps, audio_file=input_path)
- for frame_idx, frame_info in enumerate(trajectory):
- if frame_info.is_bad_frame:
- black_frame = np.zeros((input_height, input_width, 3), np.uint8)
- outputvid.write_frame(black_frame)
- # TODO: удалить, это какая-то ошибка, дупликаты не должны влиять на рендеринг
- elif frame_info.is_duplicate:
- retval, frame = vid.read()
- if not retval:
- print ('Ошибка при чтение входного потока')
- break
- print ('DUPLICATE')
- pass
- else:
- retval, frame = vid.read()
- if not retval:
- print ('Ошибка при чтение входного потока')
- break
- if bright_correct:
- curve = bright.calc_curve_from_buffered_vid(vid)
- frame = bright.apply_curve_simple(frame, curve)
- outputvid.write_frame(frame)
- # while True:
- # retval, frame = vid.read()
- # if not retval:
- # break
- # outputvid.write_frame(frame)
- outputvid.release()
- vid.release()
- import re
- class TrfLocalMotion:
- def __init__(self, dx, dy, x, y):
- self.dx = dx
- self.dy = dy
- self.x = x
- self.y = y
- class TrfFrame:
- def __init__(self, idx, local_motions=None):
- self.local_motions = local_motions or []
- self.idx = idx
- # self.is_phantom = is_phantom
- class TrfFrameCompressed:
- def __init__(self, idx, local_motions):
- nparr = np.array([[lm.dx, lm.dy, lm.x, lm.y] for lm in local_motions], dtype=int)
- self.local_motions_compressed = nparr
- self.idx = idx
- @property
- def local_motions(self):
- l = []
- for dx, dy, x, y in self.local_motions_compressed:
- l.append(TrfLocalMotion(dx, dy, x, y))
- return l
- # class TrfCompressedLMSList:
- # def __init__(self, local_motions):
- # self.size = len(local_motions)
- # self.nparr = np.array([[lm.x, lm.y, lm.dx, lm.dy] for lm in local_motions], type=int)
- # def __len__(self):
- # return self.size
- # def __iter__(self):
- # return 0
- # def __next__(self):
- # return 0
- def trf_parser(f):
- frame_start_rx = re.compile(r'^Frame ([0-9]+) \(List ([0-9]+) \[')
- frame_end_rx = re.compile(r'\]\)$')
- lm_rx = re.compile(r'\(LM (\-?[0-9]+) (\-?[0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9\.]+) ([0-9\.]+)\)')
- pos = 0
- def parse_lm(line):
- nonlocal pos
- m = lm_rx.match(line, pos=pos)
- if m:
- pos += len(m.group(0))
- return TrfLocalMotion(
- dx=int(m.group(1)),
- dy=int(m.group(2)),
- x=int(m.group(3)),
- y=int(m.group(4)),
- )
- else:
- return None
- def parse_frame(line):
- nonlocal pos
- m = frame_start_rx.match(line, pos=pos)
- if not m:
- return None
- pos += len(m.group(0))
- frame = TrfFrame(int(m.group(1)))
- lm_count = int(m.group(2))
- for lm_idx in range(lm_count):
- if lm_idx > 0:
- if line[pos] == ',':
- pos += 1
- else:
- return None
- lm = parse_lm(line)
- if not lm:
- return None
- frame.local_motions.append(lm)
- m = frame_end_rx.match(line, pos=pos)
- if not m:
- return None
- return frame
- first_line = f.readline()
- if first_line != 'VID.STAB 1\n':
- print('Broken TRF-file, should start VID.STAB 1')
- return []
- line_idx = 0
- frames = []
- while True:
- line = f.readline()
- if not line:
- break
- pos = 0
- line_idx += 1
- if line[0] == '#':
- continue
- frame = parse_frame(line)
- if frame is None:
- print('Failed to parse TRF-file, line {0}'.format(line_idx))
- return []
- frame_compressed = TrfFrameCompressed(frame.idx, frame.local_motions)
- frames.append(frame_compressed)
- return frames
- def transform_from_trf_lms(local_motions, return_identity_on_fail=True):
- if len(local_motions) < 3:
- return np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32) if return_identity_on_fail else None
- prev_kps = [[lm.x, lm.y] for lm in local_motions]
- cur_kps = [[lm.x + lm.dx, lm.y + lm.dy] for lm in local_motions]
- trans, _ = cv2.estimateAffine2D(np.array(prev_kps, dtype=np.float32), np.array(cur_kps, dtype=np.float32), refineIters=20)
- if trans is None:
- return np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32) if return_identity_on_fail else None
- return trans
- from scipy import interpolate
- import itertools
- def generate_fake_local_motion_from_transform(transform, size):
- x0 = int(0.1 * size[0])
- x1 = int(0.9 * size[0])
- y0 = int(0.1 * size[1])
- y1 = int(0.9 * size[1])
- x_points0 = [x0, x0, x1, x1]
- y_points0 = [y0, y1, y0, y1]
- p1 = transform.dot(np.array([x_points0[0], y_points0[0], 1.0]))
- p2 = transform.dot(np.array([x_points0[1], y_points0[1], 1.0]))
- p3 = transform.dot(np.array([x_points0[2], y_points0[2], 1.0]))
- p4 = transform.dot(np.array([x_points0[3], y_points0[3], 1.0]))
- x_points1 = [p1[0], p2[0], p3[0], p4[0]]
- y_points1 = [p1[1], p2[1], p3[1], p4[1]]
- lms = []
- for x_old, y_old, x_new, y_new in zip(x_points0, y_points0, x_points1, y_points1):
- lms.append(TrfLocalMotion(x_new - x_old, y_new - y_old, x_new, y_new))
- return lms
- def gen_half_local_motion(lms : List[TrfLocalMotion]):
- return [TrfLocalMotion(lm.dx / 2, lm.dy / 2, lm.x, lm.y) for lm in lms]
- class PtoParser:
- @staticmethod
- def _parse_pto_image_string(line: str):
- assert line[0] == 'i'
- m = re.search(r'Vm5 n"([^"]+)"', line)
- assert m, "Can't parse PTO image line {0}".format(line)
- return m.group(1)
- @staticmethod
- def _parse_pto_kp_string(line: str):
- assert line[0] == 'c'
- res = {}
- for el in line.split(' '):
- refIdx = '1' if el[0] == str.lower(el[0]) else '2'
- if el == 'c':
- pass
- elif el[0] == 't':
- res['type'] = el[1]
- elif str.lower(el[0]) == 'n':
- res['img' + refIdx] = int(el[1:])
- elif str.lower(el[0]) == 'x':
- res['x' + refIdx] = float(el[1:])
- elif str.lower(el[0]) == 'y':
- res['y' + refIdx] = float(el[1:])
- return res
- @staticmethod
- def read_all_images(pto_path):
- imgs = []
- with open(pto_path, 'r') as f:
- for line in f.readlines():
- if line[0] == 'i':
- parsed_img_name = PtoParser._parse_pto_image_string(line)
- imgs.append(parsed_img_name)
- return imgs
- @staticmethod
- def parse_transform_from_pto(pto_path, img1_name, img2_name):
- img_map = []
- imgs = {}
- prev_matched_keypoints = []
- cur_matched_keypoints = []
- with open(pto_path, 'r') as f:
- for line in f.readlines():
- if line[0] == 'i':
- parsed_img_name = PtoParser._parse_pto_image_string(line)
- imgs[parsed_img_name] = len(img_map)
- img_map.append(parsed_img_name)
- elif line[0] == 'c':
- ptsLine = PtoParser._parse_pto_kp_string(line)
- line_name1 = img_map[ptsLine['img1']]
- line_name2 = img_map[ptsLine['img2']]
- if line_name1 == img1_name and line_name2 == img2_name:
- prev_matched_keypoints.append([ptsLine['x1'], ptsLine['y1']])
- cur_matched_keypoints.append([ptsLine['x2'], ptsLine['y2']])
- elif line_name1 == img2_name and line_name2 == img1_name:
- cur_matched_keypoints.append([ptsLine['x1'], ptsLine['y1']])
- prev_matched_keypoints.append([ptsLine['x2'], ptsLine['y2']])
- assert img1_name in imgs, '{0} not found in {1}'.format(img1_name, pto_path)
- assert img2_name in imgs, '{0} not found in {1}'.format(img2_name, pto_path)
- transform, inliers = cv2.estimateAffine2D(np.array(cur_matched_keypoints), np.array(prev_matched_keypoints))
- # if transform is None:
- cur_kps = np.array(cur_matched_keypoints)
- prev_kps = np.array(prev_matched_keypoints)
- dx = np.mean(prev_kps[:,0] - cur_kps[:,0])
- dy = np.mean(prev_kps[:,1] - cur_kps[:,1])
- transform = np.array([[1, 0, dx], [0, 1, dy]])
- # print('TWO TRANSFORMS', transform, transform1)
- # print(len(inliers))
- # print(transform)
- return transform
- class PtoFilesLib:
- def __init__(self, pto_files: List[str]):
- self.img_table = {}
- if pto_files is None:
- return
- file_list = []
- for pto_file_path in pto_files:
- if os.path.isdir(pto_file_path):
- for fname in os.listdir(pto_file_path):
- _, ext = os.path.splitext(fname)
- pto_path_in_dir = os.path.join(pto_file_path, fname)
- if ext == '.pto' and os.path.isfile(pto_path_in_dir):
- file_list.append(pto_path_in_dir)
- else:
- file_list.append(pto_file_path)
- print(file_list)
- for pto_file_path in file_list:
- if os.path.exists(pto_file_path):
- all_images = PtoParser.read_all_images(pto_file_path)
- for img in all_images:
- self.img_table[img] = pto_file_path
- def frame_idx_name(self, idx: int):
- return f'frame{idx}.png'
- def in_table(self, frame_idx):
- return self.frame_idx_name(frame_idx) in self.img_table
- def transform_in_table(self, frame_idx):
- return self.in_table(frame_idx) and self.in_table(frame_idx - 1)
- def get_transform_for_image(self, frame_idx1, frame_idx2):
- img1_name = self.frame_idx_name(frame_idx1)
- img2_name = self.frame_idx_name(frame_idx2)
- return PtoParser.parse_transform_from_pto(self.img_table[img1_name], img2_name, img1_name)
- from collections import deque
- def detect_phantom_frames(frames):
- sliding_window_size = 7
- def vec_xy(trans):
- return np.array((trans[0, 2], trans[1, 2]))
- def window(seq, n=2):
- it = iter(seq)
- win = deque((next(it, None) for _ in range(n)), maxlen=n)
- yield np.array(win)
- append = win.append
- for e in it:
- append(e)
- yield np.array(win)
- frames_mov = map(lambda frm: vec_xy(transform_from_trf_lms(frm.local_motions)), frames)
- i = sliding_window_size // 2
- phantom_frames = []
- for w in window(frames_mov, n=sliding_window_size):
- mid = sliding_window_size // 2
- v = w[mid]
- vabs = np.linalg.norm(v)
- varound = np.concatenate((w[0:mid], w[mid+1:]))
- vavg = np.average(varound, axis=0)
- vstd = np.std(np.linalg.norm(varound, axis=1))
- acceptable_speed = np.linalg.norm(vavg) + 4 * vstd
- # строим сглаживающую кривую
- # x_std = np.std(ay) # TODO: нужно считать стандартное отклонение от сглаживающей кривой
- # y_std = np.std(ax)
- # num_points = len(good_trans)
- # k = 1 # TODO: если num_point = 1? в этом случае мы должны записать 1, так как не можем интерполировать
- # xs = np.concatenate((np.arange(-sliding_window_size // 2 + 1, 0), np.arange(1, sliding_window_size // 2 + 1)))
- # ys = varound
- # spl_x = interpolate.splrep(xs, ys[:, 0], k=3, s=sliding_window_size * np.std(ys[:, 0])**2)
- # spl_y = interpolate.splrep(xs, ys[:, 1], k=3, s=sliding_window_size * np.std(ys[:, 1])**2)
- # vinterp = np.transpose(np.array((interpolate.splev(xs, spl_x), interpolate.splev(xs, spl_y))))
- # 1. Абсолютная скорость больше 10
- # 2. скоерость больше среднего + 4 * отклонения
- if vabs > 10 and np.linalg.norm(2 * vavg - v) < 0.2 * vabs and vabs > acceptable_speed:
- print(f'speed: {vabs}')
- print(f'neighbors speed: {np.linalg.norm(w, axis=1)}')
- print(f'similar to 2 * avg: {np.linalg.norm(2 * vavg - v)} < {0.2 * vabs}')
- print(f'higher than acceptable: {vabs} > {acceptable_speed}')
- print(f'v[mid] > 1.6 * min(v[mid - 1], min[mid + 1]): {vabs} > {acceptable_speed}', )
- phantom_frames.append(i)
- i += 1
- # phantom_frames = sorted(phantom_frames + [63 + 6794-74+1, 72 + 6794-74+1, 8782 -1 ]) # 15722
- # phantom_frames = phantom_frames[:-1]
- print('phanotm frames', phantom_frames)
- return phantom_frames
- # for frm in frames:
- # s = [3884, 6016, 13268, 13395, 14088]
- # у bad_run-ов приоритет над whitelist
- def filter_trf(frames, resolution, initial_bad_runs=None, run_whitelist=None, phantom_frames=[], pto_files=None):
- last_bad_run = None
- bad_runs = []
- run_whitelist = run_whitelist or []
- pto_lib = PtoFilesLib(pto_files)
- def in_good_runs(frmidx):
- nonlocal run_whitelist
- for run in run_whitelist:
- if frmidx >= run[0] and frmidx <= run[1]:
- return True
- return False
- def in_initial_bad_runs(frmidx):
- nonlocal initial_bad_runs
- for run in initial_bad_runs:
- if frmidx >= run[0] and frmidx <= run[1]:
- return True
- return False
- def in_bad_runs(frmidx):
- nonlocal bad_runs
- for run in bad_runs:
- if frmidx >= run[0] and frmidx <= run[1]:
- return True
- return False
- for frm in frames:
- LOCAL_MOTIONS_THRESHOLD = 100
- # TODO: поставил threhosld на 5, чтобы отключить.
- # Проблема в следующем - он отключает стабилизацию как раз в тот момент, когда она нужнее всего.
- if (len(frm.local_motions) < LOCAL_MOTIONS_THRESHOLD and frm.idx > 1 and not in_good_runs(frm.idx)
- and not pto_lib.transform_in_table(frm.idx)) or in_initial_bad_runs(frm.idx):
- # TODO: Очень плохой код!!
- if last_bad_run is None:
- last_bad_run = [frm.idx, frm.idx]
- elif frm.idx - last_bad_run[1] <= 2:
- last_bad_run[1] = frm.idx
- else:
- bad_runs.append(last_bad_run)
- last_bad_run = [frm.idx, frm.idx]
- if last_bad_run:
- bad_runs.append(last_bad_run)
- filtered_bad_runs = []
- # Замяем плохую компенсацию предсказанным движением
- # То есть фактически выключаем стабилизатор
- # TODO: Заменить сплайны на гаусса.
- for bad_run in bad_runs:
- trans_good = transform_from_trf_lms(frames[bad_run[0] - 2].local_motions)
- trans = transform_from_trf_lms(frames[bad_run[0] - 1].local_motions)
- if bad_run[1] >= len(frames) - 3:
- trans_good_end = trans_good
- else:
- trans_good_end = transform_from_trf_lms(frames[bad_run[1]].local_motions)
- extrap_size = 15 # учитываем 15 кадров до и после
- good_frame_indices = []
- good_trans = []
- for good_frame_idx in itertools.chain(
- range(max((0, bad_run[0] - extrap_size - 1)), bad_run[0] - 1),
- range(bad_run[1] + 2, min((bad_run[1] + extrap_size + 1, len(frames) - 1)))):
- # if not in_initial_bad_runs(good_frame_idx) and not in_bad_runs(good_frame_idx - 1):
- trans = transform_from_trf_lms(frames[good_frame_idx - 1].local_motions)
- if np.isfinite(trans).all():
- good_frame_indices.append(good_frame_idx)
- good_trans.append(trans)
- # Получаем сплайны
- ax = np.array(good_trans)[:, 0, 2]
- ay = np.array(good_trans)[:, 1, 2]
- x_std = np.std(ay) # TODO: нужно считать стандартное отклонение от сглаживающей кривой
- y_std = np.std(ax)
- num_points = len(good_trans)
- k = 1 # TODO: если num_point = 1? в этом случае мы должны записать 1, так как не можем интерполировать
- spl_x = interpolate.splrep(good_frame_indices, ax, k=k, s=num_points * x_std**2)
- spl_y = interpolate.splrep(good_frame_indices, ay, k=k, s=num_points * y_std**2)
- bad_indices = list(range(bad_run[0], bad_run[1] + 1))
- x_interpolated = interpolate.splev(bad_indices, spl_x)
- y_interpolated = interpolate.splev(bad_indices, spl_y)
- bad_trans = []
- for bad_frame_idx in range(bad_run[0], bad_run[1] + 1):
- bad_trans.append(transform_from_trf_lms(frames[bad_frame_idx - 1].local_motions))
- bad_ax = np.array(bad_trans)[:, 0, 2]
- bad_ay = np.array(bad_trans)[:, 1, 2]
- outliers = np.logical_or(np.absolute(x_interpolated - bad_ax) > 2 * x_std, np.absolute(y_interpolated - bad_ay) > 2 * y_std)
- print(outliers)
- print(bad_run[0], trans_good[0, 2], trans_good[1, 2], trans[0, 2], trans[1, 2], trans_good_end[0, 2], trans_good_end[1, 2])
- if np.any(outliers):
- filtered_bad_runs.append([bad_run, spl_x, spl_y])
- # Удаляем плохие кадры, заменяем их на предсказанное
- if len(filtered_bad_runs) > 0:
- print(np.array(filtered_bad_runs, dtype=object)[:, 0])
- filtered_frames = []
- cur_bad_run_idx = 0
- for frm in frames:
- if frm.idx > filtered_bad_runs[cur_bad_run_idx][0][1] and cur_bad_run_idx < len(filtered_bad_runs) - 1:
- cur_bad_run_idx += 1
- cur_bad_run = filtered_bad_runs[cur_bad_run_idx]
- if pto_lib.transform_in_table(frm.idx) and not in_initial_bad_runs(frm.idx):
- pto_trans = pto_lib.get_transform_for_image(frm.idx, frm.idx - 1)
- print(f'IN TABLE {frm.idx}')
- fake_lms = generate_fake_local_motion_from_transform(pto_trans, resolution)
- filtered_frames.append(TrfFrame(frm.idx, fake_lms))
- elif frm.idx >= cur_bad_run[0][0] and frm.idx <= cur_bad_run[0][1]:
- spl_x = cur_bad_run[1]
- spl_y = cur_bad_run[2]
- x_interpolated = interpolate.splev([frm.idx], spl_x)[0]
- y_interpolated = interpolate.splev([frm.idx], spl_y)[0]
- fake_transform = np.float32([[1, 0, x_interpolated], [0, 1, y_interpolated]])
- fake_lms = generate_fake_local_motion_from_transform(fake_transform, resolution)
- filtered_frames.append(TrfFrame(frm.idx, fake_lms))
- else:
- filtered_frames.append(frm)
- else:
- filtered_frames = frames.copy()
- # добавляем фантомные кадры
- for frmidx in reversed(sorted(phantom_frames)):
- frmidx0 = frmidx + 1
- frmidx1 = frmidx0 - 1
- frm = filtered_frames[frmidx1]
- half_motion = gen_half_local_motion(frm.local_motions)
- # half_motion = frm.local_motions
- filtered_frames[frmidx1] = TrfFrame(frmidx0, half_motion)
- filtered_frames.insert(frmidx1, TrfFrame(frmidx0, half_motion))
- for frm in filtered_frames[frmidx1 + 1:]:
- frm.idx += 1
- return filtered_frames
- def write_trf(f, frames):
- f.write('VID.STAB 1\n# write_trf()\n')
- for frame in frames:
- lm_str_list = []
- if hasattr(frame, 'local_motions_compressed'):
- for dx, dy, x, y in frame.local_motions_compressed:
- kp_str = '(LM {0} {1} {2} {3} 112 0.5 0.5)'.format(dx, dy, x, y)
- lm_str_list.append(kp_str)
- else:
- for lm in frame.local_motions:
- kp_str = '(LM {0} {1} {2} {3} 112 0.5 0.5)'.format(
- probabilistic_round(lm.dx),
- probabilistic_round(lm.dy),
- probabilistic_round(lm.x),
- probabilistic_round(lm.y))
- lm_str_list.append(kp_str)
- f.write('Frame {0} (List {1} [{2}])\n'.format(frame.idx, len(lm_str_list), ','.join(lm_str_list)))
- def main():
- parser = argparse.ArgumentParser(description='Stabilizer for figure skating.')
- parser.add_argument('-f', help='Input path', dest='input_path', type=str, required=True)
- parser.add_argument('-trf', help='Trf output path', dest='trf_path', type=str)
- parser.add_argument('-o', help='Output video', dest='output_path', type=str)
- parser.add_argument('-q', help='Output quality (CRF, 0-lossless, 51-worst possible)', dest='quality', default=9, type=int)
- args = parser.parse_args()
- trf_path = args.trf_path or args.input_path + '.trf'
- output_path = args.output_path or args.input_path + '.stabilized.nosound.mp4'
- smooth_trajectory = None
- camera_trajectory, stats = gen_motion_vectors(args.input_path, True, trf_path)
- smooth_trajectory = trajectory_smoother(camera_trajectory)
- render(args.input_path, output_path, trf_path, smooth_trajectory, args.quality)
- import os
- def decompose_transition_into_scalexy_shearx_rotate(a):
- sx = math.sqrt(a[0][0]**2 + a[1][0]**2)
- theta = math.tan(a[1][0] / a[0][0])
- msy = a[0][1] * math.cos(theta) + a[1][1] * math.sin(theta)
- sy = (msy * math.cos(theta) - a[0][1]) / math.sin(theta)
- m = msy / sy
- return theta, sx, sy, m
- from scipy import stats
- # Поиск дропнутых кадров
- def search_drops_in_trf(frames):
- transforms = []
- for frm in frames[1:]:
- transforms.append(transform_from_trf_lms(frm.local_motions))
- ntransforms = np.array(transforms)
- dx = ntransforms[:, 0, 2]
- dy = ntransforms[:, 1, 2]
- def analyze_trf(frames):
- zoomy = []
- dx = []
- dy = []
- shearx = []
- last_trans = None
- for frm in frames[1:]:
- trans = transform_from_trf_lms(frm.local_motions)
- theta, sx, sy, m = decompose_transition_into_scalexy_shearx_rotate(trans)
- rot = np.float32([[math.cos(theta), -math.sin(theta)], [math.sin(theta), math.cos(theta)]])
- shear = np.float32([[1, m], [0, 1]])
- scale = np.float32([[sx, 0], [0, sy]])
- res = np.matmul(np.matmul(rot, shear), scale)
- if last_trans is not None:
- zoomy.append(sy / sx)
- dx.append(trans[0, 2] - last_trans[0, 2])
- dy.append(trans[1, 2] - last_trans[1, 2])
- shearx.append(m)
- last_trans = trans
- # print(theta, sy/sx, m, trans[0:2, 0:2], res)
- with np.errstate(invalid='ignore'):
- good1 = np.logical_and(np.logical_and(np.array(dy) > -100, np.array(dy) < 100), np.logical_and(np.array(zoomy) > 0.8, np.array(zoomy) < 1.2))
- good2 = np.logical_and(np.logical_and(np.array(dx) > -100, np.array(dx) < 100), np.logical_and(np.array(shearx) > -0.5, np.array(shearx) < 0.5))
- slope, intercept, _, _, _ = stats.linregress(np.array(dy)[good1], np.array(zoomy)[good1])
- slope1, intercept1, _, _, _ = stats.linregress(np.array(dx)[good2], np.array(shearx)[good2])
- print(slope * 1080, intercept * 1080 - 1080)
- print(slope1 * 1920, intercept1 * 1920)
- xi = np.linspace(-100, 100)
- # plt.plot(xi, (slope * xi + intercept) * 1080 - 1080)
- # plt.plot(dy, np.array(zoomy) * 1080 - 1080, linestyle="",marker=".")
- # plt.plot(xi, (slope1 * xi + intercept1) * 1920)
- # plt.plot(dx, np.array(shearx) * 1920, linestyle="",marker=".")
- # plt.show()
- # Выделим отдельную задачу - трекинг фигуриста.
- # Файлы, которые нужно стабилизировать
- FILES_TO_STAB = [
- # ['/media/vlad/new2021/editins/nuga-evalotta/DSCF0017.MOV', {'bad_runs': [[2652, 2704], [3593, 3602], [6370, 6396], [8749, 8785], [9034, 9062], [10438, 10461],
- # [12990, 13028], [15542, 15619], [15413, 15452], [15995, 16053], [18480, 18515], [21019, 21090], [21247, 21281], [21265, 21490],
- # [21505, 21606], [25215, 25245]]}],
- # ['/media/vlad/new2021/editins/nuga-evalotta/DSCF0015.MOV', {'bad_runs': [[15741, 15793], [18706, 18737], [20678, 20909], [21013, 21064]] }],
- # ['/media/vlad/new2021/editins/nuga-evalotta/DSCF0016.MOV', {'bad_runs': [[3750, 3862]]}],
- '/media/vlad/new2021/editins/nuga-evalotta/DSCF6876.MOV',
- '/media/vlad/new2021/sdcards/jumpfest2020/day3/sdcard3/DCIM/100_FUJI/DSCF0006.MOV',
- ]
- def main1():
- quality = 18
- # with open('/media/vlad/new2021/editins/sasha/sinitsa/dedups_3.json', 'r') as f:
- # duplicate_table = json.load(f)
- duplicate_table = {}
- for input_path in FILES_TO_STAB:
- if not isinstance(input_path, str):
- input_params = input_path[1]
- input_path = input_path[0]
- else:
- input_params = {}
- preset_good_runs = input_params.get('good_runs', [])
- preset_bad_runs = input_params.get('bad_runs', [])
- phantom_frames = input_params.get('phantom_frames', [])
- pto_files = input_params.get('pto_files', [])
- smoothness = input_params.get('smoothness', 20)
- trf_path = input_path + '.trf'
- filtered_trf_path = input_path + '.filtered.trf'
- output_path = input_path + '.stabilized.nosound.mp4'
- resolution = (width, height)
- if os.path.exists(output_path):
- print('{0} already exists skipping'.format(output_path))
- continue
- if not os.path.exists(trf_path):
- dup_set = duplicate_table.get(input_path, None)
- camera_trajectory, stats = gen_motion_vectors(input_path, True, trf_path, dup_set)
- print(stats)
- if os.path.exists(trf_path):
- print('{0} already exists, let\'s filter it'.format(trf_path))
- with open(trf_path, 'r') as f:
- parsed_trf = trf_parser(f)
- # analyze_trf(parsed_trf)
- # phantom_frames = []
- if phantom_frames == 'detect':
- phantom_frames = detect_phantom_frames(parsed_trf)
- filtered_trf = filter_trf(parsed_trf, resolution, initial_bad_runs=preset_bad_runs, run_whitelist=preset_good_runs,
- phantom_frames=phantom_frames,
- pto_files=pto_files,
- )
- # pto_files=['/media/vlad/new2020/fskating/gprusproj/sasha/DSCF0026.MOV.pto/frame9875 - frame18572.pto',
- # '/media/vlad/new2020/fskating/gprusproj/sasha/DSCF0026.MOV.pto/frame12073 - frame12076.pto'])
- # filtered_trf = parsed_trf
- with open(filtered_trf_path, 'w') as f:
- write_trf(f, filtered_trf)
- trajectory = [FrameInfo(None, None, None) for x in filtered_trf[1:]]
- del parsed_trf
- del filtered_trf
- for phantom_idx, frmidx in enumerate(sorted(phantom_frames)):
- trajectory[frmidx + phantom_idx - 1].is_bad_frame = True
- render(input_path, output_path, filtered_trf_path, trajectory, quality, smoothness=smoothness)
- else:
- smooth_trajectory = None
- smooth_trajectory = trajectory_smoother(camera_trajectory)
- render(input_path, output_path, trf_path, smooth_trajectory, quality)
- # render(input_path, output_path, trf_path, [], quality)
- # def extract_all_bad_frags():
- main1()
- # extract_all_bad_frags()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement