为什么批量请求要尽可能的合并操作

标签: 合并 | 发表时间:2014-11-09 22:08 | 作者:nieyong
出处:http://www.blogjava.net/

前言

线上情况:

  1. 线上Redis集群,多个Twemproxy代理(nutcracker),LVS DR路由均衡调度
  2. 客户端使用Jedis操作Redis集群,一个程序进程实例使用原先1024个工作线程处理请求,若干个进程实例
  3. 一天超过22亿次请求,网络一般情况下,一天超过上万个连接失败异常
  4. 运维同学告知,LVS压力较大

改进工作:

  1. 工作线程由原先1024改用16个
  2. 每个线程每次最多操作1000个Redis命令批量提交

实际效果:

  1. 一天不到一亿次的请求量
  2. LVS压力大减
  3. CPU压力降低到原先1/3以下
  4. 单个请求抽样调研平均减少1-90毫秒时间(尤其是跨机房处理)

Redis支持批量提交

原生支持批量操作方式

一般命令前缀若添加上m字符串,表示支持多个、批量命令提交了。

显式的...

  MSET key value [key value ...]
MSETNX key value [key value ...]

HMGET key field [field ...]
HMSET key field value [field value ...]

一般方式的...

  HDEL key field [field ...]
SREM key member [member ...]
RPUSH key value [value ...]
......

更多,请参考: http://redis.cn/commands.html

pipeline管道方式

官方文档: http://redis.io/topics/pipelining

  1. Redis Client把所有命令一起打包发送到Redis Server,然后阻塞等待处理结果
  2. Redis Server必须在处理完所有命令前先缓存起所有命令的处理结果
  3. 打包的命令越多,缓存消耗内存也越多
  4. 不是打包的命令越多越好
  5. 实际环境需要根据命令执行时间等各种因素选择合并命令的个数,以及测试效果等

Java队列支持

一般业务、接入前端请求量过大,生产者速度过快,这时候使用队列暂时缓存会比较好一些,消费者直接直接从队列获取任务,通过队列让生产者和消费者进行分离这也是业界普通采用的方式。

监控队列

有的时候,若可以监控一下队列消费情况,可以监控一下,就很直观。同事为队列添加了一个监控线程,清晰明了了解队列消费情况。

示范

示范使用了Redis Pipeline,线程池,准备数据,生产者-消费者队列,队列监控等,消费完毕,程序关闭。

  /**
 * 以下测试在Jedis 2.6下测试通过
 * 
 * @author nieyong
 * 
 */
public class TestJedisPipeline {
    private static final int NUM = 512;
    private static final int MAX = 1000000; // 100W

    private static JedisPool redisPool;
    private static final ExecutorService pool = Executors.newCachedThreadPool();
    protected static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
            MAX); // 100W
    private static boolean finished = false;

    static {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxActive(64);
        config.setMaxIdle(64);

        try {
            redisPool = new JedisPool(config, "192.168.192.8", 6379, 10000,
                    null, 0);
        } catch (Exception e) {
            System.err.println("Init msg redis factory error! " + e.toString());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("prepare test data 100W");
        prepareTestData();
        System.out.println("prepare test data done!");

        // 生产者,模拟请求100W次
        pool.execute(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < MAX; i++) {
                    if (i % 3 == 0) {
                        queue.offer("del_key key_" + i);
                    } else {
                        queue.offer("get_key key_" + i);
                    }
                }
            }
        });

        // CPU核数*2 个工作者线程
        int threadNum = 2 * Runtime.getRuntime().availableProcessors();

        for (int i = 0; i < threadNum; i++)
            pool.execute(new ConsumerTask());

        pool.execute(new MonitorTask());

        Thread.sleep(10 * 1000);// 10sec
        System.out.println("going to shutdown server ...");
        setFinished(true);
        pool.shutdown();

        pool.awaitTermination(1, TimeUnit.MILLISECONDS);

        System.out.println("colse!");
    }

    private static void prepareTestData() {
        Jedis redis = redisPool.getResource();
        Pipeline pipeline = redis.pipelined();

        for (int i = 0; i < MAX; i++) {
            pipeline.set("key_" + i, (i * 2 + 1) + "");

            if (i % (NUM * 2) == 0) {
                pipeline.sync();
            }
        }
        pipeline.sync();
        redisPool.returnResource(redis);
    }

    // queue monitor,生产者-消费队列监控
    private static class MonitorTask implements Runnable {

        @Override
        public void run() {
            while (!Thread.interrupted() && !isFinished()) {
                System.out.println("queue.size = " + queue.size());
                try {
                    Thread.sleep(500); // 0.5 second
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }

    // consumer,消费者
    private static class ConsumerTask implements Runnable {
        @Override
        public void run() {
            while (!Thread.interrupted() && !isFinished()) {
                if (queue.isEmpty()) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                    }

                    continue;
                }

                List<String> tasks = new ArrayList<String>(NUM);
                queue.drainTo(tasks, NUM);
                if (tasks.isEmpty()) {
                    continue;
                }

                Jedis jedis = redisPool.getResource();
                Pipeline pipeline = jedis.pipelined();

                try {
                    List<Response<String>> resultList = new ArrayList<Response<String>>(
                            tasks.size());

                    List<String> waitDeleteList = new ArrayList<String>(
                            tasks.size());

                    for (String task : tasks) {
                        String key = task.split(" ")[1];
                        if (task.startsWith("get_key")) {
                            resultList.add(pipeline.get(key));
                            waitDeleteList.add(key);
                        } else if (task.startsWith("del_key")) {
                            pipeline.del(key);
                        }
                    }

                    pipeline.sync();

                    // 处理返回列表
                    for (int i = 0; i < resultList.size(); i++) {
                        resultList.get(i).get();
                        // handle value here ...
                        // System.out.println("get value " + value);
                    }

                    // 读取完毕,直接删除之
                    for (String key : waitDeleteList) {
                        pipeline.del(key);
                    }

                    pipeline.sync();
                } catch (Exception e) {
                    redisPool.returnBrokenResource(jedis);
                } finally {
                    redisPool.returnResource(jedis);
                }
            }
        }
    }

    private static boolean isFinished(){
        return finished;
    }

    private static void setFinished(boolean bool){
        finished = bool;
    }
}

代码作为示范。若线上则需要处理一些异常等。

小结

若能够批量请求进行合并操作,自然可以节省很多的网络带宽、CPU等资源。有类似问题的同学,不妨考虑一下。



nieyong 2014-11-09 22:08 发表评论

相关 [合并] 推荐:

java合并PDF

- - Java - 编程语言 - ITeye博客
15.         * * 合並pdf文件 * * @param files 要合並文件數組(絕對路徑如{ "e:\\1.pdf", "e:\\2.pdf" ,. 17.         * 合並後新產生的文件絕對路徑如e:\\temp.pdf,請自己刪除用過後不再用的文件請 * @return boolean.

mysql-merge合并表

- - CSDN博客编程语言推荐文章
注意: 1 每个子表的结构必须一致,主表和子表的结构需要一致, 2 每个子表的索引在merge表中都会存在,所以在merge表中不能根据该索引进行唯一性检索. 3 子表需要是MyISAM引擎 4 AUTO_INCREMENT 不会按照你所期望的方式工作. 建表语句 create table tablename(正常的字段)engine=merge insert_method=last insert_method: 有两个值如下: LAST 如果你执行insert 指令来操作merge表时,插入操作会把数据添加到最后一个子表中.

合并和比较度量

- liang - SEM WATCH
往往我们在做分析的时候需要结合各类基本的指标进行二次计算合并得到一个可以用于进行综合评价或比较的度量,这个过程中就需要涉及到一些指标的合并技巧,和比较基准的设定. 其实之前“数据上下文”的系列文章中也一再强调了我们需要为指标设定合理的参考系来评价指标的趋势或表现的好坏,之前提供了一系列的方法,但这篇文章里面要介绍的方法应该是最简单方便的,同时不失实用性,得益于《用户体验度量》这本书中的介绍,所以这篇文章更像是一篇读书笔记,内容基本整理总结自《用户体验度量》第8章——合并和比较度量,当然不再局限于用户体验层面,结合了网站分析层面的思考.

ffmpeg裁剪合并视频

- - inJava
这里裁剪是指时间轴裁剪,不是空间裁剪. 比如说,你想把视频的从一分20秒开始,30秒的视频裁剪出来,保存成一个视频. ffmpeg提供简单的命令参数:. -ss 开始时间,如: 00:00:20,表示从20秒开始;. -t 时长,如: 00:00:10,表示截取10秒长的视频;. -i 输入,后面是空格,紧跟着就是输入视频文件;.

hive小文件合并

- - 互联网 - ITeye博客
    hive仓库表数据最终是存储在HDFS上,由于Hadoop的特性,对大文件的处理非常高效. 而且大文件可以减少文件元数据信息,减轻NameNode的存储压力. 但是在数据仓库中,越是上层的表汇总程度就越高,数据量也就越小,而且这些表通常会有日期分区,随着时间的推移,HDFS的文件数目就会逐步增加.

SVN:合并一个分支到主干

- - P.Linux Laboratory
本文内容遵从 CC版权协议, 可以随意转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明网址: http://www.penglixun.com/tech/program/svn_merge_branch_trunk.html. 原文在此,我只是翻译: http://www.sepcot.com/blog/2007/04/SVN-Merge-Branch-Trunk.

优化 RequireJS 项目(合并与压缩)

- - 博客 - 伯乐在线
英文原文: Optimize (Concatenate and Minify) RequireJS Projects,编译: oschina. 本文将演示如何合并与压缩一个基于RequireJS的项目. 本文中将用到苦干个工具,这其中就包括 Node.js. 因此,如果你手头上还没有Node.js可以 点击此处下载一个.

hbase权威指南: store file合并(compaction)

- - CSDN博客推荐文章
          hbase为了防止小文件(被刷到磁盘的menstore)过多,以保证保证查询效率,hbase需要在必要的时候将这些小的store file合并成相对较大的store file,这个过程就称之为compaction. 在hbase中,主要存在两种类型的compaction:minor  compaction和major compaction.

Hadoop Namenode HA 合并到主干

- - NoSQLFan
Hadoop 的 Namenode 单点问题一直广受诟病,而这个问题最近将会得到解决,对Namenode 的HA方案已经完成实施并合并到主干,经过严格的测试后将会在后续版本中发布. HA方案中,主要进行了如下的一些工作:. 其主要原理是将NameNode分为两种角色,Active和Standby,Active就是正在进行服务的NameNode,而Standby又分三种情况.

SVN:分支合并到主干

- - CSDN博客系统运维推荐文章
1.先把主干代码下载到本地. 3.svn merge 分支目录    . 4.遇到冲突, 请见合并日志,选择"p",记下出冲突的文件,人工编辑.  4.1 比如Index.action出冲突了,vi Index.action.  4.2 vi完成以后,删除冲突的文件 rm -f Index.action.*.