Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Entry point
- import numpy as np
- import time
- import ctypes
- import argparse
- from multiprocessing import Array, Value, Process
- from billiard import Process, forking_enable
- import cv2
- class VideoCapture:
- """
- Class that handles video capture from device or video file
- """
- def __init__(self, device=0, delay=0.):
- """
- :param device: device index or video filename
- :param delay: delay between frame captures in seconds(floating point is allowed)
- """
- self._cap = cv2.VideoCapture(device)
- self._delay = delay
- def _proper_frame(self, delay=None):
- """
- :param delay: delay between frames capture(in seconds)
- :param finished: synchronized wrapper for int(see multiprocessing.Value)
- :return: frame
- """
- snapshot = None
- correct_img = False
- fail_counter = -1
- while not correct_img:
- # Capture the frame
- correct_img, snapshot = self._cap.read()
- fail_counter += 1
- # Raise exception if there's no output from the device
- if fail_counter > 10:
- raise Exception("Capture: exceeded number of tries to capture the frame.")
- # Delay before we get a new frame
- time.sleep(delay)
- return snapshot
- def get_size(self):
- """
- :return: size of the captured image
- """
- return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
- int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
- def get_stream_function(self):
- """
- Returns stream_function object function
- """
- def stream_function(image, finished):
- """
- Function keeps capturing frames until finished = 1
- :param image: shared numpy array for multiprocessing(see multiprocessing.Array)
- :param finished: synchronized wrapper for int(see multiprocessing.Value)
- :return: nothing
- """
- # Incorrect input array
- if image.shape != self.get_size():
- raise Exception("Capture: improper size of the input image")
- print("Capture: start streaming")
- # Capture frame until we get finished flag set to True
- while not finished.value:
- image[:, :, :] = self._proper_frame(self._delay)
- # Release the device
- self.release()
- return stream_function
- def release(self):
- self._cap.release()
- def main():
- # Add program arguments
- parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument('-output', dest="output", default="output.avi", help='name of the output video file')
- parser.add_argument('-log', dest="log", default="frames.log", help='name of the log file')
- parser.add_argument('-fps', dest="fps", default=25., help='frames per second value')
- # Read the arguments if any
- result = parser.parse_args()
- fps = float(result.fps)
- output = result.output
- log = result.log
- # Initialize VideoCapture object and auxilary objects
- cap = VideoCapture()
- shape = cap.get_size()
- stream = cap.get_stream_function()
- # Define shared variables(which are synchronised so race condition is excluded)
- shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
- frame = np.ctypeslib.as_array(shared_array_base.get_obj())
- frame = frame.reshape(shape[0], shape[1], shape[2])
- finished = Value('i', 0)
- # Start processes which run in parallel
- forking_enable(0) # Supposedly this is all I need
- video_process = Process(target=stream, args=(frame, finished))
- video_process.start() # Launch capture process
- # Sleep for some time to allow videocapture start working first
- time.sleep(2)
- # Termination function
- def terminate():
- print("Main: termination")
- finished.value = True
- # Wait for all processes to finish
- time.sleep(1)
- # Terminate working processes
- video_process.terminate()
- # The capturing works until keyboard interrupt is pressed.
- while True:
- try:
- # Display the resulting frame
- cv2.imshow('frame', frame)
- cv2.waitKey(1) # Display it at least one ms before going to the next frame
- time.sleep(0.1)
- except KeyboardInterrupt:
- cv2.destroyAllWindows()
- terminate()
- break
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment