鱼香ROS社区
    • 版块
    • 最新
    • 未解决
    • 已解决
    • 群组
    • 注册
    • 登录
    紧急通知:禁止一切关于政治&VPN翻墙等话题,发现相关帖子会立马删除封号
    提问前必看的发帖注意事项: 社区问答规则(小鱼个人)更新 | 高质量帖子发布指南

    7.5.4订阅图像记录错误

    已定时 已固定 已锁定 已移动
    动手学ROS2
    订阅图像记录错误 导航卡住
    2
    3
    335
    正在加载更多帖子
    • 从旧到新
    • 从新到旧
    • 最多赞同
    回复
    • 在新帖中回复
    登录后回复
    此主题已被删除。只有拥有主题管理权限的用户可以查看。
    • 4
      开心
      最后由 编辑

      错误现象:运行仿真和导航后启动launch文件后,在准备记录图像处卡住,未武安成图像记录
      patrol_node.py代码如下:

      import rclpy
      from geometry_msgs.msg import PoseStamped, Pose
      from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult
      import rclpy.time
      from tf2_ros import TransformListener, Buffer
      from tf_transformations import euler_from_quaternion, quaternion_from_euler
      from rclpy.duration import Duration
      from autopatrol_interfaces.srv import SpeachText
      from sensor_msgs.msg import Image
      from cv_bridge import CvBridge
      import cv2
      
      class PatrolNode(BasicNavigator):
          def __init__(self, node_name='patrol_node'):
              super().__init__(node_name)
              #导航相关定义
              self.declare_parameter('initial_point', [0.0, 0.0, 0.0])
              self.declare_parameter('target_points', [0.0, 0.0, 0.0, 1.0, 1.0, 1.57])
              self.initial_point_ = self.get_parameter('initial_point').value
              self.target_points_ = self.get_parameter('target_points').value
              self.buffer_ = Buffer()
              self.listener_ = TransformListener(self.buffer_, self)
              #语音合成客户端
              self.speach_client_ = self.create_client(SpeachText, 'speech_text')
              #订阅和保存图像相关定义
              self.declare_parameter('image_save_path', '')
              self.image_save_path = self.get_parameter('image_save_path').value
              self.bridge = CvBridge()
              self.latest_image = None
              self.subscription_image = self.create_subscription(
                  Image, '/camera_sensor/image_raw', self.image_callback, 10)
      
          def get_pose_by_xyyaw(self, x, y, yaw):
              '''
              通过x,y,yaw 合成PoseStamped
              '''
              pose = PoseStamped()
              pose.header.frame_id = 'map'
              pose.pose.position.x = x
              pose.pose.position.y = y
              rotation_quat = quaternion_from_euler(0, 0, yaw)
              pose.pose.orientation.x = rotation_quat[0]
              pose.pose.orientation.y = rotation_quat[1]
              pose.pose.orientation.z = rotation_quat[2]
              pose.pose.orientation.w = rotation_quat[3]
              return pose
      
      
          def init_robot_pose(self):
              '''
              初始化机器人位姿
              '''
              #从参数获取初始化点
              self.initial_point_ = self.get_parameter('initial_point').value
              #合成位姿并进行初始化
              self.setInitialPose(self.get_pose_by_xyyaw(
                  self.initial_point_[0], self.initial_point_[1], self.initial_point_[2]
              ))
              #等待直到导航激活
              self.waitUntilNav2Active()
          def get_targat_points(self):
              '''
              通过参数值获取目标点集合
              '''
              points = []
              self.target_points_= self.get_parameter('target_points').value
              for index in range(int(len(self.target_points_)/3)):
                  x = self.target_points_[index*3]
                  y = self.target_points_[index*3+1]
                  yaw = self.target_points_[index*3+2]
                  points.append([x, y, yaw])
                  self.get_logger().info(
                      f'获取到目标点:{index}->({x},{y},{yaw})')
              return points
          def nav_to_pose(self, target_pose):
              '''
              导航到指定位资
              '''
              self.waitUntilNav2Active()
              result = self.goToPose(target_pose)
              while not self.isTaskComplete():
                  feedback = self.getFeedback()
                  if feedback:
                      self.get_logger().info(f'预计:{Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9} s 后到达')
              #最终结果判断
              result = self.getResult()
              if result == TaskResult.SUCCEEDED:
                  self.get_logger().info('导航结果:成功')
              elif result == TaskResult.CANCELED:
                  self.get_logger().warn('导航结果:被取消')
              elif result == TaskResult.FAILED:
                  self.get_logger().error('导航结果:失败')
              else:
                  self.get_logger().error('导航结果:返回状态无效')
          def get_current_pose(self):
              '''
              通过TF获取当前位姿
              '''
              while rclpy.ok():
                  try:
                      tf = self.buffer_.lookup_transform(
                          'map', 'base_footprint', 
                          rclpy.time.Time(seconds=0), 
                          rclpy.time.Duration(seconds=1))
                      transform = tf.transform
                      rotation_euler = euler_from_quaternion([
                          transform.rotation.x,
                          transform.rotation.y,
                          transform.rotation.z,
                          transform.rotation.w,])
                      self.get_logger().info(
      f'平移:{transform.translation},旋转四元数:{transform.rotation}:旋转欧拉角:{rotation_euler}')
      
                  except Exception as e:
                      self.get_logger().warn(f'不能获取坐标变换。原因:{e}')
          def speach_text(self, text):
              """
              调用服务播放语音
              """
              while not self.speach_client_.wait_for_service(timeout_sec=1.0):
                  self.get_logger().info('语音合成服务未上线,等待中......')
              request = SpeachText.Request()
              request.text = text
              future = self.speach_client_.call_async(request)
              rclpy.spin_until_future_complete(self, future)
              if future.result() is not None:
                  result = future.result().result
                  if result:
                      self.get_logger().info(f'语音合成成功:{text}')
                  else:
                      self.get_logger().warn(f'语音合成失败:{text}')
              else:
                  self.get_logger().warn('语言合成请求失败')
          
      
      
          def image_callback(self, msg):
              """
              将最新的消息放到latest_image中
              """
              self.latest_image = msg
      
          def record_image(self):
              """
              记录图像
              """
              if self.latest_image is not None:
                  pose = self.get_current_pose()
                  cv_image = self.bridge.imgmsg_to_cv2(self.latest_image)
              cv2.imwrite(f'{self.image_save_path}image_{pose.translation.x:3.2f}_{pose.translation.y:3.2f}.png', cv_image)
      
      
      def main():
          rclpy.init()
          patrol = PatrolNode()
          patrol.speach_text(text='正在初始化位置')
          patrol.init_robot_pose()
          patrol.speach_text(text='位置初始化完成')
      
          while rclpy.ok():
              points = patrol.get_targat_points()
              for point in points:
                  x, y, yaw = point[0], point[1], point[2]
                  #导航到目标点
                  target_pose = patrol.get_pose_by_xyyaw(x, y, yaw)
                  patrol.speach_text(text=f'准备前往目标点{x},{y}')
                  patrol.nav_to_pose(target_pose)
                  #记录图像
                  patrol.speach_text(text=f"已到达目标点{x},{y},准备记录图像")
                  patrol.record_image()
                  patrol.speach_text(text=f"图像记录完成")
          rclpy.shutdown()
      

      仿真gazebo_sim.launch.py代码:

      import launch
      import launch_ros
      from ament_index_python.packages import get_package_share_directory
      from launch.launch_description_sources import PythonLaunchDescriptionSource
      import launch_ros.parameter_descriptions
      
      def generate_launch_description():
          robot_name_in_model = "fishbot"
          urdf_tutorial_path = get_package_share_directory('fishbot_description')
          default_model_path = urdf_tutorial_path + '/urdf/fishbot/fishbot.urdf.xacro'
          default_world_path = urdf_tutorial_path + '/world/custom_room.world'
          print(str(default_model_path))
          action_declare_arg_mode_path = launch.actions.DeclareLaunchArgument(
              name='model', default_value=str(default_model_path), description='URDF 的绝对路径')
          robot_description = launch_ros.parameter_descriptions.ParameterValue(
              launch.substitutions.Command(
                  ['xacro ', launch.substitutions.LaunchConfiguration('model')]), value_type=str)
          robot_state_publisher_node = launch_ros.actions.Node(
              package='robot_state_publisher', 
              executable='robot_state_publisher', 
              parameters=[{'robot_description': robot_description}])
          launch_gazebo = launch.actions.IncludeLaunchDescription(
              PythonLaunchDescriptionSource([get_package_share_directory('gazebo_ros'), '/launch', 
                                             '/gazebo.launch.py']),
              launch_arguments=[('world', default_world_path), ('verbose', 'true')])
          spawn_entity_node = launch_ros.actions.Node(
              package='gazebo_ros', 
              executable='spawn_entity.py', 
              arguments=['-topic', '/robot_description', 
                         '-entity', robot_name_in_model, ])
          # 加载并激活fishbot_joint_state_broadcaster控制器
          load_joint_state_controller = launch.actions.ExecuteProcess(
              cmd=['ros2', 'control', 'load_controller', '--set-state', 'active', 
                   'fishbot_joint_state_broadcaster'], 
              output='screen'
          )
          load_fishbot_effort_controller = launch.actions.ExecuteProcess(
              cmd=['ros2', 'control', 'load_controller', '--set-state', 'active', 
                   'fishbot_effort_controller'], output='screen' )
          load_fishbot_diff_drive_controller = launch.actions.ExecuteProcess(
              cmd=['ros2', 'control', 'load_controller', '--set-state', 'active', 
                   'fishbot_diff_drive_controller'], output='screen' )
          
          return launch.LaunchDescription([
              launch.actions.RegisterEventHandler(
                  event_handler=launch.event_handlers.OnProcessExit(
                      target_action=spawn_entity_node,
                      on_exit=[load_joint_state_controller],
                  )
              ),
              
              launch.actions.RegisterEventHandler(
                  event_handler=launch.event_handlers.OnProcessExit(
                      target_action=load_joint_state_controller,
                      on_exit=[load_fishbot_effort_controller],
                  )
              ),
              
              launch.actions.RegisterEventHandler(
                  event_handler=launch.event_handlers.OnProcessExit(
                      target_action=load_fishbot_effort_controller,
                      on_exit=[load_fishbot_diff_drive_controller],
                  )
              ),
              action_declare_arg_mode_path, 
              robot_state_publisher_node, 
              launch_gazebo,
              spawn_entity_node
              
          ])
      

      导航navigation2.launch.py代码:

      import os
      import launch
      import launch_ros
      from ament_index_python.packages import get_package_share_directory
      from launch.launch_description_sources import PythonLaunchDescriptionSource
      
      
      def generate_launch_description():
          #获取并链接默认路径
          fishbot_navigation2_dir = get_package_share_directory(
              'fishbot_navigation2')
          nav2_bringup_dir = get_package_share_directory('nav2_bringup')
          rviz_config_dir = os.path.join(
              nav2_bringup_dir, 'rviz', 'nav2_default_view.rviz')
          #创建launch配置
          use_sim_time = launch.substitutions.LaunchConfiguration(
              'use_sim_time', default='true')
          map_yaml_path = launch.substitutions.LaunchConfiguration(
              'map',default=os.path.join(fishbot_navigation2_dir, 'maps', 
                                         'room.yaml'))
          nav2_param_path = launch.substitutions.LaunchConfiguration(
              'params_file', default=os.path.join(fishbot_navigation2_dir, 
                                                  'config', 'nav2_params.yaml'))
          
      
          return launch.LaunchDescription([
              #声明新的launch参数
              launch.actions.DeclareLaunchArgument('use_sim_time', 
                   default_value=use_sim_time, 
                   description='Use simulation(Gazebo) clock if true'),
              launch.actions.DeclareLaunchArgument('map', 
                  default_value=map_yaml_path, 
                  description='Full path to map file to load'),
              launch.actions.DeclareLaunchArgument('params_file', 
                  default_value=nav2_param_path, 
                  description='Full path to param file to load'),
              launch.actions.IncludeLaunchDescription(
                  PythonLaunchDescriptionSource(
                      [nav2_bringup_dir, '/launch', '/bringup_launch.py']),
                  #使用launch参数替换原有参数
                  launch_arguments={
                      'map': map_yaml_path, 
                      'use_sim_time': use_sim_time, 
                      'params_file': nav2_param_path}.items(), 
              ),
              launch_ros.actions.Node(
                  package='rviz2',
                  executable='rviz2',
                  name='rviz2',
                  arguments=['-d', rviz_config_dir],
                  parameters=[{'use_sim_time': use_sim_time}],
                  output='screen'),
          ])
      
      小鱼小 1 条回复 最后回复 回复 引用 0
      • 小鱼小
        小鱼 技术大佬 @43996173
        最后由 编辑

        @43996173 在 7.5.4订阅图像记录错误 中说:

        def get_current_pose(self):
        '''
        通过TF获取当前位姿
        '''
        while rclpy.ok():
        try:
        tf = self.buffer_.lookup_transform(
        'map', 'base_footprint',
        rclpy.time.Time(seconds=0),
        rclpy.time.Duration(seconds=1))
        transform = tf.transform
        rotation_euler = euler_from_quaternion([
        transform.rotation.x,
        transform.rotation.y,
        transform.rotation.z,
        transform.rotation.w,])
        self.get_logger().info(
        f'平移:{transform.translation},旋转四元数:{transform.rotation}:旋转欧拉角:{rotation_euler}')

                except Exception as e:
                    self.get_logger().warn(f'不能获取坐标变换。原因:{e}')
        

        这个 get_current_pose 有四循环,检查是否是因为这里获取不成功的原因,导航有没有异常,是否可以正常获取姿态

        新书配套视频:https://www.bilibili.com/video/BV1GW42197Ck/

        小鱼小 1 条回复 最后回复 回复 引用 0
        • 小鱼小
          小鱼 技术大佬 @小鱼
          最后由 编辑

          @小鱼 试试我这个代码

          import rclpy
          from geometry_msgs.msg import PoseStamped, Pose
          from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult
          from tf2_ros import TransformListener, Buffer
          from tf_transformations import euler_from_quaternion, quaternion_from_euler
          from rclpy.duration import Duration
          # 添加服务接口
          from autopatrol_interfaces.srv import SpeachText
          from sensor_msgs.msg import Image
          from cv_bridge import CvBridge
          import cv2
          
          class PatrolNode(BasicNavigator):
              def __init__(self, node_name='patrol_node'):
                  super().__init__(node_name)
                  # 导航相关定义
                  self.declare_parameter('initial_point', [0.0, 0.0, 0.0])
                  self.declare_parameter('target_points', [0.0, 0.0, 0.0, 1.0, 1.0, 1.57])
                  self.initial_point_ = self.get_parameter('initial_point').value
                  self.target_points_ = self.get_parameter('target_points').value
                  # 实时位置获取 TF 相关定义
                  self.buffer_ = Buffer()
                  self.listener_ = TransformListener(self.buffer_, self)
                  self.speach_client_ = self.create_client(SpeachText, 'speech_text')
          
                  # 订阅与保存图像相关定义
                  self.declare_parameter('image_save_path', '')
                  self.image_save_path = self.get_parameter('image_save_path').value
                  self.bridge = CvBridge()
                  self.latest_image = None
                  self.subscription_image = self.create_subscription(
                      Image, '/camera_sensor/image_raw', self.image_callback, 10)
          
              def image_callback(self, msg):
                  """
                  将最新的消息放到 latest_image 中
                  """
                  self.latest_image = msg
          
              def record_image(self):
                  """
                  记录图像
                  """
                  if self.latest_image is not None:
                    pose = self.get_current_pose()
                    cv_image = self.bridge.imgmsg_to_cv2(self.latest_image)
                    cv2.imwrite(f'{self.image_save_path}image_{pose.translation.x:3.2f}_{pose.translation.y:3.2f}.png', cv_image)
          
          
              def speach_text(self, text):
                  """
                  调用服务播放语音
                  """
                  while not self.speach_client_.wait_for_service(timeout_sec=1.0):
                      self.get_logger().info('语合成服务未上线,等待中。。。')
          
                  request = SpeachText.Request()
                  request.text = text
                  future = self.speach_client_.call_async(request)
                  rclpy.spin_until_future_complete(self, future)
                  if future.result() is not None:
                      result = future.result().result
                      if result:
                          self.get_logger().info(f'语音合成成功:{text}')
                      else:
                          self.get_logger().warn(f'语音合成失败:{text}')
                  else:
                      self.get_logger().warn('语音合成服务请求失败')
          
              def get_pose_by_xyyaw(self, x, y, yaw):
                  """
                  通过 x,y,yaw 合成 PoseStamped
                  """
                  pose = PoseStamped()
                  pose.header.frame_id = 'map'
                  pose.pose.position.x = x
                  pose.pose.position.y = y
                  rotation_quat = quaternion_from_euler(0, 0, yaw)
                  pose.pose.orientation.x = rotation_quat[0]
                  pose.pose.orientation.y = rotation_quat[1]
                  pose.pose.orientation.z = rotation_quat[2]
                  pose.pose.orientation.w = rotation_quat[3]
                  return pose
          
              def init_robot_pose(self):
                  """
                  初始化机器人位姿
                  """
                  # 从参数获取初始化点
                  self.initial_point_ = self.get_parameter('initial_point').value
                  # 合成位姿并进行初始化
                  self.setInitialPose(self.get_pose_by_xyyaw(
                      self.initial_point_[0], self.initial_point_[1], self.initial_point_[2]))
                  # 等待直到导航激活
                  self.waitUntilNav2Active()
          
              def get_target_points(self):
                  """
                  通过参数值获取目标点集合        
                  """
                  points = []
                  self.target_points_ = self.get_parameter('target_points').value
                  for index in range(int(len(self.target_points_)/3)):
                      x = self.target_points_[index*3]
                      y = self.target_points_[index*3+1]
                      yaw = self.target_points_[index*3+2]
                      points.append([x, y, yaw])
                      self.get_logger().info(f'获取到目标点: {index}->({x},{y},{yaw})')
                  return points
          
              def nav_to_pose(self, target_pose):
                  """
                  导航到指定位姿
                  """
                  self.waitUntilNav2Active()
                  result = self.goToPose(target_pose)
                  while not self.isTaskComplete():
                      feedback = self.getFeedback()
                      if feedback:
                          self.get_logger().info(f'预计: {Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9} s 后到达')
                  # 最终结果判断
                  result = self.getResult()
                  if result == TaskResult.SUCCEEDED:
                      self.get_logger().info('导航结果:成功')
                  elif result == TaskResult.CANCELED:
                      self.get_logger().warn('导航结果:被取消')
                  elif result == TaskResult.FAILED:
                      self.get_logger().error('导航结果:失败')
                  else:
                      self.get_logger().error('导航结果:返回状态无效')
          
              def get_current_pose(self):
                  """
                  通过TF获取当前位姿
                  """
                  while rclpy.ok():
                      try:
                          tf = self.buffer_.lookup_transform(
                              'map', 'base_footprint', rclpy.time.Time(seconds=0), rclpy.time.Duration(seconds=1))
                          transform = tf.transform
                          rotation_euler = euler_from_quaternion([
                              transform.rotation.x,
                              transform.rotation.y,
                              transform.rotation.z,
                              transform.rotation.w
                          ])
                          self.get_logger().info(
                              f'平移:{transform.translation},旋转四元数:{transform.rotation}:旋转欧拉角:{rotation_euler}')
                          return transform
                      except Exception as e:
                          self.get_logger().warn(f'不能够获取坐标变换,原因: {str(e)}')
              
          def main():
              rclpy.init()
              patrol = PatrolNode()
              patrol.speach_text(text='正在初始化位置')
              patrol.init_robot_pose()
              patrol.speach_text(text='位置初始化完成')
          
              while rclpy.ok():
                  for point in patrol.get_target_points():
                      x, y, yaw = point[0], point[1], point[2]
                      # 导航到目标点
                      target_pose = patrol.get_pose_by_xyyaw(x, y, yaw)
                      patrol.speach_text(text=f'准备前往目标点{x},{y}')
                      patrol.nav_to_pose(target_pose)
                      patrol.speach_text(text=f"已到达目标点{x},{y},准备记录图像")
                      patrol.record_image()
                      patrol.speach_text(text=f"图像记录完成")
              rclpy.shutdown()
          
          if __name__ == '__main__':
              main()
          

          新书配套视频:https://www.bilibili.com/video/BV1GW42197Ck/

          1 条回复 最后回复 回复 引用 0
          • 第一个帖子
            最后一个帖子
          皖ICP备16016415号-7
          Powered by NodeBB | 鱼香ROS