<< kettle中通过 时间戳(timestamp)方式 来实现数据库的增量同步操作(一) - Armin - 博客园 | 首页 | 架构高性能海量图片服务器的技术要素 - 北游运维 - 开源中国社区 >>

大数据处理系列之(一)Java线程池使用 - cstar(小乐) - 博客园

ThreadPoolExecutor有界队列使用

public class ThreadPool {

         private final static String poolName = "mypool";

         static private ThreadPool threadFixedPool = null;

         public ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

         private ExecutorService executor;

 

         static public ThreadPool getFixedInstance() {

                   return threadFixedPool;

         }

         private ThreadPool(int num) {

                   executor = new ThreadPoolExecutor(2, 4,60,TimeUnit.SECONDS, queue,new DaemonThreadFactory

(poolName), new ThreadPoolExecutor.AbortPolicy());

         }

         public void execute(Runnable r) {

                   executor.execute(r);

         }

        

         public static void main(String[] params) {

                   class MyRunnable implements Runnable {

                            public void run() {

                                     System.out.println("OK!");

                                     try {

                                               Thread.sleep(10);

                                     } catch (InterruptedException e) {

                                               e.printStackTrace();

                                     }

                            }

                   }

                   int count = 0;

                   for (int i = 0; i < 10; i++) {

                            try {

                                     ThreadPool.getFixedInstance().execute(new MyRunnable());

                            } catch (RejectedExecutionException e) {

                                     e.printStackTrace();

                                     count++;

                            }

                   }

                   try {

                            log.info("queue size:" + ThreadPool.getFixedInstance().queue.size());

                            Thread.sleep(2000);

                   } catch (InterruptedException e) {

                            e.printStackTrace();

                   }

                   System.out.println("Reject task: " + count);

         }

}

       首先我们来看下这段代码几个重要的参数,corePoolSize 为2,maximumPoolSize为4,任务队列大小为2,每个任务平

均处理时间为10ms,一共有10个并发任务。

      执行这段代码,我们会发现,有4个任务失败了。这里就验证了我们在上面提到有界队列时候线程池的执行顺序。当新任务在

方法 execute(Runnable) 中提交时, 如果运行的线程少于 corePoolSize,则创建新线程来处理请求。 如果运行的线程多于

corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程,如果此时线程数量达到maximumPoolSize,并且队

列已经满,就会拒绝继续进来的请求。

    现在我们调整一下代码中的几个参数,将并发任务数改为200,执行结果Reject task: 182,说明有18个任务成功了,线程处理

完一个请求后会接着去处理下一个过来的请求。在真实的线上环境中,会源源不断的有新的请求过来,当前的被拒绝了,但只要线

程池线程把当下的任务处理完之后还是可以处理下一个发送过来的请求。

     通过有界队列可以实现系统的过载保护,在高压的情况下,我们的系统处理能力不会变为0,还能正常对外进行服务,虽然有些服

务可能会被拒绝,至于如何减少被拒绝的数量以及对拒绝的请求采取何种处理策略我将会在下一篇文章《系统的过载保护》中继续

阐述。

 

参考文献:

  1. ThreadPoolExecutor使用与思考(上)-线程池大小设置与BlockedQueue的三种实现区别 http://dongxuan.iteye.com/blog/901689
  2. ThreadPoolExecutor使用与思考(中)-keepAliveTime及拒绝策略http://dongxuan.iteye.com/blog/902571
  3. ThreadPoolExecutor源代码
  4. Java线程池介绍以及简单实例 http://wenku.baidu.com/view/e4543a7a5acfa1c7aa00cc25.html

阅读全文……

标签 : ,



发表评论 发送引用通报