导语

Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制、快速扩容、灵活容错等特性。腾讯云内部 Pulsar工作组对 Pulsar 做了深入调研以及大量的性能和稳定性方面优化,目前已经在腾讯内部业务TDBank落地上线。本文是Pulsar技术系列中的一篇,主要介绍Pulsar 的 Message Deduplication 特性,供大家参考,避免在使用过程中踩坑。

Message Deduplication背景介绍

消息中间件产品设计中,对消息的投递设计,一般参照Kafka中提出的三种投递语意,分别为:

至多一次 (at-most-once)

至少一次 (at-least-once)

精确一次(或恰好一次) (exactly-once )

理解上需要注意的是,这里都是对投递行为的限定描述。

至多一次:客户端在生产消息的时候,仅会对生产的消息投递一次,这里并不保证消息一定生产成功。

至少一次:客户端在生产消息的时候,在收到一次成功的响应之前,可能会投递多次。这种场景下,服务器端可能存在多条重复的消息。

精确一次(或恰好一次):客户端在生产消息的时候,针对这次生产,服务器端保证有且仅保存一份消息。这里的 “这次生产”,一般都是指的是客户端对一次“SendMessage”的调用。这种语意下,服务器一般不会处理多次对相同消息体调用生产,产生重复消息的场景。简单而言,就是“精确一次”并不等于消息去重复。

许多系统声称提供“exactly-once”的交付语义,但仔细阅读其声明会发现,一些系统的声明可能存在一定的误导性,我们需要考虑它们在生产超时,部分副本写入成功,部分失败等场景下对语意的保证。

目前业界,绝大多数的消息中间件产品,如Kafka、RocketMQ、Pulsar、InLong-Tube、RabbitMQ、ActiveMQ等,都支持at-least-once(至少一次)的投递语意,即生产成功的消息,服务器端至少能保证存储一份,消费者至少能消费到一份消息。但是,对exactly-once(精确一次)语意支持的产品还是比较少。

下面,我们着重介绍一下Pulsar的Message Deduplication(相当于对exactly-once的一种实现)功能,可能与你想的并不一样。

Pulsar 的消息去重(Message deduplication)

功能配置

Pulsar提供的Message Deduplication 功能,默认是关闭的。开启时,需要修改Broker 端的配置,另外客户端也需要添加少许的配置。(详情可参考pulsar的官网 )

开启Message Deduplictiaon能力,首先,Broker 端需要变更如下配置:

#是否开启message deduplication功能
brokerDeduplicationEnabled#deduplication功能下,生产者的数量限制
brokerDeduplicationMaxNumberOfProducers
#broker端生成deduplication 快照信息的间隔
brokerDeduplicationEntriesInterval
#生产者断链后,broker端deduplication信息保存的时长
brokerDeduplicationProducerInactivityTimeoutMinutes

其次,生产者客户端需要做如下变更:

1、为生产者指定一个名称。

2、配置消息生产超时为0(默认为30s)。

代码示例如下:

PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer producer = pulsarClient.newProducer()
.producerName("producer-1")
.topic("persistent://public/default/topic-1")
.sendTimeout(0, TimeUnit.SECONDS)
.create();

功能原理

客户端对每一个发送的消息请求,都会采用递增方式生成一个唯一的Sequence ID编号,这个信息会被放置在Message 的元数据中,传输到Broker端。同时,客户端Producer 也会维护一个发送的PendingMessages队列,当收到Broker端返回的发送Ack 信息后,将PendingMessages中相同Sequence ID的信息移除,客户端认为发送的这个消息生产成功。

当Broker开启Message Deduplication 功能后,Broker对对每个收到的消息请求进行是否重复的判断。

判断的逻辑如下:

1、Broker端针对每个生产者,以生产者名字为key,分当前接收到的和已经处理完成的两个维度保存生产消息的最大Sequence ID信息:

/*当前已经接受不了到的*/
ConcurrentOpenHashMap highestSequencedPushed
/*当前已经存储处理过的*/
ConcurrentOpenHashMap highestSequencedPersisted

2、Broker端每收到一个生产Message的请求,会进行是否重复的判断,即收到的最新的Sequence ID是否大于Broker 端保存的两维度下相同ProducerName下的Sequence ID,如果大于则不重复,如果小于或等于则消息重复。消息重复时,Broker端会直接返回,不会继续走后续的存储处理流程。

由上面Pulsar 的Message Depulication feature 相关的配置和实现原理的介绍。可知,Pulsar Broker端的Message Depulication 功能,并不是对消息体的去重,而是客户端在不配置超时时间的前提下,Broker 端在一定的时间范围内,对同一个生产者名称下的客户端投递的具有相同Sequence id的消息的唯一行保证。

总结

Kafka 在0.11.0.0版本之后,针对Topic之内和多个Topic之间两种场景下的exactly-once语意,分别提供了支持传递幂等性处理的选项和类事物消息的处理方式进行保证。有兴趣的同学可以参展kafka的源码和官网介绍 。

Pulsar的Message Deduplication feature与Kafka的单Topic下对exaxtly-once语意的保证在实现方式上类似,也可以认为是对exaxtly-once语意的一种实现。

这里需要着重注意的是,exaxtly-once不等于消息去重。在实际的开发中,生产和消费部分都有可能产生重复的消息。

消息的生产者,在收到明确的消息生产成功的确认之前,消息在服务器端的存储状态是不确定的。

例如,在一定时间内,生产者没有收到生产的响应,选择了重发,这时,服务器端就可能有两份甚至多份消息的副本。

此外,消费部分在如下几个场景也有可能获取到重复推送的消息:

1、消费者重启时,已经消费,但是Broker端未收到Ack或消费者没有触发Ack;

2、Broker重启,因为消费者的Ack信息并不是实时保存的,Broker重启后可能会有少量的已经消费的消息会被重复推送;

3、消费出现异常,客户端使用reconsumerLater或negativeAck方式进行确认,这时Broker会重新推送消息。

因此,大家在选用消息中间件的特性时,需要注意相关的场景和限制。避免因为重复消息对业务产生不必要的影响。

one more thing

腾讯云基于 Apache Pulsar 自研的消息中间件--TDMQ Pulsar 版,具备极好的云原生和 Serverless 特性,兼容 Pulsar 的各个组件与概念,具备计算存储分离,灵活扩缩容的底层优势。目前TDMQ Pulsar 版已开始商业化,对Pulsar感兴趣的用户可以进入官网了解详情。

Message deduplication 这里的去重与你想的可能不一样|Apache Pulsar 技术系列的


扫描二维码,在手机上阅读!