『并发包入坑指北』之向大佬汇报任务

标签: 并发 concurrent CountDownLatch | 发表时间:2019-04-28 08:20 | 作者:
出处:http://crossoverjie.top/

前言

在面试过程中聊到并发相关的内容时,不少面试官都喜欢问这类问题:

当 N 个线程同时完成某项任务时,如何知道他们都已经执行完毕了。

这也是本次讨论的话题之一,所以本篇为『并发包入坑指北』的第二篇;来聊聊常见的并发工具。

自己实现

其实这类问题的核心论点都是:如何在一个线程中得知其他线程是否执行完毕。

假设现在有 3 个线程在运行,需要在主线程中得知他们的运行结果;可以分为以下几步:

  • 定义一个计数器为 3。
  • 每个线程完成任务后计数减一。
  • 一旦计数器减为 0 则通知等待的线程。

所以也很容易想到可以利用等待通知机制来实现,和上文的 『并发包入坑指北』之阻塞队列的类似。

按照这个思路自定义了一个 MultipleThreadCountDownKit 工具,构造函数如下:

考虑到并发的前提,这个计数器自然需要保证线程安全,所以采用了 AtomicInteger

所以在初始化时需要根据线程数量来构建对象。

计数器减一

当其中一个业务线程完成后需要将这个计数器减一,直到减为0为止。

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
     
/**
* 线程完成后计数 -1
*/
public void countDown(){
if (counter.get() <= 0){
return;
}
int count = this.counter.decrementAndGet();
if (count < 0){
throw new RuntimeException("concurrent error") ;
}
if (count == 0){
synchronized (notify){
notify.notify();
}
}
}

利用 counter.decrementAndGet() 来保证多线程的原子性,当减为 0 时则利用等待通知机制来 notify 其他线程。

等待所有线程完成

而需要知道业务线程执行完毕的其他线程则需要在未完成之前一直处于等待状态,直到上文提到的在计数器变为 0 时得到通知。

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
     
/**
* 等待所有的线程完成
* @throws InterruptedException
*/
public void await() throws InterruptedException {
synchronized (notify){
while (counter.get() > 0){
notify.wait();
}
if (notifyListen != null){
notifyListen.notifyListen();
}
}
}

原理也很简单,一旦计数器还存在时则会利用 notify 对象进行等待,直到被业务线程唤醒。

同时这里新增了一个通知接口可以自定义实现唤醒后的一些业务逻辑,后文会做演示。

并发测试

主要就是这两个函数,下面来做一个演示。

  • 初始化了三个计数器的并发工具 MultipleThreadCountDownKit
  • 创建了三个线程分别执行业务逻辑,完毕后执行 countDown()
  • 线程 3 休眠了 2s 用于模拟业务耗时。
  • 主线程执行 await() 等待他们三个线程执行完毕。

通过执行结果可以看出主线程会等待最后一个线程完成后才会退出;从而达到了主线程等待其余线程的效果。

     
1
2
     
MultipleThreadCountDownKit multipleThreadKit = new MultipleThreadCountDownKit(3);
multipleThreadKit.setNotify(() -> LOGGER.info("三个线程完成了任务"));

也可以在初始化的时候指定一个回调接口,用于接收业务线程执行完毕后的通知。

当然和在主线程中执行这段逻辑效果是一样的(和执行 await() 方法处于同一个线程)。

CountDownLatch

当然我们自己实现的代码没有经过大量生产环境的验证,所以主要的目的还是尝试窥探官方的实现原理。

所以我们现在来看看 juc 下的 CountDownLatch 是如何实现的。

通过构造函数会发现有一个 内部类 Sync,他是继承于 AbstractQueuedSynchronizer ;这是 Java 并发包中的基础框架,都可以单独拿来讲了,所以这次重点不是它,今后我们再着重介绍。

这里就可以把他简单理解为提供了和上文类似的一个计数器及线程通知工具就行了。

countDown

其实他的核心逻辑和我们自己实现的区别不大。

     
1
2
3
4
5
6
7
8
9
10
11
     
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

利用这个内部类的 releaseShared 方法,我们可以理解为他想要将计数器减一。

看到这里有没有似曾相识的感觉。

没错,在 JDK1.7 中的 AtomicInteger 自减就是这样实现的(利用 CAS 保证了线程安全)。

只是一旦计数器减为 0 时则会执行 doReleaseShared 唤醒其他的线程。


这里我们只需要关心红框部分(其他的暂时不用关心,这里涉及到了 AQS 中的队列相关),最终会调用 LockSupport.unpark 来唤醒线程;就相当于上文调用 object.notify()

所以其实本质上还是相同的。

await

其中的 await() 也是借用 Sync 对象的方法实现的。

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
     
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//判断计数器是否还未完成
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

一旦还存在未完成的线程时,则会调用 doAcquireSharedInterruptibly 进入阻塞状态。

     
1
2
3
4
     
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

同样的由于这也是 AQS 中的方法,我们只需要关心红框部分;其实最终就是调用了 LockSupport.park 方法,也就相当于执行了 object.wait()

  • 所有的业务线程执行完毕后会在计数器减为 0 时调用 LockSupport.unpark 来唤醒线程。
  • 等待线程一旦计数器 > 0 时则会利用 LockSupport.park 来等待唤醒。

这样整个流程也就串起来了,它的使用方法也和上文的类似。

就不做过多介绍了。

实际案例

同样的来看一个实际案例。

在上一篇 《一次分表踩坑实践的探讨》提到了对于全表扫描的情况下,需要利用多线程来提高查询效率。

比如我们这里分为了 64 张表,计划利用 8 个线程来分别处理这些表的数据,伪代码如下:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
     
CountDownLatch count = new CountDownLatch(64);
ConcurrentHashMap total = new ConcurrentHashMap();
for(Integer i=0;i<=63;i++){
executor.execute(new Runnable(){
@Override
public void run(){
List value = queryTable(i);
total.put(value,NULL);
count.countDown();
}
}) ;
}
count.await();
System.out.println("查询完毕");

这样就可以实现所有数据都查询完毕后再做统一汇总;代码挺简单,也好理解(当然也可以使用线程池的 API)。

总结

CountDownLatch 算是 juc 中一个高频使用的工具,学会和理解他的使用会帮助我们更容易编写并发应用。

文中涉及到的源码:

https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/communication/MultipleThreadCountDownKit.java

你的点赞与分享是对我最大的支持

相关 [并发 大佬 汇报] 推荐:

『并发包入坑指北』之向大佬汇报任务

- - crossoverJie's Blog
在面试过程中聊到并发相关的内容时,不少面试官都喜欢问这类问题:. 当 N 个线程同时完成某项任务时,如何知道他们都已经执行完毕了. 这也是本次讨论的话题之一,所以本篇为『并发包入坑指北』的第二篇;来聊聊常见的并发工具. 其实这类问题的核心论点都是:如何在一个线程中得知其他线程是否执行完毕. 假设现在有 3 个线程在运行,需要在主线程中得知他们的运行结果;可以分为以下几步:.

汇报工作的四个层级

- - IT技术博客大学习
   在 不汇报是职场发展的绊脚石中,提到过汇报的重要意义和不汇报的心理因素,实际上汇报只需要掌握一些技巧和方法,就很容易取得理想的结果. 当然,不同情况下汇报的目的和方式是有差异的,具体可以根据PDCA循环的四个阶段进行:.    计划是工作开展的第一步,任何工作在进行前,都需要有一个明确的计划.

[原]如何汇报工作与计划

- - KimmKing的技术博客
现在要做个PPT给领导讲, 我们现在做的事情和规划. 我:给什么级别的领导、汇报周期多长. F同学:CTO,就将1次,宣讲类型的. 这种东东 大概是什么思路?. 1、现状:目前的情况,系统情况,业务数据情况,. 2、问题:存在的,业务反馈的,领导指示要做的,. 3、解决办法:预研的情况如何,SWOT之类的,准备朝什么方向努力,.

高并发

- - 开源软件 - ITeye博客
垂直扩展是一种用于增加单个ActiveMQ代理连接数(因而也增加了负载能力)的技术.默认情况下,. ActiveMQ的被设计成尽可高效的传输消息以确保低延迟和良好的性能. 默认情况下,ActiveMQ使用阻塞IO来处理传输连接,这种方式为每一个连接分配一个线程. 你可以为ActiveMQ代理使用非阻塞IO(同时客户端可以使用默认的传输)以减少线程的使用.

并发导论

- - 并发编程网 - ifeve.com
由于之前工作中的疏忽,在使用Java多线程并发的时候出了问题,遂决心全面学习并发相关知识. 写作本文的意图只是希望在写作过程中把想不清楚或是一时无法掌握的地方反复揣摩记录下来. 写作本文参考的各种资料较多,抱歉的是文末的参考文献中对一些叫不上名字或没有出处的资料文献并未列举出来. 由于本人是初入职场的菜鸟,更是并发的门外汉,文中关于并发以及其他软硬件、程序设计语言的论据也许不够客观甚至不够正确.

我食言了——“出尔反尔”的大佬们

- L - FeedzShare
来自: www.bianews.com - FeedzShare  . 发布时间:2011年10月10日,  已有 4 人推荐.   BiaNews  10月10日消息(文/侯鹏飞)  最近360推出口信网页版、凡客诚品被曝即将提交上市申请,让人联想到之前周鸿祎“360绝不做IM”和陈年“凡客最早2012年下半年上市”的言之凿凿.

DDB Worldwide大佬为你指点互联网营销的本质

- - MADBRIEF | 疯狂简报
《互联网营销的本质:点亮社群》的确是本难得的好书. 书中介绍了个体的力量和社群的力量、剖析数字社群、告别盲人,迎接自主信息时代、为什么速度会成为新宠、吸引社群的招数、首席社群官“品牌的新代言人、消费者驱动型社会的蓝图等内容. 下面是书中一些精彩的观点,为了读者阅读的舒适,所有“社群”都已经被替换成“社区”.

硅谷大佬创业全过程经验

- - 互联网分析
一位美国 创业者关于 创业过程中团队建设、融资、产品、营销等全方位的经验总结. 其中不乏一些让创始人们时刻检查企业状态的观点. 比如,如果你的2到4人团队无法在6个月到1年内实现盈利,那么肯定出问题了;融资过程中,如果必须放弃15%的控制权,那么也必然是有了问题等. 总部位于旧金山的云计算数据库服务公司RethinkDB创始人斯拉瓦·阿克麦切特(Slava Akhmechet)日前发表文章,总结了57条创业经验,其中涉及团队、融资、市场、产品、营销、销售、产品开发、公司管理、个人状态等9个方面.

互联网大佬的“名言警句” :现场语录汇总

- - 极客公园-GeekPark
[核心提示]上个周末,极客公园创新大会很热闹. 沃兹、李彦宏、李书福、雷军等重量级嘉宾的到来不仅引爆了现场的气氛,更奉献了精彩的演讲. 百度CEO李彦宏与极客公园创始人张鹏对话. 1、互联网公司在做事时没有包袱,互联网金融就是互联网对金融的冲击,金融行业的人现在也同意了. 还有更多行业没有意识到互联网将会对它们产生冲击,因为才刚刚开始.