[MQ]关于ActiveMQ的配置

标签: mq activemq | 发表时间:2014-11-02 15:45 | 作者:liuguofeng
出处:http://www.iteye.com

  目前常用的消息队列组建无非就是MSMQ和ActiveMQ,至于他们的异同,这里不想做过多的比较。简单来说,MSMQ内置于微软操作系统之中,在部署上包含一个隐性条件:Server需要是微软操作系统。(对于这点我并去调研过MSMQ是否可以部署在非微软系统,比如:Linux,只是拍脑袋想了想,感觉上是不可以)。对于ActiveMQ,微软系统和Linux都是可以部署的。从功能方面来说,一般最常用的就是:消息的收/发,感觉差异不大。从性能上来说,一般的说法是ActiveMQ略高。在稳定性上,个人感觉MSMQ更好。如果这两种常用队列都用过的同学,应该来说最大的差异在于:MSMQ如果要访问远程队列(比如机器A上的程序访问机器B上的队列),会比较恶心。在数据量比较大的情况之下,一般来说队列服务器会专门的一台或者多台(多台的话,用程序去做热备+负载比较方便,也不需要额外的硬件成本。简单来说做法可以这样:消息发送的时候随机向着多台队列服务器上发送消息;接受的时候开多个线程去分别监听;热备方面,可以维护一个带状态的队列连接池,如果消息收发失败那么将状态置为不可用,然后起一个线程去定时监测坏的连接是否可用,这个过程一般情况下可以不用加锁,为什么,大家根据各自需要去取舍吧)。最近搞完了短彩信的网关连接服务,这两种队列我均使用了。大致的过程是这样的:上层应用如果要发端彩信,那么将消息直接发送至ActiveMQ(目前用的就是上面说的多台热备+负载,因为实际中下行量非常大5千万条/天以上),然后端彩信网关连接服务部署多套,每套均依赖本机的MSMQ。为什么呢?用ActiveMQ的原因是:上层应用程序和网关连接服务彼此独立,消息需要跨机访问。用MSMQ的原因是:ActiveMQ中的数据是一条不分省的大队列,网关连接服务需要按省流控,所以端彩信网关连接服务:首先把消息从ActiveMQ取出来,然后存至本机上的分省MSMQ,这样做另外的一个好处就是:ActiveMQ不至于过多挤压,他的数据会分摊到N台短彩信网关连接服务所部署的机器上的MSMQ之中,也就说MSMQ可以起到分摊数据和缓冲的作用。

 

     在之前的随笔中,已经介绍过MSMQ,现在先介绍一下ActiveMQ一些配置,目前好像ActiveMQ配置上的介绍还比较少。以下是自己总结一些相关资料,贴出来给大家共享

 

一)问题分析和解决

1)KahaDb和AMQ Message Store两种持久方式如何选择?

官方:

From 5.3 onwards - we recommend you use KahaDB - which offers improved scalability and recoverability over the AMQ Message Store.

The AMQ Message Store which although faster than KahaDB - does not scales as well as KahaDB and recovery times take longer.

 

非官方:

kaha文件系统实际上上是一个文件索引系统,有两部分组成,一个是数据文件系统,由一个个独立的文件组成,缺省文件大小是32M大(可配置),另外一个是索引文件系统,记录消息在数据文件中的位置信息以及数据文件中的空闲块信息。数据文件是存储到硬盘上的,索引文件是缓存在内存中的。所以这个存储系统对大消息存储有利,象我们的memberId之类的文本消息,实际上是浪费,索引比消息还大,哈。

 

我方分析:

推荐: Amq持久方式

理由:虽然官方推荐使用KahaDB持久方式,但其提到的优势:可伸缩性和恢复性较好,对于我们实际的应用意义不大。从我们自己的使用经验来看,KahaDB持久方式,Data文件是一个大文件(感觉文件过大后,造成队列服务瘫死的可能性会增大),从官网的相关配置(附录1)也找不到哪里可以设置数据的文件的最大Size。)而Amq持久方式可以设置Data文件最大Size,这样可以保证即时消息积压很多,Data文件也不至于过大。

 

 

2)错误:Channel was inactive for too long

解决方法:

在建立连接的Uri中加入: wireFormat.maxInactivityDuration=0

 

参考资源:

http://jinguo.iteye.com/blog/243153

 

You can do the following to fix the issues:

1) Append max inactivity duration to your Uri in the format below: wireFormat.maxInactivityDuration=0

2) Use the same Uri at the client side as well as at the server side

Regards,

 

如果不这样设置,对应的错误会出现:

2008-05-07 09:22:56,343 [org.apache.activemq.ActiveMQConnection]-[WARN] Async exception with no exception listener: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616

org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616

ActiveMQ的tcp url:tcp://localhost:61616后面要加入?wireFormat.maxInactivityDuration=0 这样的参数,否则当一段时间没有消息发送时会抛出 "Channel was inactive for too long"异常

 

3)错误:Wire format negotiation timeout: peer did not send his wire format.

解决方法:

1)关闭ActiveMqLog4j

打开:conf/log4j.properties

将:log4j.rootLogger=INFO, console, logfile

修改为:log4j.rootLogger=OFF

 

2)在建立连接的Uri中加入: maxInactivityDurationInitalDelay=30000

 

例如北京的测试环境连接Uri:

tcp://192.168.199.80:61616?wireFormat.maxInactivityDuration=0&maxInactivityDurationInitalDelay=30000&connection.AsyncSend=true

 

参考资源:

http://activemq.apache.org/javaxjmsjmsexception-wire-format-negociation-timeout-peer-did-not-send-his-wire-format.html

 

If you get exception like this,it can mean one of three things:

1. You're connecting to the port not used by ActiveMQ TCP transport

Make sure to check that you're connecting to the appropriate host:port

 

2. You're using log4j JMS appender and doesn't filter out ActiveMQ log messages

Be sure to read How do I use log4j JMS appender with ActiveMQ and more importantly to never send ActiveMQ log messages to JMS appender

 

3. Your broker is probably under heavy load (or network connection is unreliable), so connection setup cannot be completed in a reasonable time

If you experience sporadic exceptions like this, the best solution is to use failover transport, so that your clients can try connecting again if the first attempt fails. If you're getting these kind of exceptions more frequently you can also try extending wire format negotiation period (default 10 sec). You can do that by using wireFormat.maxInactivityDurationInitalDelay property on the connection URL in your client.

For example

tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=30000

will use 30 sec timeout.(貌似有问题!!!)

 

4)错误:Out of memory

解决方法:

1)  设置Java最大内存限制为合适大小:

Bin/activemq.bat 中ACTIVEMQ_OPTS=-Xmx512M(默认是512)

2)Activemq.xml配置节:systemUsage/ systemUsage配置大小合适,并且特别注意:大于所有durable desitination设置的memoryUsage之和。

 

备注:

1)尖括号:“>”代表通配符

2)ACTIVEMQ_OPTS的配置〉=memoryUsage中配置〉=所有durable desitination设置之和

 

 

 

3)SystemUsage配置设置了一些系统内存和硬盘容量,当系统消耗超过这些容量设置时,amq会“slow down producer”,还是很重要的。

 

 

参考资料:

http://m.oschina.net/blog/26216

参考-- http://activemq.apache.org/javalangoutofmemory.html

       对于MQ的内容实用是可管理和可配置的。首先需要判断的是MQ的哪部分系统因内存不足而导致泄漏,是JVM,broker还是消费者、生产者?

       一、内存管理

       JVM内存管理:

       1. 用bin/activemq命令在独立JVM中运行broker。用-Xmx和-Xss命令即可(activemq.bat文件中修改ACTIVEMQ_OPTS选项参数即可);

       2. 默认情况下,MQ用512M的JVM;

       broker内存管理:

       1. broker使用的内存并不是由JVM的内存决定的。虽然受到JVM的限制,但broker确实独立管理器内存;

       2. systemUsage和destination的内存限制与broker内存息息相关;

       3. MQ中内存的关系是:JVM->Broker->broker features;

       4. 所有destination的内存总量不能超过broker的总内存;

       消费者:

       1. 由于消息大小可以配置,prefetch limit往往是导致内存溢出的主要原因;

       2. 减少prefetch limit的大小,会减少消费者内存中存储的消息数量;

       生产者:

       1. 除非消息数量超过了broker资源的限制,否则生产者不会导致内存溢出;

       2. 当内存溢出后,生产者会收到broker的阻塞信息提示;

       二、其他

       将消息缓冲之硬盘:

       1. 只有当消息在内存中存储时,才允许消息的快速匹配与分发,而当消费者很慢或者离开时,内存可能会耗尽;

       2. 当destination到达它的内存临界值时,broker会用消息游标来缓存非持久化的消息到硬盘。

       3. 临界值在broker中通过memoryUsage和systemUsage两个属性配置,请参考activemq.xml;

       4. 对于缓慢的消费者,当尚未耗尽内存或者转变为生产者并发控制模式前,这个特性允许生产者继续发送消息到broker;

       5. 当有多个destination的时候,默认的内存临界值可能被打破,而这种情况将消息缓存到硬盘就显得很有意义;

       6. precentUsage配置:使用百分比来控制内存使用情况;

       多个线程:

       1. 默认情况下,MQ每个destination都对应唯一的线程;

       2. -Dorg.apache.activema.UseDedicatedTaskRunner=false(activemq.bat文件中修改ACTIVEMQ_OPTS选项参数即可),用线程池来限制线程的数量,从而减少内存消耗;

       大数据传输:

       1. destination policies--maxPageSize:控制进入内存中的消息数量;lazyDispatch:增加控制使用当前消费者列表的预取值;

       2. 使用blogMessage或者streamsMessage类型来进行大量文件的传输;

       泄漏JMS资源:

       1. 当session或者producer或者consumer大量存在而没有关闭的时候;

       2. 使用PooledConnectionFactory;

5)采用failover方式连接导致卡死

解决方法

不采用failover连接

 

分析

采用failover方式连接,如果所要连接的服务器或者Activemq服务宕了,那么程序会一直处于等待状态,不超时,不报错。

二)附录

1)KahaDB持久配置

property name

default value

Comments

directory

activemq-data

the path to the directory to use to store the message store data and log files

indexWriteBatchSize

1000

number of indexes written in a batch

indexCacheSize

10000

number of index pages cached in memory

enableIndexWriteAsync

false

if set, will asynchronously write indexes

journalMaxFileLength

32mb

a hint to set the maximum size of the message data logs

enableJournalDiskSyncs

true

ensure every non transactional journal write is followed by a disk sync (JMS durability requirement)

cleanupInterval

30000

time (ms) before checking for a discarding/moving message data logs that are no longer used

checkpointInterval

5000

time (ms) before checkpointing the journal

ignoreMissingJournalfiles

false

If enabled, will ignore a missing message log file

checkForCorruptJournalFiles

false

If enabled, will check for corrupted Journal files on startup and try and recover them

checksumJournalFiles

false

create a checksum for a journal file - to enable checking for corrupted journals

Available since version 5.4:

   

archiveDataLogs

false

If enabled, will move a message data log to the archive directory instead of deleting it.

directoryArchive

null

Define the directory to move data logs to when they all the messages they contain have been consumed.

databaseLockedWaitDelay

10000

time (ms) before trying to get acquire a the database lock (used by shared master/slave)

maxAsyncJobs

10000

the maximum number of asynchronous messages that will be queued awaiting storage (should be the same as the number of concurrent MessageProducers)

concurrentStoreAndDispatchTopics

false

enable the dispatching of Topic messages to interested clients to happen concurrently with message storage

concurrentStoreAndDispatchQueues

true

enable the dispatching of Queue messages to interested clients to happen concurrently with message storage

 

2)AMQ持久配置

property name

default value

Comments

directory

activemq-data

the path to the directory to use to store the message store data and log files

useNIO

true

use NIO to write messages to the data logs

syncOnWrite

false

sync every write to disk

maxFileLength

32mb

a hint to set the maximum size of the message data logs

persistentIndex

true

use a persistent index for the message logs. If this is false, an in-memory structure is maintained

maxCheckpointMessageAddSize

4kb

the maximum number of messages to keep in a transaction before automatically committing

cleanupInterval

30000

time (ms) before checking for a discarding/moving message data logs that are no longer used

indexBinSize

1024

default number of bins used by the index. The bigger the bin size - the better the relative performance of the index

indexKeySize

96

the size of the index key - the key is the message id

indexPageSize

16kb

the size of the index page - the bigger the page - the better the write performance of the index

directoryArchive

archive

the path to the directory to use to store discarded data logs

archiveDataLogs

false

if true data logs are moved to the archive directory instead of being deleted

 

3)systemUsage配置

property name

default value

Comments

memoryUsage

20M

amq使用内存大小,照amq论坛上说,这个值应该大于所有durable desitination设置的

memoryUsage之和,否则会导致硬盘swap,影响性能。

storeUsage

1G

kaha数据存储大小,如果设置不足,性能会下降到1个1个发

tempUsage

100M

非persistent的消息存储在temp区域

4)其他配置

http://activemq.apache.org/nms/configuring.html

 

4.1)Failover Transport Options

Option Name

Default

Description

transport.timeout

-1

Time that a send operation blocks before failing.

transport.initialReconnectDelay

10

Time in Milliseconds that the transport waits before attempting to reconnect the first time.

transport.maxReconnectDelay

30000

The max time in Milliseconds that the transport will wait before attempting to reconnect.

transport.backOffMultiplier

2

The amount by which the reconnect delay will be multiplied by if useExponentialBackOff is enabled.

transport.useExponentialBackOff

true

Should the delay between connection attempt grow on each try up to the max reconnect delay.

transport.randomize

true

Should the Uri to connect to be chosen at random from the list of available Uris.

transport.maxReconnectAttempts

0

Maximum number of time the transport will attempt to reconnect before failing (0 means infinite retries)

transport.startupMaxReconnectAttempts

0

Maximum number of time the transport will attempt to reconnect before failing when there has never been a connection made. (0 means infinite retries)  (included in NMS.ActiveMQ v1.5.0+)

transport.reconnectDelay

10

The delay in milliseconds that the transport waits before attempting a reconnection.

transport.backup

false

Should the Failover transport maintain hot backups.

transport.backupPoolSize

1

If enabled, how many hot backup connections are made.

transport.trackMessages

false

keep a cache of in-flight messages that will flushed to a broker on reconnect

transport.maxCacheSize

256

Number of messages that are cached if trackMessages is enabled.

transport.updateURIsSupported

true

Update the list of known brokers based on BrokerInfo messages sent to the client.

 

4.2)Connection Options

Option Name

Default

Description

connection.AsyncSend

false

Are message sent Asynchronously.

connection.AsyncClose

true

Should the close command be sent Asynchronously

connection.AlwaysSyncSend

false

Causes all messages a Producer sends to be sent Asynchronously.

connection.CopyMessageOnSend

true

Copies the Message objects a Producer sends so that the client can reuse Message objects without affecting an in-flight message.

connection.ProducerWindowSize

0

The ProducerWindowSize is the maximum number of bytes in memory that a producer will transmit to a broker before waiting for acknowledgement messages from the broker that it has accepted the previously sent messages. In other words, this how you configure the producer flow control window that is used for async sends where the client is responsible for managing memory usage. The default value of 0 means no flow control at the client. See also  Producer Flow Control

connection.useCompression

false

Should message bodies be compressed before being sent.

connection.sendAcksAsync

false

Should message acks be sent asynchronously

connection.messagePrioritySupported

true

Should messages be delivered to the client based on the value of the Message Priority header.

connection.dispatchAsync

false

Should the broker  dispatch messages asynchronously to the connection's consumers.

 

4.3)OpenWire Options

Option Name

Default

Description

wireFormat.stackTraceEnabled

false

Should the stack trace of exception that occur on the broker be sent to the client? Only used by openwire protocol.

wireFormat.cacheEnabled

false

Should commonly repeated values be cached so that less marshalling occurs? Only used by openwire protocol.

wireFormat.tcpNoDelayEnabled

false

Does not affect the wire format, but provides a hint to the peer that TCP nodelay should be enabled on the communications Socket. Only used by openwire protocol.

wireFormat.sizePrefixDisabled

false

Should serialized messages include a payload length prefix? Only used by openwire protocol.

wireFormat.tightEncodingEnabled

false

Should wire size be optimized over CPU usage? Only used by the openwire protocol.

wireFormat.maxInactivityDuration

30000

The maximum inactivity duration (before which the socket is considered dead) in milliseconds. On some platforms it can take a long time for a socket to appear to die, so we allow the broker to kill connections if they are inactive for a period of time. Use by some transports to enable a keep alive heart beat feature. Set to a value <= 0 to disable inactivity monitoring.

maxInactivityDurationInitalDelay

10000

The initial delay in starting the maximum inactivity checks (and, yes, the word 'Inital' is supposed to be misspelled like that)

 

5)安全配置

1、控制台安全配置,打开conf/jetty.xml文件,找到

    <bean id="securityConstraint" class="org.eclipse.jetty.http.security.Constraint">

        <property name="name" value="BASIC" />

        <property name="roles" value="admin" />

        <property name="authenticate" value="false" />

    </bean>

   将“false”改为“true”即可。用户名和密码存放在conf/jetty-realm.properties文件中。

2、生产者和消费者连接MQ需要密码

   打开conf/activemq.xml文件,在<broker>标签里的<systemUsage>标签前加入:

   <plugins>  

<simpleAuthenticationPlugin>  

<users>  

<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>  

</users>  

</simpleAuthenticationPlugin>  

   </plugins>

 注意必须在<systemUsage>标签前,否则启动ActiveMQ会报错。

 

用户名和密码存放在 conf/credentials.properties 文件中



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [mq activemq] 推荐:

[MQ]关于ActiveMQ的配置

- - 企业架构 - ITeye博客
  目前常用的消息队列组建无非就是MSMQ和ActiveMQ,至于他们的异同,这里不想做过多的比较. 简单来说,MSMQ内置于微软操作系统之中,在部署上包含一个隐性条件:Server需要是微软操作系统. (对于这点我并去调研过MSMQ是否可以部署在非微软系统,比如:Linux,只是拍脑袋想了想,感觉上是不可以).

闲扯kafka mq

- - 开源软件 - ITeye博客
本文主要讲解关于kafka mq的设计思想及个人理解. 关于kafka的详细信息,大家可以参考官网的文献 http://kafka.apache.org/documentation.html这是一篇相当不错的文章,值得仔细研读. 第一个问题:消息队列(Message Queue)是干嘛用的. 首先,要对消息队列有一个基本的理解.

【ActiveMQ Tuning】Prefetch Limit

- - 博客园_首页
   摘要:ActiveMQ优化 客户端优化 预取限制. 原文: http://fusesource.com/docs/broker/5.4/tuning/GenTuning-Consumer-Prefetch.html. Overview:图列4.1阐明了Broker在等待之前发送给客户端消息的反馈的行为.

【ActiveMQ Tuning】Serializing to Disk

- - 博客园_首页
     翻译自: http://fusesource.com/docs/broker/5.4/tuning/PersTuning-SerialToDisk.html.      KahaDB message store:KahaDB 是ActiveMQ Broker 为了高性能而推荐使用的消息存储机制.

ActiveMQ 桥接

- - CSDN博客互联网推荐文章
使用目的:将本地产生的消息转发到远程,通过远程服务器来处理消息,处理完成后,再启动消费者处理本地服务器消息(验证消息是否被转走,本地无消息可处理为正常). 消息在下面的地址被消费,无需任何特别配置,采用默认的配置即可. 生产消息地址为localhost:7001,需要做如下配置. 注意: 表示只有这个队列的会进行桥接转发.

ActiveMQ学习小结

- - CSDN博客架构设计推荐文章
   Activemq是众多开源消息中间件的一种,支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,和J2EE1.4容器无缝结合. 它是apache基金会的一个项目,而且经过多年发展,有了很高的稳定性. 目前被很多知名项目使用,比如Apache serviceMix、FuseESB.  消息中间件一般被用在异步消息通信、整合多个系统的场景,比如你注册CSDN论坛,你填写完注册信息点提交时,它会发一份验证邮箱的验证邮件给到你,这封邮件就可以通过消息中间异步发送给你.

ActiveMQ与Spring整合

- - 博客园_首页
ActiveMQ 是Apache出品, 是最流行​​和最强大的开源消息总线. 同时完全支持 JMS 1.1和J2EE 1.4规范. 支持多种编程语言和协议编写客户端. 在JMS客户端和消息代理完全支持企业集成模式. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务). 对Spring的支持, ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性.

ActiveMQ高级特性

- - zzm
消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者,这个确认消息暗示生产者 broker 已经成功地将它发送的消息路由到目标目的并把消息保存到二级存储中. 但有一个例外,当发送方法在一个事物上下文中时,被阻塞的是commit 方法而不是 send 方法.

与MQ通讯的完整JAVA程序

- - 编程语言 - ITeye博客
        本文实例是基于 WebSphere MQ中将消息发送至远程队列的配置的基础上的,且如果要能正常运行并发送、接收消息,还需要在两个队列管理器(QM_ORANGE和QM_APPLE)上做如下配置或修改.         1.创建名称为DC.SVRCONN的服务器连接通道.         2.将队列管理器的通道认证记录设置为“已禁用”.

ActiveMQ持久化方式

- - CSDN博客架构设计推荐文章
消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送. 消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试.