自定义异步线程池,实现异步请求以及监控当前线程运行情况
- - 企业架构 - ITeye博客由于工作的需要,写了一份异步调用的小框架,分享出来. /**
* 线程启动
* @author yfguopeng
*/
public class ThreadExecutorListener implements ServletContextListener{. log.info("=====================初始化线程池========================");.
由于工作的需要,写了一份异步调用的小框架,分享出来。。。
启动类:
/** * 线程启动 * @author yfguopeng */ public class ThreadExecutorListener implements ServletContextListener{ private final static Log log = LogFactory.getLog(ThreadExecutorListener.class); @SuppressWarnings("unchecked") public void contextInitialized(ServletContextEvent sce) { ServletContext servletContext = sce.getServletContext(); WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(servletContext); List<ThreadConfigBean> worders = (List<ThreadConfigBean>) wac.getBean("workers"); log.info("=====================初始化线程池========================"); //创建线程组 SecurityManager s = System.getSecurityManager(); ThreadGroup father = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); ThreadGroup group = new ThreadGroup(father, "root-threadgroup"); for (ThreadConfigBean configBean : worders) { //设置排队队列大小 ArrayBlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(configBean.getQueueCapacity()); //设置线程工厂 ThreadFactory threadFactory = new DecorateThreadFactory(new ThreadGroup(group,configBean.getBusinessId()),configBean.getBusinessId()); ThreadPoolExecutor worker = new ThreadPoolExecutor(configBean.getBusinessId(),configBean.getMin(), configBean.getMax(), configBean.getKeepAliveTime(), TimeUnit.SECONDS, taskQueue, threadFactory, configBean.getRejectHandler()); ThreadGroupUtil.addThreadWorker(configBean.getBusinessId(), worker); } log.info("=====================线程池初始化完毕========================"); log.info("=====================初始化监控线程========================"); ThreadGroupUtil.monitorThreadStart(group,2000l); log.info("=====================监控线程初始化完毕========================"); } public void contextDestroyed(ServletContextEvent sce) { } }
业务监控配置:
<bean id="xxxIndex" class="xxx.xxx.xxx.xxx.web.thread.ThreadConfigBean"> <property name="businessId" value="xxxIndex"></property><!-- 业务ID,唯一 --> <property name="max" value="40"></property><!-- 最好为请求线程的倍数 --> <property name="min" value="10"></property><!-- 最好为请求线程的倍数 --> <property name="queueCapacity" value="80"></property><!-- 最好为请求线程的倍数 --> <property name="keepAliveTime" value="600"></property><!-- 线程空闲保存时间 --> <property name="rejectHandler" ><!--任务拒绝处理策略 --> <bean class="com.jd.m.pay.web.thread.RejectedPolicyHandler" > <property name="bizName" value="pay-index"></property><!-- 业务ID,唯一 --> </bean> </property> </bean> <bean id="workers" class="java.util.ArrayList"> <constructor-arg > <list> <ref bean="xxxIndex"/> </list> </constructor-arg> </bean>
线程工厂:
/** * 线程工厂, 加入了线程名的业务描述 * * @User: guopeng * @Date: 2013-02-28 */ public class DecorateThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; public DecorateThreadFactory(final ThreadGroup group,final String bizName) { this.group = group; namePrefix = bizName + "-pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
监控线程:
/** * 监控业务线程池运行情况 * @author yfguopeng * @Date 2013-02-28 */ public class MonitorThread implements Runnable { private final static Log log = LogFactory.getLog(MonitorThread.class); private final ThreadGroup group; public MonitorThread(ThreadGroup group) { this.group = group; } public void run() { Map<String, ThreadPoolExecutor>workers = ThreadGroupUtil.getThreadWorkers(); Iterator<String> iterator = workers.keySet().iterator(); log.info("total threadpools:[ "+workers.size()+" ],total threads:[ "+group.activeCount()+" ]"); while(iterator.hasNext()) { ThreadPoolExecutor worker = ThreadGroupUtil.getThreadWorker(iterator.next()); RejectedExecutionHandler handler = worker.getRejectedExecutionHandler(); String rejectedSize = ""; if (RejectedPolicyHandlerInteface.class.isAssignableFrom(handler.getClass())) { rejectedSize = " ],rejected threads:[ "+((RejectedPolicyHandlerInteface) handler).getRejectedSize(); } log.info("business name:[ "+worker.getBizName()+" ]" + ", core threads:[ "+worker.getCorePoolSize()+" ], max threads:[ "+worker.getMaximumPoolSize()+" ]" + ", queue capacitys:[ "+worker.getQueue().size()+" ], running threads:[ "+worker.getActiveCount()+"] " + ", reject threads:[ "+rejectedSize+" ], largest threads:[ "+worker.getLargestPoolSize()+" ], complete threads:[ "+worker.getCompletedTaskCount()+" ]"); } } }
线程拒绝处理器:
/** * 线程拒绝执行控制球 * @author yfguopeng * @Date 2013-02-28 */ public class RejectedPolicyHandler extends ThreadPoolExecutor.AbortPolicy implements RejectedPolicyHandlerInteface{ private final static Log log = LogFactory.getLog(RejectedPolicyHandler.class); private static AtomicLong totals = new AtomicLong(0l); private String bizName; public RejectedPolicyHandler(){} @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { String tip = "["+bizName+"] 线程忙,请求被拒绝.max: "+executor.getMaximumPoolSize()+", queue: "+executor.getQueue().size(); log.info(tip); //业务报警 TODO totals.addAndGet(1); super.rejectedExecution(r, executor); } public String getBizName() { return bizName; } public void setBizName(String bizName) { this.bizName = bizName; } public long getRejectedSize() { return totals.get(); } }
import java.util.concurrent.RejectedExecutionHandler; public interface RejectedPolicyHandlerInteface extends RejectedExecutionHandler{ public long getRejectedSize() ; }
线程配置bean:
@SuppressWarnings("serial") public class ThreadConfigBean implements Serializable{ /** * 业务ID */ private String businessId; /** * 任务队列最大线程数 * 默认:80 */ private Integer max = 160; /** * 任务队列最小线程数 * 默认:40 */ private Integer min = 80; /** * 等待队列请求数 * 默认:300 */ private Integer queueCapacity = 300; /** * 空闲线程存活时间 * 默认:3分钟 */ private Long keepAliveTime = 3 * 60l; /** * 线程拒绝策略 */ private RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy(); public ThreadConfigBean() { super(); } public Integer getMax() { return max; } public void setMax(Integer max) { this.max = max; } public Integer getMin() { return min; } public void setMin(Integer min) { this.min = min; } public Integer getQueueCapacity() { return queueCapacity; } public void setQueueCapacity(Integer queueCapacity) { this.queueCapacity = queueCapacity; } public Long getKeepAliveTime() { return keepAliveTime; } public void setKeepAliveTime(Long keepAliveTime) { this.keepAliveTime = keepAliveTime; } public RejectedExecutionHandler getRejectHandler() { return rejectHandler; } public void setRejectHandler(RejectedExecutionHandler rejectHandler) { this.rejectHandler = rejectHandler; } public String getBusinessId() { return businessId; } public void setBusinessId(String businessId) { this.businessId = businessId; } }
线程组:
/** * 各个业务获取响应线程池 * @author yfguopeng */ public class ThreadGroupUtil { private static Map<String, ThreadPoolExecutor> threadworkers; private static ScheduledExecutorService monitorThread;//监视线程 private static final long delay = 200l; private static long cycle_default = 5000l; static { threadworkers = new ConcurrentHashMap<String, ThreadPoolExecutor>(); monitorThread = Executors.newScheduledThreadPool(1); } public static void addThreadWorker(String bizName,ThreadPoolExecutor executor){ threadworkers.put(bizName, executor); } public static ThreadPoolExecutor getThreadWorker(String bizName) { return threadworkers.get(bizName); } public static Map<String, ThreadPoolExecutor> getThreadWorkers(){ return threadworkers; } public static ScheduledExecutorService getMonitorThread() { return monitorThread; } public static void setMonitorThread(ScheduledExecutorService monitorThread) { ThreadGroupUtil.monitorThread = monitorThread; } public static void monitorThreadClosed(){ if (monitorThread != null) if (!monitorThread.isTerminated()) monitorThread.shutdown(); } public static void monitorThreadStart(ThreadGroup group,Long cycle){ MonitorThread monitor = new MonitorThread(group); if (cycle > 0l) { try { cycle_default = cycle; } catch (Exception e) { } } monitorThread.scheduleWithFixedDelay(monitor, delay, cycle_default, TimeUnit.MILLISECONDS); } }
线程池实现类:
/** * 线程池 * @author yfguopeng * */ public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor { private String bizName; public ThreadPoolExecutor(String bizName,int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.bizName = bizName; } public String getBizName() { return bizName; } public void setBizName(String bizName) { this.bizName = bizName; } }
web.xml配置:
<listener> <listener-class>xxx.xx.xx.xxx.web.thread.ThreadExecutorListener</listener-class> </listener>
调用:
ThreadPoolExecutor exc = ThreadGroupUtil.getThreadWorker("xxxIndex"); String payOrgInfo = null; String cards = null; Future<String> xxxFuture = null; Future<String> yyyFuture = null; long start = System.currentTimeMillis(); xxxTask xxxTask = new xxxTask(//参数); yyyTask yyyTask = new yyyTask(//参数); System.out.println("开始......"); xxxFuture = exc.submit(xxxTask ); yyyFuture = exc.submit(yyyTask ); try { xxx= xxxFuture .get(); yyy= yyyFuture .get(); System.out.println(xxx); System.out.println(yyy); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("结束...... "+(end-start)); return "";