当前位置: 代码迷 >> Java相关 >> ActiveMQ应用(一)-安装及基本模式实例
  详细解决方案

ActiveMQ应用(一)-安装及基本模式实例

热度:601   发布时间:2016-04-22 19:11:43.0
ActiveMQ应用(1)-安装及基本模式实例

0.下载地址

https://activemq.apache.org/download.html

 

1.解压并启动activemq服务(需根据系统的不同选择不同的启动文件)

/apache-activemq-5.13.1/bin/macosx/activemq start

 

2.登录activemq服务器进行查看

地址:http://localhost:8161/

点击[Manage ActiveMQ broker]登录查看详细数据,默认用户名密码admin/admin

 

 

3.创建eclipse项目

/apache-activemq-5.13.1/lib下倒入所需jar包

 

3.1 通用jms示例

public class Sender {    private static final int SEND_NUMBER=5;        public static void main(String[] args){        ConnectionFactory connectionFactory;        Connection connection =null;        Session session;        Destination destination;        MessageProducer producer;                connectionFactory=new ActiveMQConnectionFactory(                ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD,                "tcp://localhost:61616");        try{            connection = connectionFactory.createConnection();            connection.start();                        session=connection.createSession(Boolean.TRUE,                    Session.AUTO_ACKNOWLEDGE);                        destination=session.createQueue("JMeterQueue");                        producer=session.createProducer(destination);            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);            sendMessage(session,producer);            session.commit();        }catch(Exception e){            e.printStackTrace();        }finally{            try{                if(null!=connection){                    connection.close();                }            }catch(Throwable ignore){                            }        }    }        public static void sendMessage(Session session,MessageProducer producer) throws JMSException{        for(int i=1;i<SEND_NUMBER;i++){            TextMessage message=session.createTextMessage("ActiveMq send "+i);            System.out.println("ActiveMq send "+i);            producer.send(message);        }    }}

 

public class Receiver {    public static void main(String[] args){        ConnectionFactory connectionFactory ;        Connection connection=null;        Session session;        Destination destination;        MessageConsumer consumer;                connectionFactory = new ActiveMQConnectionFactory(                ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD,                "tcp://localhost:61616");        try{            connection = connectionFactory.createConnection();            connection.start();            session=connection.createSession(Boolean.TRUE,                     Session.AUTO_ACKNOWLEDGE);            destination=session.createQueue("JMeterQueue");            consumer=session.createConsumer(destination);            while(true){                TextMessage message=(TextMessage)consumer.receive(10000);                            if(null !=message){                    System.out.println("Message receive "+ message.getText());                }else{                    break;                }            }            session.commit();            //session.commit 之后,Messages Enqueued 中的消息才会被被消费掉,Messages Dequeued 才会增加;            //如果不commit,Messages Dequeued会一直为0,每次启动receiver后都会受到所有未消费的消息        }catch (Exception e) {            e.printStackTrace();        } finally {            try {                if (null != connection)                    connection.close();            } catch (Throwable ignore) {            }        }    }}

 

3.2 p2p示例

public class QueueSender {        // 发送次数    public static final int SEND_NUM = 5;    // tcp 地址    public static final String BROKER_URL = "tcp://localhost:61616";    // 目标,在ActiveMQ管理员控制台创建     public static final String DESTINATION = "mq.p2p.queue";        public static void run() throws Exception {        QueueConnection connection = null;        QueueSession session = null;        try {            // 创建链接工厂            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);            // 通过工厂创建一个连接            connection = factory.createQueueConnection();            // 启动连接            connection.start();            // 创建一个session会话            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);            // 创建一个消息队列            Queue queue = session.createQueue(DESTINATION);            // 创建消息发送者            javax.jms.QueueSender sender = session.createSender(queue);            // 设置持久化模式            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);            sendMessage(session, sender);            // 提交会话            session.commit();                    } catch (Exception e) {            throw e;        } finally {            // 关闭释放资源            if (session != null) {                session.close();            }            if (connection != null) {                connection.close();            }        }    }        public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {        for (int i = 0; i < SEND_NUM; i++) {            String message = "发送消息第" + (i + 1) + "条";            Message msg=session.createTextMessage(message);            sender.send(msg);        }    }        public static void main(String[] args) throws Exception {        QueueSender.run();    }}

 

public class QueueReceiver {         // tcp 地址    public static final String BROKER_URL = "tcp://localhost:61616";    // 目标,在ActiveMQ管理员控制台创建     public static final String TARGET = "mq.p2p.queue";        public static void run() throws Exception {        QueueConnection connection = null;        QueueSession session = null;        try {            // 创建链接工厂            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);            // 通过工厂创建一个连接            connection = factory.createQueueConnection();            // 启动连接            connection.start();            // 创建一个session会话            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);            // 创建一个消息队列            Queue queue = session.createQueue(TARGET);            // 创建消息制作者            javax.jms.QueueReceiver receiver = session.createReceiver(queue);                        receiver.setMessageListener(new MessageListener() {                 public void onMessage(Message msg) {                     if (msg != null) {                        TextMessage map = (TextMessage) msg;                        try {                            System.out.println(map.getText());                        } catch (JMSException e) {                            e.printStackTrace();                        }                    }                }             });             // 休眠100ms再关闭            Thread.sleep(1000 * 20);                         // 提交会话            session.commit();                    } catch (Exception e) {            throw e;        } finally {            // 关闭释放资源            if (session != null) {                session.close();            }            if (connection != null) {                connection.close();            }        }    }        public static void main(String[] args) throws Exception {        QueueReceiver.run();    }}

 

3.3 订阅示例

public class TopicSender {        // 发送次数    public static final int SEND_NUM = 5;    // tcp 地址    public static final String BROKER_URL = "tcp://localhost:61616";    // 目标,在ActiveMQ管理员控制台创建    public static final String DESTINATION = "mq.topic";        public static void run() throws Exception {        TopicConnection connection = null;        TopicSession session = null;        try {            // 创建链接工厂            TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);            // 通过工厂创建一个连接            connection = factory.createTopicConnection();            // 启动连接            connection.start();            // 创建一个session会话            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);            // 创建一个消息队列            Topic topic = session.createTopic(DESTINATION);            // 创建消息发送者            TopicPublisher publisher = session.createPublisher(topic);            // 设置持久化模式            publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);            sendMessage(session, publisher);            // 提交会话            session.commit();                    } catch (Exception e) {            throw e;        } finally {            // 关闭释放资源            if (session != null) {                session.close();            }            if (connection != null) {                connection.close();            }        }    }        public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {        for (int i = 0; i < SEND_NUM; i++) {            String message = "发送消息第" + (i + 1) + "条";            TextMessage msg =session.createTextMessage(message);            publisher.send(msg);        }    }        public static void main(String[] args) throws Exception {        TopicSender.run();    }}

 

public class TopicReceiver {         // tcp 地址    public static final String BROKER_URL = "tcp://localhost:61616";    // 目标,在ActiveMQ管理员控制台创建    public static final String TARGET = "mq.topic";        public static void run() throws Exception {             TopicConnection connection = null;        TopicSession session = null;        try {            // 创建链接工厂            TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);            // 通过工厂创建一个连接            connection = factory.createTopicConnection();            // 启动连接            connection.start();            // 创建一个session会话            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);            // 创建一个消息队列            Topic topic = session.createTopic(TARGET);            // 创建消息制作者            TopicSubscriber subscriber = session.createSubscriber(topic);            subscriber.setMessageListener(new MessageListener() {                 public void onMessage(Message msg) {                     System.out.println(msg);                }             });             // 休眠100ms再关闭            Thread.sleep(1000 * 20);                         // 提交会话            session.commit();                    } catch (Exception e) {            throw e;        } finally {            // 关闭释放资源            if (session != null) {                session.close();            }            if (connection != null) {                connection.close();            }        }    }        public static void main(String[] args) throws Exception {        TopicReceiver.run();    }}

 

  相关解决方案