JUC CyclicBarrier 可重用屏障

标签: 并发 CyclicBarrier juc | 发表时间:2014-04-03 22:56 | 作者:coderbee
出处:http://coderbee.net

介绍

CyclicBarrier 是一个线程同步工具,用于一组互相等待线程之间的协调,在到达某个临界点之前,这些线程必须互相等待,在通过临界点之后,这些线程是独立的;CountDownLatch 是让一个线程等待其他线程完成某些任务,其他线程之间一直是独立的。

CyclicBarrier 还允许指定一个任务,在所有线程到达临界点时执行,由最后到达的线程执行此任务。

在释放所有线程后,CyclicBarrier 可以通过重置状态来重用,这也是 Cyclic 的来源。

使用示例

  public class TestCyclicBarrier {
    private static final int THREAD_NUM = 5;

    public static class WorkerThread implements Runnable {
     CyclicBarrier barrier;

     public WorkerThread(CyclicBarrier b) {
         this. barrier = b;
     }

     @Override
     public void run() {
         try {
          String id = "ID:" + Thread.currentThread().getId();

          System. out.println(id + " before barrier");

           barrier.await(); // 线程在这里等待,直到所有线程都到达barrier。

          System. out.println(id + " after barrier");
         } catch (Exception e) {
          e. printStackTrace();
         }
     }
    }

    public static void main(String[] args) {
     CyclicBarrier cb = new CyclicBarrier( THREAD_NUM, new Runnable() {
         // 当所有线程到达barrier时执行
         @Override
          public void run() {
          System. out.println( "\nID: " + Thread.currentThread().getId()
              + " doing barrier work .\n");
         }
     });

     for (int i = 0; i < THREAD_NUM; i++) {
         new Thread( new WorkerThread(cb)).start();
     }
    }
}

输出结果如下:

ID:13 before barrier
ID:14 before barrier
ID:10 before barrier
ID:12 before barrier
ID:11 before barrier

ID: 14 doing barrier work .

ID:14 after barrier
ID:13 after barrier
ID:10 after barrier
ID:11 after barrier
ID:12 after barrier

实现

CyclicBarrier 是可重用的,那么就需要一定的结构来维持每次重用的信息,其定义了一个内部类 Generation来提供这个功能。

  private static class Generation {
    boolean broken = false;
}

内部属性

  // 由于涉及多个线程之间的同步,对共享状态访问的协调应该用锁来简化
private final ReentrantLock lock = new ReentrantLock();

// 在所有线程都到达之前,用于使线程等待的条件对象。
// 最后一个线程到达后,唤醒所有此对象上等待的线程。
private final Condition trip = lock.newCondition();

// 参与协同同步的线程数
private final int parties;

// 所有线程到达后、释放之前执行的命令。
private final Runnable barrierCommand;

// 当前的代
private Generation generation = new Generation();

// 等待到达的线程数
private int count;

核心的 wait 逻辑

CyclicBarrier 的 await 方法都会调用 dowait 方法。

  // 核心代码,覆盖了不同的策略
private int dowait(boolean timed, long nanos) throws InterruptedException,
          BrokenBarrierException, TimeoutException {
     // 先加锁,确保线程安全性。
     final ReentrantLock lock = this.lock;
      lock.lock();
     try {
          final Generation g = generation;

          if (g.broken)
              throw new BrokenBarrierException();

          if (Thread.interrupted()) {
              breakBarrier();
              throw new InterruptedException();
          }

           int index = --count; // 等待线程数减 1。

           if (index == 0) { // 当前线程是最后到达线程,释放所有线程。
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();

                   ranAction = true;
                   nextGeneration();
                   return 0;
              } finally {
                   if (!ranAction)
                        breakBarrier();
              }
          }

           // 非第一个到达的线程要进入等待,放在循环里是防止虚假唤醒。
           // 直到  tripped, broken, interrupted, or timed out
           for (;;) {
              try {
                   if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
              } catch (InterruptedException ie) {
                    if (g == generation && !g.broken) {
                        breakBarrier();
                        throw ie;
                   } else {
                         // We're about to finish waiting even if we had not
                         // been interrupted, so this interrupt is deemed to
                         // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                   }
              }

              if (g.broken)
                   throw new BrokenBarrierException();

              if (g != generation)
                   return index;

              if (timed && nanos <= 0L) {
                   breakBarrier();
                   throw new TimeoutException();
              }
          }
     } finally {
          lock.unlock();
     }
}

注意,上面的代码多处调用了 breakBarrier 方法,它的最主要作用就是唤醒所有等待线程。

reset 方法

  public void reset() {
      final ReentrantLock lock = this.lock;
     lock.lock();
      try {
          breakBarrier(); // 打破当前的代,以释放当前还在等待线程。
          nextGeneration(); // 开启新的代。
     } finally {
          lock.unlock();
     }
}

// 把当前的代标为broken,唤醒所有的等待线程。持有锁时调用。
private void breakBarrier() {
     generation.broken = true;
     count = parties;
     trip.signalAll();
}

相关 [juc cyclicbarrier] 推荐:

JUC CyclicBarrier 可重用屏障

- - 码蜂笔记
CyclicBarrier 是一个线程同步工具,用于一组互相等待线程之间的协调,在到达某个临界点之前,这些线程必须互相等待,在通过临界点之后,这些线程是独立的;CountDownLatch 是让一个线程等待其他线程完成某些任务,其他线程之间一直是独立的. CyclicBarrier 还允许指定一个任务,在所有线程到达临界点时执行,由最后到达的线程执行此任务.

JUC 原子类

- - ITeye博客
volatile变量具有可见性,也就是说线程能够自动发现volatile 变量的最新值;对volatile变量进行操作不会造成阻塞. 适用于:多个变量之间或者某个变量的当前值与修改后值之间没有约束. 正确使用volatile变量的条件:. 对变量的写操作不依赖于当前值. 该变量没有包含在具有其他变量的不变式中.

CyclicBarrier的用法

- - 编程语言 - ITeye博客
本例介绍第三个同步装置:CyclicBarrier,它维护一个计数器,与CountDownLatch不同的是,等待这个CyclicBarrier的线程必须等到计数器到达某个值时,才可以继续. CyclicBarrier就像它名字的意思一样,可看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍. 本例实现一个数组相邻元素的加法,一个线程给数组的第一个元素赋值,然后等待其他线程给数组第二个元素赋值,然后将第一个元素和第二个元素相加.

Java并发工具类CyclicBarrier

- - CSDN博客编程语言推荐文章
CyclicBarrier同步屏障. java并发工具类中有一个叫做CyclicBarrier的类,与CountDownLatch类似,都可以实现线程间的同步,但是差别是CyclicBarrier是可重置的同步屏障. 想象一个场景,有N个人不同时间走到一扇门,因为门需要N个人合力才能推开,所以人不足N个时,只能阻塞在此,等到N个人都到了之后,可以推开门,继续进行之前的工作.