Advertisement
Guest User

Untitled

a guest
Mar 15th, 2019
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.70 KB | None | 0 0
  1. import cv2
  2. from redis import ConnectionPool, Redis
  3. import numpy as np
  4. import json, time
  5. from multithreading import Thread, Event
  6.  
  7. redis_config = {"server": "localhost",
  8. "passwd": '',
  9. "port": '6379',
  10. "db": 0}
  11.  
  12. class CamStream(object):
  13. def __init__(self, cam_addr_list, image_size=None, fps=None, use_cache=False):
  14. """
  15. cam_addr_list: list of camera device addresses.
  16. """
  17. self._cams = []
  18. for cam_addr in cam_addr_list:
  19. cap = cv2.VideoCapture(cam_addr)
  20. if fps:
  21. cap.set(cv2.CAP_PROP_FPS, fps)
  22. self._cams.append(cap)
  23. self.cam_addr_list = cam_addr_list
  24. self.fps = fps
  25. self._image_size = image_size
  26. self._use_cache = use_cache
  27. if self._use_cache:
  28. self._redis_db = self._get_redis_conn(host=redis_config["server"],
  29. passwd=redis_config["passwd"],
  30. port=redis_config["port"],
  31. db=redis_config["db"])
  32. # self._start_caching = False # primitive variable are not passed to subprocess
  33. self._queue_name = "video"
  34. self._queue_length = 600
  35.  
  36. def _get_redis_conn(self, host, passwd, port, db):
  37. # initialize the redis queue for storing image cache
  38. pool = ConnectionPool(host=host, password=passwd, port=port, db=db)
  39. redis_db = Redis(connection_pool=pool)
  40. return redis_db
  41.  
  42. def start_cache(self):
  43. # self._start_caching = True
  44. self._start_caching = Event()
  45. self.p = Thread(target=self._cache_image, kwargs={"cams": self._cams})
  46. self.p.start()
  47. return self
  48.  
  49. def _cache_image(self, cams):
  50. """
  51. Cache captured image into redis queue
  52. """
  53. while 1:
  54. # if self._start_caching:
  55. if not self._start_caching.is_set():
  56. frames = []
  57. for cam in cams:
  58. ret, frame = cam.read()
  59. if ret:
  60. if self._image_size:
  61. frame = cv2.resize(frame, self._image_size)
  62. frames.append(frame)
  63. if frames:
  64. frames = np.stack(frames)
  65. # info = {"frames": frames.tolist()}
  66. info = frames.tostring()
  67. # self._redis_db.rpush(self._queue_name, json.dumps(info))
  68. self._redis_db.rpush(self._queue_name, info)
  69. self._redis_db.ltrim(self._queue_name, 0, self._queue_length)
  70. else:
  71. time.sleep(0.5)
  72.  
  73. def stop_cache(self):
  74. # self._start_caching = False
  75. self._start_caching.set()
  76. self.p.join(timeout=0.5)
  77. return self
  78.  
  79. def check_cache(self):
  80. ret = self._redis_db.lpop(self._queue_name)
  81. if ret:
  82. print("Queue exists!")
  83. return True
  84. else:
  85. print("Queue dies")
  86. return False
  87.  
  88. def delete_cache(self):
  89. self._redis_db.delete(self._queue_name)
  90.  
  91. def capture(self):
  92. """Capture images from all camera and return them in batch"""
  93. if self._use_cache:
  94. print(self._redis_db.llen(self._queue_name))
  95. frame_buf = self._redis_db.lpop(self._queue_name)
  96. if frame_buf:
  97. # frames = [np.array(x).astype(np.uint8) for x in json.loads(frame_buf)["frames"]]
  98. frames = np.reshape(np.fromstring(frame_buf, dtype=np.uint8), (len(self._cams), self._image_size[1], self._image_size[0], 3))
  99. frames = [x for x in frames]
  100. else:
  101. frames = []
  102. else:
  103. frames = []
  104. for cam in self._cams:
  105. ret, frame = cam.read()
  106. if self._image_size:
  107. frame = cv2.resize(frame, self._image_size)
  108. if ret:
  109. frames.append(frame)
  110. if frames:
  111. return frames
  112. else:
  113. print("Fail to capture frame!")
  114. return []
  115.  
  116.  
  117. def test_cam_stream():
  118. addr = [0, 1]
  119. cs = CamStream(addr, (640, 480), use_cache=True, fps=20)
  120. cs.start_cache()
  121. time.sleep(2)
  122. while 1:
  123. frames = cs.capture()
  124. if frames:
  125. cv2.imshow("test", frames[0])
  126. key = cv2.waitKey(20)
  127. if key == ord('q'):
  128. cs.stop_cache()
  129. while 1:
  130. if cs.check_cache():
  131. print("not empty")
  132. else:
  133. break
  134. cv2.destroyAllWindows()
  135. break
  136. print("Done")
  137.  
  138. if __name__ == "__main__":
  139. test_cam_stream()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement