当前位置: 代码迷 >> 综合 >> 消息队列-ActiveMQ P2P模式
  详细解决方案

消息队列-ActiveMQ P2P模式

热度:88   发布时间:2023-10-17 06:05:11.0

1.导依赖包

<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.10</version>
</dependency>

2.消息生产者

public class Sender {
    public static void main(String[] args) throws JMSException, InterruptedException {
    // 1. 建立工厂对象ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");//2. 从工厂里获取一个连接Connection connection = factory.createConnection();connection.start();//3. 从连接中获取Session(会话)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//4. 从会话中获取目的地(Destination),消费者会从这个目的地取消息Queue queue = session.createQueue("lance");//5. 从会话中创建消息提供者MessageProducer producer = session.createProducer(queue);int i = 1;try {
    while (true) {
    //从会话中创建文本消息(也可以创建其它类型的消息体)TextMessage textMessage = session.createTextMessage("hello" + i);Thread.sleep(1000);// 通过消息提供者发送消息到ActiveMQproducer.send(textMessage);i++;}} catch (JMSException e) {
    e.printStackTrace();} catch (InterruptedException e) {
    e.printStackTrace();} finally {
    // 关闭连接connection.close();}System.out.println("exit");}
}

3.消息消费者

public class Recviver {
    public static void main(String[] args) throws JMSException {
    // 1. 建立工厂对象ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");//2. 从工厂里获取一个连接Connection connection = factory.createConnection();connection.start();//3. 从连接中获取Session(会话)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//4. 从会话中获取目的地(Destination),消费者会从这个目的地取消息Destination destination = session.createQueue("lance");//5. 从会话中创建消息消费者MessageConsumer consumer = session.createConsumer(destination);//6. 持续的消费try {
    while (true) {
    TextMessage textMessage = (TextMessage) consumer.receive();System.out.println("消费:" + textMessage.getText());}} catch (Exception e) {
    e.printStackTrace();} finally {
    //关闭连接connection.close();}}
}
  相关解决方案