redis作为消息队列的使用

标签: redis 作为 消息队列 | 发表时间:2015-09-26 00:40 | 作者:
出处:http://www.iteye.com

在redis支持的数据结构中,有一个是集合list. 对List的操作常见的有lpush  lrange等。在这种常见的操作时,我们是把集合当做典型意义上的‘集合’来使用的。往往容易被忽视的是List作为“队列”的使用情况。

反编译redis的jar包,会发现:

 public String rpop(String key)
  {
    checkIsInMulti();
    this.client.rpop(key);
    return this.client.getBulkReply();
  }

public String rpoplpush(String srckey, String dstkey)
  {
    checkIsInMulti();
    this.client.rpoplpush(srckey, dstkey);
    return this.client.getBulkReply();
  }

public String lpop(String key)
  {
    checkIsInMulti();
    this.client.lpop(key);
    return this.client.getBulkReply();
  }

 pop意为“弹”,是队列里的取出元素。rpop意为"right pop"意思就是从队列的右边取元素,lpop就是"left pop"当然就是从队列的左边取元素了。对应取元素,我们往队列里面Push元素同样有2个方向:

public Long lpush(String key, String[] strings)
  {
    checkIsInMulti();
    this.client.lpush(key, strings);
    return this.client.getIntegerReply();
  }

public Long rpush(String key, String[] strings)
  {
    checkIsInMulti();
    this.client.rpush(key, strings);
    return this.client.getIntegerReply();
  }

 好了,下面正式开始测试下redis中List数据结构作为队列的使用方法:

假设我需要往队列里面Push map,该怎么做呢? push  String值的话比价简单,这里不再研究了……

 

往队列里push map的话,有以下2种方式:

第一种:

往队列里push一个map:

class PushThread implements Runnable {
	
	@Override
	public void run() {
		Jedis jedis = null;
		try{
			jedis = JedisPoolUtils.getJedis();
			
			for(int i=0; i<10; i++) {
				Map map = new HashMap();
				map.put("test", "test" + i);
				map.put("ok", "ok" + i);
				jedis.lpush("test".getBytes(), ObjectUtils.objectToBytes(map));//Object对象转换成byte数组的方法,这里省略
			}	
			
			
		} catch(Exception e) {
			e.printStackTrace();
		} finally{
			JedisPoolUtils.returnRes(jedis);
		}
	}
	
}

 从队列里取出map:

class PopThread implements Runnable {

	@Override
	public void run() {
		Jedis jedis = null;
		try{
			jedis = JedisPoolUtils.getJedis();
			
			for(int i=0; i<10; i++) {
				byte[] key = jedis.rpop("test".getBytes());
				System.out.println("弹出:" + ObjectUtils.bytesToObject(key));//byte数组转换为Object对象的方法,这里省略

			}	
			
			
		} catch(Exception e) {
			e.printStackTrace();
		} finally{
			JedisPoolUtils.returnRes(jedis);
		}
	}

 依次启动Push线程和pop线程,可在pop线程端看到打印结果:

弹出:{ok=ok0, test=test0}

弹出:{ok=ok1, test=test1}

弹出:{ok=ok2, test=test2}

弹出:{ok=ok3, test=test3}

弹出:{ok=ok4, test=test4}

弹出:{ok=ok5, test=test5}

弹出:{ok=ok6, test=test6}

弹出:{ok=ok7, test=test7}

弹出:{ok=ok8, test=test8}

弹出:{ok=ok9, test=test9}

 

第二种方式:

因为第一种方式毕竟需要在byte数组和Object对象之间来回转换,效率上可能存在一定影响。所以我们加以改进。不妨只把map的key放在一个队列中,取队列元素时,只取key,然后根据key取map即可。

push:

class PushThread implements Runnable {
	
	@Override
	public void run() {
		Jedis jedis = null;
		try{
			jedis = JedisPoolUtils.getJedis();
			
			for(int i=0; i<10; i++) {
				Map map = new HashMap();
				map.put("test", "test" + i);
				map.put("ok", "ok" + i);
				String key = "map" + i;
				jedis.lpush("test", key);//把所有map的key放入一个队列test中
				jedis.hmset(key, map);
			}	
			
			
		} catch(Exception e) {
			e.printStackTrace();
		} finally{
			JedisPoolUtils.returnRes(jedis);
		}
}

 pop元素:

class PopThread implements Runnable {

	@Override
	public void run() {
		Jedis jedis = null;
		try{
			jedis = JedisPoolUtils.getJedis();
			
			for(int i=0; i<10; i++) {
				String key = jedis.rpop("test");
				System.out.println("弹出:" + jedis.hgetAll(key));
			}	
			
			
		} catch(Exception e) {
			e.printStackTrace();
		} finally{
			JedisPoolUtils.returnRes(jedis);
		}
	}
	
}

  依次启动Push线程和pop线程,可在pop线程端看到打印结果:

弹出:{test=test0, ok=ok0}

弹出:{test=test1, ok=ok1}

弹出:{test=test2, ok=ok2}

弹出:{test=test3, ok=ok3}

弹出:{test=test4, ok=ok4}

弹出:{test=test5, ok=ok5}

弹出:{test=test6, ok=ok6}

弹出:{test=test7, ok=ok7}

弹出:{test=test8, ok=ok8}

弹出:{test=test9, ok=ok9}

 

两种方式效果一样,只不过第二种效率上更好一些。

注:当你依次从队列里取出元素后,队列在redis中就不存在了,所以当你pop完元素,再尝试pop的话,会报异常:

redis.clients.jedis.exceptions.JedisDataException: value sent to redis cannot be null

同时,redis中这个队列也不复存在了!

 

利用redis作为队列的性质我们可以用它来达到类似ZMQ的作用。。



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [redis 作为 消息队列] 推荐:

redis作为消息队列的使用

- - ITeye博客
在redis支持的数据结构中,有一个是集合list. 对List的操作常见的有lpush  lrange等. 在这种常见的操作时,我们是把集合当做典型意义上的‘集合’来使用的. 往往容易被忽视的是List作为“队列”的使用情况. 反编译redis的jar包,会发现:.  pop意为“弹”,是队列里的取出元素.

用redis实现支持优先级的消息队列

- - 企业架构 - ITeye博客
用redis实现支持优先级的消息队列. 系统中引入消息队列机制是对系统一个非常大的改善. 例如一个web系统中,用户做了某项操作后需要发送邮件通知到用户邮箱中. 你可以使用同步方式让用户等待邮件发送完成后反馈给用户,但是这样可能会因为网络的不确定性造成用户长时间的等待从而影响用户体验. 有些场景下是不可能使用同步方式等待完成的,那些需要后台花费大量时间的操作.

使用Redis作为一个LRU缓存

- - 并发编程网 - ifeve.com
原文链接  译者:flychao88. 当用Redis作为一个LRU存储时,有些时候是比较方便的,在你增添新的数据时会自动驱逐旧的数据. 这种行为在开发者论坛是非常有名的,因为这是流行的memcached系统的默认行为. LRU实际上只是支持驱逐的方式之一. 这页包含更多一般的Redis maxmemory指令的话题用于限制内存使用到一个定额,同时它也深入的涵盖了Redis所使用的LRU算法,实际上是精确LRU的近似值.

快速的消息队列 SquirrelMQ

- Le - 开源中国社区最新软件
SquirrelMQ是一个快速的消息队列.   SquirrelMQ VS Redis 入队列: SquirrelMQ:100万条数据,105S,内存使用84MB. Redis:100万条数据,156S,内存使用127MB.   出队列:   SquirrelMQ:100万条数据,230S. Redis:100万条数据,163S.

Feed消息队列架构分析

- - Tim[后端技术]
最近一两年,大部分系统的数据流由基于日志的离线处理方式转变成实时的流式处理方式,并逐渐形成几种通用的使用方式,以下介绍微博的消息队列体系. 当前的主要消息队列分成如图3部分. 1、feed信息流主流程处理,图中中间的流程,通过相关MQ worker将数据写入cache、Redis及MySQL,以便用户浏览信息流.

高可用消息队列框架ZBUS

- - 企业架构 - ITeye博客
我们在日常开发中可以需要用到消息队列 当然我们完全可以自己写一个生产者-消费者框架 但是高可用性、实时性已经大量数据堆积时候就显得问题捉襟见肘了下面推荐的框架在我时间项目中和测试中都是非常不错那么他是什么框架呢.    zbus git地址. http://git.oschina.net/rushmore/zbus ZBUS=MQ+RPC 服务总线 1)支持消息队列, 发布订阅, RPC, 交易系统队列适配 2)亿级消息堆积能力、支持HA高可用 3)无依赖单个Jar包 ~300K 4)丰富的API--JAVA/C/C++/C#/Python/Node.JS多语言接入,支持HTTP等协议长连接入.

[转]消息队列的两种模式

- -
Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. 点对点与发布订阅最初是由JMS定义的. 这两种模式主要区别或解决的问题就是发送到队列的消息能否重复消费(多订阅).

消息队列设计精要

- - 美团点评技术团队
消息队列已经逐渐成为企业IT系统内部通信的核心手段. 它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一. 当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify、MetaQ、RocketMQ等.

深入浅出 消息队列 ActiveMQ

- - 编程语言 - ITeye博客
ActiveMQ 是Apache出品,最流行的、功能强大的. 即时通讯和集成模式的开源服务器. ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现. 提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能.

延迟消息队列设计

- -
由于Kafka不支持延迟消息,而目前公司技术栈中消息中间件使用的是Kafka,业务方希望使用RocketMQ满足延迟消息场景,但如果仅仅只是需要延迟消息功能而引入多一套消息中间件,这会增加运维与维护成本. 在此背景下,我们希望通过扩展Kafka客户端提供延迟消息的支持. 本篇将介绍四种延迟消息实现方案的原理,以及分析其优缺点.