RocketMQ生产者为什么需要负载均衡?
在RocketMQ中,队列是消息发送的基本单位。每个Topic下可能存在多个队列,因此一个生产者实例可以向不同的队列发送消息。当生产者发送消息时,如果不能均衡的将消息发送到不同的队列,那么会导致队列里的消息分布不均衡,这样最终会导致消息性能下降,因此生产者负载均衡机制也是非常重要的。
RocketMQ生产者原理分析
既然生产者负载均衡如此重要,我们看下是如何实现的。
我们通常使用如下方法发送消息:
构建消息 Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); //发送消息 SendResult sendResult = producer.send(msg);
RocketMQ发送消息的核心逻辑在DefaultMQProducerImpl类sendDefaultImpl。
在发送消息流程利里面有一行非常关键的逻辑,selectOneMessageQueue,看方法名称就可以知道其含义,选择一个消息队列。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }
里面是通过策略类来实现的。
策略类最终通过org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String) 实现。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //生产者第一次发消息 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //非第一次,重试发消息的情况, for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //重试的情况,不取上一个broker的队列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } } 第一次发消息选择队列核心逻辑在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue() //线程安全的index private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); public MessageQueue selectOneMessageQueue() { //获取一个基础索引,每次自增1 这个全局存在TopicPublishInfo 每一个topic int index = this.sendWhichQueue.getAndIncrement(); // 基础索引和 消息写队列大小 进行取模 用来实现轮训的算法 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
哈哈,这里就是生产者负载均衡轮询机制的核心逻辑了,使用到了ThreadLocal技术,sendWhichQueue为每个生产者线程维护一个自己的下标索引。
基础索引计算器,使用ThreadLocal技术针对不同的生产者线程第一次随机,后面递增,可以更加负载均衡。
public class ThreadLocalIndex { //关键技术 private final ThreadLocalthreadLocalIndex = new ThreadLocal (); private final Random random = new Random(); public int getAndIncrement() { Integer index = this.threadLocalIndex.get(); if (null == index) { //第一次随机 index = Math.abs(random.nextInt()); if (index < 0) index = 0; this.threadLocalIndex.set(index); } //第二次索引位置开始自增1 index = Math.abs(index + 1); if (index < 0) index = 0; this.threadLocalIndex.set(index); return index; } }
哈哈,有没有觉得这个实现非常巧妙了。不同的生产者线程都拥有自己的索引因子,分配队列更加均衡。
总结
本文分析了RocketMQ生产者底层的实现,设计地方有巧妙之处,值得我们学习,上面是发送非顺序消息的场景, 如果是顺序消息,我们作为使用者可以指定负载均衡策略。
编辑:黄飞
全部0条评论
快来发表一下你的评论吧 !