11_RocketMQ分布式事务代码演示与源码分析
2025/10/24大约 4 分钟
生产者
package org.apache.rocketmq.example.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
/**
* A系统
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
//创建线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//设置生产者回查线程池
producer.setExecutorService(executorService);
//生产者设置监听器
producer.setTransactionListener(transactionListener);
//启动消息生产者
producer.start();
//1、半事务的发送
try {
Message msg =
new Message("TransactionTopic", null, ("A向B系统转100块钱 ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
System.out.println(sendResult.getSendStatus()+"-"+df.format(new Date()));//半事务消息是否成功
} catch (MQClientException | UnsupportedEncodingException e) {
//todo 回滚rollback
e.printStackTrace();
}
//2、半事务的发送成功
//一些长时间等待的业务(比如输入密码,确认等操作):需要通过事务回查来处理
for (int i = 0; i < 1000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}消费者
package org.apache.rocketmq.example.transaction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* 事务消息-消费者 B
*/
public class TranscationComuser {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TranscationComsuer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TransactionTopic", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
//todo 开启事务
for(MessageExt msg : msgs) {
//todo 执行本地事务 update B...(幂等性)
System.out.println("update B ... where transactionId:"+msg.getTransactionId());
//todo 本地事务成功
System.out.println("commit:"+msg.getTransactionId());
System.out.println("执行本地事务成功,确认消息");
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("执行本地事务失败,重试消费,尽量确保B处理成功");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}监听器
package org.apache.rocketmq.example.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.text.SimpleDateFormat;
import java.util.Date;
public class TransactionListenerImpl implements TransactionListener {
//执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
//todo 执行本地事务 update A...
System.out.println("update A ... where transactionId:"+msg.getTransactionId() +":"+df.format(new Date()));
//System.out.println("commit");
//todo 情况1:本地事务成功
//return LocalTransactionState.COMMIT_MESSAGE;
//todo 情况2:本地事务失败
//System.out.println("rollback");
//return LocalTransactionState.ROLLBACK_MESSAGE;
//todo 情况3:业务复杂,还处于中间过程或者依赖其他操作的返回结果,就是unknow
System.out.println("业务比较长,还没有处理完,不知道是成功还是失败!");
return LocalTransactionState.UNKNOW;
}
//事务回查 默认是60s,一分钟检查一次
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//打印每次回查的时间
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
System.out.println("checkLocalTransaction:"+df.format(new Date()));// new Date()为获取当前系统时间
//todo 情况3.1:业务回查成功!
System.out.println("业务回查:执行本地事务成功,确认消息");
return LocalTransactionState.COMMIT_MESSAGE;
//todo 情况3.2:业务回查回滚!
// System.out.println("业务回查:执行本地事务失败,删除消息");
// return LocalTransactionState.ROLLBACK_MESSAGE;
//todo 情况3.3:业务回查还是UNKNOW!
//System.out.println("业务比较长,还没有处理完,不知道是成功还是失败!");
//return LocalTransactionState.UNKNOW;
}
}测试
先启动消费者、再启动生产者进行观察
分布式事务源码分析
从分布式事务的流程上,我们分析源码,可以从消息发送,确认/回滚 ,回查三个方面。

消息发送源码分析
Producer



Broker
RocketMQ使用Netty处理网络,broker收到消息写入的请求就会进入SendMessageProcessor类中processRequest方法。

最终进入DefaultMessageStore类中asyncPutMessage方法进行消息的存储



结合图同时结合代码,我们可以看到,在事务消息发送时,消息实际存储的主题是一个系统主题:RMQ_SYS_TRANS_HALF_TOPIC
同时消息中保存着消息的原有主题相关的信息与队列

确认/回滚源码分析
Producer
DefaultMQProducerImpl类sendMessageInTransaction方法



Broker

EndTransactionProcessor类



回查源码分析
Producer
事务回查中,Producer是服务端,所以需要注册服务处理


DefaultMQProducerImpl类checkTransactionState方法


DefaultMQProducerImpl类processTransactionState方法




Broker
在Broker启动的时候,是要作为客户端,定期的访问客户端做事务回查。










事务回查是Broker发起的一次定时的网络调用(每隔60s),所以事务回查在客户端启动的时候第一次不一定是60s的间隔,一般会小于60s(因为事务回查是broker发起的,并不是client端定时发起)