当前位置: 代码迷 >> 综合 >> activeMQ-消息订阅
  详细解决方案

activeMQ-消息订阅

热度:82   发布时间:2023-12-15 11:39:20.0

activeMQ消息订阅:

订阅模式下,如果消费者开启之前由消息存在,那么消费者是消费不到之前发布的消息的,即使消息仍然在消息队列中。

    pruducer.java:

package com.anlysqx.topicproducer;import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class TopicProducer {private static final String USER = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;private static final String TOPICNAME = "myTopic";public static void start() throws JMSException{ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();Connection createConnection = activeMQConnectionFactory.createConnection();createConnection.start();Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination destination = createSession.createTopic(TOPICNAME);MessageProducer createProducer = createSession.createProducer(destination);for (int i = 0; i < 5; i++) {TextMessage createTextMessage = createSession.createTextMessage("生产者发布消息 i = "+i);createProducer.send(createTextMessage);}System.out.println("全部消息发布成功");createSession.close();createConnection.close();}public static void main(String[] args) throws JMSException {start();}}

consumer.java:

package com.anlysqx.topicconsumer;import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class TopicConsumer {private static final String USER = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;private static final String TOPICNAME = "myTopic";public static void start() throws JMSException{ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();Connection createConnection = activeMQConnectionFactory.createConnection();createConnection.start();Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination destination = createSession.createTopic(TOPICNAME);MessageConsumer createConsumer = createSession.createConsumer(destination);Integer num = 0;while(true){Message message = createConsumer.receive();if(message!=null){System.out.println("接收到订阅的消息: "+((TextMessage)message).getText());}else{break;}}createSession.close();createConnection.close();}public static void main(String[] args) throws JMSException {start();}
}