ActiveMQ系统之——消息持久化到MySQL数据库中(二)

标签: activemq 系统 消息 | 发表时间:2014-02-10 10:28 | 作者:wangdongsong1229
出处:http://blog.csdn.net

写在前面话

关于这一节的程序,需要使用到《 ActiveMQ系列之——安装、运行及事例代码(一)》中的示例代码。本文主要介绍关于消息持久化的配置。

ActiveMQ默认情况下是基于文件的存储,使用的是kahaDB,当然还有其它的持久化方式,例如LevelDB,这个是在5.8的版本中引入的,本文主要介绍数据库持久化,使用的数据库是MySQL,其它数据库类似。

介绍  

ActiveMQ数据库持久化有两种类型,一是直接的JDBC操作,二是journal的jdbc操作,第二是优化过的,先快速的保存到文件中,然后在某个检查点与数据对比,之后再写数据库,也可以称之为批量操作数据库。持久化操作的主要配置在activemq.xml中,在此基础上,我做了其它两点改动,在conf目录下增加一个jdbc.properties的文件,将数据库有关配置信息保存在此文件中,activemq.xml通过加载外部属性文件的方式引入该文件。

配置

journal

<!--
	在5.9.0里的conf目录下,修改activemq.xml而成。
	1、增加了一加载jdbc.properties的属性文件
	2、在broker节点中增加了persistenceFactory的配置
	3、增加了mySql的bean定义
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
			<list>
				<value>file:${activemq.conf}/credentials.properties</value>
				<!-- 
					1、增加了加载数据库的属性文件,目录为activemq目录下的conf文件中,
					此文件定义了有关数据连接的信息,conf目录下无jdbc.properties文件,需要手动创建,
					也可以不使用此属性文件,将数据库信息直接写到activemq.xml中
				 -->
				<value>file:${activemq.conf}/jdbc.properties</value>
			</list>
        </property>
    </bean>

    <!-- Allows log searching in hawtio console -->
    <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>
		
		<!-- 
			2、增加此节点的配置,此种方式直接操作数据库
			也可以通过persistenceAdapter这个节点的journalPersistenceAdapter的配置完成
		 -->
       	<persistenceFactory>
       		<!-- 指定数据文件的路径及数据源 -->
			<journalPersistenceAdapterFactory dataDirectory="${activemq.data}/wdsdb" dataSource="#mySql"/>
		</persistenceFactory>
        
        <systemUsage>
          <systemUsage>
              <memoryUsage>
                  <memoryUsage percentOfJvmHeap="70" />
              </memoryUsage>
              <storeUsage>
                  <storeUsage limit="100 gb"/>
              </storeUsage>
              <tempUsage>
                  <tempUsage limit="50 gb"/>
              </tempUsage>
          </systemUsage>
        </systemUsage>

        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>
	
	<!-- 
		3、增加关于数据库连接的配置,在增加此配置时,还需要将数据库驱动的jar包放到activemq安装目录下的lib目录中
	-->
	<bean id="mySql" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
			<property name="driverClassName" value="${jdbc.driverClassName}" />
			<property name="url" value="${jdbc.url}" />
			<property name="username" value="${jdbc.username}" />
			<property name="password" value="${jdbc.password}" />
	</bean>

    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

非journal

<!--
	在5.9.0里的conf目录下,修改activemq.xml而成。
	1、增加了一加载jdbc.properties的属性文件
	2、在broker节点中增加了persistenceAdapter的配置
	3、增加了mySql的bean定义
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
			<list>
				<value>file:${activemq.conf}/credentials.properties</value>
				<!-- 
					1、增加了加载数据库的属性文件,目录为activemq目录下的conf文件中,
					此文件定义了有关数据连接的信息,conf目录下无jdbc.properties文件,需要手动创建,
					也可以不使用此属性文件,将数据库信息直接写到activemq.xml中
				 -->
				<value>file:${activemq.conf}/jdbc.properties</value>
			</list>
        </property>
    </bean>

    <!-- Allows log searching in hawtio console -->
    <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>
		
		<!-- 
			2、增加此节点的配置,此种方式直接操作数据库
		 -->
        <persistenceAdapter>
			<jdbcPersistenceAdapter dataSource="#mySql"/>
        </persistenceAdapter>
        
        <systemUsage>
          <systemUsage>
              <memoryUsage>
                  <memoryUsage percentOfJvmHeap="70" />
              </memoryUsage>
              <storeUsage>
                  <storeUsage limit="100 gb"/>
              </storeUsage>
              <tempUsage>
                  <tempUsage limit="50 gb"/>
              </tempUsage>
          </systemUsage>
        </systemUsage>

        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>
	
	<!-- 
		3、增加关于数据库连接的配置,在增加此配置时,还需要将数据库驱动的jar包放到activemq安装目录下的lib目录中
	-->
	<bean id="mySql" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
			<property name="driverClassName" value="${jdbc.driverClassName}" />
			<property name="url" value="${jdbc.url}" />
			<property name="username" value="${jdbc.username}" />
			<property name="password" value="${jdbc.password}" />
	</bean>

    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

上述两种方式共用的jdbc.properties
##jdbc.driver
jdbc.driverClassName=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://127.0.0.1:3306/activemq?autoReconnect=true&useUnicode=true&characterEncoding=utf8
jdbc.username=root
jdbc.password=root

这两种方式,我都做了尝试,在发送Queue,无消息者情况下,非journal的形式是直接操作数据,单条记录写数据库,而journal的方式则是批量操作数据库的。


作者:wangdongsong1229 发表于2014-2-10 10:28:48 原文链接
阅读:0 评论:0 查看评论

相关 [activemq 系统 消息] 推荐:

ActiveMQ系统之——消息持久化到MySQL数据库中(二)

- - CSDN博客推荐文章
关于这一节的程序,需要使用到《 ActiveMQ系列之——安装、运行及事例代码(一)》中的示例代码. 本文主要介绍关于消息持久化的配置. ActiveMQ默认情况下是基于文件的存储,使用的是kahaDB,当然还有其它的持久化方式,例如LevelDB,这个是在5.8的版本中引入的,本文主要介绍数据库持久化,使用的数据库是MySQL,其它数据库类似.

深入浅出 消息队列 ActiveMQ

- - 编程语言 - ITeye博客
ActiveMQ 是Apache出品,最流行的、功能强大的. 即时通讯和集成模式的开源服务器. ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现. 提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能.

消息中间件的技术选型心得-RabbitMQ、ActiveMQ和ZeroMQ

- - haohtml's blog
RabbitMQ、ActiveMQ和ZeroMQ都是极好的消息中间件,但是我们在项目中该选择哪个更适合呢. 下面我会对这三个消息中间件做一个比较,看了后你们就心中有数了. RabbitMQ是AMQP协议领先的一个实现,它实现了代理(Broker)架构,意味着消息在发送到客户端之前可以在中央节点上排队.

Corba项目中使用的ActiveMQ消息组件介绍

- - CSDN博客推荐文章
下载ActiveMQ http://activemq.apache.org/download.html. 启动mq:/bin/activemq.bat. 管理界面: http://localhost:8161/admin,默认不用验证. // 延时500毫秒之后发送消息. // 延时500毫秒之后停止接受消息.

ActiveMQ的queue以及topic两种消息处理机制分析

- - Java - 编程语言 - ITeye博客
        上一期介绍了我们项目要用到activeMQ来作为jms总线,并且给大家介绍了activeMQ的集群和高可用部署方案,本期给大家再介绍下,如何根据自己的项目需求,更好地使用activeMQ的两种消息处理模式. 1    queue与topic的技术特点对比. Publish Subscribe messaging 发布订阅消息.

使用ActiveMQ+MQTT实现Android点对点消息通知-转载

- - 开源软件 - ITeye博客
ActiveMQ使用MQTT协议,加上android上的paho包,即可简单实现消息通知功能,但是mqtt协议只支持topic,而且不能用selector,使得点对点的消息投递变成问题. 1、每个clientId,建一个topic...这个办法对解决消息点对点投递非常有效,但是有两个大问题:. 随着用户数增多,topic数量增多,对管理性要求增大,对内存的管理也有问题.

【ActiveMQ Tuning】Prefetch Limit

- - 博客园_首页
   摘要:ActiveMQ优化 客户端优化 预取限制. 原文: http://fusesource.com/docs/broker/5.4/tuning/GenTuning-Consumer-Prefetch.html. Overview:图列4.1阐明了Broker在等待之前发送给客户端消息的反馈的行为.

【ActiveMQ Tuning】Serializing to Disk

- - 博客园_首页
     翻译自: http://fusesource.com/docs/broker/5.4/tuning/PersTuning-SerialToDisk.html.      KahaDB message store:KahaDB 是ActiveMQ Broker 为了高性能而推荐使用的消息存储机制.

ActiveMQ 桥接

- - CSDN博客互联网推荐文章
使用目的:将本地产生的消息转发到远程,通过远程服务器来处理消息,处理完成后,再启动消费者处理本地服务器消息(验证消息是否被转走,本地无消息可处理为正常). 消息在下面的地址被消费,无需任何特别配置,采用默认的配置即可. 生产消息地址为localhost:7001,需要做如下配置. 注意: 表示只有这个队列的会进行桥接转发.

ActiveMQ学习小结

- - CSDN博客架构设计推荐文章
   Activemq是众多开源消息中间件的一种,支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,和J2EE1.4容器无缝结合. 它是apache基金会的一个项目,而且经过多年发展,有了很高的稳定性. 目前被很多知名项目使用,比如Apache serviceMix、FuseESB.  消息中间件一般被用在异步消息通信、整合多个系统的场景,比如你注册CSDN论坛,你填写完注册信息点提交时,它会发一份验证邮箱的验证邮件给到你,这封邮件就可以通过消息中间异步发送给你.