RabbitMQ:镜像队列Mirrored queue

标签: rabbitmq 镜像 队列 | 发表时间:2013-07-23 18:11 | 作者:
出处:https://www.iteye.com/blog/user/flyingdutchman
        在上一节 《RabbitMQ集群类型一:在单节点上构建built-in内置集群》中我们已经学习过:在集群环境中,队列只有元数据会在集群的所有节点同步,但队列中的数据只会存在于一个节点,数据没有冗余且容易丢,甚至在durable的情况下,如果所在的服务器节点宕机,就要等待节点恢复才能继续提供消息服务。
        那么是不是有消息冗余的解决方案呢?这就要提到从RabbitMQ 2.6.0版本开始提供支持 镜像队列(Mirrored Queue),消息会在rabbitmq节点之间的被创建为“镜像队列”的队列之间复制。和其它的主从设计一样,镜像队列也有master和slave的概念,一旦某个节点当掉,会在其余的节点中选举一个slave作为master。

        在镜像队列(Mirrored Queue)中,只有master的copy对外提供服务,而其他slave copy只提供备份服务,在master copy所在节点不可用时,选出一个slave copy作为新的master继续对外提供服务。

        之前我们也将过,rabbitmq会同过一种“选举”机制在余下的所有的salve中选出一个

        理想总是简单,真实却总是复杂。如果我们要在运行时添加一个新的节点到集群中,消息复制会怎么处理?如果有新节点加入,RabbitMQ不会同步之前的历史数据,新结点只会复制该结点加入到集群之后新增消息.这里的假设是随着消息的被consumer取走,最终所有的节点的数据都会对齐一致。
        接下来,一个自然的追问就“诞生”了:既然master节点退出集群会选一个slave作为master,那么如果不幸选中了一个刚刚加入集群的节点怎么办?那消息不就丢了吗!?这里您可以把心放到您的肚子里,RabbitMQ集群内部会维护节点的状态是否已经同步,使用rabbitmqctl的synchronised_slave_pids参数,就可以查看状态.如果slave_pids和synchronised_slave_pids里面的节点是一致的,那说明全都同步了.如果不一致很容易比较出来哪些还没有同步, 集群只会在“最老”的slave节点之间选一个出来作为新的master节点。

        镜像队列分为两种:集群内全节点复制的镜像队列和集群内局部节点复制的镜像队列。先面我们就看一下如何创建镜像队列Mirrored Queue:其实很简单,只要在创建消息队列时,添加一个叫“x-ha-policy”的key/value对就可以了:
引用

        //创建集群内全节点复制的镜像队列
        ...
        queue_args = {'x-ha-policy' : 'all'}
        channel.queue_declare(queue = 'hello-queue',arguments = queue_args)
        ...

        //创建集群内局部节点复制的镜像队列
        ...
        queue_args = {'x-ha-policy' : 'nodes',
                      'x-ha-policy-params' : [rabbit@JackyChen,rabbit3@JackyChen]}
        channel.queue_declare(queue = 'hello-queue',arguments = queue_args)
        ...
       


        下面我们通过实际的python代码并执行相关的操作来验证一下镜像队列:
引用

        # mkdir -p /data/rabbitmq-pika/c5
        # cd /data/rabbitmq-pika/c5
        # touch hello_world_mirrored_producer.py
        # chmod +x hello_world_mirrored_producer.py
        # touch hello_world_mirrored_consumer.py
        # chmod +x hello_world_mirrored_consumer.py
       

        其中hello_world_mirrored_producer.py代码如下:
引用

#!/usr/bin/env python
#coding=utf-8

import pika,sys
from pika import spec

#在"/"虚拟主机vhost上通过用户guest建立channel通道
user_name = 'guest'
user_passwd = 'guest'
target_host = 'JackyChen'
vhost = '/'
cred = pika.PlainCredentials(user_name,user_passwd)
conn_params = pika.ConnectionParameters(target_host,
                                        virtual_host = vhost,
                                        credentials = cred)
conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel()

#创建一个direct类型的、持久化的、没有consumer时队列是否自动删除的exchage交换机
channel.exchange_declare(exchange = 'hello-exch',
                         type = 'direct',    
                         passive = False,    
                         durable = True,    
                         auto_delete = False)    
                        
#使用接收到的信息创建消息
msg = sys.argv[1]
msg_props = pika.BasicProperties()
msg_props.content_type = 'text/plain'
#持久化消息
msg_props.delivery_mode = 2
msg_ids = []

print 'ready to publish...'
#发布消息
channel.basic_publish(body = msg,
                      exchange = 'hello-exch',
                      properties = msg_props,
                      routing_key = 'hala')
print 'published!'
msg_ids.append(len(msg_ids) + 1)
print len(msg_ids)
channel.close()
conn_broker.close()
       

        hello_world_mirrored_consumer.py代码如下:
引用

#!/usr/bin/env python
#coding=utf-8

import pika

#在"/"虚拟主机vhost上通过用户guest建立channel通道
user_name = 'guest'
user_passwd = 'guest'
target_host = 'JackyChen'
vhost = '/'
cred = pika.PlainCredentials(user_name,user_passwd)
conn_params = pika.ConnectionParameters(target_host,
                                        virtual_host = vhost,
                                        credentials = cred)
conn_broker = pika.BlockingConnection(conn_params)
conn_channel = conn_broker.channel()

#创建一个direct类型的、持久化的、没有consumer时,队列是否自动删除exchage交换机
conn_channel.exchange_declare(exchange = 'hello-exch',
                              type = 'direct',
                              passive = False, 
                              durable = True, 
                              auto_delete = False) 
 
#创建一个持久化的、没有consumer时队列是否自动删除的名为“hello-queue”
#创建集群内全节点复制的镜像队列
queue_args = {'x-ha-policy' : 'all'}
#创建集群内局部节点复制的镜像队列
#queue_args = {'x-ha-policy' : 'nodes','x-ha-policy-params' : ['rabbit@JackyChen','rabbit3@JackyChen']}
conn_channel.queue_declare(queue = 'hello-queue', 
                           durable = True, 
                           auto_delete = False,
                           arguments = queue_args)
 
#将“hello-queue”队列通过routing_key绑定到“hello-exch”交换机 
conn_channel.queue_bind(queue = 'hello-queue',
                        exchange = 'hello-exch',
                   routing_key = 'hala')
 
#定义一个消息确认函数,消费者成功处理完消息后会给队列发送一个确认信息,然后该消息会被删除
def ack_info_handler(channel,method,header,body):
    """ack_info_handler """
    print 'ack_info_handler() called!' 
    if body == 'quit':
        channel.basic_cancel(consumer_tag = 'hello-hala')
        channel.stop_sonsuming()
    else: 
        print body
        channel.basic_ack(delivery_tag = method.delivery_tag)

conn_channel.basic_consume(ack_info_handler, 
                           queue = 'hello-queue',
                           no_ack = False,
                           consumer_tag = 'hello-hala')
 
print 'ready to consume msg...'
conn_channel.start_consuming()
       


        打开rabbitmq集群中所有节点:
引用

        # /opt/mq/rabbitmq/sbin/rabbitmqctl start_app
        # /opt/mq/rabbitmq2/sbin/rabbitmqctl start_app
        # /opt/mq/rabbitmq3/sbin/rabbitmqctl start_app
       

        然后执行:
引用

        # ./hello_world_mirrored_consumer.py
        # ./hello_world_mirrored_producer.py
       



        注意:上面是rabbitmq 3.0之前的创建镜像队列的方法,3.0之后改为通过
引用

        //给所有以“hello”开头为名创建的消息队列设置为集群内全节点复制的镜像队列
        # ./rabbitmqctl set_policy ha-all2 "^hello.*" '{"ha-mode":"all"}'
       


        创建集群内局部节点复制的镜像队列:
引用

       
        # ./rabbitmqctl set_policy ha-all "^halo.*" '{"ha-mode":"nodes","ha-params":["rabbit@JackyChen","rabbit3@JackyChen"]}'
       


        另外还有一种的镜像队列:
引用

        //只指定在整个集群节点中只包含count = n 个镜像的镜像列表
        # ./rabbitmqctl set_policy ha-all3 "^alert.*" '{"ha-mode":"exactly","count":2}'
       


       

       

已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [rabbitmq 镜像 队列] 推荐:

RabbitMQ:镜像队列Mirrored queue

- - 飞翔的荷兰人
        在上一节 《RabbitMQ集群类型一:在单节点上构建built-in内置集群》中我们已经学习过:在集群环境中,队列只有元数据会在集群的所有节点同步,但队列中的数据只会存在于一个节点,数据没有冗余且容易丢,甚至在durable的情况下,如果所在的服务器节点宕机,就要等待节点恢复才能继续提供消息服务.

【架构】关于RabbitMQ

- - 学习笔记
1      什么是RabbitMQ. RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗. 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:. 例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一个exchange即可,剩下的消息分发工作由RabbitMQ完成.

RabbitMQ (三) 发布/订阅

- - CSDN博客架构设计推荐文章
转发请标明出处: http://blog.csdn.net/lmj623565791/article/details/37657225. 本系列教程主要来自于官网入门教程的翻译,然后自己进行了部分的修改与实验,内容仅供参考. 上一篇博客中,我们实现了工作队列,并且我们的工作队列中的一个任务只会发给一个工作者,除非某个工作者未完成任务意外被杀死,会转发给另外的工作者,如果你还不了解: RabbitMQ (二)工作队列.

rabbitmq java client api详解

- - 五四陈科学院
以下内容由 [五四陈科学院]提供. AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现. 每个rabbitmq-server叫做一个Broker,等着tcp连接进入. 在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型. Queue是进程内的逻辑队列,有多个,有名字.

抽取rabbitmq网络层做的echo server

- 2sin18 - codedump
传说rabbitmq网络层实现的优雅高效,于是我就尝试着将其中的网络层抽取出来,模拟着做了一个echo服务器,代码放在这里.. rabbitmq的做法是内置状态机,通过切换callback的形式处理不同的业务,这样只有一个子进程处理一个链接,性能提高不少.. 测试这个echo服务器的客户端我使用的是telnet,telnet输入的数据会自动在后面加上”\r\n”发送到对端,于是代码中以这个来判断是否接收了一条消息,抽取出来回复给对端..

RabbitMQ关键性问题调研

- - Java - 编程语言 - ITeye博客
摘要:本篇是本人对RabbitMQ使用的关键性问题进行的调研,如性能上限、数据存储、集群等,.             具体的 RabbitMQ概念、使用方法、SpringAMQP配置,假设读者已有了基础. 1.1  RabbitMQ数据速率问题. 在边读边写的情况下:速率只与网络带宽正相关,网络使用率最高能达到接近100%,并且数据使用率很高(90%以上).

利用RabbitMQ实现分布式事务

- -
  实现要点:1、构建本地消息表及定时任务,确保消息可靠发送;2、RabbitMQ可靠消费;3、redis保证幂等.   两个服务:订单服务和消息服务.   使用springboot构建项目,相关代码如下. //设置消息发送确认回调,发送成功后更新消息表状态. //定时扫描记录表,将发送状态为0的消息再次发送,甚至可以记录重发次数,必要时人工干预,生产环境中需要单独部署定时任务.

[转][RabbitMQ+Python入门经典] 兔子和兔子窝

- lostsnow - heiyeluren的blog(黑夜路人的开源世界)
高级消息队列协议(AMQP1)是一个异步消息传递所使用的应用层协议规范. AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具. RabbitMQ作为一个工业级的消息队列中间件,基于AMQP协议的实现,由erlang语言编写. 本文讲解 RabbitMQ+Python 的使用.

使用django+celery+RabbitMQ实现异步执行

- MetUrMaker - idv2
RabbitMQ大家应该不陌生,著名的消息队列嘛. 可惜我最近才听说它的大名,了解之后不禁惊呼,世界上居然还有这种东西. 立刻觉得手里有了锤子,就看什么都是钉子了,主网站不愿意干的操作统统扔给RabbitMQ去做吧. 言归正传,先介绍一下这篇文章的应用场景吧. 我们知道大型网站的性能非常重要,然而有时不得不做一些相当耗时的操作.

Python 的服务器推送解决方案:Orbited + RabbitMQ

- 非狐外传 - python.cn(jobs, news)
最近公司要用到服务器推送技术,google了一下,nodejs固然好,但是公司的东西都是python搞的,. 所以选择了python的 Orbited +  RabbitMQ,无奈Orbited文档极其缺乏,所以要做下笔记. 以下都是在windows平台上搞的测试. RabbitMQ:先要装好Erlang,然后下RabbitMQ的win版exe文件安装,由于要用到stomp协议,.