7.5.4订阅图像记录错误
-
错误现象:运行仿真和导航后启动launch文件后,在准备记录图像处卡住,未武安成图像记录
patrol_node.py代码如下:import rclpy from geometry_msgs.msg import PoseStamped, Pose from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult import rclpy.time from tf2_ros import TransformListener, Buffer from tf_transformations import euler_from_quaternion, quaternion_from_euler from rclpy.duration import Duration from autopatrol_interfaces.srv import SpeachText from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 class PatrolNode(BasicNavigator): def __init__(self, node_name='patrol_node'): super().__init__(node_name) #导航相关定义 self.declare_parameter('initial_point', [0.0, 0.0, 0.0]) self.declare_parameter('target_points', [0.0, 0.0, 0.0, 1.0, 1.0, 1.57]) self.initial_point_ = self.get_parameter('initial_point').value self.target_points_ = self.get_parameter('target_points').value self.buffer_ = Buffer() self.listener_ = TransformListener(self.buffer_, self) #语音合成客户端 self.speach_client_ = self.create_client(SpeachText, 'speech_text') #订阅和保存图像相关定义 self.declare_parameter('image_save_path', '') self.image_save_path = self.get_parameter('image_save_path').value self.bridge = CvBridge() self.latest_image = None self.subscription_image = self.create_subscription( Image, '/camera_sensor/image_raw', self.image_callback, 10) def get_pose_by_xyyaw(self, x, y, yaw): ''' 通过x,y,yaw 合成PoseStamped ''' pose = PoseStamped() pose.header.frame_id = 'map' pose.pose.position.x = x pose.pose.position.y = y rotation_quat = quaternion_from_euler(0, 0, yaw) pose.pose.orientation.x = rotation_quat[0] pose.pose.orientation.y = rotation_quat[1] pose.pose.orientation.z = rotation_quat[2] pose.pose.orientation.w = rotation_quat[3] return pose def init_robot_pose(self): ''' 初始化机器人位姿 ''' #从参数获取初始化点 self.initial_point_ = self.get_parameter('initial_point').value #合成位姿并进行初始化 self.setInitialPose(self.get_pose_by_xyyaw( self.initial_point_[0], self.initial_point_[1], self.initial_point_[2] )) #等待直到导航激活 self.waitUntilNav2Active() def get_targat_points(self): ''' 通过参数值获取目标点集合 ''' points = [] self.target_points_= self.get_parameter('target_points').value for index in range(int(len(self.target_points_)/3)): x = self.target_points_[index*3] y = self.target_points_[index*3+1] yaw = self.target_points_[index*3+2] points.append([x, y, yaw]) self.get_logger().info( f'获取到目标点:{index}->({x},{y},{yaw})') return points def nav_to_pose(self, target_pose): ''' 导航到指定位资 ''' self.waitUntilNav2Active() result = self.goToPose(target_pose) while not self.isTaskComplete(): feedback = self.getFeedback() if feedback: self.get_logger().info(f'预计:{Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9} s 后到达') #最终结果判断 result = self.getResult() if result == TaskResult.SUCCEEDED: self.get_logger().info('导航结果:成功') elif result == TaskResult.CANCELED: self.get_logger().warn('导航结果:被取消') elif result == TaskResult.FAILED: self.get_logger().error('导航结果:失败') else: self.get_logger().error('导航结果:返回状态无效') def get_current_pose(self): ''' 通过TF获取当前位姿 ''' while rclpy.ok(): try: tf = self.buffer_.lookup_transform( 'map', 'base_footprint', rclpy.time.Time(seconds=0), rclpy.time.Duration(seconds=1)) transform = tf.transform rotation_euler = euler_from_quaternion([ transform.rotation.x, transform.rotation.y, transform.rotation.z, transform.rotation.w,]) self.get_logger().info( f'平移:{transform.translation},旋转四元数:{transform.rotation}:旋转欧拉角:{rotation_euler}') except Exception as e: self.get_logger().warn(f'不能获取坐标变换。原因:{e}') def speach_text(self, text): """ 调用服务播放语音 """ while not self.speach_client_.wait_for_service(timeout_sec=1.0): self.get_logger().info('语音合成服务未上线,等待中......') request = SpeachText.Request() request.text = text future = self.speach_client_.call_async(request) rclpy.spin_until_future_complete(self, future) if future.result() is not None: result = future.result().result if result: self.get_logger().info(f'语音合成成功:{text}') else: self.get_logger().warn(f'语音合成失败:{text}') else: self.get_logger().warn('语言合成请求失败') def image_callback(self, msg): """ 将最新的消息放到latest_image中 """ self.latest_image = msg def record_image(self): """ 记录图像 """ if self.latest_image is not None: pose = self.get_current_pose() cv_image = self.bridge.imgmsg_to_cv2(self.latest_image) cv2.imwrite(f'{self.image_save_path}image_{pose.translation.x:3.2f}_{pose.translation.y:3.2f}.png', cv_image) def main(): rclpy.init() patrol = PatrolNode() patrol.speach_text(text='正在初始化位置') patrol.init_robot_pose() patrol.speach_text(text='位置初始化完成') while rclpy.ok(): points = patrol.get_targat_points() for point in points: x, y, yaw = point[0], point[1], point[2] #导航到目标点 target_pose = patrol.get_pose_by_xyyaw(x, y, yaw) patrol.speach_text(text=f'准备前往目标点{x},{y}') patrol.nav_to_pose(target_pose) #记录图像 patrol.speach_text(text=f"已到达目标点{x},{y},准备记录图像") patrol.record_image() patrol.speach_text(text=f"图像记录完成") rclpy.shutdown()
仿真gazebo_sim.launch.py代码:
import launch import launch_ros from ament_index_python.packages import get_package_share_directory from launch.launch_description_sources import PythonLaunchDescriptionSource import launch_ros.parameter_descriptions def generate_launch_description(): robot_name_in_model = "fishbot" urdf_tutorial_path = get_package_share_directory('fishbot_description') default_model_path = urdf_tutorial_path + '/urdf/fishbot/fishbot.urdf.xacro' default_world_path = urdf_tutorial_path + '/world/custom_room.world' print(str(default_model_path)) action_declare_arg_mode_path = launch.actions.DeclareLaunchArgument( name='model', default_value=str(default_model_path), description='URDF 的绝对路径') robot_description = launch_ros.parameter_descriptions.ParameterValue( launch.substitutions.Command( ['xacro ', launch.substitutions.LaunchConfiguration('model')]), value_type=str) robot_state_publisher_node = launch_ros.actions.Node( package='robot_state_publisher', executable='robot_state_publisher', parameters=[{'robot_description': robot_description}]) launch_gazebo = launch.actions.IncludeLaunchDescription( PythonLaunchDescriptionSource([get_package_share_directory('gazebo_ros'), '/launch', '/gazebo.launch.py']), launch_arguments=[('world', default_world_path), ('verbose', 'true')]) spawn_entity_node = launch_ros.actions.Node( package='gazebo_ros', executable='spawn_entity.py', arguments=['-topic', '/robot_description', '-entity', robot_name_in_model, ]) # 加载并激活fishbot_joint_state_broadcaster控制器 load_joint_state_controller = launch.actions.ExecuteProcess( cmd=['ros2', 'control', 'load_controller', '--set-state', 'active', 'fishbot_joint_state_broadcaster'], output='screen' ) load_fishbot_effort_controller = launch.actions.ExecuteProcess( cmd=['ros2', 'control', 'load_controller', '--set-state', 'active', 'fishbot_effort_controller'], output='screen' ) load_fishbot_diff_drive_controller = launch.actions.ExecuteProcess( cmd=['ros2', 'control', 'load_controller', '--set-state', 'active', 'fishbot_diff_drive_controller'], output='screen' ) return launch.LaunchDescription([ launch.actions.RegisterEventHandler( event_handler=launch.event_handlers.OnProcessExit( target_action=spawn_entity_node, on_exit=[load_joint_state_controller], ) ), launch.actions.RegisterEventHandler( event_handler=launch.event_handlers.OnProcessExit( target_action=load_joint_state_controller, on_exit=[load_fishbot_effort_controller], ) ), launch.actions.RegisterEventHandler( event_handler=launch.event_handlers.OnProcessExit( target_action=load_fishbot_effort_controller, on_exit=[load_fishbot_diff_drive_controller], ) ), action_declare_arg_mode_path, robot_state_publisher_node, launch_gazebo, spawn_entity_node ])
导航navigation2.launch.py代码:
import os import launch import launch_ros from ament_index_python.packages import get_package_share_directory from launch.launch_description_sources import PythonLaunchDescriptionSource def generate_launch_description(): #获取并链接默认路径 fishbot_navigation2_dir = get_package_share_directory( 'fishbot_navigation2') nav2_bringup_dir = get_package_share_directory('nav2_bringup') rviz_config_dir = os.path.join( nav2_bringup_dir, 'rviz', 'nav2_default_view.rviz') #创建launch配置 use_sim_time = launch.substitutions.LaunchConfiguration( 'use_sim_time', default='true') map_yaml_path = launch.substitutions.LaunchConfiguration( 'map',default=os.path.join(fishbot_navigation2_dir, 'maps', 'room.yaml')) nav2_param_path = launch.substitutions.LaunchConfiguration( 'params_file', default=os.path.join(fishbot_navigation2_dir, 'config', 'nav2_params.yaml')) return launch.LaunchDescription([ #声明新的launch参数 launch.actions.DeclareLaunchArgument('use_sim_time', default_value=use_sim_time, description='Use simulation(Gazebo) clock if true'), launch.actions.DeclareLaunchArgument('map', default_value=map_yaml_path, description='Full path to map file to load'), launch.actions.DeclareLaunchArgument('params_file', default_value=nav2_param_path, description='Full path to param file to load'), launch.actions.IncludeLaunchDescription( PythonLaunchDescriptionSource( [nav2_bringup_dir, '/launch', '/bringup_launch.py']), #使用launch参数替换原有参数 launch_arguments={ 'map': map_yaml_path, 'use_sim_time': use_sim_time, 'params_file': nav2_param_path}.items(), ), launch_ros.actions.Node( package='rviz2', executable='rviz2', name='rviz2', arguments=['-d', rviz_config_dir], parameters=[{'use_sim_time': use_sim_time}], output='screen'), ])
-
@43996173 在 7.5.4订阅图像记录错误 中说:
def get_current_pose(self):
'''
通过TF获取当前位姿
'''
while rclpy.ok():
try:
tf = self.buffer_.lookup_transform(
'map', 'base_footprint',
rclpy.time.Time(seconds=0),
rclpy.time.Duration(seconds=1))
transform = tf.transform
rotation_euler = euler_from_quaternion([
transform.rotation.x,
transform.rotation.y,
transform.rotation.z,
transform.rotation.w,])
self.get_logger().info(
f'平移:{transform.translation},旋转四元数:{transform.rotation}:旋转欧拉角:{rotation_euler}')except Exception as e: self.get_logger().warn(f'不能获取坐标变换。原因:{e}')
这个 get_current_pose 有四循环,检查是否是因为这里获取不成功的原因,导航有没有异常,是否可以正常获取姿态
-
@小鱼 试试我这个代码
import rclpy from geometry_msgs.msg import PoseStamped, Pose from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult from tf2_ros import TransformListener, Buffer from tf_transformations import euler_from_quaternion, quaternion_from_euler from rclpy.duration import Duration # 添加服务接口 from autopatrol_interfaces.srv import SpeachText from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 class PatrolNode(BasicNavigator): def __init__(self, node_name='patrol_node'): super().__init__(node_name) # 导航相关定义 self.declare_parameter('initial_point', [0.0, 0.0, 0.0]) self.declare_parameter('target_points', [0.0, 0.0, 0.0, 1.0, 1.0, 1.57]) self.initial_point_ = self.get_parameter('initial_point').value self.target_points_ = self.get_parameter('target_points').value # 实时位置获取 TF 相关定义 self.buffer_ = Buffer() self.listener_ = TransformListener(self.buffer_, self) self.speach_client_ = self.create_client(SpeachText, 'speech_text') # 订阅与保存图像相关定义 self.declare_parameter('image_save_path', '') self.image_save_path = self.get_parameter('image_save_path').value self.bridge = CvBridge() self.latest_image = None self.subscription_image = self.create_subscription( Image, '/camera_sensor/image_raw', self.image_callback, 10) def image_callback(self, msg): """ 将最新的消息放到 latest_image 中 """ self.latest_image = msg def record_image(self): """ 记录图像 """ if self.latest_image is not None: pose = self.get_current_pose() cv_image = self.bridge.imgmsg_to_cv2(self.latest_image) cv2.imwrite(f'{self.image_save_path}image_{pose.translation.x:3.2f}_{pose.translation.y:3.2f}.png', cv_image) def speach_text(self, text): """ 调用服务播放语音 """ while not self.speach_client_.wait_for_service(timeout_sec=1.0): self.get_logger().info('语合成服务未上线,等待中。。。') request = SpeachText.Request() request.text = text future = self.speach_client_.call_async(request) rclpy.spin_until_future_complete(self, future) if future.result() is not None: result = future.result().result if result: self.get_logger().info(f'语音合成成功:{text}') else: self.get_logger().warn(f'语音合成失败:{text}') else: self.get_logger().warn('语音合成服务请求失败') def get_pose_by_xyyaw(self, x, y, yaw): """ 通过 x,y,yaw 合成 PoseStamped """ pose = PoseStamped() pose.header.frame_id = 'map' pose.pose.position.x = x pose.pose.position.y = y rotation_quat = quaternion_from_euler(0, 0, yaw) pose.pose.orientation.x = rotation_quat[0] pose.pose.orientation.y = rotation_quat[1] pose.pose.orientation.z = rotation_quat[2] pose.pose.orientation.w = rotation_quat[3] return pose def init_robot_pose(self): """ 初始化机器人位姿 """ # 从参数获取初始化点 self.initial_point_ = self.get_parameter('initial_point').value # 合成位姿并进行初始化 self.setInitialPose(self.get_pose_by_xyyaw( self.initial_point_[0], self.initial_point_[1], self.initial_point_[2])) # 等待直到导航激活 self.waitUntilNav2Active() def get_target_points(self): """ 通过参数值获取目标点集合 """ points = [] self.target_points_ = self.get_parameter('target_points').value for index in range(int(len(self.target_points_)/3)): x = self.target_points_[index*3] y = self.target_points_[index*3+1] yaw = self.target_points_[index*3+2] points.append([x, y, yaw]) self.get_logger().info(f'获取到目标点: {index}->({x},{y},{yaw})') return points def nav_to_pose(self, target_pose): """ 导航到指定位姿 """ self.waitUntilNav2Active() result = self.goToPose(target_pose) while not self.isTaskComplete(): feedback = self.getFeedback() if feedback: self.get_logger().info(f'预计: {Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9} s 后到达') # 最终结果判断 result = self.getResult() if result == TaskResult.SUCCEEDED: self.get_logger().info('导航结果:成功') elif result == TaskResult.CANCELED: self.get_logger().warn('导航结果:被取消') elif result == TaskResult.FAILED: self.get_logger().error('导航结果:失败') else: self.get_logger().error('导航结果:返回状态无效') def get_current_pose(self): """ 通过TF获取当前位姿 """ while rclpy.ok(): try: tf = self.buffer_.lookup_transform( 'map', 'base_footprint', rclpy.time.Time(seconds=0), rclpy.time.Duration(seconds=1)) transform = tf.transform rotation_euler = euler_from_quaternion([ transform.rotation.x, transform.rotation.y, transform.rotation.z, transform.rotation.w ]) self.get_logger().info( f'平移:{transform.translation},旋转四元数:{transform.rotation}:旋转欧拉角:{rotation_euler}') return transform except Exception as e: self.get_logger().warn(f'不能够获取坐标变换,原因: {str(e)}') def main(): rclpy.init() patrol = PatrolNode() patrol.speach_text(text='正在初始化位置') patrol.init_robot_pose() patrol.speach_text(text='位置初始化完成') while rclpy.ok(): for point in patrol.get_target_points(): x, y, yaw = point[0], point[1], point[2] # 导航到目标点 target_pose = patrol.get_pose_by_xyyaw(x, y, yaw) patrol.speach_text(text=f'准备前往目标点{x},{y}') patrol.nav_to_pose(target_pose) patrol.speach_text(text=f"已到达目标点{x},{y},准备记录图像") patrol.record_image() patrol.speach_text(text=f"图像记录完成") rclpy.shutdown() if __name__ == '__main__': main()