当前位置: 代码迷 >> java >> 如何从Web应用程序中侦听消息队列? (Tomcat,ActiveMQ)
  详细解决方案

如何从Web应用程序中侦听消息队列? (Tomcat,ActiveMQ)

热度:111   发布时间:2023-08-04 09:27:05.0

我很高兴改进在Apache Tomcat上运行的Web应用程序 添加ActiveMQ JMS服务器以发送和接收消息。

我已经可以发送和接收消息,但在接收方需要帮助。

我的网络应用程序应该如何持续监听一个队列来接收消息?

新邮件到达,服务器应该对它们进行操作。 例如:将数据添加到数据库或发回消息。

我已经可以发送消息了。 这是代码。

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("clientQueue");
MessageProducer publisher = session.createProducer(queue);
connection.start();

Message message = null;
message = session.createTextMessage("Text Message");
publisher.send(message);

我可以在请求后收到消息(点击;-))

connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("serverQueue");
consumer = session.createConsumer(destination);

while (true) {
    Message message = consumer.receive(300000);
    //Do message stuff
}

我应该如何让Web应用程序连续监听队列? 建议的方式是什么?

热烈欢迎所有帮助。 谢谢。

编辑 - 解决方案

目前的工作解决方案与的提议

我添加了一个ServletContextListener来连续收听我的消息。

web.xml中

<listener>
    <listener-class>com.test.JMSContextListener</listener-class>
</listener>

听众:

public class JMSContextListener implements ServletContextListener {
    @Override
    public void contextInitialized(ServletContextEvent arg0) {
        Thread thread = new Thread(new JMSConnector());
        thread.start();
    }

    @Override
    public void contextDestroyed(ServletContextEvent arg0) {
        //Nothing
    }
}

连接:

public class JMSConnector implements Runnable {
    public void run() {
        try {
            Context context = new InitialContext();
            QueueConnectionFactory factory = (QueueConnectionFactory) context.lookup("java:comp/env/jms/ConnectionFactory");            
            Connection connection = factory.createConnection();
            Queue queue = (javax.jms.Queue) context.lookup("java:comp/env/jms/serverQueue");
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            MessageConsumer consumer = session.createConsumer(queue);

            //This MessageListener will do stuff with the message
            MessageListenerImpl messageListener = new MessageListenerImpl();
            consumer.setMessageListener(messageListener);
            connection.start();

            // Start connection or nothing will happen!!!
            connection.start();
        } catch (JMSException ex) {
            //TODO
        } catch (NamingException ex) {
            //TODO
        }
    }
}

这是一种建议的方式还是应该改进?

热烈欢迎所有帮助。 谢谢。

如果您的代码已经可以使用队列中的消息(您看起来就是这样),那么我认为您的问题归结为如何运行这段代码。

看来你没有使用任何框架,所以我认为我采取的方法是采用可以从队列中检索消息并在应用程序服务器的单独线程中运行它的代码。 获取该线程以在应用程序服务器启动时启动,并在应用程序服务器关闭时自行整理。

在应用服务器启动时启动线程的最简单方法是引入ServletContextListener( 的示例。)在Context Listener中,在单独的线程中启动队列侦听代码。

编辑:我使用了这个提议的解决方案,并将上面的代码添加到问题中。

我正在使用spring来收听队列。 定义监听器是这样的:

<jms:listener-container connection-factory="jmsConnectionFactoryLocal">
            <jms:listener destination="QUEUE_NAME" ref="channelManagerSimulatorDefault"/>
</jms:listener-container>

必须根据MQ创建jmsConnectionFactoryLocal。 在我的例子中它是IBM WebsphereMQ,所以jmsConnectionFactoryLocal定义是这样的:

<bean id="mqConnectionFactoryLocal" class="com.ibm.mq.jms.MQQueueConnectionFactory">
       <property name="hostName">
         <value>THE_MQ_SERVER_IP</value>
       </property>
       <property name="port">
          <value>MQ_PORT</value>
       </property>
       <property name="queueManager">
          <value>QUEUE_MANAGER_NAME</value>
       </property>
       <property name="transportType">
          <value>1</value>
       </property>
    </bean>
    <bean id="jmsConnectionFactoryLocal" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
       <property name="targetConnectionFactory" ref="mqConnectionFactoryLocal"/>
       <property name="username" value="USER_NAME"/>
       <property name="password" value="PASSWORD"/>
    </bean>

您必须为ActiveMQ找到正确的ConnectionFactory实现并使用它。 listener和jmsConnectionFactory与MQ提供程序相同且独立。

我在webApplication spring-mvc和JMS模板中使用activeMQ,方法如下。

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
        <value>tcp://localhost:61616</value>
    </property>
</bean>

<bean id="messageSender" class="xyz.MessageSender"/>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="destination" />
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="REQUEST_QUEUE" />
</bean> 
<bean id="messageListener" class="xtz.MessageListener">
    <property name="listenerId" value="1" />
</bean> 

<jms:listener-container connection-factory="connectionFactory">     
    <jms:listener destination="RESPONSE_QUEUE" ref="messageListener" method="messageReceived" />
</jms:listener-container>

下面给出了Sender和Listener类的实现。

public class MessageSender 
{

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMessage()
    {

        jmsTemplate.send(new MessageCreator()
        {
            public Message createMessage(Session session) throws JMSException
            {
                MapMessage message = session.createMapMessage();
                message.setString("messageType", XXX);
                message.setString("jsonMessage", XXXX);
                return message;
            }
        });
    }
}


public class MessageListener
{
    private int listenerId;

    @Override
    public void messageReceived(Map<String, Object> message) throws Exception
    {        
        //put your logic here 
    }

    public int getListenerId()
    {
        return listenerId;
    }

    public void setListenerId(int listenerId)
    {
        this.listenerId = listenerId;
    }
}
  1. 在Tomcat中配置JMS队列: :

  2. 将activemq-all-5.xx.jar放到$ TOMCAT_HOME / lib

  3. 在Thread中循环监听,在@WebListener启动:

  4. 在contextDestroyed时停止

     @WebListener @Slf4j public class JmsMailListener implements ServletContextListener { private Thread listenerThread = null; private QueueConnection connection; @Override public void contextInitialized(ServletContextEvent sce) { try { InitialContext initCtx = new InitialContext(); ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) initCtx.lookup("java:comp/env/jms/ConnectionFactory"); connectionFactory.setTrustAllPackages(true); connection = connectionFactory.createQueueConnection(); QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = (Queue) initCtx.lookup("java:comp/env/jms/queue/MailQueue"); QueueReceiver receiver = queueSession.createReceiver(queue); connection.start(); log.info("Listen JMS messages ..."); listenerThread = new Thread(() -> { try { while (!Thread.interrupted()) { Message m = receiver.receive(); if (m instanceof ObjectMessage) { ObjectMessage om = (ObjectMessage) m; MyObject myObject = (MyObject) om.getObject(); log.info("Received MyObject {}", myObject); ... } } } catch (Exception e) { log.error("Receiving messages failed: " + e.getMessage(), e); } }); listenerThread.start(); } catch (Exception e) { log.error("JMS failed: " + e.getMessage(), e); } } @Override public void contextDestroyed(ServletContextEvent sce) { if (connection != null) { try { connection.close(); } catch (JMSException ex) { log.warn("Couldn't close JMSConnection: ", ex); } } if (listenerThread != null) { listenerThread.interrupt(); } } } 
  相关解决方案