常用的绝大多数消息队列,如 RocketMQRabbitMQ 等在消息传输上都只能保证至少传输成功一次 ( At least once ), 但是不能保证只传输成功一次 ( Exactly once ), 重复发送.

# 生产者保证消息可靠性

# 消息发送方式

在之前的文章架构设计之消息 中,我们知道了 生产者生产消息有三种方式:

  • 同步发送。 发送线程同步等待,通过同步检查 Brocker 返回的状态来判断消息是否持久化成功。从而保证消息的可靠性.
  • 异步发送。 发送线程异步等待,可以通过传入的回调函数来判断消息持久化状态。根据状态来判断是否需要重试消息,从而保证消息的可靠性.
  • one way 方式。 这种方式不能保证消息的可靠性。发送端发送完成之后,调用该发送接口后立刻返回,并不返回发送的结果。

除了同步发送和异步发送这两种方式来保证消息可靠性之外.

# 重试机制

在发送消息的过程中, Producer 还有 消息发送的重试机制来提高消息的可靠性.

如果 broker 只有一个节点,则 broker 宕机了,即使 producer 有重试机制,也没用,因此利用多主模式,当某台 broker 宕机了,换一台 broker 进行投递。

当发送端需要发送消息时,如果发送端中缓存了 topic 的路由信息,并包含了消息队列,则直接返回该路由信息,如果没有缓存或没有消息队列,则向 NameServer 查询该 topic 的路由信息,查询到路由消息之后,采用指定的队列选择策略选择相应的 queue 发送消息,默认是采用轮询策略,发送成功则返回,收到异常则根据相应的策略进行重试,可以根据发送端感知到的 Broker 的时延、上次发送失败的 Broker 信息和发送端配置的是否重试不同 Broker 的参数以及发送端设置的最大超时时间等等策略来灵活地实现不同等级的消息发送可靠性保证。重试策略可以有效的保证消息发送成功的概率,最终提高消息发送的可靠性。

总结两点:

  • Producer 选择一个 MessageQueue 发送消息时。默认轮询方式选择 MessageQueue , 如果启用 Broker 端的故障延迟机制,则会判断 MessageQueueBroker 是否可用,才发送消息到该 Message 中。
  • Producer 在使用 Sync 方式发送消息时会重新 N 次, N 可由 Producer 端的配置决定的。

# 发送消息的返回状态

这里再补充一点, 同步方式发送成功,发送的状态是由 SendStatus 这个枚举类决定的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public enum SendStatus {
/**
* 发送成功
*/
SEND_OK,
/**
* 刷盘超时
*/
FLUSH_DISK_TIMEOUT,
/**
* 同步从节点超时
*/
FLUSH_SLAVE_TIMEOUT,
/**
* 从节点 不可用
*/
SLAVE_NOT_AVAILABLE,
}
  • SEND_OK : 表示消息发送成功。但是这个并不代表它是可靠的。 要确保消息不丢失,还需要启动同步 Master 服务器 或者同步刷盘, 即 SYNC_MASTERSYNC_FLUSH
  • FLUSH_DISK_TIMEOUT : 消息发送成功,但是刷盘超时。 此时消息已经进去了内存 ( MessageQueue ) 中,这种情况下,只有服务器宕机,消息才会丢失。
  • FLUSH_SLAVE_TIMEOUT : 消息发送成功,但是同步到 SLAVE 时超时。同样的。此时消息已经进去了内存 ( MessageQueue ) 中,这种情况下,只有服务器宕机,消息才会丢失。
  • SLAVE_NOT_AVAILABLE : 消息发送成功,但是没有 SLAVE 机器可用。

FLUSH_DISK_TIMEOUT , FLUSH_SLAVE_TIMEOUT , SLAVE_NOT_AVAILABLE 都是 Broker 端异常导致的不正常的情况。

# Broker 的消息可靠性

我们知道了 当消息发送成功,写入了 MessageQueue 但是没有持久到磁盘上的时候,就会造成消息的丢失。
在 官方的文档中, 提及到了 五种因为 Broker 端异常导致的消息可能会丢失的情况:

  • Broker 非正常关闭
  • Broker 异常 Crash
  • OS Crash
  • 机器掉电,但是能立即恢复供电情况
  • 机器无法开机(可能是 cpu 、主板、内存等关键设备损坏)
  • 磁盘设备损坏

其中 1-4 属于硬件资源可立即恢复情况, RocketMQ 在这四种情况下能保证消息不丢 (同步刷盘),或者丢失少量数据(异步刷盘)。
5-6 属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。 RocketMQ 在这两种情况下,通过异步复制,可保证 99% 的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。注: RocketMQ3.0 版本开始支持同步双写。

可以看出 Broker 端的消息可靠性主要通过 单机情况下的刷盘策略主从之间数据复制 来保证的。

# 刷盘策略

在一个节点中,一条消息会从 Producer 发送给 BrokerBroker 端把消息存储到 MessageQueue 中,也就是 内存中, 在 RocketMQ 的源码里才能出消息内容的结构是 MappedFile , 然后通过 刷盘机制 (同步刷盘,异步刷盘) 写入到物理磁盘上。完成消息的持久化。

在源码中 刷盘的线程 由 FlushCommitLogService 类表示。 这个类有三个子类:

  • FlushRealTimeService : 当配置为异步刷盘策略 并且没有开启 TransientStorePool 的时候, Broker 会运行一个服务 FlushRealTimeService 用来刷新缓冲区的消息内容到磁盘,这个服务使用一个独立的线程来做刷盘这件事情,默认情况下每隔 500ms 来检查一次是否需要刷盘。
  • CommitRealTimeService : 异步刷盘且开启 TransientStorePool ,使用 CommitRealService
  • GroupCommitService : 同步刷盘使用 GroupCommitService .

TransientStorePool 是短暂的消息存储池。这里直接开辟默认 51G 的直接内存 ByteBuffer ,用来临时存储消息。它还引入了内存锁的机制,避免直接内存的数据被替换到系统中的 Swap 分区中,提高系统存储性能,使 RocketMQ 消息低延迟、高吞吐量。

# 同步刷盘

RocketMQ 使用 GroupCommitService 这个对象来实现 同步刷盘。

消息写入内存的 PageCache 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void run() {
// Broker 不关闭时。
while (!this.isStopped()) {
try {
// 等待 10 毫秒
this.waitForRunning(10);
// 执行一次刷盘
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}

// 正常情况下shutdown,等待10ms 请求到来,然后flush到磁盘。
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}

synchronized (this) {
this.swapRequests();
}

this.doCommit();

CommitLog.log.info(this.getServiceName() + " service end");
}

从代码中看出,当 Broker 正常关闭的时候,还是等待 10ms 的,等待这 10ms 内的请求。 处理完请求,将数据保存到磁盘上才会关闭线程。 细节满满。

# 异步刷盘

消息写入到内存的 PageCache 中,就立刻给客户端返回写操作成功,当 PageCache 中的消息积累到一定的量时,触发一次写操作,或者定时等策略将 PageCache 中的消息写入到磁盘中。这种方式吞吐量大,性能高,但是 PageCache 中的数据可能丢失,不能保证数据绝对的安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void run() {
while (!this.isStopped()) {
// 省略部分代码...
try {
// flushCommitLogTimed 是否定时刷新
if (flushCommitLogTimed) {
// interval为配置的是啊金
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
long begin = System.currentTimeMillis();
// 刷盘、 flushPhysicQueueLeastPages 每次刷新的页数。
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
// 设置检查点
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}

// 正常关闭时,保证在退出之前全部刷新到磁盘
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
}
}

# 主从同步

在集群环境中, 如果一个 Brokermasterslave 时,就需要将 master 上的消息复制到 slave 上,复制的方式有两种:

  • 同步复制: masterslave 均写成功,才返回客户端成功。 master 挂了以后可以保证数据不丢失。但是数据同步复制会增加数据延迟风险,降低吞吐量。简单说一下,同步复制其实是 在 CommitLog 将消息 刷盘之后,同步的方式将消息同步给 Slave 节点.
  • 异步复制: master 写成功,返回客户端成功。 拥有较低的延迟和较高的吞吐量,但是当 master 出现故障后,有可能造成数据丢失。 异步复制的方式,其实是 Slave 节点中会启动一个 HAService 线程,定时的去同步 Master 节点的数据,延时大概在毫秒级。

后面我们会有一篇文章专门介绍 RocketMQ 主从复制的实现细节。这里就不多做介绍了。

# 消费者的消息可靠性

RocketMQ 在消费端 实现了 At least Once 机制,来保证消息的可靠性消费.

什么是 at lease once 呢?

consumer 会把消息先 pull 到本地,消费完成之后,才向 Broker 端发送 ack .

RocketMQ 有三种措施来实现可靠性.

# 消费重试

消费者从 RocketMQ 拉取到消息之后,需要返回消费成功来表示业务方正常消费完成。因此只有返回 CONSUME_SUCCESS 才算消费完成,如果返回 CONSUME_LATER 则会按照不同的 messageDelayLevel 时间进行再次消费,时间分级从秒到小时,最长时间为 2 个小时后再次进行消费重试,如果消费满 16 次之后还是未能消费成功,则不再重试,会将消息发送到死信队列,从而保证消息存储的可靠性。
这一措施主要是通过 延时消息队列来实现。

# 死信队列

未能成功消费的消息,消息队列并不会立刻将消息丢弃,而是将消息发送到死信队列,其名称是在原队列名称前加 %DLQ% ,如果消息最终进入了死信队列,则可以通过 RocketMQ 提供的相关接口从死信队列获取到相应的消息,保证了消息消费的可靠性。

# 消息回溯

回溯消费是指 Consumer 已经消费成功的消息,或者之前消费业务逻辑有问题,现在需要重新消费。要支持此功能,则 Broker 存储端在向 Consumer 消费端投递成功消息后,消息仍然需要保留。重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据。 RocketMQ Broker 提供了一种机制,可以按照时间维度来回退消费进度,这样就可以保证只要发送成功的消息,只要消息没有过期,消息始终是可以消费到的。

# 最后

期望和你一起遇见更好的自己