1.JSM消息的发送
import java.io.Serializable; import java.util.HashMap; import java.util.Iterator; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.QueueConnection; import javax.jms.Session; import com.sun.messaging.ConnectionConfiguration; import com.sun.messaging.ConnectionFactory; import com.sun.messaging.Queue; /** * JMS发送工具类 * @author owner * */ public class JMSUtil { /** * 发送Queue消息 * * @param content * 消息传递内容 * @param queueName * 消息对象传送序列名称.如:MQtest * */ public static void send(Object content,String queueName) { try { ConnectionFactory connectionFactory = null; QueueConnection connection = null; Session session = null; MessageProducer producer = null; Message message = null; try { /** 建立消息服务器连接 */ connectionFactory = new ConnectionFactory(); /**服务器地址*/ String jmsServer = "localhost:8080"; /**用户名称*/ String jmsUser = "admin"; /**密码*/ String jmsPassword ="admin"; connectionFactory.setProperty("imqAddressList", jmsServer); connectionFactory.setProperty(ConnectionConfiguration.imqDefaultUsername, jmsUser); connectionFactory.setProperty(ConnectionConfiguration.imqDefaultPassword,jmsPassword); /**创建queue点对点模式连接。topic是发布模式*/ connection = connectionFactory.createQueueConnection(); /**通过服务连接创建会话*/ session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//AUTO_ACKNOWLEDGE自动提示 Destination destination = null; try { /**通过序列名称创建会话目标对象*/ destination = new Queue(queueName); } catch (Throwable throwable) { return; } /**创建消息制造者:用消息制造者来发送消息*/ producer = session.createProducer(destination); /**开始连接消息服务器*/ connection.start(); // 字符串类型内容 if (content instanceof String) { /**如果消息传递内容为字符串则通过会话创建文本消息*/ message = session.createTextMessage(content.toString()); } // hash表型内容 else if (content instanceof HashMap) { /**如果消息传递内容为HashMap则通过会话创建Map消息*/ HashMap contentMap = (HashMap) content; MapMessage mm = session.createMapMessage(); /**把要发送的map内容转入会话创建的消息Map中*/ Iterator it = contentMap.keySet().iterator(); boolean isDifficulty = false;//判断是Map中值是否是对象:true为对象 while (it.hasNext()) { String name = it.next().toString(); Object o = contentMap.get(name); /**该会话创建的map消息中只存储值为String的字符串,如果是对象,则会创建值为object的对象消息*/ if (o != null && !(o instanceof String)) { isDifficulty = true; break; } if (o == null) { mm.setString(name, ""); } else { mm.setString(name, o.toString()); } } if (isDifficulty) { /**如果是Map中存的值为非字符串则创建Object消息*/ message = session.createObjectMessage((Serializable) content); } else { message = mm; } } // 其它类型内容 else { /**如果消息内容不是map也不是字符串则直接创建一个Object类型消息*/ message = session .createObjectMessage((Serializable) content); } /**消息制造者发送消息*/ producer.send(message); } // JMS链接例外,记录例外日志,并抛出错误信息 catch (JMSException ex) { ex.printStackTrace(); } finally { try { /**关闭连接*/ connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } catch (Throwable ex1) { ex1.printStackTrace(); } } }
2.JMS消息的接收
第一步:创建mq连接工厂的实现。用于设置工厂构建连接地址
package com.test.jms; import javax.jms.Connection; import javax.jms.JMSException; import com.sun.messaging.ConnectionConfiguration; import com.sun.messaging.ConnectionFactory; public class OpenMqConnectionFactory implements javax.jms.ConnectionFactory{ private ConnectionFactory connectionFactory; public OpenMqConnectionFactory(String brokerAddress) throws JMSException { connectionFactory = new ConnectionFactory(); connectionFactory.setProperty(ConnectionConfiguration.imqAddressList, brokerAddress); } public Connection createConnection() throws JMSException { return connectionFactory.createConnection(); } public Connection createConnection(String userName, String password) throws JMSException { return connectionFactory.createConnection(userName, password); } }
第二步:创建JMS接收类
package com.test.jms; import org.springframework.core.task.TaskExecutor; public class JMSReciever { private TaskExecutor taskExecutor; public JMSReciever(TaskExecutor taskExecutor){ this.taskExecutor = taskExecutor; } private class MessagePrinterTask implements Runnable{ private String message; public MessagePrinterTask(String message){ this.message = message; } public void run() { System.err.println(">>>>>>>>>>>>>>>>>>新线程中:"+message); } } /** * 接收消息 * @param msg:消息内容(spring监听到消息后执行此方法,并将消息内容付给msg参数中) */ public void receiveMSG(Object msg){ //这个方法可以写具体操作 System.err.println(">>>>>>>>>>>>>>>>>接收消息后:"+msg.toString()); taskExecutor.execute(new MessagePrinterTask("新线程参数")); //此对象为消息中包含对象。例如:msg是map。Map msp = (Map)msg; Object msgContent =msg; } }第三步:编写spring配置文件,定时获取消息
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd" default-lazy-init="true"> <jms:listener-container connection-factory="jmsFactory" task-executor="taskExecutor" concurrency="10" acknowledge="transacted"> <!--监听到序列号为MSG的消息时,通过会执行messageReceiver类中的receiveMSG方法 注意:MQtest与消息发送JMSUtil的参数queueName对应,即:接收指定队列中的消息。 --> <jms:listener destination="MQtest" ref="messageReceiver" method="receiveMSG" /> <!--method监听到消息后要调用的方法--> </jms:listener-container> <!--mq连接工厂的实现。用于设置工厂构建连接地址--> <bean id="jmsFactory" class="com.test.jms.OpenMqConnectionFactory"> <!--注入服务器地址--> <constructor-arg type="java.lang.String" value="localhost:8080" /> </bean> <!--监听到消息后要调用的类--> <bean id="messageReceiver" class="com.test.jms.JMSReciever"> <constructor-arg ref="taskExecutor" /> </bean> <!--spring的api,此类可以用execute(Runnable)来执行一个新线程。此类跟jms没关系只是自定义的类中注入一个”线程池任务执行者“,来执行其他自定义线程--> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="5" /> <property name="maxPoolSize" value="10" /> <property name="queueCapacity" value="5" /> </bean> </beans>
依赖主要jar文件:
javaee.jar
imqjmsra.jar