如何在代码中配置DDS编程示例

描述

DDS编程示例
我们尝试在代码中配置DDS,以之前Hello World话题通信为例。

通信

运行效果

启动两个终端,分别运行发布者和订阅者节点:

$ ros2 run learning_qos qos_helloworld_pub
$ ros2 run learning_qos qos_helloworld_sub

可以看到两个终端中的通信效果如下,和之前貌似并没有太大区别。

通信

通信

看效果确实差不多,不过底层通信机理上可是有所不同的。

发布者代码解析

我们看下在代码中,如果加入QoS的配置。

learning_qos/qos_helloworld_pub.py

#!/usr/bin/env python3# -*- coding: utf-8 -*-"""@作者: 古月居(www.guyuehome.com)@说明: ROS2 QoS示例-发布“Hello World”话题"""import rclpy                     # ROS2 Python接口库from rclpy.node import Node      # ROS2 节点类from std_msgs.msg import String  # 字符串消息类型from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy # ROS2 QoS类"""创建一个发布者节点"""class PublisherNode(Node):    def __init__(self, name):        super().__init__(name)        # ROS2节点父类初始化        qos_profile = QoSProfile(     # 创建一个QoS原则            # reliability=QoSReliabilityPolicy.BEST_EFFORT,            reliability=QoSReliabilityPolicy.RELIABLE,            history=QoSHistoryPolicy.KEEP_LAST,            depth=1        )        self.pub = self.create_publisher(String, "chatter", qos_profile) # 创建发布者对象(消息类型、话题名、QoS原则)        self.timer = self.create_timer(0.5, self.timer_callback)         # 创建一个定时器(单位为秒的周期,定时执行的回调函数)    def timer_callback(self):                                # 创建定时器周期执行的回调函数        msg = String()                                       # 创建一个String类型的消息对象        msg.data = 'Hello World'                             # 填充消息对象中的消息数据        self.pub.publish(msg)                                # 发布话题消息        self.get_logger().info('Publishing: "%s"' % msg.data)# 输出日志信息,提示已经完成话题发布def main(args=None):                           # ROS2节点主入口main函数    rclpy.init(args=args)                      # ROS2 Python接口初始化    node = PublisherNode("qos_helloworld_pub") # 创建ROS2节点对象并进行初始化    rclpy.spin(node)                           # 循环等待ROS2退出    node.destroy_node()                        # 销毁节点对象    rclpy.shutdown()                           # 关闭ROS2 Python接口

完成代码的编写后需要设置功能包的编译选项,让系统知道Python程序的入口,打开功能包的setup.py文件,加入如下入口点的配置:

entry_points={    'console_scripts': [     'qos_helloworld_pub  = learning_qos.qos_helloworld_pub:main',},

订阅者代码解析

订阅者中的QoS配置和发布者类似。

learning_qos/qos_helloworld_sub.py

#!/usr/bin/env python3# -*- coding: utf-8 -*-"""@作者: 古月居@说明: ROS2 QoS示例-订阅“Hello World”话题消息"""import rclpy                                     # ROS2 Python接口库
from rclpy.node   import Node                    # ROS2 节点类
from std_msgs.msg import String                  # ROS2标准定义的String消息from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy  # ROS2 QoS类"""创建一个订阅者节点"""class SubscriberNode(Node):    def __init__(self, name):        super().__init__(name)         # ROS2节点父类初始化        qos_profile = QoSProfile(      # 创建一个QoS原则            # reliability=QoSReliabilityPolicy.BEST_EFFORT,            reliability=QoSReliabilityPolicy.RELIABLE,            
history=QoSHistoryPolicy.KEEP_LAST,            depth=1        )        self.sub = self.create_subscription(            String, "chatter", self.listener_callback, qos_profile) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、QoS原则)    
def listener_callback(self, msg):                      # 创建回调函数,执行收到话题消息后对数据的处理        self.get_logger().info('I heard: "%s"' % msg.data) # 输出日志信息,提示订阅收到的话题消息def main(args=None):                               # ROS2节点主入口main函数    rclpy.init(args=args)                          # ROS2 Python接口初始化    node = SubscriberNode("qos_helloworld_sub")    # 创建ROS2节点对象并进行初始化    rclpy.spin(node)                               # 循环等待ROS2退出    
node.destroy_node()                            # 销毁节点对象    rclpy.shutdown()                               # 关闭ROS2 Python接口

完成代码的编写后需要设置功能包的编译选项,让系统知道Python程序的入口,打开功能包的setup.py文件,加入如下入口点的配置:

entry_points={    'console_scripts': [     'qos_helloworld_pub  = learning_qos
打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分