当前位置: 代码迷 >> 综合 >> RabbitMQ 学习笔记 -- 06 初探@RabbitListener
  详细解决方案

RabbitMQ 学习笔记 -- 06 初探@RabbitListener

热度:49   发布时间:2024-02-06 03:00:25.0

Message

在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

  1. MessageProperties // 消息属性

  2. byte[] body // 消息内容

如下使用 Message 类型接收数据,当监听到队列 hello中有消息时则会进行接收并处理,MessageConvert 会直接转换成消息类型,并绑定在对应被注解的方法中。

消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory

@RabbitListener(queues = "hello")
public void receive1(Message message) {// 消息携带属性参数MessageProperties properties = message.getMessageProperties();// 消息内容,二进制数据byte[] body = message.getBody();
}

MessageConvert

  • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
  • RabbitMQ 的序列化是指 Messagebody 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter默认)、Jackson2JsonMessageConverter

在 @RabbitListener 注解的方法中,使用 @Payload 和 @Headers 注解可以获取消息中的 body 和 headers 消息。它们都会被 MessageConvert 转换器解析转换后,将结果绑定在对应注解的方法中。

/**
*    这里是监听 hello 队列,并将接收到的消息体 body 根据 MessageConvert 转换器转换成 String 类型输出
*    @Headers 获取所有头部属性消息,也可以用 @Header 获取单个 header 消息
*/
@RabbitListener(queues = "hello")
public void processMessage1(@Payload String message, @Headers Map<String,Object> headers, @Header(value = AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey) {System.out.println("message:" + message);System.out.println("Headers:" + headers);System.out.println("routingKey: " + routingKey);
}

默认消费者消费时,消息的 content_type 属性表示消息 body 数据以什么数据格式传输存储,直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

  • application/octet-stream:二进制字节数组存储,使用 byte[]
  • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
  • text/plain:文本数据类型存储,使用 String
  • application/json:JSON 格式,使用 Object、相应类型

通过 @RabbitListener 注解声明 Binding

  • 通过 @RabbitListener 的 bindings 属性声明 Binding(若 RabbitMQ 中不存在该绑定所需要的 Queue、Exchange、RouteKey 则自动创建,若存在则抛出异常)
  • @RabbitListener 注解可以指定目标方法来作为消费信息的方法,可以通过注解参数指定所监听的队列或 binding。使用@RabbitListener 可以设置一个自己明确默认值的 RabbitListenerContainerFactory 对象

下面是一个 demo 的示例,供参考:

@RabbitListener(bindings = {@QueueBinding(// 队列配置value = @Queue(value = "rabbit.mq.test", durable = "true",// 配置死信队列的参数arguments = {@Argument(name = "x-dead-letter-exchange", value = "target_exchange"),@Argument(name = "x-dead-letter-routing-key", value = "target_routing_key")}),// 交换机配置exchange = @Exchange(value = "rabbit_test_exchange",durable = "true",type = ExchangeTypes.TOPIC),key = "rabbit.test.*")},// 可以指定容器工厂,默认使用 rabbitListenerContainerFactorycontainerFactory = "rabbitListenerContainerFactory",//指定消费者的线程数量,一个线程会打开一个Channel,一个队列上的消息只会被消费一次(不考虑消息重新入队列的情况),下面的表示至少开启5个线程,最多10个。线程的数目需要根据你的任务来决定,如果是计算密集型,线程的数目就应该少一些concurrency = "5-10")
public void handleMessage(@Payload String message,@Headers Map<String,Object> headers,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,// 消费者标签,用来区分多个消费者@Header(AmqpHeaders.CONSUMER_TAG) String consumerTag,// message_id@Header(value = AmqpHeaders.MESSAGE_ID, required = false) String messageId,// 该消息是否多次(>1)交付@Header("amqp_redelivered") boolean redelivered) {System.out.println("====消费消息===handleMessage");System.out.println("消息:" + message);System.out.println("头:" + headers);System.out.println("deliveryTag:" + deliveryTag);System.out.println("messageId:" + messageId);System.out.println("redelivered:" + redelivered);
}

@RabbitListener 和 @RabbitHandler 搭配使用

  • @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用。
  • @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 注解的方法进行分发处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

消费者:

@Component
@RabbitListener(queues = "hello")
public class Demo_06_Consumer {@RabbitHandlerpublic void receive(byte[] msg) {System.out.println("byte[] 消费者消费信息:" + new String(msg));}@RabbitHandlerpublic void receive(String msg) {System.out.println("String 消费者消费信息:" + msg);}@RabbitHandlerpublic void receive(Integer msg) {System.out.println("Integer 消费者消费信息:" + msg);}@RabbitHandlerpublic void receive(TestA a) {System.out.println("对象 TestA 消费者消费信息:" + a.toString());}
}class TestA implements Serializable {String fieldA;String fieldB;// 省略 Get Set ToString
}

生产者:

@Test
public void demo_06_Producer() {String routingKey = "hello";String str = "FBI OPEN THE DOOR";Integer i = 102424;TestA a = new TestA();a.setFieldA("FBI WARNING");// rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());rabbitTemplate.convertAndSend(routingKey, a);rabbitTemplate.convertAndSend(routingKey, i);rabbitTemplate.convertAndSend(routingKey, str);System.out.println("发送成功");
}

这里生产者在发送消息时,调用了 RabbitTemplate 中的 convertAndSend 方法会使用 MessageConvert 对 TestA 对象进行消息的序列化,其默认的实现也是 SimpleMessageConverter