首先创建连接池工厂. 需要引入连接池的坐标.
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.9</version></dependency>
public class MQPooledConnectionFactory {public static String brokerUrl = "tcp://192.168.25.43:9601";public static String maxActiveConfig = "5";private static ActiveMQConnectionFactory connectionFactory;/*** 获得自己创建的链接工厂,这个工厂只初始化一次*/public static ActiveMQConnectionFactory getMyActiveMQConnectionFactory() {if (null == connectionFactory) {connectionFactory = new ActiveMQConnectionFactory(brokerUrl);}return connectionFactory;}private static PooledConnectionFactory pooledConnectionFactory;static {try {// 需要创建一个链接工厂然后设置到连接池中 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();activeMQConnectionFactory.setBrokerURL(brokerUrl);pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);// 链接最大活跃数,为了在测试中区别我们使用的到底是不是一个对象和看是否能控制连接数(实际上是会话数),我们在这里设置为1 int maximumActive = Integer.parseInt(maxActiveConfig);pooledConnectionFactory.setMaxConnections(5);pooledConnectionFactory.setMaximumActiveSessionPerConnection(5);} catch (Exception e) {e.printStackTrace();}}/*** 获得链接池工厂*/public static PooledConnectionFactory getPooledConnectionFactory() {return pooledConnectionFactory;}/*** 对象回收销毁时停止链接*/@Overrideprotected void finalize() throws Throwable {pooledConnectionFactory.stop();super.finalize();}
}
创建mqService 用来封装发送mq的方法.
public class MqService {// 发送消息 private void send(String message,String queueName) throws Exception {ActiveMQConnectionFactory connectionFactory = null;Connection connection = null;Session session = null;Queue queue = null;MessageProducer producer = null;// ====================================try { // 获得我们自己初始化的链接工厂然后创建链接 这个是短连接connectionFactory = MQPooledConnectionFactory.getMyActiveMQConnectionFactory();connection = connectionFactory.createConnection();// 或者链接从链接池工厂进行获得
// connection = MQPooledConnectionFactory.getPooledConnectionFactory().createConnection();session = connection.createSession(false, session.AUTO_ACKNOWLEDGE); queue = session.createQueue(queueName);producer = session.createProducer(queue); // 链接开始,如果我们使用的是连接池,那么即使你不开始,也是没有问题的 connection.start(); } catch (Exception e) {e.printStackTrace(); } // ==================================== TextMessage textMessage = session.createTextMessage(message);System.out.println("textMessage = " + textMessage);producer.send(textMessage);producer.close();// =================================== // 通过打印会话的内存地址和链接的客户端编号就可以知道我们使用的是不是同一个会话和链接 System.out.println(session.toString());
// System.out.println(connection.getClientID());// 无论使用的自己的工厂还是连接池的,都要将会话关闭 // 如果不关闭,在使用连接池的时可以看到效果,发送两次时只能发送一次,造成堵塞 session.close(); // 使用自己的工厂和连接池的区别是,运行后自己工厂链接调用关闭程序结束 // 而调用连接池链接进行关闭实际上没有关闭,因为连接池要维护这个链接 connection.close();}private MqService() {} // 发送对象每次创建一个,用以区别我们使用的对象 public static MqService getInstance() {return new MqService();} // 对外开发发送消息方法 @SuppressWarnings("unchecked")public void sendMessage(String message,String queueName)throws Exception {this.send(message,queueName);}
}
在main方法中尝试调用.
public static void main(String[] args) throws Exception {// 循环调用,这里定义调用两次MqService sender = MqService.getInstance();sender.sendMessage("testMessage", "QUEUE_NAME");System.out.println("数据已经发送完毕!");}