JMS&MQ系列之JMS的请求和回应 - geloin - 博客频道 - CSDN.NET
代理类:
- /**
- *
- * @author geloin
- * @date 2012-9-14 下午5:57:36
- */
- package com.geloin.activemq.test4;
- import org.apache.activemq.broker.BrokerService;
- /**
- *
- * @author geloin
- * @date 2012-9-14 下午5:57:36
- */
- public class Broker {
- /**
- * 创建并启动代理
- *
- * @author geloin
- * @date 2012-9-14 下午5:58:35
- * @throws Exception
- */
- private void createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.addConnector("tcp://localhost:61616");
- broker.start();
- System.out.println("\nPress any key to stop broker\n");
- System.in.read();
- broker.start();
- }
- /**
- *
- *
- * @author geloin
- * @date 2012-9-14 下午5:59:30
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- Broker broker = new Broker();
- broker.createBroker();
- }
- }
服务端:
- /**
- *
- * @author geloin
- * @date 2012-9-14 下午5:37:38
- */
- package com.geloin.activemq.test4;
- import javax.jms.Connection;
- import javax.jms.DeliveryMode;
- import javax.jms.Destination;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import org.apache.activemq.ActiveMQConnectionFactory;
- /**
- * 服务进程
- *
- * @author geloin
- * @date 2012-9-14 下午5:37:38
- */
- public class Server implements MessageListener {
- private String brokerURL = "tcp://localhost:61616";
- private Connection conn;
- private Session session;
- private String requestQueue = "TEST.QUEUE";
- private MessageProducer producer;
- private MessageConsumer consumer;
- /**
- * 消息处理
- *
- * @author geloin
- * @date 2012-9-14 下午5:53:46
- * @param messageText
- * @return
- */
- private String handleRequest(String messageText) {
- return "Response to '" + messageText + "'";
- }
- /*
- * (non-Javadoc)
- *
- * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
- */
- @Override
- public void onMessage(Message message) {
- // 监听到有消息传送至此时,执行onMessage
- try {
- // 若有消息传送到服务时,先创建一个文本消息
- TextMessage response = this.session.createTextMessage();
- // 若从客户端传送到服务端的消息为文本消息
- if (message instanceof TextMessage) {
- // 先将传送到服务端的消息转化为文本消息
- TextMessage txtMsg = (TextMessage) message;
- // 取得文本消息的内容
- String messageText = txtMsg.getText();
- // 将客户端传送过来的文本消息进行处理后,设置到回应消息里面
- response.setText(handleRequest(messageText));
- }
- // 设置回应消息的关联ID,关联ID来自于客户端传送过来的关联ID
- response.setJMSCorrelationID(message.getJMSCorrelationID());
- // 生产者发送回应消息,目的由客户端的JMSReplyTo定义,内容即刚刚定义的回应消息
- producer.send(message.getJMSReplyTo(), response);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- *
- * @author geloin
- * @date 2012-9-14 下午5:37:39
- * @param args
- */
- public static void main(String[] args) throws Exception {
- // 定义并启动服务
- Server server = new Server();
- server.start();
- // 监听控制器输入
- System.out.println("\nPress any key to stop the server\n");
- System.in.read();
- // 停止服务
- server.stop();
- }
- /**
- * 服务
- *
- * @author geloin
- * @date 2012-9-14 下午5:55:49
- * @throws Exception
- */
- public void start() throws Exception {
- // 使用tcp协议创建连接通道并启动
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
- brokerURL);
- conn = factory.createConnection();
- conn.start();
- // 创建session及消息的目的地,并设定交互时使用的存储方式,同时定义队列名称,客户端通过此名称连接
- session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dest = session.createQueue(requestQueue);
- // 创建生产者,并设定分配模式,生产者的目的地为null,因为它的目的地由JMSReplyTo定义
- producer = session.createProducer(null);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- // 消费者,及消费者的临听程序
- consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
- }
- /**
- * 回收资源
- *
- * @author geloin
- * @date 2012-9-13 上午9:42:06
- * @throws Exception
- */
- private void stop() {
- try {
- if (null != producer) {
- producer.close();
- }
- if (null != consumer) {
- consumer.close();
- }
- if (null != session) {
- session.close();
- }
- if (null != conn) {
- conn.close();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
客户端:
- /**
- *
- * @author geloin
- * @date 2012-9-14 下午6:00:15
- */
- package com.geloin.activemq.test4;
- import java.util.UUID;
- import javax.jms.Connection;
- import javax.jms.DeliveryMode;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import org.apache.activemq.ActiveMQConnectionFactory;
- /**
- *
- * @author geloin
- * @date 2012-9-14 下午6:00:15
- */
- public class Client implements MessageListener {
- private String brokerURL = "tcp://localhost:61616";
- private Connection conn;
- private Session session;
- private String requestQueue = "TEST.QUEUE";
- private MessageProducer producer;
- private MessageConsumer consumer;
- private Destination tempDest;
- /* (non-Javadoc)
- * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
- */
- @Override
- public void onMessage(Message message) {
- try {
- System.out.println("Receive response for: "
- + ((TextMessage) message).getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- /**
- * 启动
- *
- * @author geloin
- * @date 2012-9-13 下午7:49:00
- */
- private void start() {
- try {
- // 使用tcp协议创建连接通道并启动
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
- brokerURL);
- conn = factory.createConnection();
- conn.start();
- // 创建session及消息的目的地,并设定交互时使用的存储方式,同时定义队列名称,此名称与服务端相同
- session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dest = session.createQueue(requestQueue);
- // 创建生产者,并设定分配模式
- producer = session.createProducer(dest);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- // 使用临时目的地创建消费者,及消费者的临听程序
- tempDest = session.createTemporaryQueue();
- consumer = session.createConsumer(tempDest);
- consumer.setMessageListener(this);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * 停止
- *
- * @author geloin
- * @date 2012-9-13 下午7:49:06
- */
- public void stop() {
- try {
- if (null != producer) {
- producer.close();
- }
- if (null != consumer) {
- consumer.close();
- }
- if (null != session) {
- session.close();
- }
- if (null != conn) {
- conn.close();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * 请求
- *
- * @author geloin
- * @date 2012-9-13 下午7:52:54
- * @param request
- * @throws Exception
- */
- public void request(String request) throws Exception {
- System.out.println("Requesting:" + request);
- TextMessage txtMsg = session.createTextMessage();
- txtMsg.setText(request);
- txtMsg.setJMSReplyTo(tempDest);
- String correlationId = UUID.randomUUID().toString();
- txtMsg.setJMSCorrelationID(correlationId);
- producer.send(txtMsg);
- }
- /**
- *
- * @author geloin
- * @date 2012-9-14 下午6:00:15
- * @param args
- */
- public static void main(String[] args) throws Exception {
- Client client = new Client();
- client.start();
- int i = 0;
- while (i++ < 10) {
- client.request("REQUEST-" + i);
- }
- Thread.sleep(3000);
- client.stop();
- }
- }
代码比较简单,是实现JMS的基本过程,从以上代码中,我们可以得出以下结论:
1. 要实现交互,必须有代理,即Broker,该代理必须设定Connector,也即是我们所谓brokerURL。在服务端与客户端交互过程中,broker必须被启动;
2. 要向外发送消息,需要执行以下步骤:
(1) 通过第一步中的brokerURL创建ConnectionFactory;
(2) 通过ConnectionFactory创建Connection并启动;
(3) 通过Connection创建Session;
(4) 通过Session创建发送目标;
(5) 通过Session创建MessageProducer;
(6) 通过Session创建Message;
(7) 通过MessageProducer发送Message。
3. 要接收消息,需要执行以下步骤:
(1) 通过第一步中的brokerURL创建ConnectionFactory;
(2) 通过ConnectionFactory创建Connection并启动;
(3) 通过Connection创建Session;
(4) 通过Session创建发送目标;
(5) 通过Session创建MessageConsumer;
(6) 通过MessageConsumer取得Message并分析处理。
4. 若要实现交互,则注意以下几点:
(1) 服务端和客户端使用同一个brokerURL;
(2) 通常情况下,服务端和客户端各有一个MessageProducer和MessageConsumer;
(3) 为使服务端能够回应多个客户端,通常将其MessageProducer的Destination设置为null,即不设定,而由JMSReplyTo定义;
(4) 对应于服务端的MessageProducer的为null的Destination,若确定服务端与客户端能够交互,则在客户端可设置其MessageConsumer的Destination为临时Destination;
(5) 为使服务端能够正常回应客户端,客户端需设置消息的JMSReplyTo属性及JMSCorrelationID,服务端需设置消息的JMSCorrelationID为客户端设定的JMSCorrelationID,producer的send的Destination为客户端设定的JMSReplyTo。
5. 两个需要交互的MessageProducer和MessageConsumer之间除需要使用同一brokerURL外,还需要保障其Destination的对应,即保持在创建Destination时使用的queueName相同。