Yujun's Blog

RocketMQ的事务消息

July 20, 2025 (3mo ago)ReadCode

RocketMQ的事务消息

RocketMQ的事务消息,基本的实现思路都应该知道:半消息+回查机制。今天深入到代码以及日常API的使用来理解下。

分布式事务问题。为了解决一半成功,一半失败的局面,大家想出了很多办法,比如TCC、Saga,还有我们今天要聊的RocketMQ的事务消息。

核心思想

RocketMQ的事务消息,本质上是对二阶段提交(2PC)思想的一种巧妙实现。

第一阶段:发送“半消息”(Prepare Message)

当你要执行一个本地事务(比如创建订单),同时要发送一个消息时,你不会直接把消息发出去。你会先发一个“半消息”给RocketMQ的Broker。这个“半消息”很特殊,它被Broker接收了,也存储了,但对下游的消费者来说,是完全“隐身”的,消费者根本不知道它的存在。

第二阶段:执行本地事务 & 提交/回滚

半消息发送成功后,你就可以安心地去执行本地事务,比如往订单表里INSERT一条数据。

  • 如果本地事务成功了:就给Broker再发一个COMMIT指令。Broker收到后,会把那个半消息标记为可投递,这时候消费者才能真正地消费到它。
  • 如果本地事务失败了:就给Broker发一个ROLLBACK指令。Broker会直接把那个半消息给删了,就当无事发生。

事务状态回查

最关键的问题来了:如果在执行完本地事务,准备发送COMMIT指令的时候,应用突然崩了,或者断网了,怎么办?这时候Broker那边就只有一个半消息,它不知道该提交还是该回滚,就这么一直挂着,就会出现问题了。

所以,RocketMQ设计了:事务状态回回查机制。

对于那些长时间没有收到COMMITROLLBACK指令的半消息,Broker会不耐烦地反过来问这个生产者:之前你发的那个消息(ID是xxx)对应的本地事务,到底成功了没。

所以,在我们的生产者应用里,必须实现一个回查监听器。这个监听器唯一的任务就是,当Broker来问的时候,你去查一下本地事务的最终状态,然后明确地告诉Broker,是该COMMIT还是ROLLBACK。有了这层保障,数据一致性才算是真正闭环。

上手实战写代码

理论说完了,来试试怎么在自己的应用中使用。要用事务消息,代码关键就是实现TransactionListener接口。

首先,需要一个特殊的TransactionMQProducer

// 使用事务消息,必须用 TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("my-tx-producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");

// 给生产者设置一个线程池,用来处理Broker的回查请求
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, 
    new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("client-transaction-msg-check-thread");
        return thread;
    }
});
producer.setExecutorService(executorService);

// 设置核心的事务监听器
producer.setTransactionListener(new OrderTransactionListener());
producer.start();

重点就是最后一行setTransactionListener。这个OrderTransactionListener就是我们需要自己实现的,它有两个核心方法。

// com.alibaba.rocketmq.client.producer.TransactionListener
public class OrderTransactionListener implements TransactionListener {

    // 存放本地事务的执行状态,key是事务ID,value是状态
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 当“半消息”发送成功后,这个方法会被回调,用来执行本地事务
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String transactionId = msg.getTransactionId();
        localTrans.put(transactionId, 0); // 初始状态:未知

        try {
            // ===============================================
            //  在这里执行我们的本地事务,比如创建订单、操作数据库等
            //  Business logic (e.g., create order in database)
            System.out.println("正在执行本地事务...");
            Thread.sleep(120000); // 模拟业务耗时
            System.out.println("本地事务执行成功!");
            // ===============================================
            
            localTrans.put(transactionId, 1); // 状态:成功
            // 本地事务成功,告诉Broker可以提交消息了
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("本地事务执行失败");
            localTrans.put(transactionId, 2); // 状态:失败
            // 本地事务失败,告诉Broker回滚消息
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    /**
     * 当Broker发现一个半消息长期没被处理,会回调这个方法来查询本地事务状态
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String transactionId = msg.getTransactionId();
        Integer status = localTrans.get(transactionId);
        
        System.out.println("Broker回查事务状态, transactionId: " + transactionId + ", status: " + status);

        if (null != status) {
            switch (status) {
                case 0:
                    // 还在处理中,告诉Broker等会儿再来问
                    return LocalTransactionState.UNKNOW;
                case 1:
                    // 确认成功,提交
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    // 确认失败,回滚
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        // 如果连事务ID都找不到了,说明本地事务可能因为某些原因就没执行,直接回滚
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

核心就是实现TransactionListener接口的两个方法:executeLocalTransaction负责执行业务,checkLocalTransaction负责兜底。

但是注意,在实际项目中,checkLocalTransaction方法里,不应该像这样用一个内存里的Map来存状态,这里只是帮助理解。而应该去查数据库里的事务状态表,这样才能保证应用重启后状态不丢失。

最后,发送消息的方式也变了,要用sendMessageInTransaction。需要使用专门的 TransactionMQProducer,并调用 sendMessageInTransaction() 方法来发送消息。

总结一下,自己实现就是这四点要注意:Listener实现接口的两个方法。需要使用专门的生产者去调用专门的方法去发送消息。

// 发送事务消息
Message msg = new Message("TransactionTopic", "tags", "keys", 
        "Hello RocketMQ".getBytes(StandardCharsets.UTF_8));
producer.sendMessageInTransaction(msg, null);

这一切是如何发生的

来看看源码。关键逻辑在DefaultMQProducerImplsendMessageInTransaction方法里。

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
    TransactionListener transactionListener = this.getTransactionListener();
    // ... 省略各种校验 ...

    SendResult sendResult = null;
    // 1. 设置消息类型为“半消息”
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRAN_MSG, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, 
            this.defaultMQProducer.getProducerGroup());
    
    try {
        // 2. 发送“半消息”
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                // ...
                // 3. “半消息”发送成功后,回调我们自己实现的 executeLocalTransaction 方法
                localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }
            } catch (Throwable e) {
                localException = e;
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        break;
        // ...
    }

    try {
        // 4. 根据 executeLocalTransaction 的返回结果,发送 COMMIT 或 ROLLBACK 指令
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        // ...
    }

    // ... 构造并返回结果 ...
}

源码清清楚楚地展示了整个流程:

  • 先给消息打上TRAN_MSG的标记。
  • 调用send方法,把这个“半消息”发出去。
  • send成功后,立刻回调我们写的executeLocalTransaction方法。
  • 最后,根据executeLocalTransaction的返回值(COMMIT, ROLLBACKUNKNOW),调用endTransaction方法去通知Broker做最终决定。

整个过程就在生产者的一个方法调用里,逻辑很清晰。

结语

RocketMQ的事务消息,当然,它也不是银弹。

需要仔细设计checkLocalTransaction逻辑,保证它的幂等和可靠性。消费端也必须做好幂等消费,因为在某些极端情况下(比如COMMIT成功了,但ACK没返回,Broker触发了回查),消息还是有可能会被重复投递的。

技术就是这样,没有完美的方案,只有最适合的场景。

Comments