备份交换器 Alternate-Exchange, 简称 AE
生产者在发送消息时如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失。如果设置了mandatory参数,则需要添加ReturnListener的编程逻辑,生产者的代码变的复杂。
如果不想复杂生产者代码,又不想消息丢失,可以使用备份交换器将未被路由的消息存储在RabbitMQ中
package com.song.songvue.config.message;import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** 生产者客户端代码*/
public class RabbitProducer {private static final Logger logger = LoggerFactory.getLogger(RabbitProducer.class);private static final String EXCHANGE_NAME = "exchange_demo";private static final String ROUTING_KEY = "routing_key";private static final String QUEUE_NAME = "queue_test";private static final String IP_ADDRESS = "192.168.10.110";private static final int PORT = 5672;public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(IP_ADDRESS);factory.setPort(PORT);factory.setUsername("root");factory.setPassword("1234567");// 创建连接Connection connection = factory.newConnection();// 创建信道Channel channel = connection.createChannel();/*void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)throws IOException;mandatory参数为 true时: 交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者.为false时: 出现上述情形时,消息直接被丢弃。生产者如何获知有没有被正确路由到合适的队列消息呢?可以通过 channel.addReturnListener来增加ReturnListener监听实现。immediate参数,告诉服务器,如果该消息关联的队列上有消费者,则立即投递,如果所有匹配的队列上都没有消费者,则直接返回。void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)throws IOException;RabbitMQ3.0去掉了immediate的支持channel.basicPublish("source", "song_root", true, MessageProperties.PERSISTENT_TEXT_PLAIN,"mandatory test".getBytes());channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("Basic.Return 返回的结果是: " + message);}});*/Map<String, Object> params = new HashMap<>();params.put("alternate-exchange", "myAe");// normalExchange, 设置了备份交换器为 myAechannel.exchangeDeclare("normalExchange", "direct", true, false, params);channel.exchangeDeclare("myAe", "fanout", true, false, null);channel.queueDeclare("normalQueue", true, false, false, null);channel.queueDeclare("unRoutedQueue", true, false, false, null);channel.queueBind("normalQueue", "normalExchange", "normalKey");channel.queueBind("unRoutedQueue", "myAe", "");channel.basicPublish("normalExchange", "errorKey",MessageProperties.PERSISTENT_TEXT_PLAIN, "normalKey is running ok".getBytes());// 关闭资源channel.close();connection.close();}
}
如果备份交换器的类型是direct, 并且有一个与其绑定的队列。
??????? 有以下几种特殊的情况:
- 如果设置的备份交换器不存在,客户端和RabbitMQ都不会有异常出现,此消息会丢失。
- 如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会有异常出现,此消息丢失。
- 如果备份交换器没有匹配到任何的队列,客户端和RabbitMQ服务端都不会产生异常,此消息丢失。