当前位置: 代码迷 >> 综合 >> springboot rocketmq 发送顺序消息
  详细解决方案

springboot rocketmq 发送顺序消息

热度:75   发布时间:2024-03-10 01:14:13.0

顺序消息

在某些业务中,consumer在消费消息时,是需要按照生产者发送消息的顺序进行消费的,比如订单消息,会有创建订单、订单支付、已发货,如果消息的顺序发生改变,那么这样的消息就没有意义了。

RocketMQ顺序消息的实现机制

在MQ的模型中,顺序需要由3个阶段去保障:

  • 消息被发送时保持顺序

  • 消息被存储时保持和发送的顺序一致

  • 消息被消费时保持和存储的顺序一致

Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区。
Consumer端中需要保证调用MessageListener时消息的顺序性。

顺序消息测试

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.3.RELEASE</version><relativePath/></parent><groupId>com.sunyuqi</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>demo</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

配置文件 application.yml

rocketmq:name-server: 192.168.130.128:9876;192.168.130.128:9877;producer:group: my-groupmyrocketmq-config:order-topic: order_topicorder-consumer-group: oreder_consumer_group

订单实体类

package com.sunyuqi.entity;import lombok.Data;import java.io.Serializable;@Data
public class Order implements Serializable {
    private static final long serialVersionUID = -6955843080469225187L;/*** 订单ID,UUID* */private String orderId;private String orderType;}

顺序消息生产者

package com.sunyuqi.produce;import com.sunyuqi.entity.Order;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
public class OrderProduce {
    @Value("${myrocketmq-config.order-topic}")private String my_topic;@Value("${myrocketmq-config.order-consumer-group}")private String my_consumer_group;@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMsgOrder(Order order){
    // 第二个参数为 hashkey,根据其哈希值取模后确定发送到哪一个队列rocketMQTemplate.sendOneWayOrderly(my_topic,order,order.getOrderId());}
}

顺序消息消费者
我们将不同订单的消息写入不同文件,来判断消息是否顺序消费

package com.sunyuqi.consumer;import com.sunyuqi.entity.Order;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.io.*;@Component
@RocketMQMessageListener(consumerGroup = "${myrocketmq-config.order-consumer-group}",topic = "${myrocketmq-config.order-topic}",consumeMode = ConsumeMode.ORDERLY)
public class OrderConsumer implements RocketMQListener<Order> {
    @Overridepublic void onMessage(Order message) {
    System.out.println(message);if (message.getOrderId().equals("orderid_1")){
    BufferedWriter bufferedWriter=null;try {
    bufferedWriter = new BufferedWriter(new FileWriter("order_1.txt",true));bufferedWriter.write(message.getOrderType()+"\n");bufferedWriter.flush();} catch (IOException e) {
    e.printStackTrace();}finally {
    try {
    bufferedWriter.close();} catch (IOException e) {
    e.printStackTrace();}}}if (message.getOrderId().equals("orderid_2")){
    BufferedWriter bufferedWriter=null;try {
    bufferedWriter = new BufferedWriter(new FileWriter("order_2.txt",true));bufferedWriter.write(message.getOrderType()+"\n");bufferedWriter.flush();} catch (IOException e) {
    e.printStackTrace();}finally {
    try {
    bufferedWriter.close();} catch (IOException e) {
    e.printStackTrace();}}}if (message.getOrderId().equals("orderid_3")){
    BufferedWriter bufferedWriter=null;try {
    bufferedWriter = new BufferedWriter(new FileWriter("order_3.txt",true));bufferedWriter.write(message.getOrderType()+"\n");bufferedWriter.flush();} catch (IOException e) {
    e.printStackTrace();}finally {
    try {
    bufferedWriter.close();} catch (IOException e) {
    e.printStackTrace();}}}if (message.getOrderId().equals("orderid_4")){
    BufferedWriter bufferedWriter=null;try {
    bufferedWriter = new BufferedWriter(new FileWriter("order_4.txt",true));bufferedWriter.write(message.getOrderType()+"\n");bufferedWriter.flush();} catch (IOException e) {
    e.printStackTrace();}finally {
    try {
    bufferedWriter.close();} catch (IOException e) {
    e.printStackTrace();}}}}
}

引导类

package com.sunyuqi;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);}}

测试类

package com.sunyuqi;import com.sunyuqi.entity.Order;
import com.sunyuqi.produce.OrderProduce;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class OrderMsgTest {
    @Autowiredprivate OrderProduce orderProduce;@Testpublic void testOrder(){
    Order order1 = new Order();Order order2 = new Order();Order order3 = new Order();Order order4 = new Order();order1.setOrderId("orderid_1");order2.setOrderId("orderid_2");order3.setOrderId("orderid_3");order4.setOrderId("orderid_4");order1.setOrderType("创建订单");orderProduce.sendMsgOrder(order1);order3.setOrderType("创建订单");orderProduce.sendMsgOrder(order3);order3.setOrderType("支付订单");orderProduce.sendMsgOrder(order3);order2.setOrderType("创建订单");orderProduce.sendMsgOrder(order2);order2.setOrderType("支付订单");orderProduce.sendMsgOrder(order2);order3.setOrderType("已发货");orderProduce.sendMsgOrder(order3);order2.setOrderType("已发货");orderProduce.sendMsgOrder(order2);order1.setOrderType("支付订单");orderProduce.sendMsgOrder(order1);order4.setOrderType("创建订单");orderProduce.sendMsgOrder(order4);order4.setOrderType("支付订单");orderProduce.sendMsgOrder(order4);order4.setOrderType("已发货");orderProduce.sendMsgOrder(order4);order1.setOrderType("已发货");orderProduce.sendMsgOrder(order1);}
}

运行引导类和测试类,进行测试

四个订单均为顺序消费
在这里插入图片描述