ActiveMq生产者流量控制(Producer Flow Control)_驯咆贸祷_新浪博客
在ActiveMQ5.0版本中,我们可以分别对一个共享连接上的各个生产者进行流量控制,而不需要挂起整个连接。“流量控制”意味着当代理(broker)检测到目标(destination)的内存,或temp-或file-store超过了限制,消息的流量可以被减慢。生产者将会被阻塞直至资源可用,或者收到一个JMSException异常:这种行为是可配置的,下面的<systemUsage>章节会描述到。
It's worth noting that the default
值得注意的是,当内存限制或<systemUsage>限制达到的时候,<systemUsage>默认的设置会引起生产值阻塞:这种阻塞行为有时会被误解为“挂起的生产者”,而事实是生产者只是勤奋地等着空间可用。
- Messages that are sent synchronously will automatically use per producer flow control; this applies generally to persistent messages which are sent synchronously
unless you enable the useAsyncSend flag. - 同步发送的消息将会自动对每一个生产者使用流量控制;除非你使能了useAsyncSend标志,否则这将对同步发送的持久化消息都适用。
- Producers that use
Async Sends - generally speaking, producers of non-persistent messages - don't bother waiting for any acknowledgement from the broker; so, if a memory limit has been exceeded, you will not get notfied. If you do want to be aware of broker limits being exceeded, you will need to configure the ProducerWindowSize connection option so that even async messages are flow controlled per producer. - 使用异步发送的生产者 ——
一般来说,就是发送非持久化消息的生产者 —— 不需要等候来自代理的任何确认消息;所以,如果内存限制被超过了,你不会被通知。如果你真的想什么时候代理的限制被超过了,你需要配置ProducerWindowSize这一连接选项,这样就算是异步消息也会对每一个生产者进行流量控制。
ActiveMQConnectionFactory connctionFactory = ...connctionFactory.setProducerWindowSize(1024000);
The ProducerWindowSize is the maximum number of bytes of data that a producer will transmit to a broker before waiting for acknowledgment messages from the broker that it has accepted the previously sent messages.
ProducerWindowSize是一个生产者在等到确认消息之前,可以发送给代理的数据的最大byte数,这个确认消息用来告诉生产者,代理已经收到先前发送的消息了。
Alternatively, if you're sending non-persisted messages (which are by default sent async), and want to be informed if the queue or topic's memory limit has been breached, then you can simply configure the connection factory to 'alwaysSyncSend'. While this is going to be slower, it will ensure that your message producer is informed immediately of memory issues.
或者,如果你要发送非持久化的消息(该消息默认是异步发送的),并且想要得到队列或者主题的内存限制是否达到,你只需将连接工厂配置为“alwaysSyncSend”。虽然这样会变得稍微慢一点,但是这将保证当出现内存问题时,你的消息生产者能够及时得到通知。
If you like, you can disable flow control for specific JMS queues and topics on the broker by setting the
如果你喜欢,你可以通过在代理的配置中,将适当的目的地(destination)的策略(policy)中的producerFlowControl标志设置为false,使代理上特定的JMS队列和主题无效,例如:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>" producerFlowControl="false"/></policyEntries> </policyMap></destinationPolicy>
see
详情请看
Note that, since the introduction of the new file cursor in ActiveMQ 5.x, non-persisted messages are shunted into the temporary file store to reduce the amount of memory used for non-persistent messaging. As a result, you may find that a queue's memoryLimit is never reached, as the cursor doesn't use very much memory. If you really do want to keep all your non-persistent messages in memory, and stop producers when the limit is reached, you should configure the
注意,自从ActiveMQ 5.x中引入新的文件游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持久化消息传送使用的内存总量。结果就是,你可能会发现一个队列的内存限制永远达不到,因为游标不需要使用太多的内存。如果你真的想把所有的非持久化消息存放在内存中,并在达到内存限制的时候停掉生产者,你需要配置<vmQueueCursor>。
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> <pendingQueuePolicy> <vmQueueCursor/></pendingQueuePolicy></policyEntry>
The fragment above will ensure that all non-persistent queue messages are kept in memory, with each queue having a limit of 1Mb.
上面的片段可以保证所有的非持久化队列消息都保存在内存中,每一个队列的内存限制为1Mb。
How Producer Flow Control works 生产者流量控制是如何工作的
If you are sending a persistent message (so that a response of the
如果你发送一条持久化消息(这样就会有一个对
Advantage 优势
So a nice producer might wait for a producer ack before sending more data, to avoid flooding the broker (and forcing the broker to block the entire connection if a slow consumer occurs). To see how this works in source code, check out the
所以,一个友好的生产者可以再发送更多的数据之前,等待生产者应答,以此来避免对代理的冲击(并且如果出现了一个比较慢的消费者,强制代理阻塞整个连接)。如果你想知道这部分的源代码是怎么实现的,可以看一下
Though a client can ignore the producer ACKs altogether and the broker should just stall thetransport if it has to for slow consumer handling; though this does mean it'll stall the entire connection.
然而一个客户端可以完全忽略生产者的应答消息,并且处理慢消费者的时候,代理可以在需要的时候拖延传送;虽然这意味着它将拖延整个连接。
Configure Client-Side Exceptions 配置客户端的异常
An alternative to the indefinite blocking of the
应对代理空间不足,而导致不确定的阻塞 send()操作的一种替代方案,就是将其配置成客户端抛出的一个异常。通过将sendFailIfNoSpace属性设置为true,代理将会引起send()方法失败,并抛出javax.jms.ResourceAllocationExcept
<systemUsage> <systemUsage sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage></systemUsage>
The advantage of this property is that the client can catch the
这个属性的好处是,客户端可以捕获javax.jms.ResourceAllocationExcept
Starting in version 5.3.1 the
<systemUsage> <systemUsage sendFailIfNoSpaceAfterTimeout="3000"> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage></systemUsage>
The timeout is defined in milliseconds so the example above waits for three seconds before failing the
定义超时的单位是毫秒,所以上面的例子将会在使send()方法失败并对客户端抛出异常之前,等待三秒。这个属性的优点是,它仅仅阻塞配置指定的时间,而不是立即另发送失败,或者无限期阻塞。这个属性不仅在代理端提供了一个改进,还对客户端提供了一个改进,使得客户端能捕获异常,等待一下并重试send()操作。
Disabling Flow Control 使流量控制无效
A common requirement is to disable flow control so that message dispatching continues until all available disk is used up by pending messages (whether persistent or non persistent messaging is configured). To do this enable
一个普遍的需求是使流量控制无效,使得消息分发能够持续,直到所有可用的磁盘被挂起(pending)的消息耗尽(无论是持久化的还是配置了非持久化的)。要这样做,你可以使用消息游标(Message Cursors)。
System usage 系统占用
You can also slow down producers via some attributes on the
你还可以通过<systemUsage>元素的一些属性来减慢生产者。来看一眼下面的例子:
<systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="64 mb" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb" /> </storeUsage> <tempUsage> <tempUsage limit="10 gb" /> </tempUsage> </systemUsage></systemUsage>
You can set limits of memory for
你可以为非持久化的消息(NON_PERSISTENT
使用jmeter对activemq进行压力测试的方法 - 123864643的日志 - 网易博客
使用JMETER对activemq机器进行压力测试的方法
1. 复制ActiveMQ包和其依赖包到Jmeter的lib目录下
./activemq-all-5.7.0.jar
./lib/optional/log4j-1.2.17.jar
./lib/slf4j-api-1.6.6.jar
./lib/optional/slf4j-log4j12-1.6.6.jar
JMeter 在测试时使用了 JNDI,为了提供 JNDI 提供者的信息,需要提供 jndi.properties。同时需要将 jndi.properties 放到 JMeter 的classpath 中,建议将它与 bin下的ApacheJMeter.jar 打包在一起。对于 ActiveMQ,jndi.properties 的示例内容如下:
2 在jmeter的bin目录下创建jndi.properties
vim jndi.properties
java.naming.factory.initial =org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url= tcp://172.24.144.99:61616
connectionFactoryNames= connectionFactory
queue.MyQueue =example.MyQueue
topic.MyTopic = example.MyTopic
######
#注册queue,格式:
#queue.[jndiName] = [physicalName]
#使用时:(Queue)context.lookup("jndiName"),此处是MyQueue
queue.MyQueue = example.MyQueue
#注册topic,格式:
# topic.[jndiName] = [physicalName]
#使用时:(Topic)context.lookup("jndiName"),此处是MyTopic
topic.MyTopic = example.MyTopic
######
3将jndi.properties添加到ApacheJMeter.jar中
Jar uf ApacheJMeter.jarjndi.properties
4 jmeter配置
=====topic
①创建线程组
②创建测试线程JMS Publisher,JMS Subscriber 具体配置如图
采用的是jndi.properties里的。
Connection Factory填入
connectionFactory
Destination填入MyTopic
注意在下方的文本框内填入测试消息
在线程组中添加线程数持续时间或循环次数
如果要测试认证消息还需要添加认证消息
====Queue
①创建线程组
②创建测试线程 JMS Point-to-Point
运行启动172.24.144.99:8161管理控制台上的Queue和Topic数量的变化。
ActiveMq性能优化 - 王 庆 - 博客园
对性能影响很大的因素:
- 消息是否持久,非持久消息快10倍
- 消息是否异步,异步消息快10倍,点对点消息默认同步,采用异步需要设置brokerURL为:
tcp: //localhost:61616?jms.useAsyncSend=true |
ActiveMq运行是比较稳定的,数据的吞吐速度也很高,如果出现入队列或者出队列慢的问题,先检查一下自己的代码,是不是本身取到数据后处理过慢。
本文的关于性能优化,其实是列举出一些需要注意的点,请确保你的项目没有一下问题:
1. 使用spring的JmsTemplate
JmsTemplate的send和convertAndSend会使用持久化mode,即使你设置了NON_PERSISTENT。这会导致入队列速度变得非常慢。
解决办法,使用下面的MyJmsTemplate代替JmsTemplate。
public class MyJmsTemplate extends JmsTemplate { private Session session; public MyJmsTemplate() { super(); } public MyJmsTemplate(ConnectionFactory connectionFactory) { super(connectionFactory); } public void doSend(MessageProducer producer, Message message) throws JMSException { if (isExplicitQosEnabled()) { producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive()); } else { producer.send(message); } } public Session getSession() { return session; } public void setSession(Session session) { this.session = session; } }
2. DeliveryMode的选择,如果你入队列的数据,不考虑MQ挂掉的情况(这概率很小),使用NON_PERSISTENT会显著提高数据写入速度。
3. 生产者使用事物会提高入队列性能,但是消费者如果启动了事物则会显著影响数据的消费速度。相关代码如下:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
代码中的false代表不启动事物。
4. 消费者的消息处理即onMessage方法优化,举例如下:
public class SmsMoPool implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(SmsMoPool.class); private DefaultEventPubliser moEventPublisher; private final EventFactory eventFactory = new DefaultEventFactory(); private DefaultDataGather dataGather; private ExecutorService pool = Executors.newFixedThreadPool(5); @Override public void onMessage(final Message message) { pool.execute(new Runnable() { @Override public void run() { final ObjectMessage msg = (ObjectMessage) message; Serializable obj = null; try { obj = msg.getObject(); } catch (JMSException e) { logger.error("从消息队列获得上行信息异常{}", e); } if (obj != null) { dataGather.incrementDateCount(MasEntityConstants.TRAFFIC_SMS_MO_IN); AgentToServerReq req = (AgentToServerReq) obj; if (logger.isInfoEnabled()) { logger.info("驱动-->调度:{}", req.toXmlStr()); } Event event = eventFactory.createMoEvent(req); moEventPublisher.publishEvent(event); } } }); } }
这段代码使用了线程池,另一点要注意的是msg.getObject();这个方法是一个比较耗时的方法,你的代码中不应该出现多次getObject()。
5. 消费者使用预取,如何使用预取,下面以spring版本为例
<bean class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="data.mo?consumer.prefetchSize=100"/> </bean>
预取数量根据具体入队列数据而定,以上设置100,是针对2000/sec入队列速度设定的。
另外如果是慢消费者,这里可设置为1。
6. 检查你的MQ数据吞吐速度,保持生产和消费的平衡,不会出现大量积压。
7. ActiveMQ使用TCP协议时 tcpNoDelay=默认是false ,设置为true可以提高性能。
还是spring版本的:
<bean id="mqPoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:useAsyncSend="true" p:brokerURL="failover://(tcp://127.0.0.1:61616?tcpNoDelay=true)"/> </property> </bean>