Message
在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message
,它主要由两部分组成:
MessageProperties // 消息属性
byte[] body // 消息内容
如下使用 Message 类型接收数据,当监听到队列 hello
中有消息时则会进行接收并处理,MessageConvert
会直接转换成消息类型,并绑定在对应被注解的方法中。
消息处理方法参数是由 MessageConverter
转化,若使用自定义 MessageConverter
则需要在 RabbitListenerContainerFactory
实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory
)
@RabbitListener(queues = "hello")
public void receive1(Message message) {// 消息携带属性参数MessageProperties properties = message.getMessageProperties();// 消息内容,二进制数据byte[] body = message.getBody();
}
MessageConvert
- 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
- RabbitMQ 的序列化是指
Message
的body
属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有SimpleMessageConverter
(默认)、Jackson2JsonMessageConverter
等
在 @RabbitListener 注解的方法中,使用 @Payload 和 @Headers 注解可以获取消息中的 body 和 headers 消息。它们都会被 MessageConvert 转换器解析转换后,将结果绑定在对应注解的方法中。
/**
* 这里是监听 hello 队列,并将接收到的消息体 body 根据 MessageConvert 转换器转换成 String 类型输出
* @Headers 获取所有头部属性消息,也可以用 @Header 获取单个 header 消息
*/
@RabbitListener(queues = "hello")
public void processMessage1(@Payload String message, @Headers Map<String,Object> headers, @Header(value = AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey) {System.out.println("message:" + message);System.out.println("Headers:" + headers);System.out.println("routingKey: " + routingKey);
}
默认消费者消费时,消息的 content_type 属性表示消息 body 数据以什么数据格式传输存储,直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:
- application/octet-stream:二进制字节数组存储,使用 byte[]
- application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
- text/plain:文本数据类型存储,使用 String
- application/json:JSON 格式,使用 Object、相应类型
通过 @RabbitListener 注解声明 Binding
- 通过
@RabbitListener
的 bindings 属性声明 Binding(若 RabbitMQ 中不存在该绑定所需要的 Queue、Exchange、RouteKey 则自动创建,若存在则抛出异常) @RabbitListener
注解可以指定目标方法来作为消费信息的方法,可以通过注解参数指定所监听的队列或 binding。使用@RabbitListener
可以设置一个自己明确默认值的RabbitListenerContainerFactory
对象
下面是一个 demo 的示例,供参考:
@RabbitListener(bindings = {@QueueBinding(// 队列配置value = @Queue(value = "rabbit.mq.test", durable = "true",// 配置死信队列的参数arguments = {@Argument(name = "x-dead-letter-exchange", value = "target_exchange"),@Argument(name = "x-dead-letter-routing-key", value = "target_routing_key")}),// 交换机配置exchange = @Exchange(value = "rabbit_test_exchange",durable = "true",type = ExchangeTypes.TOPIC),key = "rabbit.test.*")},// 可以指定容器工厂,默认使用 rabbitListenerContainerFactorycontainerFactory = "rabbitListenerContainerFactory",//指定消费者的线程数量,一个线程会打开一个Channel,一个队列上的消息只会被消费一次(不考虑消息重新入队列的情况),下面的表示至少开启5个线程,最多10个。线程的数目需要根据你的任务来决定,如果是计算密集型,线程的数目就应该少一些concurrency = "5-10")
public void handleMessage(@Payload String message,@Headers Map<String,Object> headers,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,// 消费者标签,用来区分多个消费者@Header(AmqpHeaders.CONSUMER_TAG) String consumerTag,// message_id@Header(value = AmqpHeaders.MESSAGE_ID, required = false) String messageId,// 该消息是否多次(>1)交付@Header("amqp_redelivered") boolean redelivered) {System.out.println("====消费消息===handleMessage");System.out.println("消息:" + message);System.out.println("头:" + headers);System.out.println("deliveryTag:" + deliveryTag);System.out.println("messageId:" + messageId);System.out.println("redelivered:" + redelivered);
}
@RabbitListener 和 @RabbitHandler 搭配使用
- @RabbitListener 可以标注在类上面,需配合
@RabbitHandler
注解一起使用。 - @RabbitListener 标注在类上面表示当有收到消息的时候,就交给
@RabbitHandler
注解的方法进行分发处理,具体使用哪个方法处理,根据MessageConverter
转换后的参数类型
消费者:
@Component
@RabbitListener(queues = "hello")
public class Demo_06_Consumer {@RabbitHandlerpublic void receive(byte[] msg) {System.out.println("byte[] 消费者消费信息:" + new String(msg));}@RabbitHandlerpublic void receive(String msg) {System.out.println("String 消费者消费信息:" + msg);}@RabbitHandlerpublic void receive(Integer msg) {System.out.println("Integer 消费者消费信息:" + msg);}@RabbitHandlerpublic void receive(TestA a) {System.out.println("对象 TestA 消费者消费信息:" + a.toString());}
}class TestA implements Serializable {String fieldA;String fieldB;// 省略 Get Set ToString
}
生产者:
@Test
public void demo_06_Producer() {String routingKey = "hello";String str = "FBI OPEN THE DOOR";Integer i = 102424;TestA a = new TestA();a.setFieldA("FBI WARNING");// rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());rabbitTemplate.convertAndSend(routingKey, a);rabbitTemplate.convertAndSend(routingKey, i);rabbitTemplate.convertAndSend(routingKey, str);System.out.println("发送成功");
}
这里生产者在发送消息时,调用了 RabbitTemplate 中的 convertAndSend
方法会使用 MessageConvert
对 TestA 对象进行消息的序列化,其默认的实现也是 SimpleMessageConverter