Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cv2
- from redis import ConnectionPool, Redis
- import numpy as np
- import json, time
- from multithreading import Thread, Event
- redis_config = {"server": "localhost",
- "passwd": '',
- "port": '6379',
- "db": 0}
- class CamStream(object):
- def __init__(self, cam_addr_list, image_size=None, fps=None, use_cache=False):
- """
- cam_addr_list: list of camera device addresses.
- """
- self._cams = []
- for cam_addr in cam_addr_list:
- cap = cv2.VideoCapture(cam_addr)
- if fps:
- cap.set(cv2.CAP_PROP_FPS, fps)
- self._cams.append(cap)
- self.cam_addr_list = cam_addr_list
- self.fps = fps
- self._image_size = image_size
- self._use_cache = use_cache
- if self._use_cache:
- self._redis_db = self._get_redis_conn(host=redis_config["server"],
- passwd=redis_config["passwd"],
- port=redis_config["port"],
- db=redis_config["db"])
- # self._start_caching = False # primitive variable are not passed to subprocess
- self._queue_name = "video"
- self._queue_length = 600
- def _get_redis_conn(self, host, passwd, port, db):
- # initialize the redis queue for storing image cache
- pool = ConnectionPool(host=host, password=passwd, port=port, db=db)
- redis_db = Redis(connection_pool=pool)
- return redis_db
- def start_cache(self):
- # self._start_caching = True
- self._start_caching = Event()
- self.p = Thread(target=self._cache_image, kwargs={"cams": self._cams})
- self.p.start()
- return self
- def _cache_image(self, cams):
- """
- Cache captured image into redis queue
- """
- while 1:
- # if self._start_caching:
- if not self._start_caching.is_set():
- frames = []
- for cam in cams:
- ret, frame = cam.read()
- if ret:
- if self._image_size:
- frame = cv2.resize(frame, self._image_size)
- frames.append(frame)
- if frames:
- frames = np.stack(frames)
- # info = {"frames": frames.tolist()}
- info = frames.tostring()
- # self._redis_db.rpush(self._queue_name, json.dumps(info))
- self._redis_db.rpush(self._queue_name, info)
- self._redis_db.ltrim(self._queue_name, 0, self._queue_length)
- else:
- time.sleep(0.5)
- def stop_cache(self):
- # self._start_caching = False
- self._start_caching.set()
- self.p.join(timeout=0.5)
- return self
- def check_cache(self):
- ret = self._redis_db.lpop(self._queue_name)
- if ret:
- print("Queue exists!")
- return True
- else:
- print("Queue dies")
- return False
- def delete_cache(self):
- self._redis_db.delete(self._queue_name)
- def capture(self):
- """Capture images from all camera and return them in batch"""
- if self._use_cache:
- print(self._redis_db.llen(self._queue_name))
- frame_buf = self._redis_db.lpop(self._queue_name)
- if frame_buf:
- # frames = [np.array(x).astype(np.uint8) for x in json.loads(frame_buf)["frames"]]
- frames = np.reshape(np.fromstring(frame_buf, dtype=np.uint8), (len(self._cams), self._image_size[1], self._image_size[0], 3))
- frames = [x for x in frames]
- else:
- frames = []
- else:
- frames = []
- for cam in self._cams:
- ret, frame = cam.read()
- if self._image_size:
- frame = cv2.resize(frame, self._image_size)
- if ret:
- frames.append(frame)
- if frames:
- return frames
- else:
- print("Fail to capture frame!")
- return []
- def test_cam_stream():
- addr = [0, 1]
- cs = CamStream(addr, (640, 480), use_cache=True, fps=20)
- cs.start_cache()
- time.sleep(2)
- while 1:
- frames = cs.capture()
- if frames:
- cv2.imshow("test", frames[0])
- key = cv2.waitKey(20)
- if key == ord('q'):
- cs.stop_cache()
- while 1:
- if cs.check_cache():
- print("not empty")
- else:
- break
- cv2.destroyAllWindows()
- break
- print("Done")
- if __name__ == "__main__":
- test_cam_stream()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement