Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from PIL import Image
- from PIL import ImageStat
- import numpy as np
- import matplotlib.pyplot as plt
- from scipy.interpolate import PchipInterpolator
- from cv2 import cv2
- import subprocess
- ffmpeg_executable = 'ffmpeg'
- def fps_to_ffmpeg_fps(fps: float):
- if round(fps * 100) == 5994:
- return '60000/1001'
- elif round(fps * 100) % 100 == 0:
- return str(int(fps))
- return str(fps)
- def dnxhd_bitrate(w: int, h: int, fps: float):
- if w == 1920 and h == 1080:
- if int(round(fps)) == 60:
- return 'yuv422p', '440M', None
- elif int(round(fps)) == 50:
- return 'yuv422p', '365M', None
- elif int(round(fps)) == 30:
- return 'yuv422p', '220M', None
- elif w == 3840 and h == 2160:
- if int(round(fps)) == 60:
- return 'yuv422p10le', '1760M', 'dnxhr_hqx'
- elif int(round(fps)) == 50:
- return 'yuv422p10le', '1460M', 'dnxhr_hqx'
- elif int(round(fps)) == 30:
- return 'yuv422p10le', '880M', 'dnxhr_hqx'
- raise Exception(f'fps {fps} incompatible with dnxhd')
- class FfmpegVideoWriter:
- def __init__(self, path: str, quality: int, w: int, h: int, duration_s: float, fps: float, filters: str = None, preset: str = 'slow', audio_file: str = None):
- assert quality >= 0 and quality < 51
- fps_str = format(fps, '.2f')
- args = [ffmpeg_executable, '-y', '-f', 'image2pipe', '-vcodec', 'bmp', '-t', '00:{0}:{1}'.format(int(duration_s // 60), duration_s % 60), '-video_size', '{0}x{1}'.format(int(w), int(h)) , '-r', fps_str, '-i', '-']
- if audio_file:
- args += ['-i', audio_file, '-map', '0:v:0', '-map', '1:a:0']
- if filters:
- args += ['-vf', filters]
- res_str = f'{w}x{h}'
- pix_fmt, bitrate, profile = dnxhd_bitrate(w, h, fps)
- args += ['-vcodec', 'dnxhd', '-acodec', 'pcm_s16le', '-s', res_str, '-b:v', bitrate, '-s', res_str, '-r', fps_to_ffmpeg_fps(fps), '-pix_fmt', pix_fmt]
- if profile:
- args += ['-profile:v', profile]
- args += ['-f', 'mov', path]
- # args += ['-vcodec', 'dnxhd', '-acodec', 'pcm_s16le', '-s', '1920x1080', '-b:v', '365M', '-s', '1920x1080', '-r', '50', '-pix_fmt', 'yuv422p', '-f', 'mov', path]
- # args += ['-vcodec', 'libx264', '-crf', str(quality), '-preset', preset, '-r', fps_str, path]
- print(args)
- self.p = subprocess.Popen(args, stdin=subprocess.PIPE)
- def write_frame(self, cvframe):
- rgbimg = cv2.cvtColor(np.array(cvframe, dtype=np.uint8), cv2.COLOR_BGR2RGB)
- img = Image.fromarray(rgbimg)
- img.save(self.p.stdin, 'BMP')
- def release(self):
- self.p.stdin.close()
- self.p.wait()
- def create_curve(hist, cutpoint: float, cutpoint2: float):
- pixel_count = np.sum(hist)
- cumhist = np.cumsum(hist)
- thresh = pixel_count * cutpoint
- spointx = np.sum(np.less(cumhist, thresh))
- if spointx / 255 >= cutpoint:
- return np.arange(len(hist))
- # print(cutpoint, spointx, max(hist) / pixel_count / (spointx / 255))
- x = np.arange(len(hist))
- # cs = PchipInterpolator([0, spointx * 0.8, spointx, 255], [0, cutpoint2 * 0.9, cutpoint2, 1])
- # cs = PchipInterpolator([0, spointx, 255], [0, cutpoint2, 1])
- # Если много льда, то мы хотим его сжать, чем менее равномерно, тем более
- peak = max(hist) / pixel_count / (spointx / 255)
- ice_compression = (max(min(peak, 0.1), 0.01) - 0.01)
- # print(ice_compression)
- xs = [0, spointx * (0.9 - ice_compression), spointx, 255]
- ys = [0, cutpoint2 * 0.9, cutpoint2, 1]
- cs = PchipInterpolator(xs, ys)
- # Рисуем
- # fig, ax = plt.subplots(figsize=(6.5, 4))
- # ax.plot(x, hist / pixel_count * 50)
- # ax.plot(x, cs(x))
- # ax.plot(xs, ys, 'o')
- # plt.show()
- # print(cumhist, pixel_count * cutpoint)
- return np.round(cs(x) * 255)
- def make_monotonic(a):
- amon = [a[0]]
- last = a[0]
- for x in a[1:]:
- if x >= last:
- amon.append(x)
- last = x
- else:
- amon.append(last)
- return np.array(amon, dtype='uint8')
- def pic1(im):
- # im = Image.open("/media/vlad/new2021/editins/sasha/jumpfest/day3/mpv-shot0002.jpg")
- rgb_hist = np.array(im.histogram()[:256]) + np.array(im.histogram()[256:512]) + np.array(im.histogram()[512:768])
- crv = create_curve(np.array(rgb_hist), 0.99, 0.96)
- crvmon = np.array(make_monotonic(list(crv)), dtype='uint8')
- print(len(crvmon))
- im2 = im.point(lambda x: crvmon[x])
- return im2
- class BufferedVideoInput:
- def __init__(self, vid, lookahead, fun):
- self.vid = vid
- self.cached_frames = []
- self.lookahead = lookahead
- self.fun = fun
- self.cached_fun_results = []
- self.pos = -1
- self.eof = False
- def get_fun_results(self, offset):
- idx = self.pos + offset - 1
- return self.cached_fun_results[max(min(idx, len(self.cached_fun_results) - 1), 0)]
- def read(self):
- while not self.eof and len(self.cached_frames) < self.lookahead:
- retval, frame = self.vid.read()
- if not retval:
- self.eof = True
- else:
- self.cached_fun_results.append(self.fun(frame))
- self.cached_frames.append(frame)
- self.pos += 1
- if len(self.cached_frames) == 0:
- return False, None
- else:
- cached_frame = self.cached_frames[0]
- self.cached_frames.pop(0)
- return True, cached_frame
- def get(self, cap):
- return self.vid.get(cap)
- def release(self):
- self.vid.release()
- def create_brightness_from_video_capture(vid_capture):
- def make_curve(frame):
- hist_r = cv2.calcHist([frame], [0], None, [256], [0,256])
- hist_g = cv2.calcHist([frame], [1], None, [256], [0,256])
- hist_b = cv2.calcHist([frame], [2], None, [256], [0,256])
- rgb_hist = np.array(hist_r + hist_g + hist_b)
- crv = create_curve(np.array(rgb_hist), 0.99, 0.96)
- crvmon = make_monotonic(list(crv))
- return crvmon
- return BufferedVideoInput(vid_capture, 15, make_curve)
- def vid_bright(input_path, output_path):
- vid = create_brightness_from_video_capture(cv2.VideoCapture(input_path))
- input_width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
- input_height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
- input_fps = vid.get(cv2.CAP_PROP_FPS)
- framecount = vid.get(cv2.CAP_PROP_FRAME_COUNT)
- filters_str = ''
- quality = 12
- print(input_fps)
- print(filters_str)
- outputvid = FfmpegVideoWriter(output_path, fps=input_fps, quality=quality, filters=filters_str, w=input_width, h=input_height, duration_s=framecount / input_fps, audio_file=input_path)
- i = 1
- while True:
- retval, frame = vid.read()
- if not retval:
- break
- # Скользящее окно с ядром [1, 1, .., 1]
- curve = vid.get_fun_results(1)
- total_curve = np.zeros(curve.shape)
- total_curve += vid.get_fun_results(1)
- for i in range(5):
- total_curve += vid.get_fun_results(1 - i)
- total_curve += vid.get_fun_results(1 + i)
- total_curve = np.array(total_curve / (5 * 2 + 1), dtype='uint8')
- frame_cc = total_curve[frame]
- outputvid.write_frame(frame_cc)
- i += 1
- outputvid.release()
- vid.release()
- path_list = []
- for path in path_list:
- vid_bright(path, path + '.bright.mp4')
- # im.show()
- # im2.show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement