如何处理重复消息

Tuesday, August 6, 2019

消息重复的情况必然存在

在MQTT协议中,有三种消息传递标准

  • At most onece: 至多一次。消息传递时,最多会被送达一次。就是没什么可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不高的监控场景使用。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。就是不能丢消息,但是能允许出现少量重复信息
  • Exactly once: 恰好一次。消息在传递时,不允许重复也不允许丢失,这个是最高等级。

大部分的消息队列,比如RocketMQ,Kafka提供的服务质量都是At least once。 Kafka在文档里说支持Exactly once,但是他的这个概念跟上面说的Exactly once不完全相同,所以他提供的还是At least once。

用幂等性解决重复消息问题

一般解决重复消息的方法是,在消费端,让消息的操作具有幂等性。 什么是幂等性,简单来说,就是一个操作,方法或者服务,其任意多次执行所产生的影响与一次执行的影响相同。

比如说,给一个账户增加100元,这个操作是不幂等的,但是给一个账户设置为100元,这个操作是幂等的。

所以从系统的层面来看,At least once + 幂等消费 = Exactly once

最好的方式,就是从业务逻辑下手,将消费的业务逻辑设计成具备幂等性的操作。

常用方法

利用数据库的唯一约束实现幂等

比如还是上面说的,给一个账户增加100元。我们可以设定,每个转账单对每个账户只可以执行一次变更操作。最简单的方法是,我们在数据库中建立一张流水表,有三个字段,转账单ID,用户ID,变更金额。然后我们把转账单和用户ID共同设为主键。这样的话,对于一条记录,只能插入一次了,也就解决上面的问题了。

如果是NoSql,我们也可以用INSERT IF NOT EXIST的类似操作来实现幂等性。

为更新的数据设置前置条件

给数据设置一个前置条件,如果满足条件就更新数据,否则拒绝,在更新数据的时候,同时变更前置条件中需要判断的数据,这样的话,只有第一次更新数据才满足前置条件。

比如上面说的增加100元这个操作,我们可以对前置条件设置为,如果账户X当前的余额为500元,将余额增加100元,这个操作,就具备了幂等性。在消费的时候,判断数据库中,当前余额与消息中的余额是否相等,只有相等才执行。

记录并检查操作

我们还可以用一种全局ID机制,比如Token或者GUID。在执行更新数据操作之前,先检查一下是否执行过这个更新操作。

在发送消息时,给每个消息指定一个全局唯一的ID,消费时,先根据这个ID检查这条消息是否有被消费过,如果没有消费过,才更新数据数据,然后将消费状态为已消费。

但是这种操作,是最不好实现的,比如在检查消费状态,然后更新数据并且设置消费状态中,三个操作必须为一组操作保证原子性,才能真正实现幂等,不然会出Bug。

比如: t0时刻: Consumer A收到消息,检查消息执行状态,发现消息未处理过,开始执行账户增加100元。 t1时刻:Consumer B收到消息,检查消息执行状态,发现消息未处理过,因为这个时刻,ConsumerA还未来得及更新消息执行状态。

这样就会出问题了,当然我们可以用事务来实现,也可以用锁来实现,但是在分布式系统中,无论是分布式事务还是分布式锁都是比较难解决问题。

消息队列

计算机指令

Vim 命令