rabbitmq java client api详解

标签: rabbitmq java client | 发表时间:2014-08-27 07:33 | 作者:
出处:http://2014.54chen.com/

以下内容由 [五四陈科学院]提供

AMQP

  • AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。

基础概念快速入门

  • 每个rabbitmq-server叫做一个Broker,等着tcp连接进入。
  • 在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型。(一对多、直连、多对多等等)
  • Queue是进程内的逻辑队列,有多个,有名字。
  • Binding联系Exchane与Queue。
  • Routing key由生产者指定。Binding key由消费者指定。二者联合决定一条消息的来去。

java client api

连接

1
2
3
4
      ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
  • 以上是得到一个rabbitmq连接最最基础的代码,当然了,还可以设置一些诸如用户名密码的事情。
  • 最后这个channel就可以用来收和发消息了。

消息者线程池

1
2
      ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
  • 消费者时使用,上述自动开了一20个线程的池来搞。

地址数组

1
2
3
      Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
                                 , new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);
  • 上述代码如果连hostname1失败了就去hostname2。
  • factory.newConnection()会触发这个检测。

声明exchange与queue

1
2
3
      channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
  • channel.exchangeDeclare 参数有 交换机名字 类型 是否持久化 不使用时是否自动删除 是否是内部的(不能被客户端使用) 其他参数
  • channel.queueDeclare 参数有 queue名字 是否持久化 独占的queue(仅供此连接) 不使用时是否自动删除 其他参数
  • channel.queueBind 参数有 queue名字 交换机名字 此次绑定使用的路由关键字 其他参数

发出消息

1
2
      byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
  • channel.basicPublish 参数有 要发出的交换机名字 路由关键字 是否强制(设置为true时,找不到收的人时可以通过returnListener返回) 是否立即(其实rabbitmq不支持) 其他属性 消息主体

线程安全

  • Channel是线程好全的,但是最好是每个线程里用自己的Channel,因为在单个Channel里排队是有可能慢一些的。

最简单的办法消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
      boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });
  • 一个Channel一个Consumer。
  • channel.basicAck 回发ACK 参数 tag 是否多个。

零碎

  • channel.basicQos 指定服务质量设置 参数 最大的投送字节数 最大的投送消息数量 设置是否要应用到整个channel(而不是一个消费者)。
  • factory.setAutomaticRecoveryEnabled(true) 网络有问题时,好后可自动恢复设置。
  • cf.setRequestedHeartbeat(5) 设置心跳时间。
  • exchange type可用的值:direct topic headers fanout。
  • exchange的类型有一个default,basicPublish没有指定时使用,而且,如果routingKey在指定绑定的时候,会去到绑定的exchange。
  • channel.queueDeclare().getQueue() 得到的是一个随机queue,断开连接后即删除。
  • 当exchange为direct的时候routingKey与bindingKey必须完全一致才能消费消息。

想快点找到作者也可以到Twitter上留言: @54chen
或者你懒得带梯子上墙,请到新浪微博: @54chen

相关 [rabbitmq java client] 推荐:

rabbitmq java client api详解

- - 五四陈科学院
以下内容由 [五四陈科学院]提供. AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现. 每个rabbitmq-server叫做一个Broker,等着tcp连接进入. 在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型. Queue是进程内的逻辑队列,有多个,有名字.

构建实时Web的JAVA选择组合:socket.io client + socketio-netty server

- - BlogJava-首页技术区
     摘要: 很显然,实时Web,是一种技术趋势,将成为一种人们的默认技术选择,用以拉近和桌面应用的距离. socket.io是一种数据实时推送、事件驱动模型的框架,支持事件订阅,简单易用. 其价值目前看来,还未被完整的挖掘出来. socket.io即提供了node.js服务器端(地址)又提供了客户端(地址)的整体解决方案,而socketio-netty则是基于JAVA服务器端,支持最新socket.io client最新版规范.

【架构】关于RabbitMQ

- - 学习笔记
1      什么是RabbitMQ. RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗. 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:. 例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一个exchange即可,剩下的消息分发工作由RabbitMQ完成.

Zookeeper Client简介

- - zzm
直接使用zk的api实现业务功能比较繁琐. 因为要处理session loss,session expire等异常,在发生这些异常后进行重连. 又因为ZK的watcher是一次性的,如果要基于wather实现发布/订阅模式,还要自己包装一下,将一次性订阅包装成持久订阅. 另外如果要使用抽象级别更高的功能,比如分布式锁,leader选举等,还要自己额外做很多事情.

RabbitMQ (三) 发布/订阅

- - CSDN博客架构设计推荐文章
转发请标明出处: http://blog.csdn.net/lmj623565791/article/details/37657225. 本系列教程主要来自于官网入门教程的翻译,然后自己进行了部分的修改与实验,内容仅供参考. 上一篇博客中,我们实现了工作队列,并且我们的工作队列中的一个任务只会发给一个工作者,除非某个工作者未完成任务意外被杀死,会转发给另外的工作者,如果你还不了解: RabbitMQ (二)工作队列.

Chrome 14 beta启用Native Client

- tinda - Solidot
Google发布了Chrome 14 beta,默认启用Native Client(NaCl),它最早在上半年发布的Chrome 10 beta整合了NaCl,但并未激活. Google在2008年首次推出了试验项目NaCl,让开发者可以编译C/C++代码为不针对特定平台的二进制文件,在浏览器整合的运行时中执行,利用沙盒技术避开安全缺陷.

剑走偏锋的 Native Client

- - 谷奥——探寻谷歌的奥秘
感谢读者  liuyanghejerry 的投稿. 不知不觉,Google已经正式推出其Native Client (NaCl)过去约7个月之久. 而目前国内似乎还没有多少关于NaCl的资料,所以在这里面向Web开发者做一下简单的介绍,希望能够起到一个抛砖引玉的效果. 本文的所有代码均来自于 https://developers.google.com/native-client/devguide/tutorial,如果您对其中的任何技术细节存在疑问,请以原文为准.

Netty Client重连实现

- - 鸟窝
当我们用Netty实现一个TCP client时,我们当然希望当连接断掉的时候Netty能够自动重连. Netty Client有两种情况下需要重连:. Netty Client启动的时候需要重连. 在程序运行中连接断掉需要重连. 对于第一种情况,Netty的作者在stackoverflow上给出了 解决方案,.

抽取rabbitmq网络层做的echo server

- 2sin18 - codedump
传说rabbitmq网络层实现的优雅高效,于是我就尝试着将其中的网络层抽取出来,模拟着做了一个echo服务器,代码放在这里.. rabbitmq的做法是内置状态机,通过切换callback的形式处理不同的业务,这样只有一个子进程处理一个链接,性能提高不少.. 测试这个echo服务器的客户端我使用的是telnet,telnet输入的数据会自动在后面加上”\r\n”发送到对端,于是代码中以这个来判断是否接收了一条消息,抽取出来回复给对端..

RabbitMQ关键性问题调研

- - Java - 编程语言 - ITeye博客
摘要:本篇是本人对RabbitMQ使用的关键性问题进行的调研,如性能上限、数据存储、集群等,.             具体的 RabbitMQ概念、使用方法、SpringAMQP配置,假设读者已有了基础. 1.1  RabbitMQ数据速率问题. 在边读边写的情况下:速率只与网络带宽正相关,网络使用率最高能达到接近100%,并且数据使用率很高(90%以上).