最近有点浮躁,玩了一个周,没有写文章了。今天继续搞 RocketMQ 的事务实现。

# 往期文章

# 分布式事务

分布式事务,是指事务的发起者、资源及资源管理器和事务协调者分别位于分布式系统的不同节点之上。当然,分布式事务也是一个老生常谈的话题,尤其是面试中。常用的分布式事务解决方案有很多,比如:两阶段提交, TCCSAGA 等等。

本篇文章,我主要看看 RocketMQ 的事务实现。

# 从一个例子开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 事务消息的监听器。
TransactionListener transactionListener = new TransactionListenerImpl();
// 创建一个 事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");

producer.setTransactionListener(transactionListener);
producer.start();


try {
Message msg =
new Message("TopicTest1234", "tag", "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}

Thread.sleep(1000L * 60 * 5);

producer.shutdown();

这里我们介绍下事务消息的常用的 API 吧。

  • producer.setTransactionListener() . 设置检测事务消息的回调。当定时任务执行检测到有事务消息时就会调用 checkLocalTransaction 方法。
  • producer.setExecutorService(ExecutorService executorService) : 设置检查事务的线程池。默认使用: ThreadPoolExecutor , 每分钟执行一次。
  • producer.sendMessageInTransaction(final Message msg, final Object arg) : 发送事务消息。

从上面的这个案例来看, 发送事务消息 和 普通消息的发送并没有什么太大的不同,只是需要创建 TransactionMQProducer 和 使用 sendMessageInTransaction 发送消息就可以了。

其中 TransactionMQProducer 继承了 DefaultMQProducer , 这个并不陌生了,我们在 RocketMQ 架构设计之启动这篇文章中,已经看过很多次了。而在启动时,案例中的 producer.start(); 就是直接调用父类 DefaultMQProducerstart() 方法, DefaultMQProducer 会通过 defaultMQProducerImpl.start() 完成整个生产者部分的启动。这里就不多赘述了。

我们来看最精彩的发送消息的过程.

producer.sendMessageInTransaction(msg, null)

我们一路追踪 sendMessageInTransaction 方法,很容易就看到了: DefaultMQProducerImpl#sendMessageInTransaction

这个方法主要有两个部分,如下图:

第一部分:发送带有 事务标识 ( TRAN_MSG ) 的消息.
第二部分:结束事务。 主要有提交 offset , 和 同步给 Brokercommit 还是 rollback .

所以, RocketMQ 使用的分布式事务方案是:二阶段提交 (XA)

# 事务消息生产过程

发送消息的过程,在上篇文章中我们也已经说过了: RocketMQ 系列 - 架构设计之消息。 可以总结为 3 步,根据 topic 获取路由元数据, 选择适合的 MessageQueue 进行存储, 将 Message 通过 Netty 发送给 Broker .

特别需要注意的时候,在发送事务消息的时候,会给消息添加事务标志。 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); 这里是干什么用的呢?接着往下看

TransactionalMessageServiceMessageStore 这两种存储方式有什么不同呢?

其实存储并没有什么不同,而是存储的消息是不一样的,我们继续往下看:

TransactionalMessageService 使用的存储和 普通消息都是一样的,都是在启动时候创建的 MessageStore . 不同的是,事务消息存储的 经过 parseHalfMessageInner 处理之后的 消息.

可以看到,处理之后消息, topicqueueId 都被设置成了默认的 RMQ_SYS_TRANS_OP_HALF_TOPIC 和 0。

所以:事务消息会被存储到 默认的 RMQ_SYS_TRANS_OP_HALF_TOPIC 中,并根据是否为延迟消息和延迟的等级存储到不同的 MessageQueue 中。

开源 RocketMQ 支持延迟消息,但是不支持秒级精度。默认支持 18level 的延迟消息,这是通过 broker 端的 messageDelayLevel 配置项确定的.

具体延迟队列的实现,我们会在下一篇文章中详细的分析一下 RocketMQ 延时消息队列的实现。

1
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

看完了,事务的第一个阶段,发送事务消息。 接下来,我们继续看第二个阶段。

# 结束事务: commit or callback.

结束事务主要是做两件事情:

  • 提交消息的偏移量
  • 发送提交事务还是回滚事务。

实现逻辑如下图:

主要的流程就是:封装 RequestHeader , 会设置 事务 id , 事务的状态。 然后设置本次请求的 RequestCodeEND_TRANSACTION(37) .

Broker 端 接收到对应的请求之后,会根据 事务的状态执行提交事务或者回滚操作。如右上图。

我们从两方面来分析一下:

# 提交事务

可以看到 根据 offset 查到了消息,然后根据消息的状态封装成 MessageExtBrokerInner 对象。通过通过 sendFianlMessage 方法完成将消息写入到 消息原来的队列中 (不是默认的事务队列了). 最后删除 prepare 消息。如上图。

# 回滚事务

回滚事务就比较简单了,检查消息的状态,查看直接删除掉 prepare 消息就 OK 了。

以上就是 整个事务消息的生产过程了。

# 事务回查

我们在介绍 TranscationMQProducerapi 时候,有一个 setExecutorService(ExecutorService executorService) 方法。 方法的作用是:设置检查事务的线程池。 这个线程池的作用就是 事务回查。

我们追踪这个方法的调用方,可以发现,调用是 NettyServer 调用的,那是谁请求的呢?我们跟踪这个请求码发现在 Broker2Client 的类中发现了这个 请求码。 跟踪这个方法,查看其调用方,最终终于在 TransactionalMessageCheckService 发现了新大陆。
当我看到 public class TransactionalMessageCheckService extends ServiceThread 的时候,我就不禁想起了, Broker 在启动的时候会初始化事务,这是会创建一个 TransactionalMessageCheckService 对象.

我们从 Broker 端开始还原一下这个事务回查的场景。

在 Broker 端进行启动时会初识化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 初始化事务逻辑
*/
private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
/**
* Broker 检查事务状态的listner.。 Producer 写入 half 消息,但是没有收到 finalMessage .回调。
*/
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);

/**
* Broker 检查事务状态的线程。 Producer 写入 half 消息,但是没有收到 finalMessage.
*/
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}

这里初始化了 transactionalMessageCheckServicetransactionalMessageCheckListener . 然后在 start 方法中通过 startProcessorByHa 启动了 transactionalMessageCheckService 线程

1
2
3
4
5
6
// Broker的容灾处理
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}

这样 transactionalMessageCheckService 就启动起来了。运行代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
// 限制只有一个本类可以运行,最终会调用 onWaitEnd()
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}


@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 回查的最大次数
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);

// 开始回查
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

在回查的过程中, RocketMQ 会获取 事务 Topic 下的所有 MessageQueue.

1
2
3
4
5
6
7
// 获取 事务topic中下的所有MessageQueue.
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}

然后遍历所有的 MessageQueue ,按个处理所有队列里的待回查的消息。怎么判断消息需要回查呢?前面说过了,通过 Op队列 判断,因此还需要定位到 HalfQueue 对应的 OpQueue ,以及它们的 ConsumeQueue 偏移量, 获取到 halfMessage , 然后判断是否需要检测事务状态,如果需要检测则会调用 AbstractTransactionalMessageCheckListenerresolveHalfMsg 方法,即会发送 检测的请求 给 Producer .

1
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);

checkProducerTransactionState 方法的实现则是: 向 Producer 端发送 请求码为 39 ( RequestCode.CHECK_TRANSACTION_STATE ) 的请求.

如下。

1
2
3
4
5
6
7
8
9
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.setBody(MessageDecoder.encode(messageExt, false));
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",
group, messageExt.getMsgId(), e.toString());
}

这样就触发了 Producer 的 事务回查。我们接着看、

ClientRemotingProcessor 中可以看到 CHECK_TRANSACTION_STATE 的处理方法。

1
2
3
// 检查事务状态
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);

然后获取到 Producer 实例,通过 producer 开始回查事务.

1
2
3
4
5
6
7
8
9
// 查询到对应的 Producer实例
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
// 开始回查。
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}

继续跟踪这个 checkTransactionState 便看到了我们一开始说的这个事务回查了。

其中 checkExecutor 就是一个线程池了,我们重点来看 回查的任务,即 图中的 Runnable 的实现。

首先会 获取消息的事务状态,然后将消息的事务状态发送给 Broker。 这样就完成了 事务回查。

如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

@Override
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
// 1、查看 消息事务的状态
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
}

// 2、处理消息事务, 把事务的状态发送给 Broker。
this.processTransactionState(localTransactionState, group, exception);
}

/**
* 处理消息事务。
*
* 将消息事务的状态发送给 Broker
*
* @param localTransactionState
* @param producerGroup
* @param exception
*/
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {

// 封装RequestHeader
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.swetFromTransactionCheck(true);

String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
// 设置 事务的状态.
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}

doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);

// 向Broker端发送请求。
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000);

}
};

以上就是整个事务回查的过程了。这里我再来梳理一下执行的时序图。

最后我们想一下,为什么 事务回查呢?
Half 消息写入成功,可能因为网络,服务重启等等原因没有收到 Producer 的事务状态请求,这是, Broker 就会主动放弃事务回查给 Producer . 来决定该事务消息是提交还是回滚。为了避免消息被无限次的回查,RocketMQ 通过 transactionCheckMax 属性设置消息回查的最大次数,默认是 15 次。

# 总结

  • RocketMQ 的事务消息是使用 二阶段提交 ( XA ) 的这种分布式事务解决方案。
  • RocketMQ 第一阶段会发送带有 事务标志的消息给 Broker . Broker 会把消息存储到固定的 Topic 中,并根据延迟级别存储到不同的 queue 中。
  • RocketMQ 第二阶段是结束事务,会提交 offset , 根据消息的事务状态提交或者回滚事务。

RocketMQ 通过改写 TopicqueueId ,将消息暂时存储到的一个对 Consumer 不可见的队列中,然后等待 Producer 执行本地事务,提交事务装填后再决定将 Half 消息 commit 或者 rollback .

# 最后

期望和你一起遇见更好的自己