DDS编程示例
我们尝试在代码中配置DDS,以之前Hello World话题通信为例。
运行效果
启动两个终端,分别运行发布者和订阅者节点:
$ ros2 run learning_qos qos_helloworld_pub
$ ros2 run learning_qos qos_helloworld_sub
可以看到两个终端中的通信效果如下,和之前貌似并没有太大区别。
看效果确实差不多,不过底层通信机理上可是有所不同的。
发布者代码解析
我们看下在代码中,如果加入QoS的配置。
learning_qos/qos_helloworld_pub.py
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""@作者: 古月居(www.guyuehome.com)@说明: ROS2 QoS示例-发布“Hello World”话题"""import rclpy # ROS2 Python接口库from rclpy.node import Node # ROS2 节点类from std_msgs.msg import String # 字符串消息类型from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy # ROS2 QoS类"""创建一个发布者节点"""class PublisherNode(Node): def __init__(self, name): super().__init__(name) # ROS2节点父类初始化 qos_profile = QoSProfile( # 创建一个QoS原则 # reliability=QoSReliabilityPolicy.BEST_EFFORT, reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=1 ) self.pub = self.create_publisher(String, "chatter", qos_profile) # 创建发布者对象(消息类型、话题名、QoS原则) self.timer = self.create_timer(0.5, self.timer_callback) # 创建一个定时器(单位为秒的周期,定时执行的回调函数) def timer_callback(self): # 创建定时器周期执行的回调函数 msg = String() # 创建一个String类型的消息对象 msg.data = 'Hello World' # 填充消息对象中的消息数据 self.pub.publish(msg) # 发布话题消息 self.get_logger().info('Publishing: "%s"' % msg.data)# 输出日志信息,提示已经完成话题发布def main(args=None): # ROS2节点主入口main函数 rclpy.init(args=args) # ROS2 Python接口初始化 node = PublisherNode("qos_helloworld_pub") # 创建ROS2节点对象并进行初始化 rclpy.spin(node) # 循环等待ROS2退出 node.destroy_node() # 销毁节点对象 rclpy.shutdown() # 关闭ROS2 Python接口
完成代码的编写后需要设置功能包的编译选项,让系统知道Python程序的入口,打开功能包的setup.py文件,加入如下入口点的配置:
entry_points={ 'console_scripts': [ 'qos_helloworld_pub = learning_qos.qos_helloworld_pub:main',},
订阅者代码解析
订阅者中的QoS配置和发布者类似。
learning_qos/qos_helloworld_sub.py
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""@作者: 古月居@说明: ROS2 QoS示例-订阅“Hello World”话题消息"""import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from std_msgs.msg import String # ROS2标准定义的String消息from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy # ROS2 QoS类"""创建一个订阅者节点"""class SubscriberNode(Node): def __init__(self, name): super().__init__(name) # ROS2节点父类初始化 qos_profile = QoSProfile( # 创建一个QoS原则 # reliability=QoSReliabilityPolicy.BEST_EFFORT, reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST, depth=1 ) self.sub = self.create_subscription( String, "chatter", self.listener_callback, qos_profile) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、QoS原则)
def listener_callback(self, msg): # 创建回调函数,执行收到话题消息后对数据的处理 self.get_logger().info('I heard: "%s"' % msg.data) # 输出日志信息,提示订阅收到的话题消息def main(args=None): # ROS2节点主入口main函数 rclpy.init(args=args) # ROS2 Python接口初始化 node = SubscriberNode("qos_helloworld_sub") # 创建ROS2节点对象并进行初始化 rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象 rclpy.shutdown() # 关闭ROS2 Python接口
完成代码的编写后需要设置功能包的编译选项,让系统知道Python程序的入口,打开功能包的setup.py文件,加入如下入口点的配置:
entry_points={ 'console_scripts': [ 'qos_helloworld_pub = learning_qos
全部0条评论
快来发表一下你的评论吧 !