Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Author Aled
- import numpy as np
- import rospy
- from helper.array_help import convolve_func
- from Probability_Models.sensor_model import SensorModel
- from scipy.stats import multivariate_normal
- from nav_msgs.msg import OccupancyGrid
- from geometry_msgs.msg import PoseStamped
- from time import sleep
- class ProbabilityMap:
- """
- For generating a probability map of the area as to where a person could be, updating the probabilities of the map will
- cause the map to be published to /prob_map
- NOT DESIGNED FOR CONCURRENCY
- """
- def __init__(self, occupancy_map, occupancy_map_info, publish=False):
- """Author Aled
- :param occupancy_map: a 2D array of a map with -1 being unseen 0 being no wall and 1 being a wall
- :type occupancy_map: numpy.ndarray
- :param publish: if set to true the object will publish the map to "prob_map" on every single update
- :type publish: bool
- """
- if publish:
- self._prob_map_msg = OccupancyGrid()
- self._prob_map_msg.info = occupancy_map_info
- self.publisher = rospy.Publisher('/prob_map', OccupancyGrid, queue_size=1)
- sleep(10)
- else:
- self.publisher = None
- print "initialising sensor model"
- self.sensor_model = SensorModel()
- # this should be refactored so that the actual map is passed in as a whole as this would
- # make things easier
- actual_map = OccupancyGrid()
- actual_map.info = occupancy_map_info
- actual_map.data = list(occupancy_map.reshape(-1,))
- self.sensor_model.set_map(actual_map)
- print "generating map"
- self.probability_map = self._generate_prob_map(occupancy_map)
- print "map generated"
- self._publish_map()
- # this might be a performance bottleneck
- def _normalise(self):
- """Author Aled
- normalises the probability map so that all the probabilities sum to 1"""
- total = (self.probability_map != - 1).sum()
- f = np.vectorize(lambda x: (x * 100) / total if x != -1 else x)
- # I don't know how efficient this is for memory and the garbage collector may be doing a lot of work
- self.probability_map = f(self.probability_map)
- def _generate_prob_map(self, occupancy_map):
- """Author Aled
- Generate's a probability map and populates it
- :param occupancy_map: a 2D array of a map with -1 being unseen 0 being no wall and 1 being a wall
- :type occupancy_map: numpy.ndarray
- :return: a probability map
- :type: numpy.ndarray
- """
- probability_map = occupancy_map.copy()
- # replaces any blocks which are a wall or unknown with 0 probability any other are set to 1.0
- probability_map = convolve_func(probability_map, wall_mask, (19, 19))
- return probability_map
- def _publish_map(self):
- """Author Aled
- This simply publishes the probability map to /prob_map
- :return:
- """
- if self.publisher is not None:
- self._prob_map_msg.data = self.probability_map.flatten().astype(int).tolist()
- self._prob_map_msg.header.frame_id = "map"
- #print self._prob_map_msg.data
- self.publisher.publish(self._prob_map_msg)
- print "I've published the map!!!"
- def update(self,scan,robot_pose):
- """Author Aled
- :param scan:
- :param robot_pose:
- :return:
- """
- self.sensor_model_update(scan,robot_pose)
- self._publish_map()
- def _add_gaussian(self,x,y,negative=False,filter_size=9,scale=50, cov=2.0):
- """Author Aled
- It will add a gaussian filter to the given x and y co-ordinates. It's intended use is for when the laser
- find an unexpected object at predicted co-ordinates x,y. This should be used in conjunction with the camera.
- if negative is set to true then the gaussian will be subtracted rather than added to the map
- :param x:
- :param y:
- :return:
- """
- # I should probably only calculate the filter once to save extra computation
- # the filter size should always be an odd number
- filter_ = np.linspace(0, filter_size, filter_size, endpoint=False)
- filter_ = multivariate_normal.pdf(filter_, mean=(filter_size/2), cov=cov).reshape(1,filter_size)
- filter_ = filter_.T.dot( filter_)
- filter_ *= scale
- half_filter = int(filter_size / 2)
- for i, array in enumerate(filter_):
- for j, p in enumerate(array):
- # if it is out the map then do not give it a probability
- if self.probability_map[y + (i - half_filter)][x + (j - half_filter)] == -1:
- continue
- #print "BEFORE"
- #print self.probability_map[y + (i - half_filter)][x +(j - half_filter)]
- #print "AFTER"
- if negative:
- self.probability_map[y + (i - half_filter)][x +(j - half_filter)] -= p
- if self.probability_map[y + (i - half_filter)][x +(j - half_filter)] < 0:
- self.probability_map[y + (i - half_filter)][x +(j - half_filter)] = 0
- else:
- self.probability_map[y + (i - half_filter)][x +(j - half_filter)] += p
- if self.probability_map[y + (i - half_filter)][x +(j - half_filter)] > 100:
- self.probability_map[y + (i - half_filter)][x +(j - half_filter)] = 100
- #print self.probability_map[y + (i - half_filter)][x +(j - half_filter)]
- def get_highest_probability(self):
- return self.probability_map.max()
- def get_most_likely_pose(self):
- # should probably apply gaussian smoothing here
- (y,x) = get_most_likely(self.probability_map)
- map_width = self._prob_map_msg.info.width
- map_height = self._prob_map_msg.info.height
- map_origin_x = self._prob_map_msg.info.origin.position.x
- map_origin_y = self._prob_map_msg.info.origin.position.y
- map_resolution = self._prob_map_msg.info.resolution
- ox = (x * map_resolution) + map_origin_x
- oy = (y * map_resolution) + map_origin_y
- #ox = ((x - (map_width/2) -0.5) * map_resolution) + map_origin_x
- #oy = ((y - (map_height/2) -0.5) * map_resolution ) + map_origin_y
- p = PoseStamped()
- p.header.frame_id = "map"
- p.pose.position.x = ox
- p.pose.position.y = oy
- return p
- # this is a performance bottleneck
- def apply_gausian_smoothing(self):
- pass
- def sensor_model_update(self, scan, robot_pose):
- """
- Author: Aled
- The probability map will be updated with gaussians added to co-ordinates where there is an unexpected obstacle
- :param scan:
- :param robot_pose:
- :return:
- """
- DECAY_RATE = 5
- # reduce the probability of places seen by the laser first by way of a decay rate
- laser_view = self.sensor_model.get_laser_area(scan,robot_pose)
- for point_x, point_y in laser_view:
- self._add_gaussian(point_x,point_y,negative=True,scale = 25)
- collision_points = self.sensor_model.get_collision_points(scan, robot_pose)
- print collision_points
- for (x,y) in collision_points:
- self._add_gaussian(x,y,scale = 200)
- def get_most_likely(a):
- """AUthor Xiwen"""
- target=[]
- idx = np.argmax(a, axis=1)
- pro = np.amax(a, axis=1)
- for x in range(len(pro)):
- if pro[x]==np.amax(pro):
- y=idx[x]
- target.append([x,y])
- return target[0]
- def numpy_conv(map,filter,padding="VALID"):
- new_map=init_map(map)
- H, W = new_map.shape
- filter_size = filter.shape[0]
- # default np.floor
- filter_center = int(filter_size / 2.0)
- filter_center_ceil = int(np.ceil(filter_size / 2.0))
- # SAME:input=output
- if padding == "SAME":
- padding_new_map = np.zeros([H + filter_center_ceil, W + filter_center_ceil], np.float32)
- padding_new_map[filter_center:-filter_center, filter_center:-filter_center] = inputs
- new_map = padding_new_map
- #defind a equal size sapce as the new_map,the 0 outside will be cut off later
- result = np.zeros((new_map.shape))
- #update input,change HW
- H, W = new_map.shape
- #print("new size",H,W)
- for r in range(filter_center,H -filter_center):
- for c in range(filter_center,W -filter_center ):
- cur_input = new_map[r -filter_center :r +filter_center_ceil,
- c - filter_center:c + filter_center_ceil]
- cur_output = cur_input * filter
- conv_sum = np.sum(cur_output)
- result[r, c] = conv_sum
- # cut off the 0 outside
- final_result = result[filter_center:result.shape[0] - filter_center,
- filter_center:result.shape[1] -filter_center]
- returnfinal_result
- def wall_mask(a):
- """Author Aled
- Applies a mask to try and exclude areas that are too close to a wall
- Only works straight after you've converted the occupancy map to a probability map
- :param a: 2D sub array to apply a mask to
- :type a: np.ndarray
- :return: a single float value of probability
- :type: float
- """
- if -1 in a:
- return -1
- elif a.max() >= 50:
- return -1
- else:
- return 1
- def wall_to_person_prob(x):
- """Author Aled
- maps a value in the occupancy map to the probability of a person being there
- :param x: probability of wall
- :type x: float
- :return: probability of a person being there
- :type float
- """
- if x == -1:
- return -1
- elif x < 50:
- return 1.0
- else:
- return 0.0
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement