Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- import matplotlib
- import matplotlib.pyplot as plt
- import rospy
- from sensor_msgs.msg import PointCloud2 as pc2
- import sensor_msgs.point_cloud2 as pc2
- from sensor_msgs.msg import LaserScan
- from nav_msgs.msg import Odometry
- from geometry_msgs.msg import Pose
- from laser_geometry import LaserProjection
- import open3d as o3d
- import numpy as np
- import collections
- import copy
- import time
- # class OdomShiz:
- # def __init__(self):
- # self.OdomSubscriber = rospy.Subscriber("/odom", Odometry, self.odom_callback)
- # self.odom_trajectory = {'x': [], 'y': []}
- #
- # def odom_callback(self,data):
- # # x_pos = data.pose.pose.position.x
- # # y_pos = data.pose.pose.position.y
- # # z_pos = data.pose.pose.position.z
- #
- # self.odom_trajectory['x'].append(data.pose.pose.position.x)
- # self.odom_trajectory['y'].append(data.pose.pose.position.y)
- # self.update_odom_plot()
- #
- # def update_odom_plot(self):
- # odom_plot.plot(self.odom_trajectory['x'], self.odom_trajectory['y'])
- # plt.draw()
- # plt.pause(0.001)
- # plt.show()
- class SickDawgPCz:
- def __init__(self):
- self.laserProj = LaserProjection()
- self.laserSubscriber = rospy.Subscriber("/scan", LaserScan, self.laser_callback)
- self.queue = collections.deque()
- self.current_pos = np.array([0, 0, 0])
- self.vis = o3d.visualization.Visualizer()
- self.source_geometry = None
- self.target_geometry = None
- self.trajectory = {'x': [], 'y': []}
- self.scan_count = 0
- self.trans_init = None
- # Odometry stuff
- self.OdomSubscriber = rospy.Subscriber("/odom", Odometry, self.odom_callback)
- self.odom_trajectory = {'x': [], 'y': []}
- def laser_callback(self, data):
- pc = self.laser_scan_to_point_cloud(data)
- # o3d.visualization.draw_geometries([pc])
- # Append cloud to the queue
- self.queue.append(pc)
- self.scan_count += 1
- def odom_callback(self, data):
- self.odom_trajectory['x'].append(data.pose.pose.position.x)
- self.odom_trajectory['y'].append(data.pose.pose.position.y)
- # self.update_odom_plot()
- def laser_scan_to_point_cloud(self, scan):
- cloud_out = self.laserProj.projectLaser(scan)
- # Get cloud data from ros_cloud
- # Source: https://github.com/felixchenfy/open3d_ros_pointcloud_conversion/blob/master/lib_cloud_conversion_between_Open3D_and_ROS.py
- field_names = [field.name for field in cloud_out.fields]
- cloud_data = list(pc2.read_points(cloud_out, skip_nans=True, field_names=field_names))
- open3d_cloud = o3d.PointCloud()
- xyz = [(x, y, z) for x, y, z, intensity, index in cloud_data] # get xyz
- open3d_cloud.points = o3d.Vector3dVector(np.array(xyz))
- return open3d_cloud
- def icp_registration(self):
- # Check for 2 clouds in queue
- if len(self.queue) < 2:
- return
- # print("QUEUE LENGTH:" + str(len(self.queue)))
- source = self.queue.popleft()
- target = self.queue.popleft()
- threshold = 0.02
- if self.trans_init is None:
- self.trans_init = np.identity(4)
- reg_p2l = o3d.registration.registration_icp(
- source, target, threshold, self.trans_init,
- o3d.registration.TransformationEstimationPointToPoint(),
- o3d.registration.ICPConvergenceCriteria(max_iteration=200))
- # Save this transformation to use in the next ICP
- self.trans_init = reg_p2l.transformation[:]
- # print(reg_p2l)
- # print(reg_p2l.transformation)
- self.draw_registration_result(source, target, reg_p2l.transformation)
- self.update_position(reg_p2l.transformation)
- # Add target back to queue
- self.queue.appendleft(target)
- # Update trajectory
- # Plotting is a bottleneck - only update the plot every 50 scans
- # so the program can keep up with incoming scans
- if self.scan_count % 10 == 0:
- # pc_stuff.update_trajectory()
- self.update_plots()
- def update_position(self, transform):
- # Extract rotation and translation from transformation matrix
- rot = transform[0:3, 0:3]
- transl = transform[0:3, 3]
- print(rot)
- print(transl)
- # print(self.current_pos)
- # Apply transformation to bot's current pose
- new_pos = rot.dot(self.current_pos.T) + transl.T
- self.current_pos = new_pos.T[:]
- # Add the new position to the trajectory
- self.trajectory['x'].append(new_pos[0])
- self.trajectory['y'].append(new_pos[1])
- def update_plots(self):
- pc_plot.plot(self.trajectory['x'], self.trajectory['y'])
- odom_plot.plot(self.odom_trajectory['x'], self.odom_trajectory['y'])
- plt.draw()
- plt.pause(0.001)
- plt.show()
- # Helper visualisation function
- # Source: http://www.open3d.org/docs/release/tutorial/pipelines/icp_registration.html
- def draw_registration_result(self, source, target, transformation):
- source_temp = copy.deepcopy(source)
- target_temp = copy.deepcopy(target)
- source_temp.paint_uniform_color([1, 0.706, 0])
- target_temp.paint_uniform_color([0, 0.651, 0.929])
- source_temp.transform(transformation)
- # o3d.visualization.draw_geometries([source_temp, target_temp]) # Blocking
- # Non-blocking visualisation
- self.vis.remove_geometry(self.source_geometry)
- self.vis.remove_geometry(self.target_geometry)
- self.source_geometry = source_temp
- self.target_geometry = target_temp
- self.vis.add_geometry(self.source_geometry)
- self.vis.add_geometry(self.target_geometry)
- self.vis.poll_events()
- self.vis.update_renderer()
- # Press the green button in the gutter to run the script.
- if __name__ == '__main__':
- rospy.init_node("laser2PointCloud", disable_signals=True)
- pc_stuff = SickDawgPCz()
- # odomOb = OdomShiz()
- pc_stuff.vis.create_window()
- # Initialise a plot for the trajectory
- matplotlib.use('TKAgg')
- fig,(pc_plot, odom_plot) = plt.subplots(1, 2)
- plt.ion()
- plt.show()
- pc_plot.axis([-3, 1.5, -3, 1.5])
- # odom_plot.axis([2, 6.5, -1.5, 3])
- odom_plot.axis([-5, 5, -5, 5])
- # plt.set(xlim=(-3, 1.5), ylim=(-3, 1.5))
- while True:
- try:
- pc_stuff.icp_registration()
- except KeyboardInterrupt as e:
- print(e)
- break
- pc_stuff.vis.destroy_window()
- rospy.spin()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement