主线程等待几个子线程执行完成方案
- - ITeye博客 有时,为了程序的性能,我们有必要对程序中的for循环(含有sql/rpc操作)进行并发处理,要求是并发处理完之后才能继续执行主线程. CountDownLatch用队列来存放任务,主要是一个构造器和两个方法,相关代码这里不予赘述. CountDownLatch很贴合我们的要求,但没用到线程池,考虑到性能,我推荐下面的这种方案.
有时,为了程序的性能,我们有必要对程序中的for循环(含有sql/rpc操作)进行并发处理,要求是并发处理完之后才能继续执行主线程。现给出如下两种方案:
1. CountDownLatch
package com.itlong.whatsmars.base.sync; import java.util.concurrent.CountDownLatch; /** * Created by shenhongxi on 2016/8/12. */ public class CountDownLatchTest { public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(3); long start = System.currentTimeMillis(); for (int i = 0; i < 3; i++) { new Thread(new SubRunnable(i, latch)).start(); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() - start); System.out.println("Main finished"); } static class SubRunnable implements Runnable { private int id = -1; private CountDownLatch latch; SubRunnable(int id, CountDownLatch latch) { this.id = id; this.latch = latch; } @Override public void run() { try { Thread.sleep(3000); System.out.println(String .format("Sub Thread %d finished", id)); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } } }
CountDownLatch用队列来存放任务,主要是一个构造器和两个方法,相关代码这里不予赘述。CountDownLatch很贴合我们的要求,但没用到线程池,考虑到性能,我推荐下面的这种方案。
2. ExecutorService
package com.itlong.whatsmars.base.sync; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by shenhongxi on 2016/8/12. */ public class CallableTest { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(3); List<Callable<Void>> subs = new ArrayList<Callable<Void>>(); for (int i = 0; i < 3; i++) { subs.add(new SubCallable(i)); } long start = System.currentTimeMillis(); try { pool.invokeAll(subs); } finally { pool.shutdown(); } System.out.println(System.currentTimeMillis() - start); System.out.println("Main finished"); } static class SubCallable implements Callable<Void> { private int id = -1; public SubCallable(int id) { this.id = id; } @Override public Void call() throws Exception { try { Thread.sleep(3000); System.out.println(String .format("Child Thread %d finished", id)); } catch (InterruptedException e) { e.printStackTrace(); } return null; } } }
AbstractExecutorService
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (Future<T> f : futures) { if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; return futures; } finally { if (!done) for (Future<T> f : futures) f.cancel(true); } }
接下来我做了个join的试验,发现同样可以达到目的,但不推荐此法。
package com.itlong.whatsmars.base.sync; /** * Created by shenhongxi on 2016/8/12. * 子线程与主线程是顺序执行的,各子线程之间还是异步的 */ public class JoinTest { public static void main(String[] args) throws Exception { Thread t1 = new Thread(new SubRunnable(0)); Thread t2 = new Thread(new SubRunnable(1)); Thread t3 = new Thread(new SubRunnable(2)); long start = System.currentTimeMillis(); t1.start(); t2.start(); t3.start(); t1.join(); t2.join(); t3.join(); System.out.println(System.currentTimeMillis() - start); System.out.println("Main finished"); } static class SubRunnable implements Runnable { private int id = -1; SubRunnable(int id) { this.id = id; } @Override public void run() { try { System.out.println("hi, I'm id-" + id); Thread.sleep(9000); System.out.println(String .format("Sub Thread %d finished", id)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
最后,我们顺便提下org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor { private final Object poolSizeMonitor = new Object(); private int corePoolSize = 1; private int maxPoolSize = Integer.MAX_VALUE; private int keepAliveSeconds = 60; private boolean allowCoreThreadTimeOut = false; private int queueCapacity = Integer.MAX_VALUE; private ThreadPoolExecutor threadPoolExecutor; /** * Set the ThreadPoolExecutor's core pool size. * Default is 1. * <p><b>This setting can be modified at runtime, for example through JMX.</b> */ public void setCorePoolSize(int corePoolSize) { synchronized (this.poolSizeMonitor) { this.corePoolSize = corePoolSize; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setCorePoolSize(corePoolSize); } } } /** * Return the ThreadPoolExecutor's core pool size. */ public int getCorePoolSize() { synchronized (this.poolSizeMonitor) { return this.corePoolSize; } }
看到我们熟悉的ThreadPoolExecutor之后,我们瞬间明白了一切。
另外我们脑补下几个接口/类的关系
public interface ExecutorService extends Executor { <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; } public interface Executor { void execute(Runnable command); } public abstract class AbstractExecutorService implements ExecutorService{ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { // ... } } public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } }