RocketMQ中各类重复消费的原理浅析

电子说

1.3w人已加入

描述

Labs 导读

随着大数据和云计算时代的到来,我国的各个产业每天都在产生不可估计的数据,以及对数据的各式各样的需求,消息中间件在处理数据、消费数据的过程中越来越受到重视。在高并发、微服务、分布式的场景下,如何合理地利用消息中间件,如何保证MQ消费消息的幂等性?所谓知其然,才能知其所以然,本文将通过RocketMQ作为例子,来扒一扒什么情况下会导致重复消费。

作者:李佳斌

单位:中国移动智慧家庭运营中心

Part 01 ●  RocketMQ如何生产和消费消息  

先简单介绍下RocketMQ正常生产消息和消费消息的流程,如下图。

1.生产者在发送消息之前根据负载均衡策略(默认是轮询)选择一个Queue,然后跟这个Queue所在的机器建立连接,把消息发送到这个Queue上。

2.消费者消费这个Queue,就能获取到对应的消息。

Queue

- 问题出现

当异常情况出现时,如消息发送超时或者消息消费超时,RocketMQ为保证消息发送成功,会启动重试机制,选择另一台机器的Queue重发。现在假设有这样一种情况,消费者实际正确接收到了消息,只是由于网络波动导致响应超时了,那就会出现消息重复发送,导致消费者重复消费的情况出现。

那除此之外,还有没有其他情况会导致消息重复消费的情况呢?总结起来一共有如下几种情况。

1)消息发送异常时的重复消费

2)消费消息时抛出了异常

3)消费者提交offset失败

4)Broker持久化offset失败

5)主从同步失败

6)重平衡

7)清理长时间消费的消息

Part 02 ●  浅析各类情况 ● 

- 消费消息时抛出异常

问题分析一

RocketMQ在并发消费的模式下会调用MessageListenerConcurrently的consumeMessage方法,入参是msgs集合。当调用该方法消费消息出现异常时,返回的结果status就会是null。这种情况下会导致status被设置为RECONSUME_LATER,也就是说消息之后会被重复消费。

问题分析二

传入的是msgs集合。上述原因一中消息处理之后,不管成功失败,都会对结果进行处理。而集合中的任意一个失败,都会导致status被设置为RECONSUME_LATER。在对结果处理是,判断到RECONSUME_LATER时,就会对msgs重新遍历并发送消息,重新消费,从而导致之前成功处理的消息都会被重复消费。不过好在msgs消息的数量默认情况下是1。

Queue

- 消费者提交offset失败

何为offset

producer发送消息到broker,Rocketmq会将消息的内容持久化到commitLog文件中,再分发到topic下的消费队列consume Queue,消费者提交消费请求时,broker从该consumer负责的消费队列中根据请求参数传入的起始offset来获取需要消费的消息索引信息,再从commitLog中获取具体的消息内容返回给consumer。消费成功之后,消费者提交offset,来记录这个queue消费到哪个位置了。

问题分析

RocketMq设计的时候,消费完消息,并不是同步提交offset,而是将offset保存到内存中,通过一个定时任务(默认是5S一次),以网络请求的方式将offset提交给broker。如果最新的offset还没提交,此时服务器宕机了,那么重启之后,就会从broker中读取到之前的提交的offset,并从此处开始消费,此时就会出现重复消费的情况了。

Queue

- broker持久化offset失败

问题分析

与消费者提交offset同理,Broker为了防止数据丢失,会将offset持久化到磁盘中。同样的也是通过一个默认5S的定时任务来处理持久化操作。所以offset的完整过程就如下图。当broker宕机时,就会导致offset丢失,此时如果消费者重新拉取消费进度,就会比实际消费的进度要低,导致重复消费。

Queue

- 主从同步失败

问题分析

为保证RocketMQ服务的高可用,一般项目中都会启用主从备份的模式,当主节点挂掉之后,从节点就会升级为主节点对外提供服务。因此就需要进行主从同步,保证数据的一致性。默认情况下每隔10S,从节点会向主节点请求,同步元数据,包括消费进度。此时如果主节点宕机了,从节点就无法获取到10S之内的消费进度,自然也就会导致重复消费。

Queue

- 重平衡

何为重平衡

RocketMQ的消费者有两种模式,集群消费模式和广播消费模式,绝大多数场景采用的都是集群消费模式。前面提到的消费进度就是在集群消费模式下才会存在。集群消费模式中有一个消费组的概念。一个消费组可以有多个消费者,不同消费组之间消费消息互不干扰,而同一消费组的消费者按照一定的算法分配消息队列进行消息消费,保证一个消息只能被一个消费组消费一次。当消费组中的消费组增加或者减少时就会触发重平衡。如图,原先消费组中有两个消费者,平均消费4个队列,每个消费组2个队列;当加入了一个新的消费者时,为了保证新的消费者能够消费消息,就会进行重平衡,重新分配消息队列。

Queue

问题分析

假设在重平衡发生时,此时消费者2还在正常消费Queue4,当消费者3加入,重平衡完成时,此时消费者2判断到Queue4已经不属于自己消费了,就会将Queue4设置为dropped,消费完成时,发现队列是dropped状态,那么消费者2的消费进度offset就不会被提交。成功消费了消息,但是消费进度却没有被提交,于是当消费者3开始消费消息时,就会从服务端拉取到之前的消费进度,造成队列4的消息被重复消费。

- 清理长时间消费的消息

清理机制讲解

RocketMQ中有一个机制会定时清理长时间正在消费的消息,默认是15分钟执行一次清理任务。之所以这么做,是有原因的。我们说过,消息被消费之后,就会提交offset。当一个线程消费了所有消息时,就会把消息从集合中移除,提交的消息进度offset就是msg5的offset+1。

假设,现在是两个线程消费,线程2消费完成,之后提交offset,但是此时线程1还在处理前两条消息,因此为了保证消费消息的不丢失,移除之后发现集合中还有剩余消息,就会把msg1的offset返回提交上去。而一旦集合最前面的消息长时间处理,就会导致这个消费进度一直在最前面。此时如果服务器重启,就会导致很多消费过的消息都会被重复消费。因此引入了清理长时间消费的机制。

Queue

问题分析

引入清理长时间消费的消息机制后,一旦发现某个消息已经处理超过15分钟了,就会将消息移除,保障后续消息消费进度的正常提交,之后会隔一定的时间再次消费这个被移除的消息。但是,这个消息虽然被移除了,却并不是没有消费过,因此再次消费就会导致重复消费的问题出现。

Part 03 ●  总结 ● 

RocketMq的官方文档中对消息传递有这样的解释:RocketMq确保所有消息至少被传递一次,在大多数情况下,消息不会重复。可见RocketMq为了保证消息的不丢失,牺牲了消息投递的重复率。因此我们在使用RokcetMq时需要合理使用它的特点,设计合理的幂等技术方案来解决重复消费的问题。

审核编辑:汤梓红

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分