将ROS2的图像话题转成视频流在浏览器中显示
-
大家好,我是最近又又偷懒了好一段时间的小鱼,今天分享下如何实现ROS2的图像话题在浏览器中进行可视化,做交互开发的小伙伴肯定用的到。
将ROS2的图像话题转成视频流在浏览器中显示
在机器人操作系统(ROS)中,图像数据的传输和处理是常见任务之一。ROS2作为最新的ROS版本,提供了改进的通信机制。本文介绍如何将ROS2中的图像话题(topic)转换为视频流,并在网页浏览器中实时显示。这一功能可以用于远程监控、图像分析和教学演示等场景。
技术栈介绍
- ROS2: 机器人操作系统的新版本,用于管理和处理节点间的通信。
- Python: 使用Python编程语言进行脚本编写和后端逻辑实现。
- Flask: 一个轻量级的Python web框架,用于搭建web服务器。
- OpenCV: 开源计算机视觉库,用于图像处理。
- cv_bridge: 在ROS和OpenCV间转换图像数据的接口。
实现步骤
1. 创建ROS2节点接收图像数据
首先,我们需要创建一个ROS2节点,用于订阅图像话题。该节点使用
cv_bridge
库将ROS中的图像消息(sensor_msgs/msg/Image
)转换为OpenCV的图像格式。转换后的图像将存储在全局变量current_image
中,以便后续发送到浏览器。from cv_bridge import CvBridge from sensor_msgs.msg import Image class ImageSubscriber(Node): def __init__(self, topic_name): super().__init__('image_subscriber') self.bridge = CvBridge() self.subscription = self.create_subscription( Image, topic_name, self.image_callback, 10) def image_callback(self, msg): global current_image cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8") _, buffer = cv2.imencode('.jpg', cv_image) current_image = buffer.tobytes()
2. 使用Flask设置视频流路由
在Flask应用中,定义一个路由
/video_stream
,该路由关联到一个生成器函数generate()
。此函数持续检查current_image
,并将其作为JPEG格式的图像流返回给客户端。@app.route('/video_stream') def video_stream(): return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame') def generate(): while True: if current_image is not None: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + current_image + b'\r\n') else: yield (b'--frame\r\n' b'Content-Type: text/plain\r\n\r\nNo image available\r\n')
3. 运行ROS2节点和Flask服务器
首先定义一个函数
run_ros_node
,该函数初始化ROS2,并运行我们创建的订阅节点。def run_ros_node(topic_name): rclpy.init() image_subscriber = ImageSubscriber(topic_name) rclpy.spin(image_subscriber) image_subscriber.destroy_node() rclpy.shutdown()
在主函数中,解析命令行参数以获取ROS2话题名称,创建并启动一个线程来运行ROS2节点,同时启动Flask服务器以便在浏览器中访问视频流。
def main(args=None): parser = argparse.ArgumentParser() parser.add_argument('topic_name', type=str, help="The name of the ROS2 topic to subscribe to.") args = parser.parse_args() ros_thread = threading.Thread(target=run_ros_node, args=(args.topic_name,)) ros_thread.start() app.run(host='0.0.0.0', port=5000, threaded=True)
使用方法
- 启动ROS2系统:确保ROS2环境已经设置并且相关话题正在发布图像数据。
- 运行脚本:使用命令
python3 script_name.py topic_name
来运行上述脚本,其中topic_name
是你的图像话题名称。 - **打开浏览器
**:在浏览器地址栏输入
http://<服务器IP地址>:5000/video_stream
,即可查看实时视频流。通过以上步骤,你可以成功地将ROS2的图像话题转换成视频流,并在任何支持HTML5的浏览器中实时查看。
-
-
@小鱼 完整代码:
import rclpy from rclpy.node import Node from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 from flask import Flask, Response import io import threading import argparse # 初始化Flask应用 app = Flask(__name__) # 创建一个全局变量来存储图像数据 current_image = None # 创建ROS2节点类 class ImageSubscriber(Node): def __init__(self, topic_name): super().__init__('image_subscriber') self.subscription = self.create_subscription( Image, topic_name, self.image_callback, 10) self.subscription # 防止订阅被自动销毁 self.bridge = CvBridge() def image_callback(self, msg): global current_image cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8") # 将图像转换为JPEG格式 _, buffer = cv2.imencode('.jpg', cv_image) current_image = buffer.tobytes() # 生成器函数,不断发送图像 def generate(): while True: if current_image is not None: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + current_image + b'\r\n') else: yield (b'--frame\r\n' b'Content-Type: text/plain\r\n\r\nNo image available\r\n') @app.route('/video_stream') def video_stream(): return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame') def run_ros_node(topic_name): rclpy.init() image_subscriber = ImageSubscriber(topic_name) rclpy.spin(image_subscriber) # 清理和关闭ROS2节点 image_subscriber.destroy_node() rclpy.shutdown() def main(args=None): parser = argparse.ArgumentParser(description="Subscribe to a ROS2 image topic and serve it as a video stream.") parser.add_argument('topic_name', type=str, help="The name of the ROS2 topic to subscribe to.") args = parser.parse_args() # 创建线程运行ROS2节点 ros_thread = threading.Thread(target=run_ros_node, args=(args.topic_name,)) ros_thread.start() # 启动Flask HTTP服务器 try: app.run(host='0.0.0.0', port=5000, threaded=True) except KeyboardInterrupt: pass finally: # 确保ROS线程可以优雅退出 ros_thread.join() if __name__ == '__main__': main()