分布式开放消息系统(RocketMQ)的原理与实践

标签: 分布 开放 消息 | 发表时间:2016-05-02 09:29 | 作者:m635674608
出处:http://www.iteye.com

备注:1.如果您此前未接触过RocketMQ,请先阅读附录部分,以便了解RocketMQ的整体架构和相关术语2.文中的MQServer与Broker表示同一概念

  分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:  

  1消息的顺序问题

  2消息的重复问题

  RocketMQ作为阿里开源的一款高性能、高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的?

   关键特性以及其实现原理

   一、顺序消息

  消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是可以并行消费的。

  假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:

  

175724-303b6e1322576021.png



  M1发送到S1后,M2发送到S2,如果要保证M1先于M2被消费,那么需要M1到达消费端后,通知S2,然后S2再将M2发送到消费端。

  这个模型存在的问题是,如果M1和M2分别发送到两台Server上,就不能保证M1先达到,也就不能保证M1被先消费,那么就需要在MQ Server集群维护消息的顺序。那么如何解决?一种简单的方式就是将M1、M2发送到同一个Server上:

  

175724-886b25d2ced8e641.png



  这样可以保证M1先于M2到达MQServer(客户端等待M1成功后再发送M2),根据先达到先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。

  这个模型,理论上可以保证消息的顺序,但在实际运用中你应该会遇到下面的问题:

  

175724-34c5c00c2490136b.png



  只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题。如上图所示,如果发送M1耗时大于发送M2的耗时,那么M2就先被消费,仍然不能保 证消息的顺序。即使M1和M2同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然有可能出现M2先于M1被消费。如何解决这个问题?将M1和 M2发往同一个消费者即可,且发送M1后,需要消费端响应成功后才能发送M2。

  但又会引入另外一个问题,如果发送M1后,消费端1没有响应,那是继续发送M2呢,还是重新发送M1?一般为了保证消息一定被消费,肯定会选择重发M1到另外一个消费端2,就如下图所示。

  

175724-78a8706b4614440e.png



  这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端1没有响应Server时有两种情况,一种是M1确实没有到达,另外一种情况是消费 端1已经响应,但是Server端没有收到。如果是第二种情况,重发M1,就会造成M1被重复消费。也就是我们后面要说的第二个问题,消息重复问题。

  回过头来看消息顺序问题,严格的顺序消息非常容易理解,而且处理问题也比较容易,要实现严格的顺序消息,简单且可行的办法就是:

  保证生产者 - MQServer - 消费者是一对一对一的关系

  但是这样设计,并行度就成为了消息系统的瓶颈(吞吐量不够),也会导致更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

  但我们的最终目标是要集群的高容错性和高吞吐量。这似乎是一对不可调和的矛盾,那么阿里是如何解决的?

  世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!—— 沈询

  有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决它们身上,实际上是浪费的,效率低下的。从这个角度来看消息的顺序问题,我们可以得出两个结论:

  1、不关注乱序的应用实际大量存在2、队列无序并不意味着消息无序

  最后我们从源码角度分析RocketMQ怎么实现发送顺序消息。

  一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。下面的示例中,OrderId相同的消息,会发送到同一个队列:

  1. // RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
  2. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  3.     @Override
  4.     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  5.         Integer id = (Integer) arg;
  6.         int index = id % mqs.size();
  7.         return mqs.get(index);
  8.     }
  9. }, orderId);
复制代码


  在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的队列是同一个队列。

  1. private SendResult send()  {
  2.     // 获取topic路由信息
  3.     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  4.     if (topicPublishInfo != null && topicPublishInfo.ok()) {
  5.         MessageQueue mq = null;
  6.         // 根据我们的算法,选择一个发送队列
  7.         // 这里的arg = orderId
  8.         mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
  9.         if (mq != null) {
  10.             return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
  11.         }
  12.     }
  13. }
复制代码


   二、消息重复

  上面在解决消息顺序问题时,引入了一个新的问题,就是消息重复。那么RocketMQ是怎样解决消息重复的问题呢?还是“恰好”不解决。

  造成消息的重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是不解决,转而绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

  1、消费端处理消息的业务逻辑保持幂等性2、保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

  第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

  我们可以看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重 复消息的概率不一定大,且由消息系统实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是 RocketMQ不解决消息重复的问题的原因。

  RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

   三、事务消息

  RocketMQ除了支持普通消息,顺序消息,另外还支持事务消息。首先讨论一下什么是事务消息以及支持事务消息的必要性。我们以一个转帐的场景为例来说明这个问题:Bob向Smith转账100块。

  在单机环境下,执行事务的情况,大概是下面这个样子:

  

175724-13a6d80b21345f45.png



  当用户增长到一定程度,Bob和Smith的账户及余额信息已经不在同一台服务器上了,那么上面的流程就变成了这样:

  

175724-69101aad0122572b.png



  这时候你会发现,同样是一个转账的业务,在集群环境下,耗时居然成倍的增长,这显然是不能够接受的。那我们如何来规避这个问题?

  大事务 = 小事务 + 异步

  将大事务拆分成多个小事务异步执行。这样基本上能够将跨机事务的执行效率优化到与单机一致。转账的事务就可以分解成如下两个小事务:

  

175724-92abb226f288ff9c.png



  图中执行本地事务(Bob账户扣款)和发送异步消息应该保持同时成功或者失败中,也就是扣款成功了,发送消息一定要成功,如果扣款失败了,就不能再发送消息。那问题是:我们是先扣款还是先发送消息呢?

  首先我们看下,先发送消息,大致的示意图如下:

  

175724-1927b8f3d14ef823.png



  存在的问题是:如果消息发送成功,但是扣款失败,消费端就会消费此消息,进而向Smith账户加钱。

  先发消息不行,那我们就先扣款呗,大致的示意图如下:

  

175724-367b5cf60cbdfa16.png



  存在的问题跟上面类似:如果扣款成功,发送消息失败,就会出现Bob扣钱了,但是Smith账户未加钱。

  可能大家会有很多的方法来解决这个问题,比如:直接将发消息放到Bob扣款的事务中去,如果发送失败,抛出异常,事务回滚。这样的处理方式也符合“恰好”不需要解决的原则。RocketMQ支持事务消息,下面我们来看看RocketMQ是怎样来实现的。

  

175724-ab0085543c6d02d6.png



  RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改 状态。细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息, 它会向消息发送者确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还 是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

  那我们来看下RocketMQ源码,是不是这样来处理事务消息的。客户端发送事务消息的部分(完整代码请查看:rocketmq-example工程 下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

  1. // 未决事务,MQ服务器回查客户端
  2. // 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
  3. TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
  4. // 构造事务消息的生产者
  5. TransactionMQProducer producer = new TransactionMQProducer("groupName");
  6. // 设置事务决断处理类
  7. producer.setTransactionCheckListener(transactionCheckListener);
  8. // 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
  9. TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
  10. producer.start()
  11. // 构造MSG,省略构造参数
  12. Message msg = new Message(......);
  13. // 发送消息
  14. SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
  15. producer.shutdown();
复制代码


  接着查看sendMessageInTransaction方法的源码,总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。

  1. public TransactionSendResult sendMessageInTransaction(.....)  {
  2.     // 逻辑代码,非实际代码
  3.     // 1.发送消息
  4.     sendResult = this.send(msg);
  5.     // sendResult.getSendStatus() == SEND_OK
  6.     // 2.如果消息发送成功,处理与消息关联的本地事务单元
  7.     LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
  8.     // 3.结束事务
  9.     this.endTransaction(sendResult, localTransactionState, localException);
  10. }
复制代码


  endTransaction方法会将请求发往broker(mq server)去更新事物消息的最终状态:  

  根据sendResult找到Prepared消息  

  根据localTransaction更新消息的最终状态  

  如果endTransaction方法执行失败,导致数据没有发送到broker,broker会有回查线程定时(默认1分钟)扫描每个存储事务状态 的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是prepared状态则会向Producer发起CheckTransaction请 求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时 回调请求,而checkTransactionState会调用我们的事务设置的决断方法,最后调用endTransactionOneway让 broker来更新消息的最终状态。

  再回到转账的例子,如果Bob的账户的余额已经减少,且消息已经发送成功,Smith端开始消费这条消息,这个时候就会出现消费失败和消费超时两个问 题?解决超时问题的思路就是一直重试,直到消费端消费消息成功,整个过程中有可能会出现消息重复的问题,按照前面的思路解决即可。

  

175724-1d9ba7bcd230e0dc.png



  这样基本上可以解决超时问题,但是如果消费失败怎么办?阿里提供给我们的解决方法是:人工解决。大家可以考虑一下,按照事务的流程,因为某种原因 Smith加款失败,需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升,且很容易出现Bug,估计出现Bug的概率会比消费 失败的概率大很多。我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,这也是大家在解决疑难问题时需要多多思考的地方。

   四、Producer如何发送消息

  Producer轮询某topic下的所有队列的方式来实现发送方的负载均衡,如下图所示:

  

175724-9eac93e29d0e06ef.png



  首先分析一下RocketMQ的客户端发送消息的源码:

  1. // 构造Producer
  2. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
  3. // 初始化Producer,整个应用生命周期内,只需要初始化1次
  4. producer.start();
  5. // 构造Message
  6. Message msg = new Message("TopicTest1",// topic
  7.                         "TagA",// tag:给消息打标签,用于区分一类消息,可为null
  8.                         "OrderID188",// key:自定义Key,可以用于去重,可为null
  9.                         ("Hello MetaQ").getBytes());// body:消息内容
  10. // 发送消息并返回结果
  11. SendResult sendResult = producer.send(msg);
  12. // 清理资源,关闭网络连接,注销自己
  13. producer.shutdown();
复制代码


  在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:

  如果没有指定namesrv地址,将会自动寻址  

  启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳...  

  启动负载均衡的服务  

  初始化完成后,开始发送消息,发送消息的主要代码如下:

  1. private SendResult sendDefaultImpl(Message msg,......) {
  2.     // 检查Producer的状态是否是RUNNING
  3.     this.makeSureStateOK();
  4.     // 检查msg是否合法:是否为null、topic,body是否为空、body是否超长
  5.     Validators.checkMessage(msg, this.defaultMQProducer);
  6.     // 获取topic路由信息
  7.     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  8.     // 从路由信息中选择一个消息队列
  9.     MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
  10.     // 将消息发送到该队列上去
  11.     sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
  12. }
复制代码


  代码中需要关注的两个方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面说过在 producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo会首先从缓存中获取 topic路由信息,如果没有获取到,则会自己去namesrv获取路由信息。selectOneMessageQueue方法通过轮询的方式,返回一个 队列,以达到负载均衡的目的。

  如果Producer发送消息失败,会自动重试,重试的策略:

  重试次数 < retryTimesWhenSendFailed(可配置)   

  总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)  

  同时满足上面两个条件后,Producer会选择另外一个队列发送消息  

   五、消息存储

  RocketMQ的消息存储是由consume queue和commit log配合完成的。

  1、Consume Queue

  consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。

  我们可以在配置中指定consumequeue与commitlog存储的目录每个topic下的每个queue都有一个对应的consumequeue文件,比如:

  1. ${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
复制代码


  Consume Queue文件组织,如图所示:

  

175724-d1b4318d77a96950.png



  根据topic和queueId来组织文件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。  

  按照消费端的GroupName来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,比如图中的%RETRY%ConsumerGroupA。  

  按照消费端的GroupName来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,比如图中的%DLQ%ConsumerGroupA。  

  死信队列(Dead Letter Queue)一般用于存放由于某种原因无法传递的消息,比如处理失败或者已经过期的消息。  

  Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,如下图所示:

  

175724-7212acc81b91c086.png



  CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量  

  Size存储中消息的大小  

  Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)  

  2、Commit Log

  CommitLog:消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。文件的默认位置如下,仍然可通过配置文件修改:

  ${user.home} \store\${commitlog}\${fileName}

  CommitLog的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构如下表所示,按照编号顺序以及编号对应的内容依次存储。

  

175724-96ed677eb504abfe.png



  3、消息存储实现

  1. // Set the storage time
  2. msg.setStoreTimestamp(System.currentTimeMillis());
  3. // Set the message body BODY CRC (consider the most appropriate setting
  4. msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
  5. StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
  6. synchronized (this) {
  7.     long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
  8.     // Here settings are stored timestamp, in order to ensure an orderly global
  9.     msg.setStoreTimestamp(beginLockTimestamp);
  10.     // MapedFile:操作物理文件在内存中的映射以及将内存数据持久化到物理文件中
  11.     MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
  12.     // 将Message追加到文件commitlog
  13.     result = mapedFile.appendMessage(msg, this.appendMessageCallback);
  14.     switch (result.getStatus()) {
  15.     case PUT_OK:break;
  16.     case END_OF_FILE:
  17.          // Create a new file, re-write the message
  18.          mapedFile = this.mapedFileQueue.getLastMapedFile();
  19.          result = mapedFile.appendMessage(msg, this.appendMessageCallback);
  20.      break;
  21.      DispatchRequest dispatchRequest = new DispatchRequest(
  22.                 topic,// 1
  23.                 queueId,// 2
  24.                 result.getWroteOffset(),// 3
  25.                 result.getWroteBytes(),// 4
  26.                 tagsCode,// 5
  27.                 msg.getStoreTimestamp(),// 6
  28.                 result.getLogicsOffset(),// 7
  29.                 msg.getKeys(),// 8
  30.                 /**
  31.                  * Transaction
  32.                  */
  33.                 msg.getSysFlag(),// 9
  34.                 msg.getPreparedTransactionOffset());// 10
  35.     // 1.分发消息位置到ConsumeQueue
  36.     // 2.分发到IndexService建立索引
  37.     this.defaultMessageStore.putDispatchRequest(dispatchRequest);
  38. }
复制代码


  4、消息的索引文件

  如果一个消息包含key值的话,会使用IndexFile存储消息索引,文件的内容结构如图:

  

175724-4deee0fb9d08e02d.png



  索引文件主要用于根据key来查询消息的,流程主要是:  

  根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)  

  根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)  

  遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)  

   六、消息订阅

  RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

  首先看下消费端的负载均衡:

  

175724-fdbb184a5d9bd022.png



  消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载:  

  遍历Consumer下的所有topic,然后根据topic订阅所有的消息  

  获取同一topic和Consumer Group下的所有Consumer  

  然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等  

  如同上图所示:如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列。这里采用的就是平均分配策略,它类似于我们的分页,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页 有多少条记录,就类似于某个Consumer会消费哪些队列。

  通过这样的策略来达到大体上的平均消费,这样的设计也可以很方面的水平扩展Consumer来提高消费能力。

  消费端的Push模式是通过长轮询的模式来实现的,就如同下图:

  

175724-f2e2ee205d49a05f.png



  Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer 端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直 到有数据传递或超时才返回。

  当然,Consumer端是通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的 PullRequest发送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的 PullRequest时,如果发现没有消息,就会把PullRequest扔到ConcurrentHashMap中缓存起来。broker在启动时, 会启动一个线程不停的从ConcurrentHashMap取出PullRequest检查,直到有数据返回。

  七、RocketMQ的其他特性

  前面的6个特性都是基本上都是点到为止,想要深入了解,还需要大家多多查看源码,多多在实际中运用。当然除了已经提到的特性外,RocketMQ还支持:

  定时消息  

  消息的刷盘策略  

  主动同步策略:同步双写、异步复制  

  海量消息堆积能力  

  高效通信  

  .......  

  其中涉及到的很多设计思路和解决方法都值得我们深入研究:  

  消息的存储设计:既要满足海量消息的堆积能力,又要满足极快的查询效率,还要保证写入的效率。  

  高效的通信组件设计:高吞吐量,毫秒级的消息投递能力都离不开高效的通信。  

  .......  

   RocketMQ最佳实践

  一、Producer最佳实践


  1、一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。2、每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。3、消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。4、对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。5、某些应用如果不 关注消息是否发送成功,请直接使用sendOneWay方法发送消息。  

   二、Consumer最佳实践

  1、消费过程要做到幂等(即消费端去重)2、尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。3、优化每条消息消费过程  

   三、其他配置

  线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false。

  RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配 置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选 择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。后果就是:以后所有该 TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

  所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。

  RocketMQ设计相关

  RocketMQ的设计假定:

  每台PC机器都可能宕机不可服务任意集群都有可能处理能力不足最坏的情况一定会发生内网环境需要低延迟来提供最佳用户体验  

  RocketMQ的关键设计:

  分布式集群化强数据安全海量数据堆积毫秒级投递延迟(推拉模式)  

  这是RocketMQ在设计时的假定前提以及需要到达的效果。我想这些假定适用于所有的系统设计。随着我们系统的服务的增多,每位开发者都要注意自己 的程序是否存在单点故障,如果挂了应该怎么恢复、能不能很好的水平扩展、对外的接口是否足够高效、自己管理的数据是否足够安全...... 多多规范自己的设计,才能开发出高效健壮的程序。

   附录:RocketMQ涉及到的几个专业术语和整体架构介绍

  一、RocketMQ中的专业术语


  Topictopic表示消息的第一级类型,比如一个电商系统的消息可以分为:交易消息、物流消息...... 一条消息必须有一个Topic。

  TagTag表示消息的第二级类型,比如交易消息又可以分为:交易创建消息,交易完成消息..... 一条消息可以没有Tag。RocketMQ提供2级消息分类,方便大家灵活控制。

  Queue一个topic下,我们可以设置多个queue(消息队列)。当我们发送消息时,需要要指定该消息的topic。RocketMQ会轮询该topic下的所有队列,将消息发送出去。

  Producer 与 Producer GroupProducer表示消息队列的生产者。消息队列的本质就是实现了publish-subscribe模式,生产者生产消息,消费者消费消息。所以这里的Producer就是用来生产和发送消息的,一般指业务系统。

  Producer Group是一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致。

  Consumer 与 Consumer Group消息消费者,一般由后台系统异步消费消息。

  Push ConsumerConsumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。Pull ConsumerConsumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。

  Consumer Group是一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。

  Broker消息的中转者,负责存储和转发消息。可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker是RocketMQ的核心,它不不能挂的,所以需要保证broker的高可用。

  广播消费一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。在广播消费中的Consumer Group概念可以认为在消息划分方面无意义。

  集群消费一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有 9 条消息,其中一个Consumer Group有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。

  NameServerNameServer即名称服务,两个功能:

  接收broker的请求,注册broker的路由信息

  接口client的请求,根据某个topic获取其到broker的路由信息NameServer没有状态,可以横向扩展。每个broker在启动的 时候会到NameServer注册;Producer在发送消息前会根据topic到NameServer获取路由(到broker)信 息;Consumer也会定时获取topic路由信息。

   二、RocketMQ Overview

  

175724-697bf92a2cd4cd1d.png



  Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列;如果做集群消费,则多个Consumer实例平均消费这个Topic对应的队列集合。

  再看下RocketMQ物理部署结构图:

  

175724-9875fde6f39e788b.png



  RocketMQ网络部署特点:

  Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个 Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId=0表示 Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。

  Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer 完全无状态,可集群部署。

  Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic 路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从 Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

   三、其他参考资料

  RocketMQ用户指南  

  RocketMQ原理简介  

  RocketMQ最佳实践  

  阿里分布式开放消息服务(ONS)原理与实践2

  阿里分布式开放消息服务(ONS)原理与实践3

  RocketMQ原理解析

 

http://udn.yyuap.com/article-7981.html



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [分布 开放 消息] 推荐:

分布式开放消息系统(RocketMQ)的原理与实践

- - 编程语言 - ITeye博客
备注:1.如果您此前未接触过RocketMQ,请先阅读附录部分,以便了解RocketMQ的整体架构和相关术语2.文中的MQServer与Broker表示同一概念.   分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点. 而谈到消息系统的设计,就回避不了两个问题:  .

分布式消息系统:Kafka

- - 标点符
Kafka是分布式发布-订阅消息系统. 它最初由LinkedIn公司开发,之后成为Apache项目的一部分. Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务. 在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转. 传统的企业消息系统并不是非常适合大规模的数据处理.

kafka分布式消息系统

- - CSDN博客云计算推荐文章
Kafka[1]是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态). 当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线).

淘宝开源分布式消息中间件Metamorphosis

- - InfoQ cn
最近,淘宝开源了分布式消息中间件 Memorphosis项目,它是Linkedin开源MQ——Kafka的Java版本,针对淘宝内部应用做了定制和优化. 据了解,Metamorphosis(以下简称Meta)的设计原则包括:. 分布式,生产者、服务器和消费者都可分布. Metamorphosis的总体 架构图如下:.

案例分析:基于消息的分布式架构

- - 简单文本
美国计算机科学家,LaTex的作者Leslie Lamport说:“分布式系统就是这样一个系统,系统中一个你甚至都不知道的计算机出了故障,却可能导致你自己的计算机不可用. ”一语道破了开发分布式系统的玄机,那就是它的复杂与不可控. 所以Martin Fowler强调:分布式调用的第一原则就是不要分布式.

kafka:一个分布式消息系统 - 十九画生

- - 博客园_首页
最近因为工作需要,调研了追求高吞吐的轻量级消息系统Kafka,打算替换掉线上运行的ActiveMQ,主要是因为明年的预算日流量有十亿,而ActiveMQ的分布式实现的很奇怪,所以希望找一个适合分布式的消息系统. 以下是内容是调研过程中总结的一些知识和经验,欢迎拍砖. 首先,我们来看看什么是消息队列,维基百科里的解释翻译过来如下:.

Apache Kafka:下一代分布式消息系统

- - zzm
Apache Kafka是分布式发布-订阅消息系统. 它最初由LinkedIn公司开发,之后成为Apache项目的一部分. Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同:. 它被设计为一个分布式系统,易于向外扩展;.

基于消息的分布式架构设计-转

- - 学习笔记
随着社会的发展,经济的飞跃,传统的单系统模式(webApp+DB)已经很难满足业务场景的需要. 企业系统开始不断演化成多个子系统并存协作的局面. 大大降低了系统间的耦合性,更重要的便于子系统的扩展、升级、维护等. 谈到系统间的协作,目前常用两种方式:. 通过客户端发起的get、post请求,服务端接收request请求,处理请求,得到响应内容,通过网络传送到客户端,由浏览器解析出一个可视化的页面.

如何用消息系统避免分布式事务?

- - CSDN博客推荐文章
  前阵子从支付宝转账1万块钱到余额宝,这是日常生活的一件普通小事,但作为互联网研发人员的职业病,我就思考支付宝扣除1万之后,如果系统挂掉怎么办,这时余额宝账户并没有增加1万,数据就会出现不一致状况了.   上述场景在各个类型的系统中都能找到相似影子,比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品数量必须减1吧,怎么保证.

万亿级数据洪峰下的分布式消息引擎

- - zzm
通过简单回顾阿里中间件(Aliware)消息引擎的发展史,本文开篇于双11消息引擎面临的低延迟挑战,通过经典的应用场景阐述可能会面临的问题 - 响应慢,雪崩,用户体验差,继而交易下跌. 为了应对这些不可控的洪峰数据,中间件团队通过大量研究和实践,推出了低延迟高可用解决方案,在分布式存储领域具有一定的普适性.