主线程等待几个子线程执行完成方案

标签: 线程 等待 线程 | 发表时间:2016-08-17 12:23 | 作者:
出处:http://www.iteye.com

 有时,为了程序的性能,我们有必要对程序中的for循环(含有sql/rpc操作)进行并发处理,要求是并发处理完之后才能继续执行主线程。现给出如下两种方案:

 

1. CountDownLatch

package com.itlong.whatsmars.base.sync;

import java.util.concurrent.CountDownLatch;

/**
 * Created by shenhongxi on 2016/8/12.
 */
public class CountDownLatchTest {

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(3);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 3; i++) {
            new Thread(new SubRunnable(i, latch)).start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - start);

        System.out.println("Main finished");
    }

    static class SubRunnable implements Runnable {
        private int id = -1;
        private CountDownLatch latch;

        SubRunnable(int id, CountDownLatch latch) {
            this.id = id;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(3000);
                System.out.println(String
                        .format("Sub Thread %d finished", id));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                latch.countDown();
            }
        }
    }
}

 CountDownLatch用队列来存放任务,主要是一个构造器和两个方法,相关代码这里不予赘述。CountDownLatch很贴合我们的要求,但没用到线程池,考虑到性能,我推荐下面的这种方案。

 

2. ExecutorService

package com.itlong.whatsmars.base.sync;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by shenhongxi on 2016/8/12.
 */
public class CallableTest {

    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(3);

        List<Callable<Void>> subs = new ArrayList<Callable<Void>>();
        for (int i = 0; i < 3; i++) {
            subs.add(new SubCallable(i));
        }

        long start = System.currentTimeMillis();
        try {
            pool.invokeAll(subs);
        } finally {
            pool.shutdown();
        }
        System.out.println(System.currentTimeMillis() - start);

        System.out.println("Main finished");
    }

    static class SubCallable implements Callable<Void> {
        private int id = -1;

        public SubCallable(int id) {
            this.id = id;
        }

        @Override
        public Void call() throws Exception {
            try {
                Thread.sleep(3000);
                System.out.println(String
                        .format("Child Thread %d finished", id));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    }

}

 AbstractExecutorService

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

 

接下来我做了个join的试验,发现同样可以达到目的,但不推荐此法。

package com.itlong.whatsmars.base.sync;

/**
 * Created by shenhongxi on 2016/8/12.
 * 子线程与主线程是顺序执行的,各子线程之间还是异步的
 */
public class JoinTest {

    public static void main(String[] args) throws Exception {
        Thread t1 = new Thread(new SubRunnable(0));
        Thread t2 = new Thread(new SubRunnable(1));
        Thread t3 = new Thread(new SubRunnable(2));

        long start = System.currentTimeMillis();
        t1.start();
        t2.start();
        t3.start();

        t1.join();
        t2.join();
        t3.join();
        System.out.println(System.currentTimeMillis() - start);

        System.out.println("Main finished");
    }

    static class SubRunnable implements Runnable {
        private int id = -1;

        SubRunnable(int id) {
            this.id = id;
        }

        @Override
        public void run() {
            try {
                System.out.println("hi, I'm id-" + id);
                Thread.sleep(9000);
                System.out.println(String
                        .format("Sub Thread %d finished", id));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

 

最后,我们顺便提下org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor {

	private final Object poolSizeMonitor = new Object();

	private int corePoolSize = 1;

	private int maxPoolSize = Integer.MAX_VALUE;

	private int keepAliveSeconds = 60;

	private boolean allowCoreThreadTimeOut = false;

	private int queueCapacity = Integer.MAX_VALUE;

	private ThreadPoolExecutor threadPoolExecutor;


	/**
	 * Set the ThreadPoolExecutor's core pool size.
	 * Default is 1.
	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
	 */
	public void setCorePoolSize(int corePoolSize) {
		synchronized (this.poolSizeMonitor) {
			this.corePoolSize = corePoolSize;
			if (this.threadPoolExecutor != null) {
				this.threadPoolExecutor.setCorePoolSize(corePoolSize);
			}
		}
	}

	/**
	 * Return the ThreadPoolExecutor's core pool size.
	 */
	public int getCorePoolSize() {
		synchronized (this.poolSizeMonitor) {
			return this.corePoolSize;
		}
	}

看到我们熟悉的ThreadPoolExecutor之后,我们瞬间明白了一切。

另外我们脑补下几个接口/类的关系

public interface ExecutorService extends Executor {
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
}

public interface Executor {
    void execute(Runnable command);
}

public abstract class AbstractExecutorService implements ExecutorService{
  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
    // ...
  }
}

public class ThreadPoolExecutor extends AbstractExecutorService {
  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
}

 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [线程 等待 线程] 推荐:

主线程等待几个子线程执行完成方案

- - ITeye博客
 有时,为了程序的性能,我们有必要对程序中的for循环(含有sql/rpc操作)进行并发处理,要求是并发处理完之后才能继续执行主线程.  CountDownLatch用队列来存放任务,主要是一个构造器和两个方法,相关代码这里不予赘述. CountDownLatch很贴合我们的要求,但没用到线程池,考虑到性能,我推荐下面的这种方案.

MySQL Replication 线程

- - CSDN博客推荐文章
Replication 线程. Mysql 的Replication 是一个异步的复制过程,从一个Mysql instace(我们称之为Master)复制到另一个Mysql instance(我们称之Slave). 在Master 与Slave 之间的实现整个复制过程主. 要由三个线程来完成,其中两个线程(Sql 线程和IO 线程)在Slave 端,另外一个线程(IO 线程)在Master 端.

Java线程池

- - 企业架构 - ITeye博客
线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的. 在jdk1.5之后这一情况有了很大的改观. Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用. 为我们在开发中处理线程的问题提供了非常大的帮助.

Java 线程池

- - 编程语言 - ITeye博客
在项目中,系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互. 在这种情形下,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存周期很短的线程时,更应该考虑使用线程池. 使用线程池可以有效地控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过此数.

Java线程(四):线程中断、线程让步、线程睡眠、线程合并

- - 编程语言 - ITeye博客
 最近在Review线程专栏,修改了诸多之前描述不够严谨的地方,凡是带有Review标记的文章都是修改过了. 本篇文章是插进来的,因为原来没有写,现在来看传统线程描述的不太完整,所以就补上了. 理解了线程同步和线程通信之后,再来看本文的知识点就会简单的多了,本文是做为传统线程知识点的一个补充. 有人会问:JDK5之后有了更完善的处理多线程问题的类(并发包),我们还需要去了解传统线程吗.

Java Thread多线程

- - CSDN博客推荐文章
Java Thread多线程. Java 多线程例子1 小例子. super("zhuyong");//设置线程的名字,默认为“TestThread”. Java 多线程例子2 前台线程(用户线程) 后台线程(守护线程 ). 1,setDaemon(true)后就是后台线程(守护线程 ),反之就是前台线程(用户线程).

Android线程大坑

- - 移动开发 - ITeye博客
     android界面的更新实在主线程进行的,通常把主线程也叫UI线程,UI线程里进行事件的分发和交互. 在UI线程中进行耗时操作,比如网络请求,IO操作等会阻塞UI线程,界面会卡住,并且超过大概5秒钟程序会ANR(Application Not Responding),也就是死掉. 其实这种GUI单线程的思想在我上一篇博客(http://zyqwst.iteye.com/blog/2262011)都有阐述,道理一模一样,只是android实现的方式上略有不同,所以我建议把上一篇Swing线程的博客能够阅读一遍,Android线程的问题豁然开朗,始终晋级GUI开发的原则:在UI线程中进行界面的更新操作,在单独线程中进行耗时操作.

浅谈 iOS 线程

- - SegmentFault 最新的文章
通常主线程和其他线程的使用场景. Tips: 解压、打开 Zip 包,读写较大文件的操作也不宜放在主线程里. 一般异步网络请求中会有一个 completionBlock ,这个 completionBlock 是在主线程中被调用的. 所以,可能消耗大量时间的代码(例如上面提到的处理 Zip 包的方法)也不宜放在这些 block 中.

Java线程之FutureTask

- - zzm
FutureTask是Future和Callable的结合体. 然后通过Future来取得计算结果. 但是,若开启了多个任务,我们无从知晓哪个任务最先结束,因此,若要实现“当某任务结束时,立刻做一些事情,例如记录日志”这一功能,就需要写一些额外的代码. FutureTask正是为此而存在,他有一个回调函数protected void done(),当任务结束时,该回调函数会被触发.

Java多线程之synchronized

- - CSDN博客推荐文章
这里通过三个测试类阐述了synchronized应用的不同场景. 首先是最基本的synchronized Method的使用.  * @see 概述:Java中的每个对象都有一个锁(lock)或者叫做监视器(monitor) .  * @see 说明:当synchronized关键字修饰一个方法时,则该方法为同步方法 .