@小鱼 试试我这个代码
import rclpy from geometry_msgs.msg import PoseStamped, Pose from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult from tf2_ros import TransformListener, Buffer from tf_transformations import euler_from_quaternion, quaternion_from_euler from rclpy.duration import Duration # 添加服务接口 from autopatrol_interfaces.srv import SpeachText from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 class PatrolNode(BasicNavigator): def __init__(self, node_name='patrol_node'): super().__init__(node_name) # 导航相关定义 self.declare_parameter('initial_point', [0.0, 0.0, 0.0]) self.declare_parameter('target_points', [0.0, 0.0, 0.0, 1.0, 1.0, 1.57]) self.initial_point_ = self.get_parameter('initial_point').value self.target_points_ = self.get_parameter('target_points').value # 实时位置获取 TF 相关定义 self.buffer_ = Buffer() self.listener_ = TransformListener(self.buffer_, self) self.speach_client_ = self.create_client(SpeachText, 'speech_text') # 订阅与保存图像相关定义 self.declare_parameter('image_save_path', '') self.image_save_path = self.get_parameter('image_save_path').value self.bridge = CvBridge() self.latest_image = None self.subscription_image = self.create_subscription( Image, '/camera_sensor/image_raw', self.image_callback, 10) def image_callback(self, msg): """ 将最新的消息放到 latest_image 中 """ self.latest_image = msg def record_image(self): """ 记录图像 """ if self.latest_image is not None: pose = self.get_current_pose() cv_image = self.bridge.imgmsg_to_cv2(self.latest_image) cv2.imwrite(f'{self.image_save_path}image_{pose.translation.x:3.2f}_{pose.translation.y:3.2f}.png', cv_image) def speach_text(self, text): """ 调用服务播放语音 """ while not self.speach_client_.wait_for_service(timeout_sec=1.0): self.get_logger().info('语合成服务未上线,等待中。。。') request = SpeachText.Request() request.text = text future = self.speach_client_.call_async(request) rclpy.spin_until_future_complete(self, future) if future.result() is not None: result = future.result().result if result: self.get_logger().info(f'语音合成成功:{text}') else: self.get_logger().warn(f'语音合成失败:{text}') else: self.get_logger().warn('语音合成服务请求失败') def get_pose_by_xyyaw(self, x, y, yaw): """ 通过 x,y,yaw 合成 PoseStamped """ pose = PoseStamped() pose.header.frame_id = 'map' pose.pose.position.x = x pose.pose.position.y = y rotation_quat = quaternion_from_euler(0, 0, yaw) pose.pose.orientation.x = rotation_quat[0] pose.pose.orientation.y = rotation_quat[1] pose.pose.orientation.z = rotation_quat[2] pose.pose.orientation.w = rotation_quat[3] return pose def init_robot_pose(self): """ 初始化机器人位姿 """ # 从参数获取初始化点 self.initial_point_ = self.get_parameter('initial_point').value # 合成位姿并进行初始化 self.setInitialPose(self.get_pose_by_xyyaw( self.initial_point_[0], self.initial_point_[1], self.initial_point_[2])) # 等待直到导航激活 self.waitUntilNav2Active() def get_target_points(self): """ 通过参数值获取目标点集合 """ points = [] self.target_points_ = self.get_parameter('target_points').value for index in range(int(len(self.target_points_)/3)): x = self.target_points_[index*3] y = self.target_points_[index*3+1] yaw = self.target_points_[index*3+2] points.append([x, y, yaw]) self.get_logger().info(f'获取到目标点: {index}->({x},{y},{yaw})') return points def nav_to_pose(self, target_pose): """ 导航到指定位姿 """ self.waitUntilNav2Active() result = self.goToPose(target_pose) while not self.isTaskComplete(): feedback = self.getFeedback() if feedback: self.get_logger().info(f'预计: {Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9} s 后到达') # 最终结果判断 result = self.getResult() if result == TaskResult.SUCCEEDED: self.get_logger().info('导航结果:成功') elif result == TaskResult.CANCELED: self.get_logger().warn('导航结果:被取消') elif result == TaskResult.FAILED: self.get_logger().error('导航结果:失败') else: self.get_logger().error('导航结果:返回状态无效') def get_current_pose(self): """ 通过TF获取当前位姿 """ while rclpy.ok(): try: tf = self.buffer_.lookup_transform( 'map', 'base_footprint', rclpy.time.Time(seconds=0), rclpy.time.Duration(seconds=1)) transform = tf.transform rotation_euler = euler_from_quaternion([ transform.rotation.x, transform.rotation.y, transform.rotation.z, transform.rotation.w ]) self.get_logger().info( f'平移:{transform.translation},旋转四元数:{transform.rotation}:旋转欧拉角:{rotation_euler}') return transform except Exception as e: self.get_logger().warn(f'不能够获取坐标变换,原因: {str(e)}') def main(): rclpy.init() patrol = PatrolNode() patrol.speach_text(text='正在初始化位置') patrol.init_robot_pose() patrol.speach_text(text='位置初始化完成') while rclpy.ok(): for point in patrol.get_target_points(): x, y, yaw = point[0], point[1], point[2] # 导航到目标点 target_pose = patrol.get_pose_by_xyyaw(x, y, yaw) patrol.speach_text(text=f'准备前往目标点{x},{y}') patrol.nav_to_pose(target_pose) patrol.speach_text(text=f"已到达目标点{x},{y},准备记录图像") patrol.record_image() patrol.speach_text(text=f"图像记录完成") rclpy.shutdown() if __name__ == '__main__': main()
紧急通知:禁止一切关于政治&VPN翻墙等话题,发现相关帖子会立马删除封号
小鱼 ROS 2 新书上线!点击链接查看, 新书配套视频点击链接查看。
提问前必看的发帖注意事项—— 提问前必看!不符合要求的问题拒绝回答!!
社区使用指南—如何添加标签修改密码
小鱼 ROS 2 新书上线!点击链接查看, 新书配套视频点击链接查看。
提问前必看的发帖注意事项—— 提问前必看!不符合要求的问题拒绝回答!!
社区使用指南—如何添加标签修改密码