当前位置: 代码迷 >> 综合 >> 【RocketMQ】(四)解决分布式事务-RocketMQ实现可靠消息最终一致性
  详细解决方案

【RocketMQ】(四)解决分布式事务-RocketMQ实现可靠消息最终一致性

热度:62   发布时间:2023-11-17 12:45:22.0

 RocketMQ实现可靠消息最终一致性的原理图:

    废话不多说,直接上代码,这个案例用了RocketMQ、Spring cloud Alibaba组件中的nacos来实现服务的注册与发现、mybatis-plus等等,案例中使用到了两个数据库,流程就是用户在订单微服务中下单,然后在库存微服务中扣减库存;(如果高并发场景,可以参考另外一篇:利用Redssion分布式锁和RocketMQ消息的最终一致性 实现并发场景下单扣减库存)

一、项目结构: 

           rocketmq-transaction工程分为三个子模块,base-framework-mysql-support模块(作为基础模块,被其它服务模块引用)存放数据库相关jar包和配置类,order-service模块是订单微服务,storage-service是库存微服务模块;

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lucifer</groupId><artifactId>rocketmq-transaction</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><modules><module>order-service</module><module>storage-service</module><module>base-framework-mysql-support</module></modules><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><spring-cloud.version>Finchley.RELEASE</spring-cloud.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.1.3.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement></project>

二、模块介绍

(1)base-framework-mysql-support:

此模块只有一个关于mybatis-plus的配置:代码如下:

package com.lucifer.config;import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author lucifer* @date 2020/4/14 21:54* @description mybatis-plus 配置*/
@Configuration
@MapperScan(value = {"com.lucifer.mapper"})
public class MybatisPlusConfig {/*** SQL执行效率插件*/@Bean// @Profile({"dev", "test"})// 设置 dev test 环境开启public PerformanceInterceptor performanceInterceptor() {return new PerformanceInterceptor();}
}

pom.xml: 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq-transaction</artifactId><groupId>com.lucifer</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>base-framework-mysql-support</artifactId><dependencies><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.10</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.39</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.1.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency></dependencies></project>

 (1)order-service:订单模块

 pojo:存放实体类的包

/*** 订单表*/
@Data
@NoArgsConstructor
@TableName("order_tbl")
public class Order {@TableId(type = IdType.AUTO)private Integer id;private String userId;private String commodityCode;private Integer count;private BigDecimal money;@TableField(exist = false)private String txNum;
}
/*** @author lucifer* @date 2020/4/15 13:04* @description 事务日志表*/
@Data
//@Builder
@NoArgsConstructor
//@Accessors(chain = true)
@TableName("tx_log")
public class TxLog {@TableIdprivate String txNum;private Date createTime;
}

mapper:

package com.lucifer.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.Order;public interface OrderMapper extends BaseMapper<Order> {
}
package com.lucifer.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.TxLog;public interface TxLogMapper extends BaseMapper<TxLog> {
}

service:接口 

package com.lucifer.service;import com.lucifer.pojo.Order;public interface OrderService {/*** 发送订单消息** @param order*/void sendOrder(Order order);/*** 新增订单** @param order*/void insertOrder(Order order);}

 实现类:

/*** @author lucifer* @date 2020/4/14 19:31* @description*/
@Service
public class OrderServiceImpl implements OrderService {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Resourceprivate OrderMapper orderMapper;@Resourceprivate TxLogMapper txLogMapper;@Overridepublic void sendOrder(Order order) {String str = JSON.toJSONString(order);Message<String> message = MessageBuilder.withPayload(str).build();/*** 发送一条事务消息* String txProducerGroup: 生产组* String destination:topic* Message<?> message: 消息内容* Object arg: 参数*/rocketMQTemplate.sendMessageInTransaction("producer_group_tx1", "topic_tx", message, null);}@Transactional(rollbackFor = Exception.class)@Overridepublic void insertOrder(Order order) {//用事务id幂等处理if (txLogMapper.selectById(order.getTxNum()) != null) {return;}orderMapper.insert(order);//插入事务日志TxLog txLog = new TxLog();txLog.setTxNum(order.getTxNum());System.out.println("order.getTxNum():" + order.getTxNum());txLog.setCreateTime(new Date());txLogMapper.insert(txLog);//模拟异常,检查事务是否回滚QueryWrapper<Order> queryWrapper = new QueryWrapper<>();queryWrapper.eq("commodity_code", "product-1");if (orderMapper.selectList(queryWrapper).size()== 6) {throw new RuntimeException("人为模拟异常");}}}

 rocketmq的事务监听器:(重要)

/*** @author lucifer* @date 2020/4/15 0:59* @description TODO*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "producer_group_tx1")
public class ProducerTransactionListener implements RocketMQLocalTransactionListener {@Resourceprivate OrderService orderService;@Resourceprivate TxLogMapper txLogMapper;/*** 事务消息发送mq成功后的回调方法** @param msg* @param arg* @return 返回事务状态* RocketMQLocalTransactionState.COMMIT:提交事务,提交后broker才允许消费者使用* RocketMQLocalTransactionState.ROLLBACK:回滚事务,回滚后消息将被删除,并且不允许别消费* RocketMQLocalTransactionState.Unknown:中间状态,表示MQ需要核对,以确定状态*/@Transactional(rollbackFor = Exception.class)@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {String str = new String((byte[]) msg.getPayload());Order order = JSON.parseObject(str, Order.class);orderService.insertOrder(order);//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit,mq将消息的状态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}/*** 事务状态回查,查询是否下单成功** @param msg* @return 返回事务状态*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String str = new String((byte[]) msg.getPayload());Order order = JSON.parseObject(str, Order.class);//事务idString txNo = order.getTxNum();TxLog txLog = txLogMapper.selectById(txNo);if (txLog != null) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.UNKNOWN;}}
}

 application.yml: 配置文件

server:port: 8081spring:application:name: order-servicedatasource:druid:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.160.131:3306/order?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&serverTimezone=UTCusername: rootpassword: 123456cloud:nacos:discovery:server-addr: 192.168.160.131:8848
#  main:
#    allow-bean-definition-overriding: true
logging:level:com.lucifer.mapper: debugrocketmq:producer:group: producter_txname-server: 192.168.160.131:9876

 springboot启动类:

/*** @author lucifer* @date 2020/4/14 19:28* @description TODO*/
@EnableDiscoveryClient
@SpringBootApplication
public class OrderApplication {public static void main(String[] args) {SpringApplication.run(OrderApplication.class, args);}}

 controller层:

/*** @author lucifer* @date 2020/4/14 19:32* @description TODO*/
@RestController
@RequestMapping(value = "order")
public class OrderController {@ResourceOrderService orderService;/*** 下单:插入订单表、扣减库存,模拟回滚** @return*/@GetMapping("/placeOrder/commit")public Boolean placeOrderCommit() {//将uuid作为事务id,发送到mqString uuid = UUID.randomUUID().toString();Order order = new Order();order.setCommodityCode("product-1");order.setUserId("1");order.setCount(1);order.setTxNum(uuid);order.setMoney(new BigDecimal(12.5));System.out.println("准备下单了=======》" + order);orderService.sendOrder(order);return true;}
}

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>rocketmq-transaction</artifactId><groupId>com.lucifer</groupId><version>1.0-SNAPSHOT</version></parent><artifactId>order-service</artifactId><dependencies><dependency><groupId>com.lucifer</groupId><artifactId>base-framework-mysql-support</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!-- nacos --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>0.2.2.RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

(3)storage-service:库存微服务模块

mapper:

package com.lucifer.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.Storage;public interface StorageMapper extends BaseMapper<Storage> {
}
package com.lucifer.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.TxLog;public interface TxLogMapper extends BaseMapper<TxLog> {
}

pojo:

/*** 库存表*/
@Data
@Accessors(chain = true)
@TableName("storage_tbl")
public class Storage {private Long id;private String commodityCode;private Long count;
}

ps:order、txlog两个实体类从order-service中复制过来即可;

service:

public interface StorageService {/*** 扣减库存** @param commodityCode* @param count* @param txNum 事务id*/void deduct(String commodityCode, int count,String txNum);
}

实现类:

/*** @author lucifer* @date 2020/4/14 20:07* @description TODO*/
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {@Resourceprivate StorageMapper storageMapper;@Resourceprivate TxLogMapper txLogMapper;@Transactional(rollbackFor = Exception.class)@Overridepublic void deduct(String commodityCode, int count, String txNum) {log.info("扣减库存,商品编码:{},数量:{}", commodityCode, count);TxLog txLog = txLogMapper.selectById(txNum);if (txLog != null) {return;}//扣减库存QueryWrapper<Storage> wrapper = new QueryWrapper<>();wrapper.setEntity(new Storage().setCommodityCode(commodityCode));Storage storage = storageMapper.selectOne(wrapper);if (storage == null) {throw new RuntimeException("商品" + commodityCode + ",不存在");}storage.setCount(storage.getCount() - count);storageMapper.updateById(storage);//添加事务记录,用于幂等TxLog tLog = new TxLog();tLog.setTxNum(txNum);tLog.setCreateTime(new Date());txLogMapper.insert(tLog);//模拟异常,检查事务是否回滚if(storageMapper.selectById(1).getCount()==4996){throw new RuntimeException("人为模拟异常");}}
}

rocketmq监听类: 

/*** @author lucifer* @date 2020/4/15 0:59* @description TODO*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_tx2", topic = "topic_tx")
class ConsumerTransactionListener implements RocketMQListener<String> {@Resourceprivate StorageService storageService;@Overridepublic void onMessage(String message) {log.info("开始消费消息:{}", message);//解析消息Order order = JSON.parseObject(message, Order.class);//扣减库存storageService.deduct(order.getCommodityCode(), order.getCount(), order.getTxNum());}
}

springboot启动类:

/*** @author lucifer* @date 2020/4/14 20:23* @description 库存服务*/
@EnableDiscoveryClient
@SpringBootApplication
public class StorageApplication {public static void main(String[] args) {SpringApplication.run(StorageApplication.class, args);}
}

application.yml: 

server:port: 8082spring:application:name: storage-servicedatasource:druid:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.160.131:3306/storage?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&serverTimezone=UTCusername: rootpassword: 123456cloud:nacos:discovery:server-addr: 192.168.160.131:8848
#  main:
#    allow-bean-definition-overriding: true
logging:level:com.lucifer.mapper: debugrocketmq:producer:group: consumer_txname-server: 192.168.160.131:9876

pom.xml: 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>rocketmq-transaction</artifactId><groupId>com.lucifer</groupId><version>1.0-SNAPSHOT</version></parent><artifactId>storage-service</artifactId><dependencies><dependency><groupId>com.lucifer</groupId><artifactId>base-framework-mysql-support</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version></dependency><!-- nacos --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>0.2.2.RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

启动order-service服务,启动storage-service服务;

测试场景:

(1)order-service 本地事务失败,order-service不会发送下订单消息

(2)storage-service 接收到下单的消息,扣减库存失败,会不断重试扣减库存(当然这个尝试次数有限制的),控制台会不断打印重试信息:如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,此时需要人工干预了。

场景(2)数据库截图: