ActiveMQ与Spring整合

标签: activemq spring | 发表时间:2011-12-30 09:44 | 作者:bincrack
出处:http://www.cnblogs.com/

ActiveMQ 是Apache出品, 是最流行​​和最强大的开源消息总线。 同时完全支持 JMS 1.1和J2EE 1.4规范。

ActiveMQ 特性

  1. 支持多种编程语言和协议编写客户端。
  2. 在JMS客户端和消息代理完全支持企业集成模式
  3. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)。
  4. 对Spring的支持, ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性。
  5. 测试通过常见的J2EE服务器。如:Geronimo, JBoss 4, GlassFish, WebLogic。
  6. 支持多种传送协议:in-VM, TCP, SSL, NIO, UDP, JGroups,JXTA。
  7. 支持通过JDBC和journal提供高速的消息持久化。
  8. 从设计上保证了高性能的集群,客户端-服务器,点对点。
  9. 支持Ajax
  10. 支持与Axis的整合
  11. 可以很容易得调用内嵌JMS provider,进行测试。

通过网络搜索及个人理解,整理出ActiveMQ与Spring整合的文章。

相关jar包

activemq-core-5.5.1.jar   
geronimo-j2ee-management_1.1_spec-1.0.1.jar
geronimo-jms_1.1_spec-1.1.1.jar
geronimo-jta_1.0.1B_spec-1.0.1.jar
kahadb-5.5.1.jar
xbean-spring-3.7.jar
commons-beanutils.jar
commons-codec.jar
commons-collections.jar
commons-fileupload.jar
commons-httpclient.jar
commons-io.jar
commons-lang.jar
commons-logging.jar
commons-validator.jar
dom4j-1.6.1.jar
javaee.jar
jsf-api.jar
jsf-impl.jar
jstl.jar
log4j-1.2.15.jar
slf4j-api-1.5.8.jar
slf4j-log4j12-1.5.8.jar
aspectjrt.jar
aspectjweaver.jar
spring.jar

spring.xml

<?xml version="1.0" encoding="UTF-8"?>   
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-2.5.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-2.5.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">

  <!-- Annotation Config -->
  <context:annotation-config/>

  <!-- Compoent Scan -->
  <context:component-scan base-package="com.uu.web.*"/>

  <!-- Property Placeholder -->
  <context:property-placeholder location="classpath:config.properties"/>

  <!-- Aop Config -->
  <aop:aspectj-autoproxy/>
  <!--
  <bean id="mysql-ds" destroy-method="close">
    <property name="driverClassName" value="${dirver}"/>
    <property name="url" value="${url}"/>
    <property name="username" value="${username}"/>
    <property name="password" value="${password}"/>
  </bean>
  -->
  <!--
    使用spring的listenerContainer 消息用持久化保存,服务器重启不会丢失
    也可以配置在${ACTIVEMQ_HOME}/conf/activemq.xml内
    消息的保存方式文件持久化和数据库持久化
    此配置是文件持久化
  -->
  <!-- Embedded ActiveMQ Broker -->
  <amq:broker useJmx="false" persistent="true">
    <amq:persistenceAdapter>
      <amq:amqPersistenceAdapter directory="G:/amq"/>
      <!-- 使用数据库持久化 -->
      <!--<amq:jdbcPersistenceAdapter dataSource="#mysql-ds" />-->
    </amq:persistenceAdapter>
    <amq:transportConnectors>
      <amq:transportConnector uri="tcp://localhost:61616" />
    </amq:transportConnectors>
  </amq:broker>

  <!--ActiveMQ connectionFactory -->
  <amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

  <!-- ActiveMQ destinations -->
  <!--
    TOPIC:发布订阅消息
        无状态,不保证每条消息被消费
        只有监听该TOPIC地址才能收到消息并消费,否则该消息将会丢失
        一对多的发布接受策略,可以同时消费多个消息
  -->
  <amq:topic name="TOPIC" physicalName="JMS-TOPIC" />

<!--
    QUEUE: 点对点
         消息数据被持久化,每条消息都能被消费
         没有监听QUEUE地址也能被消费,数据不会丢失
         一对一的发布接受策略,保证数据完整
-->
<amq:queue name="QUEUE" physicalName="JMS-QUEUE" />

  <!-- ConnectionFactory -->
  <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory" destroy-method="destroy">
    <property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
  </bean>

  <!-- 添加事务 -->
  <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager" >
    <property name="connectionFactory" ref="singleConnectionFactory"/>
  </bean>

  <!-- Spring JmsTemplate config -->
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <!-- lets wrap in a pool to avoid creating a connection per send -->
    <property name="connectionFactory" ref="singleConnectionFactory"/>
    <!-- custom MessageConverter -->
    <property name="messageConverter" ref="defaultMessageConverter" />
    <property name="sessionTransacted" value="true"/>
  </bean>

  <!-- converter -->
  <bean id="defaultMessageConverter" class="com.uu.activemq.DefaultMessageConverter" />

  <!-- 生产消息配置 -->
  <!-- POJO which send Message uses Spring JmsTemplate -->
  <bean id="topicMessageProducer" class="com.uu.activemq.TopicMessageProducer">
    <property name="template" ref="jmsTemplate" />
    <property name="destination" ref="TOPIC" />
  </bean>
  <bean id="queueMessageProducer" class="com.uu.activemq.QueueMessageProducer">
    <property name="template" ref="jmsTemplate" />
    <property name="destination" ref="QUEUE" />
  </bean>

  <!-- 消费消息 配置 -->
  <!-- Message Driven POJO (MDP) -->
  <!-- consumer1 for topic -->
  <bean id="topicConsumer" class="com.uu.activemq.TopicConsumer" />

  <!-- consumer for queue -->
  <bean id="queueConsumer" class="com.uu.activemq.QueueConsumer" />

  <!-- Message Listener for -->
  <bean id="topicListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg ref="topicConsumer" />
    <!-- 指定消费消息的方法 -->
    <property name="defaultListenerMethod" value="receive" />
    <!-- custom MessageConverter define -->
    <property name="messageConverter" ref="defaultMessageConverter" />
  </bean>
  <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg ref="queueConsumer" />
    <!-- 指定消费消息的方法 -->
    <property name="defaultListenerMethod" value="receive" />
    <!-- custom MessageConverter define -->
    <property name="messageConverter" ref="defaultMessageConverter" />
  </bean>

  <!-- listener container,MDP无需实现接口 -->
  <bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="destination" ref="TOPIC" />
    <property name="messageListener" ref="topicListener" />
  </bean>
  <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="destination" ref="QUEUE" />
    <property name="messageListener" ref="queueListener" />
    <property name="transactionManager" ref="jmsTransactionManager"/>
    <property name="sessionTransacted" value="true"/>
    <property name="concurrentConsumers" value="5"/>
  </bean>
</beans>

消息转换类

DefaultMessageConverter.java

 1 package com.uu.activemq;   
2
3 import java.io.ByteArrayInputStream;
4 import java.io.ByteArrayOutputStream;
5 import java.io.IOException;
6 import java.io.ObjectInputStream;
7 import java.io.ObjectOutputStream;
8 import java.util.HashMap;
9
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.ObjectMessage;
13 import javax.jms.Session;
14
15 import org.apache.activemq.command.ActiveMQObjectMessage;
16 import org.apache.commons.logging.Log;
17 import org.apache.commons.logging.LogFactory;
18 import org.springframework.jms.support.converter.MessageConverter;
19
20 /**
21 * 消息转换类
22 */
23 @SuppressWarnings("unchecked")
24 public class DefaultMessageConverter implements MessageConverter {
25
26 private static final Log log = LogFactory.getLog(DefaultMessageConverter.class);
27
28 public Message toMessage(Object obj, Session session) throws JMSException {
29 if (log.isDebugEnabled()) {
30 log.debug("toMessage(Object, Session) - start");
31 }
32
33 // check Type
34 ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();
35 HashMap<String, byte[]> map = new HashMap<String, byte[]>();
36 try {
37 // POJO must implements Seralizable
38 ByteArrayOutputStream bos = new ByteArrayOutputStream();
39 ObjectOutputStream oos = new ObjectOutputStream(bos);
40 oos.writeObject(obj);
41 map.put("POJO", bos.toByteArray());
42 objMsg.setObjectProperty("Map", map);
43
44 } catch (IOException e) {
45 log.error("toMessage(Object, Session)", e);
46 }
47 return objMsg;
48 }
49
50
51 public Object fromMessage(Message msg) throws JMSException {
52 if (log.isDebugEnabled()) {
53 log.debug("fromMessage(Message) - start");
54 }
55
56 if (msg instanceof ObjectMessage) {
57 HashMap<String, byte[]> map = (HashMap<String, byte[]>) ((ObjectMessage) msg).getObjectProperty("Map");
58 try {
59 // POJO must implements Seralizable
60 ByteArrayInputStream bis = new ByteArrayInputStream(map.get("POJO"));
61 ObjectInputStream ois = new ObjectInputStream(bis);
62 Object returnObject = ois.readObject();
63 return returnObject;
64 } catch (IOException e) {
65 log.error("fromMessage(Message)", e);
66
67 } catch (ClassNotFoundException e) {
68 log.error("fromMessage(Message)", e);
69 }
70
71 return null;
72 } else {
73 throw new JMSException("Msg:[" + msg + "] is not Map");
74 }
75 }
76 }

接受Queue方式消费

QueueConsumer.java

 1 package com.uu.activemq;   
2
3 import java.util.Map;
4 import java.util.Set;
5
6 public class QueueConsumer
7 {
8 public void receive(Map<String, Object> message)
9 {
10 Set<String> set = message.keySet();
11 String str = "";
12 for(String key : set)
13 {
14 str += key + "_" + message.get(key) + ":Queue";
15 }
16 System.out.println(str);
17 }
18 }

接受Topic方式消费

TopicConsumer.java

 1 package com.uu.activemq;   
2
3 import java.util.Map;
4 import java.util.Set;
5 import java.util.concurrent.ExecutorService;
6 import java.util.concurrent.Executors;
7
8 /**
9 * 创建10个线程池
10 * 1.使用线程异步消息处理
11 * 2.不使用线程,那么消息等待上一个消息处理完成后才继续
12 * 注:如果担心据同步问题,那么使用第2种方法
13 */
14 public class TopicConsumer
15 {
16 protected static ExecutorService exec = Executors.newFixedThreadPool(10);
17
18 public void receive(Map<String, Object> message)
19 {
20 /*exec.submit(new Runnable(){
21
22 public void run()
23 {
24
25 }
26 });*/
27 Set<String> set = message.keySet();
28 String str = "";
29 for(String key : set)
30 {
31 str += key + "_" + message.get(key) + ":Topic";
32 }
33 System.out.println(str);
34 }
35 }

生产Queue方式的消息

QueueMessageProducer.java

 1 package com.uu.activemq;   
2
3 import java.util.Map;
4
5 import javax.jms.Queue;
6
7 import org.springframework.jms.core.JmsTemplate;
8
9 public class QueueMessageProducer {
10
11 private JmsTemplate template;
12
13 private Queue destination;
14
15 public void setTemplate(JmsTemplate template) {
16 this.template = template;
17 }
18
19 public void setDestination(Queue destination) {
20 this.destination = destination;
21 }
22
23 public void send(Map<String,Object> message) {
24 template.convertAndSend(this.destination, message);
25 }
26
27 }

生产Topic方式的消息

TopicMessageProducer.java

 1 package com.uu.activemq;   
2
3 import java.util.Map;
4
5 import javax.jms.Topic;
6
7 import org.springframework.jms.core.JmsTemplate;
8
9 public class TopicMessageProducer {
10
11 private JmsTemplate template;
12
13 private Topic destination;
14
15 public void setTemplate(JmsTemplate template) {
16 this.template = template;
17 }
18
19 public void setDestination(Topic destination) {
20 this.destination = destination;
21 }
22
23 public void send(Map<String,Object> message) {
24 template.convertAndSend(this.destination, message);
25 }
26 }

写一个测试类

 1 package com.uu.activemq;   
2
3 import java.util.LinkedHashMap;
4 import java.util.Map;
5
6 import org.springframework.context.ApplicationContext;
7 import org.springframework.context.support.FileSystemXmlApplicationContext;
8
9 /**
10 * 解决耗时的数据操作
11 * 发送消息不等待返回,继续执行
12 */
13 public class MainTest
14 {
15 public static void main(String[] args)
16 {
17 ApplicationContext wac = new FileSystemXmlApplicationContext("classpath:spring.xml");
18
19 TopicMessageProducer topicMessageProducer = (TopicMessageProducer) wac.getBean("topicMessageProducer");
20
21 QueueMessageProducer queueMessageProducer = (QueueMessageProducer) wac.getBean("queueMessageProducer");
22
23 Map<String, Object> message = new LinkedHashMap<String, Object>();
24 message.put("test", "ActiveMQ");
25
26 queueMessageProducer.send(message);
27 topicMessageProducer.send(message);
28 System.out.println("完成");
29 }
30 }

结果先输出完成然后输出传递的参数

完成   
test_ActiveMQ:Queue
test_ActiveMQ:Topic



本文链接

相关 [activemq spring] 推荐:

ActiveMQ与Spring整合

- - 博客园_首页
ActiveMQ 是Apache出品, 是最流行​​和最强大的开源消息总线. 同时完全支持 JMS 1.1和J2EE 1.4规范. 支持多种编程语言和协议编写客户端. 在JMS客户端和消息代理完全支持企业集成模式. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务). 对Spring的支持, ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性.

【ActiveMQ Tuning】Prefetch Limit

- - 博客园_首页
   摘要:ActiveMQ优化 客户端优化 预取限制. 原文: http://fusesource.com/docs/broker/5.4/tuning/GenTuning-Consumer-Prefetch.html. Overview:图列4.1阐明了Broker在等待之前发送给客户端消息的反馈的行为.

【ActiveMQ Tuning】Serializing to Disk

- - 博客园_首页
     翻译自: http://fusesource.com/docs/broker/5.4/tuning/PersTuning-SerialToDisk.html.      KahaDB message store:KahaDB 是ActiveMQ Broker 为了高性能而推荐使用的消息存储机制.

ActiveMQ 桥接

- - CSDN博客互联网推荐文章
使用目的:将本地产生的消息转发到远程,通过远程服务器来处理消息,处理完成后,再启动消费者处理本地服务器消息(验证消息是否被转走,本地无消息可处理为正常). 消息在下面的地址被消费,无需任何特别配置,采用默认的配置即可. 生产消息地址为localhost:7001,需要做如下配置. 注意: 表示只有这个队列的会进行桥接转发.

ActiveMQ学习小结

- - CSDN博客架构设计推荐文章
   Activemq是众多开源消息中间件的一种,支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,和J2EE1.4容器无缝结合. 它是apache基金会的一个项目,而且经过多年发展,有了很高的稳定性. 目前被很多知名项目使用,比如Apache serviceMix、FuseESB.  消息中间件一般被用在异步消息通信、整合多个系统的场景,比如你注册CSDN论坛,你填写完注册信息点提交时,它会发一份验证邮箱的验证邮件给到你,这封邮件就可以通过消息中间异步发送给你.

ActiveMQ高级特性

- - zzm
消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者,这个确认消息暗示生产者 broker 已经成功地将它发送的消息路由到目标目的并把消息保存到二级存储中. 但有一个例外,当发送方法在一个事物上下文中时,被阻塞的是commit 方法而不是 send 方法.

ActiveMQ持久化方式

- - CSDN博客架构设计推荐文章
消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送. 消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试.

[MQ]关于ActiveMQ的配置

- - 企业架构 - ITeye博客
  目前常用的消息队列组建无非就是MSMQ和ActiveMQ,至于他们的异同,这里不想做过多的比较. 简单来说,MSMQ内置于微软操作系统之中,在部署上包含一个隐性条件:Server需要是微软操作系统. (对于这点我并去调研过MSMQ是否可以部署在非微软系统,比如:Linux,只是拍脑袋想了想,感觉上是不可以).

优化ActiveMQ性能(zhuan)

- - zzm
1.  优化ActiveMQ性能. 1.PERSISTENT(持久性消息). 这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次. 对于这些消息,可靠性是优先考虑的因素. 可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息.

ActiveMQ性能调优2

- - zzm
转自 :  http://setting.iteye.com/blog/1484498. amq自己带了一个性能test: http://activemq.apache.org/activemq-performance-module-users-manual.html. 使用jmeter压测的介绍: http://activemq.apache.org/jmeter-performance-tests.html.