ROS2节点通信实现零拷贝
-
下面的一个例子演示了两个节点之间如何实现零拷贝通讯。
注意,下面测试例子的
ROS2
版本为Galactic
。#include <chrono> #include <cinttypes> #include <cstdio> #include <memory> #include <string> #include <utility> #include "rclcpp/rclcpp.hpp" #include "std_msgs/msg/int32.hpp" using namespace std::chrono_literals; // Node that produces messages. struct Producer : public rclcpp::Node { Producer(const std::string & name, const std::string & output) : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true)) { // Create a publisher on the output topic. pub_ = this->create_publisher<std_msgs::msg::Int32>(output, 10); std::weak_ptr<std::remove_pointer<decltype(pub_.get())>::type> captured_pub = pub_; // Create a timer which publishes on the output topic at ~1Hz. auto callback = [captured_pub]() -> void { auto pub_ptr = captured_pub.lock(); if (!pub_ptr) { return; } static int32_t count = 0; std_msgs::msg::Int32::UniquePtr msg(new std_msgs::msg::Int32()); msg->data = count++; printf( "Published message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data, reinterpret_cast<std::uintptr_t>(msg.get())); pub_ptr->publish(std::move(msg)); }; timer_ = this->create_wall_timer(1s, callback); } rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_; rclcpp::TimerBase::SharedPtr timer_; }; // Node that consumes messages. struct Consumer : public rclcpp::Node { Consumer(const std::string & name, const std::string & input) : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true)) { // Create a subscription on the input topic which prints on receipt of new messages. sub_ = this->create_subscription<std_msgs::msg::Int32>( input, 10, [](std_msgs::msg::Int32::UniquePtr msg) { printf( " Received message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data, reinterpret_cast<std::uintptr_t>(msg.get())); }); } rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_; }; int main(int argc, char * argv[]) { setvbuf(stdout, NULL, _IONBF, BUFSIZ); rclcpp::init(argc, argv); rclcpp::executors::SingleThreadedExecutor executor; auto producer = std::make_shared<Producer>("producer", "number"); auto consumer = std::make_shared<Consumer>("consumer", "number"); executor.add_node(producer); executor.add_node(consumer); executor.spin(); rclcpp::shutdown(); return 0; }
例子中节点的建立有几点需要注意:
- 使能
intra_process
Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
配置节点时要开启
intra_process
。- 发布消息时使用
UniquePtr
和std::move(msg)
除了上面例子中的写法,还可以采用下面的写法:
auto dis = std::make_unique<std_msgs::msg::Float32>(); dis->data = 10.0; pub_->publish(std::move(dis));
- 将需通信的节点加入到同一个进程中
rclcpp::executors::SingleThreadedExecutor executor; auto producer = std::make_shared<Producer>("producer", "number"); auto consumer = std::make_shared<Consumer>("consumer", "number"); executor.add_node(producer); executor.add_node(consumer);
完整的功能代码可通过下面的链接获取:
https://github.com/shoufei403/ros2_galactic_tutorials
相应的代码在
demos/intra_process_demo
目录。编译好代码后使用下面的命令启动示例程序
source install/setup.bash ros2 run intra_process_demo two_node_pipeline
输出结果:
Published message with value: 0, and address: 0x5625E3159130 Received message with value: 0, and address: 0x5625E3159130 Published message with value: 1, and address: 0x5625E3159130 Received message with value: 1, and address: 0x5625E3159130 Published message with value: 2, and address: 0x5625E3159130 Received message with value: 2, and address: 0x5625E3159130 Published message with value: 3, and address: 0x5625E3159130 Received message with value: 3, and address: 0x5625E3159130 Published message with value: 4, and address: 0x5625E3159130 Received message with value: 4, and address: 0x5625E3159130 Published message with value: 5, and address: 0x5625E3159130 Received message with value: 5, and address: 0x5625E3159130 Published message with value: 6, and address: 0x5625E3159130 Received message with value: 6, and address: 0x5625E3159130 Published message with value: 7, and address: 0x5625E3159130 Received message with value: 7, and address: 0x5625E3159130 Published message with value: 8, and address: 0x5625E3159130 Received message with value: 8, and address: 0x5625E3159130 Published message with value: 9, and address: 0x5625E3159130 Received message with value: 9, and address: 0x5625E3159130
可以发现,发送端数据的地址和接收端数据地址是一致的。所以发送端只是把数据存放的地址发送给了接收端并没有发生数据拷贝。
零拷贝的特性对于传输图像数据尤为有用。关于图像传输的例子请参看
demos/intra_process_demo
目录中的其他例子。参考:
https://docs.ros.org/en/galactic/Tutorials/Demos/Intra-Process-Communication.html
- 使能
-
请教一个问题 ros2中零拷贝只能在同一进程中的不同线程之间吗?不同进程间可以吗?
-
@dongwu_li ROS2官方的这个示例是在一个进程中跑两个节点。然后节点间通讯实现零拷贝。这个效果和ROS1中的nodelet是一样的。官方没有不同进程间零拷贝通讯的例子。估计是没有这个功能的。但TogetherROS(地平线基于foxy优化的ROS2版本)中的rclcpp实现了进程间的零拷贝通讯。
-
@首飞Kevin ROS2官方确实没有提这个,这部分工作其实在dds中有做,比如你用fastdds,需要修改dds配置文件可以实共享内存通信。
若要在进程间启用数据共享,需要在 XML 和设置上具有以下各项RMW_FASTRTPS_USE_QOS_FROM_XML=1
<data_writer profile_name="topic_name">
<qos>
<data_sharing>
<kind>AUTOMATIC</kind>
</data_sharing>
</qos>
</data_writer><data_reader profile_name="topic_name">
<qos>
<data_sharing>
<kind>AUTOMATIC</kind>
</data_sharing>
</qos>
</data_reader>关于XML配置的具体使用:https://fast-dds.docs.eprosima.com/en/latest/fastdds/xml_configuration/xml_configuration.html
-
@首飞Kevin 这是共享内存吗?
-
@young 感觉不是共享内存。因为没有涉及到不同进程。
-
@首飞Kevin 我也这么认为,互相共享对象地址,某种意义上说也是共享内存了哈哈
-
@首飞Kevin 我现在用国产芯片 他们移植的cyberrt
也加入了零拷贝 不过是进程间的 cyberrt本身进程内就是共享内存的
-
@李东武 cyberrt是Apollo的那套通讯框架吗?
-
@首飞Kevin 是的
-
所以他没有走中间层,对吗?这个性能 怎么测试呢?
-
你好,use_intra_process_comms没有Python版本的吗?