Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import rclpy
- from rclpy.node import Node
- from std_msgs.msg import String
- from sensor_msgs.msg import NavSatFix, Imu
- from nav_msgs.msg import Odometry
- from geometry_msgs.msg import Twist
- import pynmea2
- import math
- import tf_transformations
- class NMEAToNavSatFixNode(Node):
- def __init__(self):
- super().__init__('nmea_to_navsat_creating_odom_updated')
- # Variáveis para odometria
- self.x_ = 0.0
- self.y_ = 0.0
- self.yaw_imu = 0.0 # Yaw integrado do IMU
- self.last_cog = None # COG do GPS em graus
- self.wheelbase = 5.950 # Distância entre os eixos em metros
- # Variáveis de velocidade e aceleração
- self.current_speed_kmh = 0.0
- self.current_speed = 0.0
- self.last_speed = 0.0
- self.last_time = self.get_clock().now()
- self.yaw_rate = 0.0
- # Inicializa o buffer para o filtro de média móvel
- self.filter_buffers = {
- 'speed': [],
- 'steering_angle': []
- }
- # Assinantes
- self.subscription_nmea = self.create_subscription(
- String,
- '/nmea_sentence_topic',
- self.nmea_listener_callback,
- 10)
- self.subscription_imu_raw = self.create_subscription(
- Imu,
- '/imu_raw',
- self.imu_raw_callback,
- 10)
- self.subscription_speed_steer = self.create_subscription(
- Twist,
- '/speed_steer_feedback',
- self.speed_steer_callback,
- 10)
- # Publicadores
- self.publisher_navsat = self.create_publisher(NavSatFix, '/real_gps/fix', 10)
- self.publisher_imu = self.create_publisher(Imu, '/imu', 10) # Publica o IMU corrigido
- self.odom_pub = self.create_publisher(Odometry, '/odom', 10)
- # Variáveis para armazenar as últimas mensagens
- self.latest_imu_raw_msg = None
- self.latest_speed_steer_msg = None
- # Variável para controlar a impressão dos logs
- self.create_timer(1.0, self.log_status)
- def nmea_listener_callback(self, msg):
- try:
- if msg.data.startswith('$GPRMC'):
- nmea_obj = pynmea2.parse(msg.data)
- # Verifica se os campos necessários estão presentes e válidos
- if nmea_obj.status != 'A':
- self.get_logger().warning('NMEA data invalid or not fixed.')
- return
- if not nmea_obj.latitude or not nmea_obj.longitude:
- self.get_logger().warning('NMEA data missing latitude or longitude.')
- return
- # Cria mensagem NavSatFix
- navsatfix_msg = NavSatFix()
- navsatfix_msg.header.stamp = self.get_clock().now().to_msg()
- navsatfix_msg.header.frame_id = 'base_footprint'
- navsatfix_msg.latitude = nmea_obj.latitude
- navsatfix_msg.longitude = nmea_obj.longitude
- navsatfix_msg.altitude = float('nan') # Altitude como NaN
- # Ajustar a covariância do GPS para refletir a precisão
- navsatfix_msg.position_covariance = [
- 0.01, 0.0, 0.0,
- 0.0, 0.01, 0.0,
- 0.0, 0.0, 9999.0
- ]
- navsatfix_msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_DIAGONAL_KNOWN
- # Publica a mensagem NavSatFix
- self.publisher_navsat.publish(navsatfix_msg)
- # Obtém COG e velocidade
- try:
- speed_knots = float(nmea_obj.spd_over_grnd)
- speed_kmh = speed_knots * 1.852 # Converte de nós para km/h
- cog = float(nmea_obj.true_course) # COG em graus
- # Atualiza o COG se a velocidade for acima do limiar
- if speed_kmh > 2.5:
- # Normaliza o COG para [0, 360)
- cog_normalized = cog % 360
- # Atualiza last_cog
- self.last_cog = cog_normalized
- # Converte COG para yaw em radianos
- yaw_rad = self.cog_to_yaw(cog_normalized)
- self.yaw_imu = yaw_rad # Atualiza yaw_imu com o COG do GPS
- except (ValueError, AttributeError):
- self.get_logger().warning('Invalid spd_over_grnd or true_course in NMEA data.')
- except pynmea2.ParseError as e:
- self.get_logger().error(f'Erro ao analisar a sentença NMEA: {msg.data}')
- except Exception as e:
- self.get_logger().error(f'Erro ao processar NMEA: {e}')
- def imu_raw_callback(self, imu_raw_msg):
- self.latest_imu_raw_msg = imu_raw_msg
- self.process_data()
- def speed_steer_callback(self, speed_steer_msg):
- self.latest_speed_steer_msg = speed_steer_msg
- self.process_data()
- def process_data(self):
- # Certifica-se de que recebeu ambas as mensagens
- if self.latest_imu_raw_msg is None or self.latest_speed_steer_msg is None:
- return
- try:
- current_time = self.get_clock().now()
- # Atualiza a velocidade atual
- self.current_speed_kmh = self.latest_speed_steer_msg.linear.x # Velocidade em km/h
- self.current_speed = self.current_speed_kmh / 3.6 # Converte km/h para m/s
- # Ângulo de direção (steering angle) em radianos
- steering_angle_rad = self.latest_speed_steer_msg.angular.z # Assumindo que está em radianos
- # Filtragem de média móvel na velocidade
- self.current_speed = self.apply_moving_average_filter(self.current_speed, 'speed')
- # Filtragem de média móvel no steering angle
- steering_angle_rad = self.apply_moving_average_filter(steering_angle_rad, 'steering_angle')
- # Cálculo do yaw rate baseado no modelo Ackermann
- if abs(steering_angle_rad) > 1e-6:
- tan_steering = math.tan(steering_angle_rad)
- if tan_steering != 0.0:
- turning_radius = self.wheelbase / tan_steering
- self.yaw_rate = self.current_speed / turning_radius
- else:
- self.yaw_rate = 0.0
- else:
- self.yaw_rate = 0.0
- # Calcula dt
- dt = (current_time - self.last_time).nanoseconds * 1e-9
- if dt <= 0 or dt > 1:
- dt = 0.02 # Valor padrão
- self.last_time = current_time
- # Integra o yaw rate para atualizar yaw_imu
- self.yaw_imu += self.yaw_rate * dt
- self.yaw_imu = self.normalize_angle(self.yaw_imu)
- # Calcula a aceleração linear
- if dt > 0:
- linear_acceleration = (self.current_speed - self.last_speed) / dt
- else:
- linear_acceleration = 0.0
- self.last_speed = self.current_speed
- # Atualiza o IMU (orientação)
- quaternion = tf_transformations.quaternion_from_euler(0, 0, self.yaw_imu)
- imu_msg = Imu()
- imu_msg.header.stamp = current_time.to_msg()
- imu_msg.header.frame_id = self.latest_imu_raw_msg.header.frame_id
- imu_msg.orientation.x = quaternion[0]
- imu_msg.orientation.y = quaternion[1]
- imu_msg.orientation.z = quaternion[2]
- imu_msg.orientation.w = quaternion[3]
- # Copia os dados de angular_velocity
- imu_msg.angular_velocity = self.latest_imu_raw_msg.angular_velocity
- imu_msg.angular_velocity_covariance = self.latest_imu_raw_msg.angular_velocity_covariance
- # Define a aceleração linear calculada
- imu_msg.linear_acceleration.x = linear_acceleration
- imu_msg.linear_acceleration.y = 0.0
- imu_msg.linear_acceleration.z = 0.0
- # Ajustar a covariância da orientação do IMU
- imu_msg.orientation_covariance = [
- 0.1, 0.0, 0.0,
- 0.0, 0.1, 0.0,
- 0.0, 0.0, 0.1
- ]
- # Defina angular_velocity e linear_acceleration como desconhecidas (-1)
- imu_msg.angular_velocity_covariance = [
- -1.0, 0.0, 0.0,
- 0.0, -1.0, 0.0,
- 0.0, 0.0, -1.0
- ]
- imu_msg.linear_acceleration_covariance = [
- -1.0, 0.0, 0.0,
- 0.0, -1.0, 0.0,
- 0.0, 0.0, -1.0
- ]
- # Publica a mensagem IMU corrigida
- self.publisher_imu.publish(imu_msg)
- # Calcula a mudança na posição usando yaw_imu
- x_dot = self.current_speed * math.cos(self.yaw_imu)
- y_dot = self.current_speed * math.sin(self.yaw_imu)
- self.x_ += x_dot * dt
- self.y_ += y_dot * dt
- # Cria mensagem de odometria
- odom_msg = Odometry()
- odom_msg.header.stamp = current_time.to_msg()
- odom_msg.header.frame_id = "odom"
- odom_msg.child_frame_id = "base_footprint"
- # Posição e orientação
- odom_msg.pose.pose.position.x = self.x_
- odom_msg.pose.pose.position.y = self.y_
- odom_msg.pose.pose.position.z = 0.0 # Movimentação 2D
- odom_msg.pose.pose.orientation = imu_msg.orientation
- # Velocidades linear e angular
- odom_msg.twist.twist.linear.x = self.current_speed
- odom_msg.twist.twist.angular.z = self.yaw_rate
- # Define as covariâncias para reduzir a influência da odometria
- odom_msg.pose.covariance = [
- 2.0, 0.0, 0.0, 0.0, 0.0, 0.0,
- 0.0, 2.0, 0.0, 0.0, 0.0, 0.0,
- 0.0, 0.0, 9999.0, 0.0, 0.0, 0.0,
- 0.0, 0.0, 0.0, 9999.0, 0.0, 0.0,
- 0.0, 0.0, 0.0, 0.0, 9999.0, 0.0,
- 0.0, 0.0, 0.0, 0.0, 0.0, 0.1
- ]
- odom_msg.twist.covariance = [
- 2.0, 0.0, 0.0, 0.0, 0.0, 0.0,
- 0.0, 2.0, 0.0, 0.0, 0.0, 0.0,
- 0.0, 0.0, 9999.0, 0.0, 0.0, 0.0,
- 0.0, 0.0, 0.0, 9999.0, 0.0, 0.0,
- 0.0, 0.0, 0.0, 0.0, 9999.0, 0.0,
- 0.0, 0.0, 0.0, 0.0, 0.0, 0.1
- ]
- # Publica a odometria
- self.odom_pub.publish(odom_msg)
- except Exception as e:
- self.get_logger().error(f'Erro em process_data: {e}')
- def apply_moving_average_filter(self, new_value, key, window_size=5):
- buffer = self.filter_buffers[key]
- buffer.append(new_value)
- if len(buffer) > window_size:
- buffer.pop(0)
- return sum(buffer) / len(buffer)
- def cog_to_yaw(self, cog_deg):
- yaw_rad = math.radians(90 - cog_deg)
- yaw_rad = self.normalize_angle(yaw_rad)
- return yaw_rad
- def normalize_angle(self, angle):
- return (angle + math.pi) % (2 * math.pi) - math.pi
- def log_status(self):
- self.get_logger().info(f'Orientação (Yaw): {math.degrees(self.yaw_imu):.2f} graus')
- self.get_logger().info(f'Velocidade: {self.current_speed_kmh:.2f} km/h')
- def main(args=None):
- rclpy.init(args=args)
- node = NMEAToNavSatFixNode()
- try:
- rclpy.spin(node)
- except KeyboardInterrupt:
- pass
- finally:
- node.destroy_node()
- rclpy.shutdown()
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement