<< 关于 Jms Topic 持久订阅 - Bory.Chan | 首页 | 使用jmeter对activemq进行压力测试的方法 - 123864643的日志 - 网易博客 >>

ActiveMq性能优化 - 王 庆 - 博客园

对性能影响很大的因素:

  • 消息是否持久,非持久消息快10倍
  • 消息是否异步,异步消息快10倍,点对点消息默认同步,采用异步需要设置brokerURL为:

 

tcp://localhost:61616?jms.useAsyncSend=true

 

ActiveMq运行是比较稳定的,数据的吞吐速度也很高,如果出现入队列或者出队列慢的问题,先检查一下自己的代码,是不是本身取到数据后处理过慢。

本文的关于性能优化,其实是列举出一些需要注意的点,请确保你的项目没有一下问题:

1. 使用spring的JmsTemplate

 JmsTemplate的send和convertAndSend会使用持久化mode,即使你设置了NON_PERSISTENT。这会导致入队列速度变得非常慢。

 解决办法,使用下面的MyJmsTemplate代替JmsTemplate。

复制代码
public class MyJmsTemplate extends JmsTemplate {     private Session session;      public MyJmsTemplate() {         super();     }      public MyJmsTemplate(ConnectionFactory connectionFactory) {         super(connectionFactory);     }      public void doSend(MessageProducer producer, Message message) throws JMSException {         if (isExplicitQosEnabled()) {             producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());         } else {             producer.send(message);         }     }      public Session getSession() {         return session;     }      public void setSession(Session session) {         this.session = session;     } }
复制代码

 

2. DeliveryMode的选择,如果你入队列的数据,不考虑MQ挂掉的情况(这概率很小),使用NON_PERSISTENT会显著提高数据写入速度。

3. 生产者使用事物会提高入队列性能,但是消费者如果启动了事物则会显著影响数据的消费速度。相关代码如下:

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

代码中的false代表不启动事物。

4. 消费者的消息处理即onMessage方法优化,举例如下:

复制代码
public class SmsMoPool implements MessageListener {     private final static Logger logger = LoggerFactory.getLogger(SmsMoPool.class);     private DefaultEventPubliser moEventPublisher;     private final EventFactory eventFactory = new DefaultEventFactory();     private DefaultDataGather dataGather;     private ExecutorService pool = Executors.newFixedThreadPool(5);      @Override     public void onMessage(final Message message) {         pool.execute(new Runnable() {             @Override             public void run() {                 final ObjectMessage msg = (ObjectMessage) message;                 Serializable obj = null;                 try {                     obj = msg.getObject();                 } catch (JMSException e) {                     logger.error("从消息队列获得上行信息异常{}", e);                 }                 if (obj != null) {                     dataGather.incrementDateCount(MasEntityConstants.TRAFFIC_SMS_MO_IN);                     AgentToServerReq req = (AgentToServerReq) obj;                     if (logger.isInfoEnabled()) {                         logger.info("驱动-->调度:{}", req.toXmlStr());                     }                     Event event = eventFactory.createMoEvent(req);                     moEventPublisher.publishEvent(event);                 }             }         });     } }
复制代码

这段代码使用了线程池,另一点要注意的是msg.getObject();这个方法是一个比较耗时的方法,你的代码中不应该出现多次getObject()。

5. 消费者使用预取,如何使用预取,下面以spring版本为例

  <bean class="org.apache.activemq.command.ActiveMQQueue">
                <constructor-arg value="data.mo?consumer.prefetchSize=100"/>
            </bean>

预取数量根据具体入队列数据而定,以上设置100,是针对2000/sec入队列速度设定的。
另外如果是慢消费者,这里可设置为1。

6. 检查你的MQ数据吞吐速度,保持生产和消费的平衡,不会出现大量积压。

7. ActiveMQ使用TCP协议时 tcpNoDelay=默认是false ,设置为true可以提高性能。

还是spring版本的:

 <bean id="mqPoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:useAsyncSend="true"                   p:brokerURL="failover://(tcp://127.0.0.1:61616?tcpNoDelay=true)"/>
        </property>
    </bean>

阅读全文……

标签 : ,



发表评论 发送引用通报