当前位置: 代码迷 >> 综合 >> 【RocketMQ】(五)利用Redssion分布式锁和RocketMQ消息的最终一致性 实现并发场景下单扣减库存
  详细解决方案

【RocketMQ】(五)利用Redssion分布式锁和RocketMQ消息的最终一致性 实现并发场景下单扣减库存

热度:72   发布时间:2023-11-17 12:45:09.0

一、项目结构: 

1、 父工程

pom.xml:(父工程只有一个pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lucifer</groupId><artifactId>rocketmq-transaction</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><modules><module>order-service</module><module>storage-service</module><module>base-framework-mysql-support</module></modules><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><spring-cloud.version>Greenwich.SR1</spring-cloud.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.2.5.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>

2、base-framework-mysql-support:数据库相关的配置

   2.1 MybatisPlusConfig :mybatis-plus的相关配置

package com.lucifer.config;import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author lucifer* @date 2020/4/14 21:54* @description mybatis-plus 配置*/
@Configuration
@MapperScan(value = {"com.lucifer.mapper"})
public class MybatisPlusConfig {/*** SQL执行效率插件*/@Bean// @Profile({"dev", "test"})// 设置 dev test 环境开启public PerformanceInterceptor performanceInterceptor() {return new PerformanceInterceptor();}
}

2.2 RedissonConfig: redisson相关配置

package com.lucifer.config;import org.apache.commons.lang3.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SentinelServersConfig;
import org.redisson.config.SingleServerConfig;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;@Configuration
public class RedissonConfig {@Resourceprivate RedissonProperties redissonProperties;/*** 单机模式自动装配* @return*/@Bean@ConditionalOnProperty(prefix ="redisson",name="single-is",havingValue="true")public RedissonClient getSingleRedisson(){Config config = new Config();String singlePassword = redissonProperties.getSinglePassword();SingleServerConfig serverConfig = config.useSingleServer().setAddress("redis://" + redissonProperties.getSingleAddress());System.out.println("redis:=================="+serverConfig.getAddress());if(StringUtils.isNotBlank(singlePassword)){serverConfig.setPassword(singlePassword);}return Redisson.create(config);}/*** 哨兵模式自动装配* @return*/@Bean@ConditionalOnProperty(prefix ="redisson",name="sentinel-is",havingValue="true")public RedissonClient getSentinelRedisson(){Config config = new Config();SentinelServersConfig serverConfig = config.useSentinelServers().addSentinelAddress(redissonProperties.getSentinelAddresses()).setMasterName(redissonProperties.getSentinelMasterName());String sentinelPassword = redissonProperties.getSentinelPassword();if(StringUtils.isNotBlank(sentinelPassword)) {serverConfig.setPassword(sentinelPassword);}return Redisson.create(config);}
}

2.3 RedissonProperties:读取application.yml的自定义配置

package com.lucifer.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @author lucifer* @date 2020/4/21 14:09* @description TODO*/
@Data
@Component
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties  {private String singleAddress;private String singlePassword;private String sentinelMasterName;private String sentinelAddresses;private String sentinelPassword;}

2.4 pom.xml: jar包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq-transaction</artifactId><groupId>com.lucifer</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>base-framework-mysql-support</artifactId><dependencies><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.10</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.39</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.1.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.12.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency></dependencies>
</project>

3、order-service 订单微服务

3.1 OrderController:控制层 用于测试

package com.lucifer.controller;import com.lucifer.pojo.Order;
import com.lucifer.service.OrderService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.UUID;/*** @author lucifer* @date 2020/4/14 19:32* @description TODO*/
@RestController
@RequestMapping(value = "order")
public class OrderController {@ResourceOrderService orderService;/*** 下单:插入订单表、扣减库存,模拟回滚** @return*/@GetMapping("/placeOrder/commit")public Boolean placeOrderCommit() {//将uuid作为事务id,发送到mqString uuid = UUID.randomUUID().toString();Order order = new Order();order.setCommodityCode("product-1");order.setUserId("1");order.setCount(1);order.setTxNum(uuid);order.setMoney(new BigDecimal(12.5));System.out.println("准备下单了=======》" + order);orderService.sendOrder(order);return true;}
}

 3.2 service 接口: 

package com.lucifer.service;import com.lucifer.pojo.Order;public interface OrderService {/*** 发送订单消息** @param order*/void sendOrder(Order order);/*** 新增订单** @param order*/void insertOrder(Order order) throws Exception;}

3.2.1 service 实现类: 

package com.lucifer.service.impl;import com.alibaba.fastjson.JSON;
import com.lucifer.mapper.OrderMapper;
import com.lucifer.mapper.TxLogMapper;
import com.lucifer.pojo.Order;
import com.lucifer.pojo.Storage;
import com.lucifer.pojo.TxLog;
import com.lucifer.service.OrderService;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.redisson.api.RBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.util.Date;
import java.util.concurrent.TimeUnit;/*** @author lucifer* @date 2020/4/14 19:31* @description*/
@Service
public class OrderServiceImpl implements OrderService {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Resourceprivate OrderMapper orderMapper;@Resourceprivate TxLogMapper txLogMapper;@Resourceprivate RedissonClient redissonClient;@Overridepublic void sendOrder(Order order) {String str = JSON.toJSONString(order);Message<String> message = MessageBuilder.withPayload(str).build();/*** 发送一条事务消息* String txProducerGroup: 生产组* String destination:topic* Message<?> message: 消息内容* Object arg: 参数*/rocketMQTemplate.sendMessageInTransaction("producer_group_tx1", "topic_tx", message, null);}@Transactional(rollbackFor = Exception.class)@Overridepublic void insertOrder(Order order) throws Exception {//创建锁对象RLock lock = redissonClient.getLock("placeOrder:" + order.getUserId() + order.getCommodityCode());try {//尝试去获取锁RFuture<Boolean> booleanRFuture = lock.tryLockAsync(3, 10, TimeUnit.SECONDS);Boolean aBoolean = booleanRFuture.get();//如果获取到了锁if (aBoolean) {//获取redis中的库存RBucket<Storage> storageBucket = redissonClient.getBucket("placeOrder");Storage storage = storageBucket.get();System.out.println("剩余库存:=================" + storage.getCount());if (storage.getCount() <= 0) {throw new RuntimeException("商品:" + order.getCommodityCode() + ",库存为空");}//用事务id幂等处理if (txLogMapper.selectById(order.getTxNum()) != null) {return;}orderMapper.insert(order);//插入事务日志TxLog txLog = new TxLog();txLog.setTxNum(order.getTxNum());System.out.println("order.getTxNum():" + order.getTxNum());txLog.setCreateTime(new Date());txLogMapper.insert(txLog);}} finally {//释放锁lock.unlock();}}
}

3.3. mapper接口:

package com.lucifer.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.TxLog;public interface TxLogMapper extends BaseMapper<TxLog> {
}
package com.lucifer.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.Order;public interface OrderMapper extends BaseMapper<Order> {
}

 3.4. 实体类:

package com.lucifer.pojo;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.NoArgsConstructor;import java.math.BigDecimal;/*** 订单表*/
@Data
@NoArgsConstructor
@TableName("order_tbl")
public class Order {@TableId(type = IdType.AUTO)private Integer id;private String userId;private String commodityCode;private Integer count;private BigDecimal money;@TableField(exist = false)private String txNum;
}
package com.lucifer.pojo;import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.experimental.Accessors;import java.io.Serializable;/*** 库存表*/
@Data
@Accessors(chain = true)
@TableName("storage_tbl")
public class Storage implements Serializable {private Long id;private String commodityCode;private Long count;
}
import java.util.Date;/*** @author lucifer* @date 2020/4/15 13:04* @description 事务日志表*/
@Data
//@Builder
@NoArgsConstructor
//@Accessors(chain = true)
@TableName("tx_log")
public class TxLog {@TableIdprivate String txNum;private Date createTime;
}

3.5、ProducerTransactionListener 是去实现 RocketMQLocalTransactionListener接口(重要

package com.lucifer.transaction;import com.alibaba.fastjson.JSON;
import com.lucifer.mapper.TxLogMapper;
import com.lucifer.pojo.Order;
import com.lucifer.pojo.Storage;
import com.lucifer.pojo.TxLog;
import com.lucifer.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;/*** @author lucifer* @date 2020/4/15 0:59* @description TODO*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "producer_group_tx1")
public class ProducerTransactionListener implements RocketMQLocalTransactionListener {@Resourceprivate OrderService orderService;@Resourceprivate TxLogMapper txLogMapper;@Resourceprivate RedissonClient redissonClient;/*** 事务消息发送mq成功后的回调方法** @param msg* @param arg* @return 返回事务状态* RocketMQLocalTransactionState.COMMIT:提交事务,提交后broker才允许消费者使用* RocketMQLocalTransactionState.ROLLBACK:回滚事务,回滚后消息将被删除,并且不允许别消费* RocketMQLocalTransactionState.UNKNOWN:中间状态,表示MQ需要核对,以确定状态*/@Transactional(rollbackFor = Exception.class)@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {String str = new String((byte[]) msg.getPayload());Order order = JSON.parseObject(str, Order.class);//下单orderService.insertOrder(order);//扣减redis中库存RBucket<Storage> storageBucket = redissonClient.getBucket("placeOrder");Storage storage = storageBucket.get();long count = storage.getCount() - order.getCount();storage.setCount(count);storageBucket.set(storage);//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit,mq将消息的状态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}/*** 事务状态回查,查询是否下单成功** @param msg* @return 返回事务状态*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String str = new String((byte[]) msg.getPayload());Order order = JSON.parseObject(str, Order.class);//事务idString txNo = order.getTxNum();TxLog txLog = txLogMapper.selectById(txNo);if (txLog != null) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.UNKNOWN;}}
}

3.6. 启动类 

package com.lucifer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;/*** @author lucifer* @date 2020/4/14 19:28* @description TODO*/
@EnableDiscoveryClient
@SpringBootApplication
public class OrderApplication {public static void main(String[] args) {SpringApplication.run(OrderApplication.class, args);}}

3.7.application.yml:

server:port: 8081spring:application:name: order-servicedatasource:druid:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.160.131:3306/order?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&serverTimezone=UTCusername: rootpassword: 123456cloud:nacos:discovery:server-addr: 192.168.160.131:8848
#  main:
#    allow-bean-definition-overriding: true
logging:level:com.lucifer.mapper: debugrocketmq:producer:group: producter_txname-server: 192.168.160.131:9876redisson:single-is: truesingle-address: 192.168.160.131:6379single-password:sentinel-is: falsesentinel-master-name: business-mastersentinel-addresses: 127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381sentinel-password:

3.8. pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>rocketmq-transaction</artifactId><groupId>com.lucifer</groupId><version>1.0-SNAPSHOT</version></parent><artifactId>order-service</artifactId><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>com.lucifer</groupId><artifactId>base-framework-mysql-support</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!-- nacos --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>0.2.2.RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

4.storage-service 库存微服务

4.1 项目启动就会将商品 "product-1"的库存信息查询出来,放到redis当中,这里用于测试

package com.lucifer.config;import com.lucifer.pojo.Storage;
import com.lucifer.service.StorageService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Slf4j
@Component
public class StorageInitApplicationRunner implements ApplicationRunner {@Resourceprivate RedissonClient redissonClient;@Resourceprivate StorageService storageService;@Overridepublic void run(ApplicationArguments applicationArguments) throws Exception {//从数据库查询抢购秒杀商品信息Storage storage = storageService.getStorage("product-1");//获取redis中key为storage对象信息RBucket<Storage> storageBucket = redissonClient.getBucket("placeOrder");//如果key存在,就设置key的值为新值value//如果key不存在,就设置key的值为valuestorageBucket.set(storage);log.info("商品数据初始化完成!");}
}

4.2 service接口

package com.lucifer.service;import com.lucifer.pojo.Storage;import java.util.concurrent.ExecutionException;public interface StorageService {/*** 扣减库存** @param commodityCode* @param count* @param txNum         事务id*/void deduct(String commodityCode, int count, String txNum) throws ExecutionException, InterruptedException, Exception;/*** 获取商品信息** @param commodityCode* @return*/Storage getStorage(String commodityCode);
}

4.2.1 实现类

package com.lucifer.service.impl;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.lucifer.mapper.StorageMapper;
import com.lucifer.mapper.TxLogMapper;
import com.lucifer.pojo.Storage;
import com.lucifer.pojo.TxLog;
import com.lucifer.service.StorageService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.util.Date;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;/*** @author lucifer* @date 2020/4/14 20:07* @description TODO*/
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {@Resourceprivate StorageMapper storageMapper;@Resourceprivate TxLogMapper txLogMapper;@Resourceprivate RedissonClient redissonClient;@Transactional(rollbackFor = Exception.class)@Overridepublic void deduct(String commodityCode, int count, String txNum) throws Exception {RLock lock = redissonClient.getLock("placeOrder:" + commodityCode);Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);try {if (res.get()) {log.info("扣减库存,商品编码:{},数量:{}", commodityCode, count);TxLog txLog = txLogMapper.selectById(txNum);if (txLog != null) {return;}//扣减库存QueryWrapper<Storage> wrapper = new QueryWrapper<>();wrapper.setEntity(new Storage().setCommodityCode(commodityCode));Storage storage = storageMapper.selectOne(wrapper);if (storage == null) {throw new RuntimeException("商品" + commodityCode + ",不存在");}//扣减MySQL中的库存storage.setCount(storage.getCount() - count);System.out.println("剩余库存数量:" + storage.getCount());storageMapper.updateById(storage);//添加事务记录,用于幂等TxLog tLog = new TxLog();tLog.setTxNum(txNum);tLog.setCreateTime(new Date());txLogMapper.insert(tLog);}} finally {lock.unlock();}}@Overridepublic Storage getStorage(String commodityCode) {QueryWrapper<Storage> queryWrapper = new QueryWrapper<>();queryWrapper.eq("commodity_code", commodityCode);Storage storage = storageMapper.selectOne(queryWrapper);System.out.println("剩余库存数:" + storage.getCount());return storage;}
}

4.3 mapper接口

package com.lucifer.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.Storage;public interface StorageMapper extends BaseMapper<Storage> {
}
package com.lucifer.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.TxLog;public interface TxLogMapper extends BaseMapper<TxLog> {
}

4.4 实体类

   Order、Storage、TxLog 三个实体类同order-service中;

4.5 ConsumerTransactionListener 实现RocketMQListener接口,消费消息

package com.lucifer.transaction;import com.alibaba.fastjson.JSON;
import com.lucifer.pojo.Order;
import com.lucifer.service.StorageService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author lucifer* @date 2020/4/15 0:59* @description TODO*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_tx2", topic = "topic_tx")
class ConsumerTransactionListener implements RocketMQListener<String> {@Resourceprivate StorageService storageService;@Overridepublic void onMessage(String message) {log.info("开始消费消息:{}", message);//解析消息Order order = JSON.parseObject(message, Order.class);if(order!=null){//扣减库存try {storageService.deduct(order.getCommodityCode(), order.getCount(), order.getTxNum());} catch (Exception e) {e.printStackTrace();throw new RuntimeException("扣减库存失败");}}}
}

4.6启动类

package com.lucifer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;/*** @author lucifer* @date 2020/4/14 20:23* @description 库存服务*/
@EnableConfigurationProperties
@EnableDiscoveryClient
@SpringBootApplication
public class StorageApplication {public static void main(String[] args) {SpringApplication.run(StorageApplication.class, args);}
}

4.7application.yml:

server:port: 8082spring:application:name: storage-servicedatasource:druid:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.160.131:3306/storage?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&serverTimezone=UTCusername: rootpassword: 123456cloud:nacos:discovery:server-addr: 192.168.160.131:8848
#  main:
#    allow-bean-definition-overriding: true
logging:level:com.lucifer.mapper: debugrocketmq:producer:group: consumer_txname-server: 192.168.160.131:9876redisson:single-is: truesingle-address: 192.168.160.131:6379single-password:sentinel-is: falsesentinel-master-name: business-mastersentinel-addresses: 127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381sentinel-password:

4.8 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>rocketmq-transaction</artifactId><groupId>com.lucifer</groupId><version>1.0-SNAPSHOT</version></parent><artifactId>storage-service</artifactId><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>com.lucifer</groupId><artifactId>base-framework-mysql-support</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version></dependency><!-- nacos --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>0.2.2.RELEASE</version><exclusions><exclusion><artifactId>fastjson</artifactId><groupId>com.alibaba</groupId></exclusion></exclusions></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.12.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

二、测试

(1)场景一:正常情况:使用jmter测试:用20个线程去模拟20个用户同时去购买某个商品:

目前库存只有10个: 

查看控制台: (当10个订单下单成功后,第11个订单。。。。)

准备下单了=======》Order(id=null, userId=1, commodityCode=product-1, count=1, money=12.5, txNum=7a0702f8-8814-4d6a-a492-04ccd1bda31b)
剩余库存:=================0
java.lang.RuntimeException: 商品:product-1,库存为空

查看数据库:只有10个订单下单成功,并且库存为0,并没有成为负数

 

(2)场景2:模拟异常的发生,在order-service中:

删除redis数据、清空数据库刚产生的数据,将库存数调回10,重启库存微服务;

模拟20个用户去访问,order-service 微服务控制台:

剩余库存:=================6
2020-04-21 19:20:59.502 DEBUG 22056 --- [nio-8081-exec-1] c.lucifer.mapper.TxLogMapper.selectById  : ==>  Preparing: SELECT tx_num,create_time FROM tx_log WHERE tx_num=? 
2020-04-21 19:20:59.509 DEBUG 22056 --- [nio-8081-exec-1] c.lucifer.mapper.TxLogMapper.selectById  : ==> Parameters: 4dadbd02-735f-43f3-b10e-5863c833406c(String)
2020-04-21 19:20:59.520 DEBUG 22056 --- [nio-8081-exec-1] c.lucifer.mapper.TxLogMapper.selectById  : <==      Total: 0Time:17 ms - ID:com.lucifer.mapper.TxLogMapper.selectById
Execute SQL:SELECT tx_num,create_time FROM tx_log WHERE tx_num='4dadbd02-735f-43f3-b10e-5863c833406c'java.lang.RuntimeException: 人为异常

当订单下了4单,剩余库存为6时,人为异常抛出,此时数据库:

(2)场景3:模拟异常的发生,在storage-service中:

库存微服务 控制台:一直打印,去扣减MySQL的库存,不过因为此处人为制造异常,库存只要为5,就会去抛异常

java.lang.RuntimeException: 扣减库存失败at com.lucifer.transaction.ConsumerTransactionListener.onMessage(ConsumerTransactionListener.java:37) ~[classes/:na]at com.lucifer.transaction.ConsumerTransactionListener.onMessage(ConsumerTransactionListener.java:18) ~[classes/:na]at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:308) ~[rocketmq-spring-boot-2.0.2.jar:2.0.2]at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417) [rocketmq-client-4.4.0.jar:4.4.0]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_192]at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_192]at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_192]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_192]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_192]at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192]2020-04-21 19:31:31.243 DEBUG 15160 --- [essageThread_12] c.lucifer.mapper.TxLogMapper.selectById  : ==> Parameters: f60a6c44-292b-47b7-8bfd-6aedaeb26a8e(String)
java.lang.RuntimeException: 人为异常

此时,order数据库中有10个订单,而库存数据库中库存为6,有6单是扣减失败的,所以此时库存微服务会不停的去尝试扣减库存(尝试次数好像是16次),一般情况下,不会让rocketmq重试那么多次,重试几次差不多了,还是无法扣减只能人工扣减了。

此时将IDEA中的人为异常注释掉,重新编译,会发现,库存微服务扣减成功了,然后数据库中订单数为10,库存数扣减为0 了。