自定义异步线程池,实现异步请求以及监控当前线程运行情况

标签: 定义 异步 线程池 | 发表时间:2014-04-14 14:02 | 作者:gp32965465
出处:http://www.iteye.com

由于工作的需要,写了一份异步调用的小框架,分享出来。。。

 

启动类:

 

/**
 * 线程启动
 * @author yfguopeng
 */
public class ThreadExecutorListener  implements ServletContextListener{
	private final static Log log = LogFactory.getLog(ThreadExecutorListener.class);

	@SuppressWarnings("unchecked")
	public void contextInitialized(ServletContextEvent sce) {
		ServletContext servletContext = sce.getServletContext();
		WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(servletContext);
		List<ThreadConfigBean> worders = (List<ThreadConfigBean>) wac.getBean("workers"); 
		log.info("=====================初始化线程池========================");
		//创建线程组
		SecurityManager s = System.getSecurityManager();
		ThreadGroup  father = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
		ThreadGroup group = new ThreadGroup(father, "root-threadgroup");
		
		for (ThreadConfigBean configBean : worders) {
			//设置排队队列大小
			ArrayBlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(configBean.getQueueCapacity());
			//设置线程工厂
			ThreadFactory threadFactory = new DecorateThreadFactory(new ThreadGroup(group,configBean.getBusinessId()),configBean.getBusinessId());
			
			ThreadPoolExecutor worker = new ThreadPoolExecutor(configBean.getBusinessId(),configBean.getMin(), configBean.getMax(), configBean.getKeepAliveTime(),
                    TimeUnit.SECONDS, taskQueue, threadFactory, configBean.getRejectHandler());
			
			ThreadGroupUtil.addThreadWorker(configBean.getBusinessId(), worker);
		}
		
		log.info("=====================线程池初始化完毕========================");
		log.info("=====================初始化监控线程========================");
		ThreadGroupUtil.monitorThreadStart(group,2000l);
		log.info("=====================监控线程初始化完毕========================");
	}

	public void contextDestroyed(ServletContextEvent sce) {
	}

	
}

 

 

业务监控配置:

 

<bean id="xxxIndex" class="xxx.xxx.xxx.xxx.web.thread.ThreadConfigBean">
    	<property name="businessId" value="xxxIndex"></property><!-- 业务ID,唯一 -->
    	<property name="max" value="40"></property><!-- 最好为请求线程的倍数 -->
    	<property name="min" value="10"></property><!-- 最好为请求线程的倍数 -->
    	<property name="queueCapacity" value="80"></property><!-- 最好为请求线程的倍数 -->
    	<property name="keepAliveTime" value="600"></property><!-- 线程空闲保存时间 -->
    	<property name="rejectHandler" ><!--任务拒绝处理策略 -->
	    	<bean class="com.jd.m.pay.web.thread.RejectedPolicyHandler" >
	    		<property name="bizName" value="pay-index"></property><!-- 业务ID,唯一 -->
	    	</bean>
    	</property>
    </bean>

	<bean id="workers" class="java.util.ArrayList">
		<constructor-arg >
			<list> 
				<ref bean="xxxIndex"/>
			</list>
		</constructor-arg>
	</bean>

 线程工厂:

 

 

/**
 *  线程工厂, 加入了线程名的业务描述
 * 
 * @User: guopeng
 * @Date: 2013-02-28
 */
public class DecorateThreadFactory implements ThreadFactory {
    static final AtomicInteger poolNumber = new AtomicInteger(1);
    final ThreadGroup group;
    final AtomicInteger threadNumber = new AtomicInteger(1);
    final String namePrefix;

    public DecorateThreadFactory(final ThreadGroup group,final String bizName) {
        this.group = group;
        namePrefix = bizName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

 监控线程:

/**
 * 监控业务线程池运行情况
 * @author yfguopeng
 * @Date 2013-02-28
 */
public class MonitorThread implements Runnable {
	private final static Log log = LogFactory.getLog(MonitorThread.class);
	private final ThreadGroup group;
	
	public MonitorThread(ThreadGroup group) {
		this.group = group;
	}
	
	public void run() {
		Map<String, ThreadPoolExecutor>workers =  ThreadGroupUtil.getThreadWorkers();
		Iterator<String> iterator = workers.keySet().iterator();
		
		log.info("total threadpools:[ "+workers.size()+" ],total threads:[ "+group.activeCount()+" ]");
		while(iterator.hasNext()) {
			ThreadPoolExecutor worker = ThreadGroupUtil.getThreadWorker(iterator.next());
			
			RejectedExecutionHandler handler = worker.getRejectedExecutionHandler();
			String rejectedSize = "";
			if (RejectedPolicyHandlerInteface.class.isAssignableFrom(handler.getClass())) {
				rejectedSize = " ],rejected threads:[ "+((RejectedPolicyHandlerInteface) handler).getRejectedSize();
			}
			
			log.info("business name:[ "+worker.getBizName()+" ]" +
					", core threads:[ "+worker.getCorePoolSize()+" ], max threads:[ "+worker.getMaximumPoolSize()+" ]" +
							", queue capacitys:[ "+worker.getQueue().size()+" ], running threads:[ "+worker.getActiveCount()+"] " +
									", reject threads:[ "+rejectedSize+" ],  largest threads:[ "+worker.getLargestPoolSize()+" ], complete threads:[ "+worker.getCompletedTaskCount()+" ]");
		}
	}

}

 线程拒绝处理器:

/**
 * 线程拒绝执行控制球
 * @author yfguopeng
 * @Date 2013-02-28
 */
public class RejectedPolicyHandler extends ThreadPoolExecutor.AbortPolicy implements RejectedPolicyHandlerInteface{
	private final static Log log = LogFactory.getLog(RejectedPolicyHandler.class);
	private static AtomicLong totals = new AtomicLong(0l);
	
	private String bizName;
	
	public RejectedPolicyHandler(){}
	
	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		String tip = "["+bizName+"] 线程忙,请求被拒绝.max: "+executor.getMaximumPoolSize()+", queue: "+executor.getQueue().size();
		log.info(tip);
		//业务报警 TODO
		totals.addAndGet(1);
		super.rejectedExecution(r, executor);
	}

	public String getBizName() {
		return bizName;
	}

	public void setBizName(String bizName) {
		this.bizName = bizName;
	}
	
	public long getRejectedSize() {
		return totals.get();
	}
}

 

import java.util.concurrent.RejectedExecutionHandler;

public interface RejectedPolicyHandlerInteface extends RejectedExecutionHandler{
	public long getRejectedSize() ;
}

 线程配置bean:

@SuppressWarnings("serial")
public class ThreadConfigBean implements Serializable{
	/**
	 * 业务ID
	 */
	private String businessId;
	/**
	 * 任务队列最大线程数
	 * 默认:80
	 */
	private Integer max = 160;
	/**
	 * 任务队列最小线程数
	 * 默认:40
	 */
	private Integer min = 80;
	/**
	 * 等待队列请求数
	 * 默认:300
	 */
	private Integer queueCapacity = 300;
	/**
	 * 空闲线程存活时间
	 * 默认:3分钟
	 */
	private Long keepAliveTime = 3 * 60l;
	/**
	 * 线程拒绝策略
	 */
	private RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
	
	public ThreadConfigBean() {
		super();
	}
	
	public Integer getMax() {
		return max;
	}
	public void setMax(Integer max) {
		this.max = max;
	}
	public Integer getMin() {
		return min;
	}
	public void setMin(Integer min) {
		this.min = min;
	}
	public Integer getQueueCapacity() {
		return queueCapacity;
	}
	public void setQueueCapacity(Integer queueCapacity) {
		this.queueCapacity = queueCapacity;
	}
	public Long getKeepAliveTime() {
		return keepAliveTime;
	}
	public void setKeepAliveTime(Long keepAliveTime) {
		this.keepAliveTime = keepAliveTime;
	}
	public RejectedExecutionHandler getRejectHandler() {
		return rejectHandler;
	}
	public void setRejectHandler(RejectedExecutionHandler rejectHandler) {
		this.rejectHandler = rejectHandler;
	}
	public String getBusinessId() {
		return businessId;
	}
	public void setBusinessId(String businessId) {
		this.businessId = businessId;
	}
}

 线程组:

/**
 * 各个业务获取响应线程池
 * @author yfguopeng
 */
public class ThreadGroupUtil {
	private static Map<String, ThreadPoolExecutor> threadworkers;
	private static ScheduledExecutorService monitorThread;//监视线程
	private static final long delay = 200l;
	private static long cycle_default = 5000l;
	
	static {
		threadworkers = new ConcurrentHashMap<String, ThreadPoolExecutor>();
		monitorThread = Executors.newScheduledThreadPool(1);
	}
	
	public static void addThreadWorker(String bizName,ThreadPoolExecutor executor){
		threadworkers.put(bizName, executor);
	}
	
	public static ThreadPoolExecutor getThreadWorker(String bizName) {
		return threadworkers.get(bizName);
	}
	
	public static Map<String, ThreadPoolExecutor> getThreadWorkers(){
		return threadworkers;
	}

	public static ScheduledExecutorService getMonitorThread() {
		return monitorThread;
	}

	public static void setMonitorThread(ScheduledExecutorService monitorThread) {
		ThreadGroupUtil.monitorThread = monitorThread;
	}
	
	public static void monitorThreadClosed(){
		if (monitorThread != null)
			if (!monitorThread.isTerminated()) 
				monitorThread.shutdown();
	}
	
	public static void monitorThreadStart(ThreadGroup group,Long cycle){
		MonitorThread monitor = new MonitorThread(group);
		if (cycle > 0l) {
			try {
				cycle_default = cycle;
			} catch (Exception e) {
			}
		}
		monitorThread.scheduleWithFixedDelay(monitor, delay, cycle_default, TimeUnit.MILLISECONDS);
	}
}

 线程池实现类:

/**
 * 线程池
 * @author yfguopeng
 *
 */
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

	private String bizName;
	
	public ThreadPoolExecutor(String bizName,int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
			RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
				threadFactory, handler);
		this.bizName = bizName;
	}

	public String getBizName() {
		return bizName;
	}

	public void setBizName(String bizName) {
		this.bizName = bizName;
	}
	
}

 

web.xml配置:

  <listener>
		<listener-class>xxx.xx.xx.xxx.web.thread.ThreadExecutorListener</listener-class>
	</listener> 

调用:

		ThreadPoolExecutor exc = ThreadGroupUtil.getThreadWorker("xxxIndex");
		
		String payOrgInfo = null;
		String cards = null;
		Future<String> xxxFuture = null;
		Future<String> yyyFuture = null;
		long start = System.currentTimeMillis();
		xxxTask xxxTask = new xxxTask(//参数);
		yyyTask yyyTask = new yyyTask(//参数);
		System.out.println("开始......");
		xxxFuture = exc.submit(xxxTask );
		yyyFuture = exc.submit(yyyTask );
		
		try {
			xxx= xxxFuture .get();
			yyy= yyyFuture .get();
			System.out.println(xxx);
			System.out.println(yyy);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		long end = System.currentTimeMillis();
		System.out.println("结束......   "+(end-start));
		
		return "";
	

 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [定义 异步 线程池] 推荐:

自定义异步线程池,实现异步请求以及监控当前线程运行情况

- - 企业架构 - ITeye博客
由于工作的需要,写了一份异步调用的小框架,分享出来. /** * 线程启动 * @author yfguopeng */ public class ThreadExecutorListener implements ServletContextListener{. log.info("=====================初始化线程池========================");.

Java线程池

- - 企业架构 - ITeye博客
线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的. 在jdk1.5之后这一情况有了很大的改观. Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用. 为我们在开发中处理线程的问题提供了非常大的帮助.

Java 线程池

- - 编程语言 - ITeye博客
在项目中,系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互. 在这种情形下,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存周期很短的线程时,更应该考虑使用线程池. 使用线程池可以有效地控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过此数.

java线程池分析

- - BlogJava-首页技术区
    在Java 5.0之前启动一个任务是通过调用Thread类的start()方法来实现的,任务的提于交和执行是同时进行的,如果你想对任务的执行进行调度或是控制 同时执行的线程数量就需要额外编写代码来完成. 5.0里提供了一个新的任务执行架构使你可以轻松地调度和控制任务的执行,并且可以建立一个类似数据库连接 池的线程池来执行任务.

Java线程池应用

- - CSDN博客架构设计推荐文章
1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务. 2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机). Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具.

Java线程池总结

- - Java - 编程语言 - ITeye博客
  假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间. 当T1 + T3 远大于 T2时,采用多线程技术可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力.     线程池就是一个线程的容器,每次只执行额定数量的线程, 线程池作用就是限制系统中执行线程的数量.

java 线程池原理及几种线程池详解

- - CSDN博客综合推荐文章
服务器经常出现处理大量单个任务处理的时间很短而请求的数目却是巨大的请求. 构建服务器应用程序的一个过于简单的模型应该是:每当一个请求到达就创建一个新线程,然后在新线程中为请求服务. 实际上,对于原型开发这种方法工作得很好,但如果试图部署以这种方式运行的服务器应用程序,那么这种方法的严重不足就很明显.

Spring提供的线程池支持

- - 博客园_首页
核心提示:一旦企业应用越来越复杂时(比如,基于流程服务器的EIS),它们对相关技术也提出了更高的要求. 在使用 EJB 3.0组件技术开发企业应用过程中,它们能够享受到EJB容器提供的线程池、任务调度(@Timeout)服务. 现如今,运行于Web容器的Web应用、单独的桌面应用. 一旦企业应用越来越复杂时(比如,基于流程服务器的EIS),它们对相关技术也提出了更高的要求.

用线程池启动定时器

- - BlogJava-首页技术区
(1)调用ScheduledExecutorService的schedule方法,返回的ScheduleFuture对象可以取消任务. (2)支持间隔重复任务的定时方式,不直接支持绝对定时方式,需要转换成相对时间方式.             System.out.println("响");           .

Web容器线程池机制小议

- - ITeye博客
从刚开始学习java,我们就被告知Java是一种支持多线程的语言,每条程序指令都会在一个线程中执行,而启动主线程的入口,是可执行类中的main方法. 我们可以在main方法或其调用的方法中创建新的线程以实现多线程、并发处理的效果. Java入门资料上介绍线程时往往会说明一点,创建线程不是免费的,是有成本的--对内存的消耗、对CPU切换调度的消耗都是成本,所以像数据库连接池这类“创建昂贵型”资源一样,创建好的线程优先被复用而不是每次都创建新的,这就是线程池出现的原因.