Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #@title experimental batch inference WITH PROGRESS BAR
- import os
- import cv2
- from IPython.display import Image, display
- from concurrent.futures import ThreadPoolExecutor
- # Paths
- source_video_path = '/content/LivePortrait/37secsclip.mp4' #@param {type:"string"}
- driving_video_path = '/content/LivePortrait/drivingcut35secs.mp4' #@param {type:"string"}
- output_dir = 'output_frames'
- animations_dir = '/content/LivePortrait/animations'
- last_frames_dir = 'last_frames'
- source_frames_dir = 'source_frames'
- driving_frames_dir = 'driving_frames'
- os.makedirs(output_dir, exist_ok=True)
- os.makedirs(animations_dir, exist_ok=True)
- os.makedirs(last_frames_dir, exist_ok=True)
- os.makedirs(source_frames_dir, exist_ok=True)
- os.makedirs(driving_frames_dir, exist_ok=True)
- # Get the FPS of both videos
- source_fps = cv2.VideoCapture(source_video_path).get(cv2.CAP_PROP_FPS)
- driving_fps = cv2.VideoCapture(driving_video_path).get(cv2.CAP_PROP_FPS)
- # Settings
- fps_option = 'custom' #@param ["source", "driving", "higher", "lower", "custom"] {allow-input: true}
- custom_fps = 15 #@param {type:"number"} # Only used if fps_option is 'custom'
- num_workers = 4 #@param {type:"number"}
- # Determine the frame rate to use
- if fps_option == 'source':
- fps = source_fps
- elif fps_option == 'driving':
- fps = driving_fps
- elif fps_option == 'higher':
- fps = max(source_fps, driving_fps)
- elif fps_option == 'lower':
- fps = min(source_fps, driving_fps)
- elif fps_option == 'custom':
- fps = custom_fps
- else:
- fps = source_fps # Default to source video fps if option is unknown
- print(f"Using frame rate: {fps} FPS")
- # Clear directories
- for folder in [source_frames_dir, driving_frames_dir, last_frames_dir, output_dir]:
- for file in os.listdir(folder):
- file_path = os.path.join(folder, file)
- if os.path.isfile(file_path) or os.path.islink(file_path):
- os.unlink(file_path)
- elif os.path.isdir(file_path):
- os.rmdir(file_path)
- # Extract frames from source video using ffmpeg
- os.system(f'ffmpeg -i {source_video_path} -vf "fps={fps}" {source_frames_dir}/frame_%04d.png')
- # Extract frames from driving video using ffmpeg
- os.system(f'ffmpeg -i {driving_video_path} -vf "fps={fps}" {driving_frames_dir}/frame_%04d.png')
- # List extracted frames
- source_frames = sorted(os.listdir(source_frames_dir))
- driving_frames = sorted(os.listdir(driving_frames_dir))
- # Determine the minimum length
- min_length = min(len(source_frames), len(driving_frames))
- print(f"Using {min_length} frames for processing.")
- # Create 2-frame videos
- for i in range(min_length):
- frame1_path = os.path.join(driving_frames_dir, driving_frames[0]) # first frame
- frame2_path = os.path.join(driving_frames_dir, driving_frames[i])
- # Create 2-frame video using ffmpeg
- two_frame_video_path = os.path.join(output_dir, f'two_frame_video_{i}.mp4')
- os.system(f'ffmpeg -y -loop 1 -t 0.5 -i {frame1_path} -loop 1 -t 0.5 -i {frame2_path} '
- f'-filter_complex "[0:v][1:v]concat=n=2:v=1:a=0[outv]" -map "[outv]" {two_frame_video_path}')
- # print(f"Created 2-frame video: {two_frame_video_path}")
- # Verify the first frame of the first 2-frame video
- two_frame_video_path = os.path.join(output_dir, 'two_frame_video_0.mp4')
- first_frame_path = os.path.join(output_dir, 'verify_frame.png')
- os.system(f'ffmpeg -i {two_frame_video_path} -vf "select=eq(n\,0)" -q:v 3 {first_frame_path}')
- # # Display the extracted frame from the first 2-frame video
- # if os.path.exists(first_frame_path):
- # display(Image(filename=first_frame_path))
- # else:
- # print("Failed to extract the first frame from the 2-frame video.")
- # Function to run inference
- import os
- from concurrent.futures import ThreadPoolExecutor
- from tqdm import tqdm
- # Function to run inference on a batch of frames
- def run_batch_inference(batch):
- batch_commands = []
- for i in batch:
- input_image_path = os.path.join(source_frames_dir, source_frames[i])
- driving_video_path = os.path.join(output_dir, f'two_frame_video_{i}.mp4')
- command = f'python inference.py -s {input_image_path} -d {driving_video_path}'
- batch_commands.append(command)
- # Run the batch inference
- os.system(' && '.join(batch_commands))
- # Define batch size
- batch_size = 32 # Adjust this based on your system's capabilities
- # Create batches
- batches = [range(i, min(i + batch_size, min_length)) for i in range(0, min_length, batch_size)]
- # Calculate total number of batches
- total_batches = len(batches)
- # Run batch inference in parallel with progress bar
- with ThreadPoolExecutor(max_workers=num_workers) as executor:
- list(tqdm(executor.map(run_batch_inference, batches),
- total=total_batches,
- desc="Processing batches",
- unit="batch"))
- print("Re-combining Video")
- # Save the last frame of each inference video
- for i in range(min_length):
- input_image_path = os.path.join('source_frames', source_frames[i])
- driving_video_path = os.path.join(output_dir, f'two_frame_video_{i}.mp4')
- # Determine output file path
- image_filename = os.path.splitext(os.path.basename(input_image_path))[0]
- video_filename = os.path.splitext(os.path.basename(driving_video_path))[0]
- output_filename = f"{image_filename}--{video_filename}.mp4"
- output_path = f"{animations_dir}/{output_filename}"
- if not os.path.exists(output_path):
- print(f"Output video not found: {output_path}")
- continue
- # Extract the last frame of the output video
- last_frame_path = os.path.join(last_frames_dir, f'last_frame_{i}.png')
- os.system(f'ffmpeg -sseof -3 -i {output_path} -update 1 -q:v 1 {last_frame_path}')
- # print(f"Saved last frame of {output_path} to {last_frame_path}")
- # print(f"Saved frame {i}")
- # Delete the final output video if it already exists
- final_video_path = os.path.join(output_dir, 'final_output2.mp4')
- if os.path.exists(final_video_path):
- os.remove(final_video_path)
- # Combine the last frames into a final video using ffmpeg
- os.system(f'ffmpeg -framerate {fps} -i {last_frames_dir}/last_frame_%d.png -c:v libx264 -pix_fmt yuv420p {final_video_path}')
- print(f"Final video saved at: {final_video_path}")
- from IPython.display import HTML
- from base64 import b64encode
- # Read the output video file
- mp4 = open("output_frames/final_output2.mp4", 'rb').read()
- data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
- # Display the video in HTML
- HTML(f"""
- <video width=400 controls>
- <source src="{data_url}" type="video/mp4">
- </video>
- """)
Add Comment
Please, Sign In to add comment