最近有点浮躁,玩了一个周,没有写文章了。今天继续搞 RocketMQ
的事务实现。
# 往期文章
# 分布式事务
分布式事务,是指事务的发起者、资源及资源管理器和事务协调者分别位于分布式系统的不同节点之上。当然,分布式事务也是一个老生常谈的话题,尤其是面试中。常用的分布式事务解决方案有很多,比如:两阶段提交, TCC
, SAGA
等等。
本篇文章,我主要看看 RocketMQ
的事务实现。
# 从一个例子开始
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setTransactionListener(transactionListener); producer.start();
try { Message msg = new Message("TopicTest1234", "tag", "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); }
Thread.sleep(1000L * 60 * 5);
producer.shutdown();
|
这里我们介绍下事务消息的常用的 API
吧。
producer.setTransactionListener()
. 设置检测事务消息的回调。当定时任务执行检测到有事务消息时就会调用 checkLocalTransaction
方法。
producer.setExecutorService(ExecutorService executorService)
: 设置检查事务的线程池。默认使用: ThreadPoolExecutor
, 每分钟执行一次。
producer.sendMessageInTransaction(final Message msg, final Object arg)
: 发送事务消息。
从上面的这个案例来看, 发送事务消息 和 普通消息的发送并没有什么太大的不同,只是需要创建 TransactionMQProducer
和 使用 sendMessageInTransaction
发送消息就可以了。
其中 TransactionMQProducer
继承了 DefaultMQProducer
, 这个并不陌生了,我们在 RocketMQ 架构设计之启动这篇文章中,已经看过很多次了。而在启动时,案例中的 producer.start();
就是直接调用父类 DefaultMQProducer
的 start()
方法, DefaultMQProducer
会通过 defaultMQProducerImpl.start()
完成整个生产者部分的启动。这里就不多赘述了。
我们来看最精彩的发送消息的过程.
producer.sendMessageInTransaction(msg, null)
我们一路追踪 sendMessageInTransaction
方法,很容易就看到了: DefaultMQProducerImpl#sendMessageInTransaction
这个方法主要有两个部分,如下图:
第一部分:发送带有 事务标识 ( TRAN_MSG
) 的消息.
第二部分:结束事务。 主要有提交 offset
, 和 同步给 Broker
是 commit
还是 rollback
.
所以, RocketMQ
使用的分布式事务方案是:二阶段提交 (XA)
# 事务消息生产过程
发送消息的过程,在上篇文章中我们也已经说过了: RocketMQ 系列 - 架构设计之消息。 可以总结为 3
步,根据 topic
获取路由元数据, 选择适合的 MessageQueue
进行存储, 将 Message
通过 Netty
发送给 Broker
.
特别需要注意的时候,在发送事务消息的时候,会给消息添加事务标志。 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
这里是干什么用的呢?接着往下看
TransactionalMessageService
和 MessageStore
这两种存储方式有什么不同呢?
其实存储并没有什么不同,而是存储的消息是不一样的,我们继续往下看:
TransactionalMessageService
使用的存储和 普通消息都是一样的,都是在启动时候创建的 MessageStore
. 不同的是,事务消息存储的 经过 parseHalfMessageInner
处理之后的 消息.
可以看到,处理之后消息, topic
和 queueId
都被设置成了默认的 RMQ_SYS_TRANS_OP_HALF_TOPIC
和 0。
所以:事务消息会被存储到 默认的 RMQ_SYS_TRANS_OP_HALF_TOPIC
中,并根据是否为延迟消息和延迟的等级存储到不同的 MessageQueue
中。
开源 RocketMQ
支持延迟消息,但是不支持秒级精度。默认支持 18
个 level
的延迟消息,这是通过 broker
端的 messageDelayLevel
配置项确定的.
具体延迟队列的实现,我们会在下一篇文章中详细的分析一下 RocketMQ
延时消息队列的实现。
1
| messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
|
看完了,事务的第一个阶段,发送事务消息。 接下来,我们继续看第二个阶段。
# 结束事务: commit or callback.
结束事务主要是做两件事情:
实现逻辑如下图:
主要的流程就是:封装 RequestHeader
, 会设置 事务 id
, 事务的状态。 然后设置本次请求的 RequestCode
为 END_TRANSACTION(37)
.
在 Broker
端 接收到对应的请求之后,会根据 事务的状态执行提交事务或者回滚操作。如右上图。
我们从两方面来分析一下:
# 提交事务
可以看到 根据 offset
查到了消息,然后根据消息的状态封装成 MessageExtBrokerInner
对象。通过通过 sendFianlMessage
方法完成将消息写入到 消息原来的队列中 (不是默认的事务队列了). 最后删除 prepare
消息。如上图。
# 回滚事务
回滚事务就比较简单了,检查消息的状态,查看直接删除掉 prepare
消息就 OK 了。
以上就是 整个事务消息的生产过程了。
# 事务回查
我们在介绍 TranscationMQProducer
的 api
时候,有一个 setExecutorService(ExecutorService executorService)
方法。 方法的作用是:设置检查事务的线程池。 这个线程池的作用就是 事务回查。
我们追踪这个方法的调用方,可以发现,调用是 NettyServer
调用的,那是谁请求的呢?我们跟踪这个请求码发现在 Broker2Client 的类中发现了这个 请求码。 跟踪这个方法,查看其调用方,最终终于在 TransactionalMessageCheckService
发现了新大陆。
当我看到 public class TransactionalMessageCheckService extends ServiceThread
的时候,我就不禁想起了, Broker 在启动的时候会初始化事务,这是会创建一个 TransactionalMessageCheckService
对象.
我们从 Broker
端开始还原一下这个事务回查的场景。
在 Broker 端进行启动时会初识化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
private void initialTransaction() { this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class); if (null == this.transactionalMessageService) { this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore())); log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName()); }
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class); if (null == this.transactionalMessageCheckListener) { this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener(); log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName()); } this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); }
|
这里初始化了 transactionalMessageCheckService
和 transactionalMessageCheckListener
. 然后在 start
方法中通过 startProcessorByHa
启动了 transactionalMessageCheckService
线程
1 2 3 4 5 6
| if (!messageStoreConfig.isEnableDLegerCommitLog()) { startProcessorByHa(messageStoreConfig.getBrokerRole()); handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); this.registerBrokerAll(true, false, true); }
|
这样 transactionalMessageCheckService
就启动起来了。运行代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Override public void run() { log.info("Start transaction check service thread!"); long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval(); while (!this.isStopped()) { this.waitForRunning(checkInterval); } log.info("End transaction check service thread!"); }
@Override protected void onWaitEnd() { long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }
|
在回查的过程中, RocketMQ
会获取 事务 Topic 下的所有 MessageQueue.
1 2 3 4 5 6 7
| String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) { log.warn("The queue of topic is empty :" + topic); return; }
|
然后遍历所有的 MessageQueue
,按个处理所有队列里的待回查的消息。怎么判断消息需要回查呢?前面说过了,通过 Op队列
判断,因此还需要定位到 HalfQueue
对应的 OpQueue
,以及它们的 ConsumeQueue
偏移量, 获取到 halfMessage
, 然后判断是否需要检测事务状态,如果需要检测则会调用 AbstractTransactionalMessageCheckListener
的 resolveHalfMsg
方法,即会发送 检测的请求 给 Producer
.
1
| brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
|
checkProducerTransactionState
方法的实现则是: 向 Producer 端发送 请求码为 39 ( RequestCode.CHECK_TRANSACTION_STATE
) 的请求.
如下。
1 2 3 4 5 6 7 8 9
| RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); request.setBody(MessageDecoder.encode(messageExt, false)); try { this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); } catch (Exception e) { log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}", group, messageExt.getMsgId(), e.toString()); }
|
这样就触发了 Producer
的 事务回查。我们接着看、
在 ClientRemotingProcessor
中可以看到 CHECK_TRANSACTION_STATE
的处理方法。
1 2 3
| case RequestCode.CHECK_TRANSACTION_STATE: return this.checkTransactionState(ctx, request);
|
然后获取到 Producer
实例,通过 producer
开始回查事务.
1 2 3 4 5 6 7 8 9
| MQProducerInner producer = this.mqClientFactory.selectProducer(group); if (producer != null) { final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); producer.checkTransactionState(addr, messageExt, requestHeader); } else { log.debug("checkTransactionState, pick producer by group[{}] failed", group); }
|
继续跟踪这个 checkTransactionState
便看到了我们一开始说的这个事务回查了。
其中 checkExecutor
就是一个线程池了,我们重点来看 回查的任务,即 图中的 Runnable
的实现。
首先会 获取消息的事务状态,然后将消息的事务状态发送给 Broker。 这样就完成了 事务回查。
如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| Runnable request = new Runnable() { private final String brokerAddr = addr; private final MessageExt message = msg; private final CheckTransactionStateRequestHeader checkRequestHeader = header; private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override public void run() { TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); TransactionListener transactionListener = getCheckListener(); LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable exception = null; if (transactionCheckListener != null) { localTransactionState = transactionCheckListener.checkLocalTransactionState(message); } else if (transactionListener != null) { log.debug("Used new check API in transaction message"); localTransactionState = transactionListener.checkLocalTransaction(message); }
this.processTransactionState(localTransactionState, group, exception); }
private void processTransactionState( final LocalTransactionState localTransactionState, final String producerGroup, final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); thisHeader.setProducerGroup(producerGroup); thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); thisHeader.swetFromTransactionCheck(true);
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (uniqueKey == null) { uniqueKey = message.getMsgId(); } thisHeader.setMsgId(uniqueKey); thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); switch (localTransactionState) { case COMMIT_MESSAGE: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); log.warn("when broker check, client rollback this transaction, {}", thisHeader); break; case UNKNOW: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); log.warn("when broker check, client does not know this transaction state, {}", thisHeader); break; default: break; }
doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000);
} };
|
以上就是整个事务回查的过程了。这里我再来梳理一下执行的时序图。
最后我们想一下,为什么 事务回查呢?
Half
消息写入成功,可能因为网络,服务重启等等原因没有收到 Producer
的事务状态请求,这是, Broker
就会主动放弃事务回查给 Producer
. 来决定该事务消息是提交还是回滚。为了避免消息被无限次的回查,RocketMQ 通过 transactionCheckMax 属性设置消息回查的最大次数,默认是 15 次。
# 总结
RocketMQ
的事务消息是使用 二阶段提交 ( XA
) 的这种分布式事务解决方案。
RocketMQ
第一阶段会发送带有 事务标志的消息给 Broker
. Broker
会把消息存储到固定的 Topic 中,并根据延迟级别存储到不同的 queue
中。
RocketMQ
第二阶段是结束事务,会提交 offset
, 根据消息的事务状态提交或者回滚事务。
RocketMQ
通过改写 Topic
和 queueId
,将消息暂时存储到的一个对 Consumer
不可见的队列中,然后等待 Producer
执行本地事务,提交事务装填后再决定将 Half
消息 commit
或者 rollback
.
# 最后
期望和你一起遇见更好的自己
扫码或搜索:方家小白
发送 290992
即可立即永久解锁本站全部文章