【使用Redis构建任务队列】 | bboyjing's blog

标签: | 发表时间:2018-12-19 17:04 | 作者:
出处:http://bboyjing.github.io

本章节将学习如何使用Redis来构建任务队列,实现类似RabbitMQ之类的消息队列的功能。

先进先出队列

依然使用之前虚构的例子,现在来实现通过电子邮件来订阅商品交易市场中已售出的商品的相关信息。因为对外发电子邮件可能会有非常高的延迟,甚至可能会出现发送失败的情况,所以将使用任务队列来记录邮件的收信人以及发送邮件的原因,并构建一个可以在邮件发送服务器运行变得缓慢的时候,以并行方式一次发送多封邮件的工作进程。我们要编写的队列将以“先到先服务”的方式发送邮件,并且无论发送是否成功,程序都会把结果记录到日志中。
邮件队列采用List构成,存储JSON格式的字符串,数据结构如下:

  • key
    • queue:email
  • list value
    • “{“seller_id”:1,”item_id”:”Item_M”,”price”:97,”buyer_id”:27,”time”:1482399576744}”
    • ××××××
            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
            
//将信息推入邮件列表
publicvoidsendSoldEmailViaQueue(intseller, String item,intprice,intbuyer){
JSONObject jsonObject =newJSONObject();
jsonObject.put("seller_id", seller);
jsonObject.put("item_id", item);
jsonObject.put("price", price);
jsonObject.put("buyer_id", buyer);
jsonObject.put("time", System.currentTimeMillis());
stringRedisTemplate.opsForList().rightPush("queue:email", jsonObject.toJSONString());
}
//处理待发邮件
publicvoidprocessEmailQueue(){
while(true) {
/**
* 给了超时参数的leftPop方法调用了bLPop命令
* 当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。
*/
String packed = stringRedisTemplate.opsForList().leftPop("queue:email",30l, TimeUnit.SECONDS);
JSONObject jsonObject = JSONObject.parseObject(packed);
System.out.println(jsonObject.getString("item_id"));
}
}

上面已经实现了简单的队列,下面再看下如果要执行的任务不止一种,该怎么办?

多个可执行任务

将采用注册回调函数的方式来实现执行指定的任务,队列中存储的格式为[FUNCTION_NAME,[ARG1,ARG2,ARG3…]],书上的例子是python写的,动态语言实现这样的功能非常方便,但是用Java来写的话,就有点恶心了,还是简单来实现下吧。数据格式如下:

  • key
    • queue:task
  • list value
    • “[SendEmailTask,[1,Item_M,97,99]]”
    • ××××××
            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
            
//将任务推入队列
publicvoidsendTaskViaQueue(String queueName, String functionName, String ... args){
StringBuilder task =newStringBuilder();
task.append("[").append(functionName);
if(args.length >0){
task.append(",[");
for(String arg : args) {
task.append(arg).append(",");
}
task.deleteCharAt(task.length() -1);
task.append("]");
}
task.append("]");
stringRedisTemplate.opsForList().rightPush(queueName, task.toString());
}
//处理队列中的任务
publicvoidprocessTaskQueue(String queueName)throwsException{
while(true) {
String packed = stringRedisTemplate.opsForList().leftPop(queueName,30l, TimeUnit.SECONDS);
String functionName;
String args;
intindex = packed.indexOf(",");
if(index >0){
functionName = packed.substring(1, index);
args = packed.substring(index +2, packed.length() -2);
}else{
functionName = packed.substring(1, packed.length() -1);
args ="";
}
//此处可以用Spring管理Bean
Class taskClass = Class.forName("cn.didadu.queue."+ functionName);
ITask task = (ITask) taskClass.newInstance();
task.execute(args.split(","));
}
}
//处理任务接口
publicinterfaceITask{
voidexecute(String ... args);
}
//处理任务实现
publicclassSendEmailTaskimplementsITask{
@Override
publicvoidexecute(String... args){
for(String arg: args) {
System.out.println("send email process running: "+ arg);
}
}
}

任务优先级

假如多个任务之间存在优先级,上面一个例子实现了发送邮件任务,现在有另外任务需要发送消息,发送邮件的优先级比发送消息高。这个问题也很容易解决。BLPOP天然支持,因为BLPOP可以接收多个队列,当给定多个key参数时,按参数key的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。只需要稍微修改下上述处理任务的方法即可:

             
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
             
publicvoidprocessTaskQueue(String[] queueNames)throwsException{
while(true) {
//leftPop参数不支持多个key,需要自己实现下
byte[][] keys =newbyte[queueNames.length][];
inti =0;
for(String queueName : queueNames){
keys[i++] = queueName.getBytes();
}
RedisCallback<List<byte[]>> redisCallback = connection -> connection.bLPop(30, keys);
String packed =newString(stringRedisTemplate.execute(redisCallback).get(1));
String functionName;
String args;
intindex = packed.indexOf(",");
if(index >0){
functionName = packed.substring(1, index);
args = packed.substring(index +2, packed.length() -2);
}else{
functionName = packed.substring(1, packed.length() -1);
args ="";
}
//此处可以用Spring管理Bean
Class taskClass = Class.forName("cn.didadu.queue."+ functionName);
ITask task = (ITask) taskClass.newInstance();
task.execute(args.split(","));
}
}

延迟任务

本节尝试构造一个具有延迟执行任务的队列,将所有需要在未来执行的任务都添加到有序集合中,并将任务的执行时间设置分值,另外再使用一个进程来查找有序集合中是否存在可以立即被执行的任务,如果有的话,就从有序集合中移除那个任务,并将任务重新添加到适当的任务队列中,来实现延迟特性。
有序集合队列存储的每个被延迟执行的任务是一个包含4个值的JSON串,分别为:唯一标识符、处理任务的队列的名字、处理任务的回调函数的名字、传给回调函数的参数。格式如下:

  • key
    • delayed:
  • list value
    • “{“task_id”:aa-bb-cc,”queue_name”:”queue:email”,”function_name”:”SendEmailTask”,”args”:[“1”, “Item_M”, “97”, “99”]}”
    • ××××××
            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
            
//添加到延迟任务列表
publicStringsendTaskViaQueue(String queueName, String functionName,longdelay,String ... args){
String task_id = UUID.randomUUID().toString();
JSONObject jsonObject =newJSONObject();
jsonObject.put("task_id", task_id);
jsonObject.put("queue_name", queueName);
jsonObject.put("function_name", functionName);
jsonObject.put("args", args);
stringRedisTemplate.opsForZSet().add(
"delayed:",
jsonObject.toJSONString(),
System.currentTimeMillis() + delay);
returntask_id;
}
//处理延迟任务队列
publicvoidprocessTaskQueue()throwsException{
while(true) {
//获取延迟任务列表中的第一个任务
Set<ZSetOperations.TypedTuple<String>> delayTaskSet =
stringRedisTemplate.opsForZSet().rangeWithScores("delayed:",0,0);
//Redis没有直接提供阻塞有序集合的方法,需要自己检测
if(delayTaskSet.size() ==0){
Thread.sleep(1000);
continue;
}
//获取任务信息
String delayTaskStr ="";
longdelay =0;
for(ZSetOperations.TypedTuple typedTuple : delayTaskSet){
delayTaskStr = (String) typedTuple.getValue();
delay = typedTuple.getScore().longValue();
break;
}
//若还未到执行时间,等待一会儿继续loop
if(delay > System.currentTimeMillis()){
Thread.sleep(1000);
continue;
}
//将到执行时间的任务推入适当的任务队列中,并删除记录
JSONObject jsonObject = JSONObject.parseObject(delayTaskStr);
JSONArray jsonArray = jsonObject.getJSONArray("args");
intargsSize = jsonArray ==null?0: jsonArray.size();
String[] args =newString[argsSize];
for(inti =0; i <jsonArray.size(); i++){
args[i] = jsonArray.getString(i);
}
multiTaskQueueService.sendTaskViaQueue(
jsonObject.getString("queue_name"),
jsonObject.getString("function_name"),
args);
stringRedisTemplate.opsForZSet().remove("delayed:", delayTaskStr);
}
}

Redis构建队列的学习就到此结束,示例代码可能无法直接使用到实际项目中,但是其实现思路以及对Redis数据结构的使用还是值得学习的。

相关 [redis 任务 队列] 推荐:

【使用Redis构建任务队列】 | bboyjing's blog

- -
本章节将学习如何使用Redis来构建任务队列,实现类似RabbitMQ之类的消息队列的功能. 依然使用之前虚构的例子,现在来实现通过电子邮件来订阅商品交易市场中已售出的商品的相关信息. 因为对外发电子邮件可能会有非常高的延迟,甚至可能会出现发送失败的情况,所以将使用任务队列来记录邮件的收信人以及发送邮件的原因,并构建一个可以在邮件发送服务器运行变得缓慢的时候,以并行方式一次发送多封邮件的工作进程.

redis作为消息队列的使用

- - ITeye博客
在redis支持的数据结构中,有一个是集合list. 对List的操作常见的有lpush  lrange等. 在这种常见的操作时,我们是把集合当做典型意义上的‘集合’来使用的. 往往容易被忽视的是List作为“队列”的使用情况. 反编译redis的jar包,会发现:.  pop意为“弹”,是队列里的取出元素.

使用redis把队列的异步返回改成同步 - 队列使用

- - ITeye博客
web编程开发中,会遇到资源争用的情况. 有多个商品,商品抢单,每个商品都有数量限制. 但凡遇到此类问题,自古以来,就有两种解决方式:1、使用锁,2、使用队列. 使用队列的方式最为简单,不考虑加锁. 把所有的请求都放入队列,然后把队列处理的结果返回给客户端. 如果商品太多,可以按商品大类分成多个队列.

用redis实现支持优先级的消息队列

- - 企业架构 - ITeye博客
用redis实现支持优先级的消息队列. 系统中引入消息队列机制是对系统一个非常大的改善. 例如一个web系统中,用户做了某项操作后需要发送邮件通知到用户邮箱中. 你可以使用同步方式让用户等待邮件发送完成后反馈给用户,但是这样可能会因为网络的不确定性造成用户长时间的等待从而影响用户体验. 有些场景下是不可能使用同步方式等待完成的,那些需要后台花费大量时间的操作.

Redis 负载监控——redis-monitor

- - ITeye资讯频道
redis-monitor是一个Web可视化的 redis 监控程序. 使用 Flask 来开发的,代码结构非常简单,适合移植到公司内网使用. redis 服务器信息,包括 redis 版本、上线时间、 os 系统信息等等. 实时的消息处理信息,例如处理 command 数量、连接总数量等. 内存占用、 cpu 消耗实时动态图表.

Redis 起步

- - 博客园_首页
Rdis和JQuery一样是纯粹为应用而产生的,这里记录的是在CentOS 5.7上学习入门文章:. Redis是一个key-value存储系统. 和Memcached类似,但是解决了断电后数据完全丢失的情况,而且她支持更多无化的value类型,除了和string外,还支持lists(链表)、sets(集合)和zsets(有序集合)几种数据类型.

redis 配置

- - 谁主沉浮
# 当配置中需要配置内存大小时,可以使用 1k, 5GB, 4M 等类似的格式,其转换方式如下(不区分大小写). # 内存配置大小写是一样的.比如 1gb 1Gb 1GB 1gB. # daemonize no 默认情况下,redis不是在后台运行的,如果需要在后台运行,把该项的值更改为yes. # 当redis在后台运行的时候,Redis默认会把pid文件放在/var/run/redis.pid,你可以配置到其他地址.

Cassandra代替Redis?

- - Tim[后端技术]
最近用Cassandra的又逐渐多了,除了之前的360案例,在月初的QCon Shanghai 2013 篱笆网也介绍了其使用案例. 而这篇 百万用户时尚分享网站feed系统扩展实践文章则提到了Fashiolista和Instagram从Redis迁移到Cassandra的案例. 考虑到到目前仍然有不少网友在讨论Redis的用法问题,Redis是一个数据库、内存、还是Key value store?以及Redis和memcache在实际场景的抉择问题,因此简单谈下相关区别.

redis 部署

- - CSDN博客云计算推荐文章
一、单机部署 tar xvf redis-2.6.16.tar.gz cd redis-2.6.16 make make PREFIX=/usr/local/redis install  #指定安装目录为/usr/local/redis,默认安装安装到/usr/local/bin. # chkconfig: 2345 80 10       #添加redhat系列操作系统平台,开机启动需求项(运行级别,开机时服务启动顺序、关机时服务关闭顺序) # description:  Starts, stops redis server.

nagios 监控redis

- - C1G军火库
下载check_redis.pl. OK: REDIS 2.6.12 on 192.168.0.130:6379 has 1 databases (db0) with 49801 keys, up 3 days 14 hours - connected_clients is 1, blocked_clients is 0 | connected_clients=1 blocked_clients=0.