SHARE
TWEET

Untitled

a guest Mar 15th, 2019 63 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import cv2
  2. from redis import ConnectionPool, Redis
  3. import numpy as np
  4. import json, time
  5. from multithreading import Thread, Event
  6.  
  7. redis_config = {"server": "localhost",
  8.                 "passwd": '',
  9.                 "port": '6379',
  10.                 "db": 0}
  11.  
  12. class CamStream(object):
  13.     def __init__(self, cam_addr_list, image_size=None, fps=None, use_cache=False):
  14.         """
  15.         cam_addr_list: list of camera device addresses.
  16.         """
  17.         self._cams = []
  18.         for cam_addr in cam_addr_list:
  19.             cap = cv2.VideoCapture(cam_addr)
  20.             if fps:
  21.                 cap.set(cv2.CAP_PROP_FPS, fps)
  22.             self._cams.append(cap)
  23.         self.cam_addr_list = cam_addr_list
  24.         self.fps = fps
  25.         self._image_size = image_size
  26.         self._use_cache = use_cache
  27.         if self._use_cache:
  28.             self._redis_db = self._get_redis_conn(host=redis_config["server"],
  29.                                                   passwd=redis_config["passwd"],
  30.                                                   port=redis_config["port"],
  31.                                                   db=redis_config["db"])
  32.             # self._start_caching = False    # primitive variable are not passed to subprocess
  33.             self._queue_name = "video"
  34.             self._queue_length = 600
  35.  
  36.     def _get_redis_conn(self, host, passwd, port, db):
  37.         # initialize the redis queue for storing image cache
  38.         pool = ConnectionPool(host=host, password=passwd, port=port, db=db)
  39.         redis_db = Redis(connection_pool=pool)
  40.         return redis_db
  41.  
  42.     def start_cache(self):
  43.         # self._start_caching = True
  44.         self._start_caching = Event()
  45.         self.p = Thread(target=self._cache_image, kwargs={"cams": self._cams})
  46.         self.p.start()
  47.         return self
  48.  
  49.     def _cache_image(self, cams):
  50.         """
  51.         Cache captured image into redis queue
  52.         """
  53.         while 1:
  54.             # if self._start_caching:
  55.             if not self._start_caching.is_set():
  56.                 frames = []
  57.                 for cam in cams:
  58.                     ret, frame = cam.read()
  59.                     if ret:
  60.                         if self._image_size:
  61.                             frame = cv2.resize(frame, self._image_size)
  62.                             frames.append(frame)
  63.                 if frames:
  64.                     frames = np.stack(frames)
  65.                     # info = {"frames": frames.tolist()}
  66.                     info = frames.tostring()
  67.                     # self._redis_db.rpush(self._queue_name, json.dumps(info))
  68.                     self._redis_db.rpush(self._queue_name, info)
  69.                     self._redis_db.ltrim(self._queue_name, 0, self._queue_length)
  70.             else:
  71.                 time.sleep(0.5)
  72.  
  73.     def stop_cache(self):
  74.         # self._start_caching = False
  75.         self._start_caching.set()
  76.         self.p.join(timeout=0.5)
  77.         return self
  78.  
  79.     def check_cache(self):
  80.         ret = self._redis_db.lpop(self._queue_name)
  81.         if ret:
  82.             print("Queue exists!")
  83.             return True
  84.         else:
  85.             print("Queue dies")
  86.             return False
  87.  
  88.     def delete_cache(self):
  89.         self._redis_db.delete(self._queue_name)
  90.  
  91.     def capture(self):
  92.         """Capture images from all camera and return them in batch"""
  93.         if self._use_cache:
  94.             print(self._redis_db.llen(self._queue_name))
  95.             frame_buf = self._redis_db.lpop(self._queue_name)
  96.             if frame_buf:
  97.                 # frames = [np.array(x).astype(np.uint8) for x in json.loads(frame_buf)["frames"]]
  98.                 frames = np.reshape(np.fromstring(frame_buf, dtype=np.uint8), (len(self._cams), self._image_size[1], self._image_size[0], 3))
  99.                 frames = [x for x in frames]
  100.             else:
  101.                 frames = []
  102.         else:
  103.             frames = []
  104.             for cam in self._cams:
  105.                 ret, frame = cam.read()
  106.                 if self._image_size:
  107.                     frame = cv2.resize(frame, self._image_size)
  108.                 if ret:
  109.                     frames.append(frame)
  110.         if frames:
  111.             return frames
  112.         else:
  113.             print("Fail to capture frame!")
  114.             return []
  115.  
  116.  
  117. def test_cam_stream():
  118.     addr = [0, 1]
  119.     cs = CamStream(addr, (640, 480), use_cache=True, fps=20)
  120.     cs.start_cache()
  121.     time.sleep(2)
  122.     while 1:
  123.         frames = cs.capture()
  124.         if frames:
  125.             cv2.imshow("test", frames[0])
  126.         key = cv2.waitKey(20)
  127.         if key == ord('q'):
  128.             cs.stop_cache()
  129.             while 1:
  130.                 if cs.check_cache():
  131.                     print("not empty")
  132.                 else:
  133.                     break
  134.             cv2.destroyAllWindows()
  135.             break
  136.     print("Done")
  137.  
  138. if __name__ == "__main__":
  139.     test_cam_stream()
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Not a member of Pastebin yet?
Sign Up, it unlocks many cool features!
 
Top