RocketMQ分布式事务消息 代码

1. 分布式事务消息介绍

简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。

本质上来说,分布式事务就是为了保证不同数据库的数据一致性。

2. RocketMQ4.X分布式事务消息架构讲解

  • RocketMQ事务消息:
    RocketMQ提供分布式事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致性

  • 半消息HalfMessage:
    暂不能投递的消息(暂不能消费),Producer已经将消息成功发送Broker端,但是服务端未收到生产者对消息的二次确认,此时该消息被标记成"暂不能投递状态",处于该种状态下的消息即半消息

  • 消息回查:
    由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该过程即消息回查。

  • 整体交互流程:


    在这里插入图片描述

    1. Producer向broker端发送消息
    2. 服务端将消息持久化成功之后,向发送方ACK确认消息已经发送成功,此时消息为半消息
    3. 发送方开始执行本地事务逻辑
    4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit或者Rollback),服务端收到Commit状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半消息,订阅方将不会接受该消息
    5. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
    6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
    7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照4对半消息进行操作

  • RocketMQ事务消息的状态:
    1. COMMIT_MESSAGE: 提交事务消息,消费者可以消费此消息
    2. ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能消费
    3. UNKNOW:Broker需要回查确认消息的状态

  • 关于事务消息的消费:
    事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息能被consumer收到(消息重试机制,最后也存在consumer消费失败的情况,这种情况出现的概率极低,而且消费端消费失败使用之前博客中讲解的失败重试机制)

3. 代码实现

3.1 Producer代码

package com.pj.boot.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;

@Component
public class TransacationProducer {

    private String producerGroup = "trac_producer_group";
    // 事务监听器,执行本地事务
    TransactionListener transactionListener = new TransactionListenerImpl();

    TransactionMQProducer producer = null;

    // 创建自定义线程池
    /**
     * @param corePoolSize   池中所保存的核心线程数
     * @param maximumPoolSize   池中允许的最大线程池
     * @param keepActiveTime    非核心线程空闲等待新任务的最长时间
     * @param timeunit          keepActiveTime参数的时间单位
     * @param blockingqueue     队列任务
     */
    private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
            TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory()
    {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });

    public TransacationProducer(){

        producer = new TransactionMQProducer(producerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");

        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        start();
    }

    public TransactionMQProducer getProducer(){
        return this.producer;
    }

    /**
     * 对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown(){
        this.producer.shutdown();
    }
}

class TransactionListenerImpl implements TransactionListener {
    /**
     * 半消息发送成功触发此方法来执行本地事务
     * @param message  消息
     * @param o 发送消息时传递的参数
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("====executeLocalTransaction=======");
        String body = new String(message.getBody());
        String key = message.getKeys();
        String transactionId = message.getTransactionId();
        System.out.println("transactionId="+transactionId+", key="+key+", body="+body);
        // 执行本地事务begin TODO

        // 执行本地事务end TODO

        int status = Integer.parseInt(o.toString());

        //二次确认消息,然后消费者可以消费
        if(status == 1){
            return LocalTransactionState.COMMIT_MESSAGE;
        }

        //回滚消息,broker端会删除半消息
        if(status == 2){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }

        //broker端会进行回查消息,再或者什么都不响应
        if(status == 3){
            return LocalTransactionState.UNKNOW;
        }
        return null;
    }

    /**
     * 当没有响应时准备(半)消息。broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态。broker回查本地事务
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("====checkLocalTransaction=======");
        String body = new String(messageExt.getBody());
        String key = messageExt.getKeys();
        String transactionId = messageExt.getTransactionId();
        System.out.println("transactionId="+transactionId+", key="+key+", body="+body);

        //要么commit 要么rollback

        //可以根据key去检查本地事务消息是否完成

        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

3.2 Consumer代码

package com.pj.boot.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

@Component
public class PayConsumer {

    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_consumer_group";

    public  PayConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.subscribe(JmsConfig.TOPIC, "*");

        consumer.registerMessageListener( new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    Message msg = msgs.get(0);
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                    String topic = msg.getTopic();
                    String body = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
                    System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);

                    // 告诉broker消息消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (UnsupportedEncodingException e) {

                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        consumer.start();
        System.out.println("consumer start ...");
    }

}

3.3 发送消息

 // 生产时建议再加一个key值
        Message message = new Message(JmsConfig.TOPIC,tag, ("hello xdclass rocketmq = "+tag).getBytes() );
        /**
         * 发送半消息
         * 第一个参数:消息
         * 第二个参数:param,消息回查时会使用到
         */
        SendResult sendResult = transacationProducer.getProducer().sendMessageInTransaction(message, otherParam);

3.4 注意

TransactionMQProducer的groupName要唯一,不能和普通的producer一样

https://blog.csdn.net/pjsdsg/article/details/104326323

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,874评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,102评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,676评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,911评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,937评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,935评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,860评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,660评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,113评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,363评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,506评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,238评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,861评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,486评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,674评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,513评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,426评论 2 352

推荐阅读更多精彩内容