Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import cv2
- import rospy
- from sensor_msgs.msg import Image, PointCloud2, CameraInfo
- import os
- import yaml
- import rosbag
- from cv_bridge import CvBridge
- rospy.init_node('Translator')
- class Translator:
- mode = 'republish'
- cam_ids = [i for i in range(4)]
- video_name = 'econVideo_2021-09-29-10-29-58'
- camera_info_msg = CameraInfo()
- bridge = CvBridge()
- def __init__(self, input_bag, video_path, calibration_files_path):
- # Takes as input a ROSbag containing
- self.bag = input_bag
- self.video_path = video_path
- self.calibration_files_path = calibration_files_path
- #@staticmethod
- #def
- #orig = os.path.splitext(bag)[0] + ".orig.bag"
- #move(bag, orig)
- def translate(self, video_path):
- setup_cameras(video_path)
- do_translate()
- def setup_cameras(self):
- self.cameras = dict()
- for cid in self.cam_ids:
- self.cam_info = dict()
- print("{}".format(self.video_path) + "/{}_{}.mkv".format(self.video_name, cid))
- self.cam_info['cap'] = cv2.VideoCapture("{}".format(self.video_path) + "/{}_{}.mkv".format(self.video_name, cid))
- self.cameras[cid] = self.cam_info
- if self.mode == 'republish':
- # Setting up camera, camera_info & velodyne publishers. Also setting up cv2.VideoCapture for each video file.
- for cid in self.cam_ids:
- self.cam_info['publisher'] = rospy.Publisher('/econ_cam_{}/image_raw'.format(cid), Image, queue_size=25)
- self.cam_info['publisher_info'] = rospy.Publisher('/econ_cam_{}/camera_info'.format(cid), CameraInfo, queue_size=25)
- self.cameras[cid] = self.cam_info
- self.velo_publisher = rospy.Publisher('/velodyne_points', PointCloud2, queue_size=10)
- print(self.cameras[1]['publisher'])
- return self.cameras, self.velo_publisher
- @staticmethod
- def parse_camera_info_from_yaml(yaml_fname):
- info = {}
- if os.path.isfile(yaml_fname): # if a file containing camera info exists extract its content.
- with open(yaml_fname, "r") as file_handle:
- calib_data = yaml.load(file_handle)
- # Parse
- info['width'] = calib_data["image_width"]
- info['height'] = calib_data["image_height"]
- info['K'] = calib_data["camera_matrix"]["data"]
- info['D'] = calib_data["distortion_coefficients"]["data"]
- info['R'] = calib_data["rectification_matrix"]["data"]
- info['P'] = calib_data["projection_matrix"]["data"]
- info['distortion_model'] = calib_data["distortion_model"]
- else: # if a file containing camera info does not exist create dummy camera info.
- info['width'] = 0
- info['height'] = 0
- info['K'] = [0, 0, 0, 0, 0, 0, 0, 0, 0]
- info['D'] = [0, 0, 0, 0, 0]
- info['R'] = [0, 0, 0, 0, 0, 0, 0, 0, 0]
- info['P'] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
- info['distortion_model'] = "plumb_bob"
- return info
- def cam_info_data(self):
- self.info_data = {}
- for idx in self.cameras.keys():
- yaml_fname = '{}'.format(self.calibration_files_path) + 'cam-{}-camerainfo.yaml'.format(idx)
- self.info_data[idx] = self.parse_camera_info_from_yaml(yaml_fname)
- #print(self.info_data[idx])
- return self.info_data
- def pub_camera_info(self, frames, timestamp, info):
- for idx, frame in enumerate(frames):
- self.camera_info_msg.header.stamp = timestamp # NOT TAKEN BY SOMETHING YET
- self.camera_info_msg.header.frame_id = 'cam_{}'.format(idx)
- self.camera_info_msg.width = info[idx]["width"]
- self.camera_info_msg.height = info[idx]["height"]
- self.camera_info_msg.K = info[idx]["K"]
- self.camera_info_msg.D = info[idx]["D"]
- self.camera_info_msg.R = info[idx]["R"]
- self.camera_info_msg.P = info[idx]["P"]
- self.camera_info_msg.distortion_model = info[idx]["distortion_model"]
- if self.mode == 'republish':
- self.cameras[idx]['publisher_info'].publish(self.camera_info_msg)
- #elif mode == 'rewrite':
- # outbag.write('/econ_cam_{}/camera_info'.format(idx), camera_info_msg, timestamp
- def read_frames(self, msg):
- frames = [None for i in range(4)]
- for idx, cam in enumerate(self.cameras):
- ret, frames[idx] = self.cameras[idx]['cap'].read()
- if not ret:
- print("Non-valid frame.")
- break
- elif not any(frame is None for frame in frames): # Wait to gather all the frames from the videos.
- timestamp = msg.header.stamp
- return frames, timestamp
- def pub_imgs(self,frames, cameras, timestamp):
- for idx, frame in enumerate(frames):
- img_msg = self.bridge.cv2_to_imgmsg(frame, encoding="bgr8")
- img_msg.header.frame_id = 'cam_{}'.format(idx)
- img_msg.header.stamp = timestamp
- if self.mode == 'republish':
- self.cameras[idx]['publisher'].publish(img_msg)
- elif self.mode == 'rewrite':
- pass #outbag.write('/econ_cam_{}/image_raw'.format(idx), img_msg, timestamp)
- def translate(self):
- info_data = self.cam_info_data()
- bag = rosbag.Bag(self.bag)
- cameras = self.cameras
- for topic, msg, t in bag.read_messages():
- if topic=='/velodyne_points':
- self.velo_publisher.publish(msg)
- elif topic=='/econ_vid':
- frames, timestamp = self.read_frames(msg)
- self.pub_imgs(frames, cameras, timestamp)
- self.pub_camera_info(frames, timestamp, info_data)
- test_1 = Translator('/home/u2m/time_check/check.bag', '/home/u2m/time_check', '/home/u2m/time_check/')
- test_1.setup_cameras()
- #print(test_1.cameras)
- #test_1.cam_info_data()
- #print(test_1.cam_info_data())
- print(test_1.cameras[1])
- test_1.translate()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement