Phaser功能简述

标签: phaser 功能 | 发表时间:2016-06-05 14:58 | 作者:
出处:http://shift-alt-ctrl.iteye.com

    在JAVA 1.7引入了一个新的并发API:Phaser,一个可重用的同步barrier。在此前,JAVA已经有CyclicBarrier、CountDownLatch这两种同步barrier,但是Phaser更加灵活,而且侧重于“重用”。

一、简述

    1、注册机制:与其他barrier不同的是,Phaser中的“注册的同步者(parties)”会随时间而变化,Phaser可以通过构造器初始化parties个数,也可以在Phaser运行期间随时加入(register)新的parties,以及在运行期间注销(deregister)parties。运行时可以随时加入、注销parties,只会影响Phaser内部的计数器,它建立任何内部的bookkeeping(账本),因此task不能查询自己是否已经注册了,当然你可以通过实现子类来达成这一设计要求。

//伪代码
Phaser phaser = new Phaser();
phaser.register();//parties count: 1
....
phaser.arriveAndDeregister()://count : 0;
....

 

    此外,CyclicBarrier、CountDownLatch需要在初始化的构造函数中指定同步者的个数,且运行时无法再次调整。

CountDownLatch countDownLatch = new CountDownLatch(12);
//count deregister parties after all
//parties count is 12 all the times
//if you want change the number of parties, you should create a new instance.
CyclicBarrier cyclicBarrier = new CyclicBarrier(12);

 

    2、同步机制:类似于CyclicBarrier,Phaser也可以awaited多次,它的arrivedAndAwaitAdvance()方法的效果类似于CyclicBarrier的await()。Phaser的每个周期(generation)都有一个phase数字,phase 从0开始,当所有的已注册的parties都到达后(arrive)将会导致此phase数字自增(advance),当达到Integer.MAX_VALUE后继续从0开始。这个phase数字用于表示当前parties所处于的“阶段周期”,它既可以标记和控制parties的wait行为、唤醒等待的时机。

    1)Arrival:Phaser中的arrive()、arriveAndDeregister()方法,这两个方法不会阻塞(block),但是会返回相应的phase数字,当此phase中最后一个party也arrive以后,phase数字将会增加,即phase进入下一个周期,同时触发(onAdvance)那些阻塞在上一phase的线程。这一点类似于CyclicBarrier的barrier到达机制;更灵活的是,我们可以通过重写onAdvance方法来实现更多的触发行为。

 

    2)Waiting:Phaser中的awaitAdvance()方法,需要指定一个phase数字,表示此Thread阻塞直到phase推进到此周期,arriveAndAwaitAdvance()方法阻塞到下一周期开始(或者当前phase结束)。不像CyclicBarrier,即使等待Thread已经interrupted,awaitAdvance方法会继续等待。Phaser提供了Interruptible和Timout的阻塞机制,不过当线程Interrupted或者timout之后将会抛出异常,而不会修改Phaser的内部状态。如果必要的话,你可以在遇到此类异常时,进行相应的恢复操作,通常是在调用forceTermination()方法之后。

    Phaser通常在ForJoinPool中执行tasks,它可以在有task阻塞等待advance时,确保其他tasks的充分并行能力。

 

    3、中断(终止):Phaser可以进入Termination状态,可以通过isTermination()方法判断;当Phaser被终止后,所有的同步方法将会立即返回(解除阻塞),不需要等到advance(即advance也会解除阻塞),且这些阻塞方法将会返回一个负值的phase值(awaitAdvance方法、arriveAndAwaitAdvance方法)。当然,向一个termination状态的Phaser注册party将不会有效;此时onAdvance()方法也将会返回true(默认实现),即所有的parties都会被deregister,即register个数为0。

 

    4、Tiering(分层):Phaser可以“分层”,以tree的方式构建Phaser来降低“竞争”。如果一个Phaser中有大量parties,这会导致严重的同步竞争,所以我们可以将它们分组并共享一个parent Phaser,这样可以提高吞吐能力;Phaser中注册和注销parties都会有Child 和parent Phaser自动管理。当Child Phaser中中注册的parties变为非0时(在构造函数Phaser(Phaser parent,int parties),或者register()方法),Child Phaser将会注册到其Parent上;当Child Phaser中的parties变为0时(比如由arrivedAndDegister()方法),那么此时Child Phaser也将从其parent中注销出去。

 

    5、监控:同步的方法只会被register操作调用,对于当前state的监控方法可以在任何时候调用,比如getRegisteredParties()获取已经注册的parties个数,getPhase()获取当前phase周期数等;因为这些方法并非同步,所以只能反映当时的瞬间状态。

 

二、常用的Barrier比较

    1、CountDownLatch

//创建时,就需要指定参与的parties个数
int parties = 12;
CountDownLatch latch = new CountDownLatch(parties);
//线程池中同步task
ExecutorService executor = Executors.newFixedThreadPool(parties);
for(int i = 0; i < parties; i++) {
    executor.execute(new Runnable() {
        @Override
        public void run() {
            try {
                //可以在任务执行开始时执行,表示所有的任务都启动后,主线程的await即可解除
                //latch.countDown();
                //run
                //..
                Thread.sleep(3000);

            } catch (Exception e) {

            }
            finally {
                //任务执行完毕后:到达
                //表示所有的任务都结束,主线程才能继续
                latch.countDown();
            }
        }
    });
}
latch.await();//主线程阻塞,直到所有的parties到达
//latch上所有的parties都达到后,再次执行await将不会有效,
//即barrier是不可重用的
executor.shutdown();

 

    2、CyclicBarrier

//创建时,就需要指定参与的parties个数
int parties = 12;
CyclicBarrier barrier = new CyclicBarrier(parties);
//线程池中同步task
ExecutorService executor = Executors.newFixedThreadPool(parties);
for(int i = 0; i < parties; i++) {
    executor.execute(new Runnable() {
        @Override
        public void run() {
            try {
                int i = 0;
                while (i < 3 && !barrier.isBroken()) {
                    System.out.println("generation begin:" + i + ",tid:" + Thread.currentThread().getId());
                    Thread.sleep(3000);
                    //如果所有的parties都到达,则开启新的一次周期(generation)
                    //barrier可以被重用
                    barrier.await();
                    i++;
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
            finally {

            }
        }
    });
}
Thread.sleep(100000);

 

    3、Phaser

//创建时,就需要指定参与的parties个数
int parties = 12;
//可以在创建时不指定parties
// 而是在运行时,随时注册和注销新的parties
Phaser phaser = new Phaser();
//主线程先注册一个
//对应下文中,主线程可以等待所有的parties到达后再解除阻塞(类似与CountDownLatch)
phaser.register();
ExecutorService executor = Executors.newFixedThreadPool(parties);
for(int i = 0; i < parties; i++) {
    phaser.register();//每创建一个task,我们就注册一个party
    executor.execute(new Runnable() {
        @Override
        public void run() {
            try {
                int i = 0;
                while (i < 3 && !phaser.isTerminated()) {
                    System.out.println("Generation:" + phaser.getPhase());
                    Thread.sleep(3000);
                    //等待同一周期内,其他Task到达
                    //然后进入新的周期,并继续同步进行
                    phaser.arriveAndAwaitAdvance();
                    i++;//我们假定,运行三个周期即可
                }
            } catch (Exception e) {

            }
            finally {
                phaser.arriveAndDeregister();
            }
        }
    });
}
//主线程到达,且注销自己
//此后线程池中的线程即可开始按照周期,同步执行。
phaser.arriveAndDeregister();

 

三、API简述

     1、Phaser():构造函数,创建一个Phaser;默认parties个数为0。此后我们可以通过register()、bulkRegister()方法来注册新的parties。每个Phaser实例内部,都持有几个状态数据:termination状态、已经注册的parties个数(registeredParties)、当前phase下已到达的parties个数(arrivedParties)、当前phase周期数,还有2个同步阻塞队列Queue。Queue中保存了所有的waiter,即因为advance而等待的线程信息;这两个Queue分别为evenQ和oddQ,这两个Queue在实现上没有任何区别,Queue的元素为QNode,每个QNode保存一个waiter的信息,比如Thread引用、阻塞的phase、超时的deadline、是否支持interrupted响应等。两个Queue,其中一个保存当前phase中正在使用的waiter,另一个备用,当phase为奇数时使用evenQ、oddQ备用,偶数时相反,即两个Queue轮换使用。当advance事件触发期间,新register的parties将会被放在备用的Queue中,advance只需要响应另一个Queue中的waiters即可,避免出现混乱。

 

    2、Phaser(int parties):构造函数,初始一定数量的parties;相当于直接regsiter此数量的parties。

    3、arrive():到达,阻塞,等到当前phase下其他parties到达。如果没有register(即已register数量为0),调用此方法将会抛出异常,此方法返回当前phase周期数,如果Phaser已经终止,则返回负数。

    4、arriveAndDeregister():到达,并注销一个parties数量,非阻塞方法。注销,将会导致Phaser内部的parties个数减一(只影响当前phase),即下一个phase需要等待arrive的parties数量将减一。异常机制和返回值,与arrive方法一致。

    5、arriveAndAwaitAdvance():到达,且阻塞直到其他parties都到达,且advance。此方法等同于awaitAdvance(arrive())。如果你希望阻塞机制支持timeout、interrupted响应,可以使用类似的其他方法(参见下文)。如果你希望到达后且注销,而且阻塞等到当前phase下其他的parties到达,可以使用awaitAdvance(arriveAndDeregister())方法组合。此方法的异常机制和返回值同arrive()。

    6、awaitAdvance(int phase):阻塞方法,等待phase周期数下其他所有的parties都到达。如果指定的phase与Phaser当前的phase不一致,则立即返回。

    7、awaitAdvanceInterruptibly(int phase):阻塞方法,同awaitAdvance,只是支持interrupted响应,即waiter线程如果被外部中断,则此方法立即返回,并抛出InterrutedException。

    8、awaitAdvanceInterruptibly(int phase,long timeout,TimeUnit unit):阻塞方法,同awaitAdvance,支持timeout类型的interrupted响应,即当前线程阻塞等待约定的时长,超时后以TimeoutException异常方式返回。

    9、forceTermination():强制终止,此后Phaser对象将不可用,即register等将不再有效。此方法将会导致Queue中所有的waiter线程被唤醒。

    10、register():新注册一个party,导致Phaser内部registerPaties数量加1;如果此时onAdvance方法正在执行,此方法将会等待它执行完毕后才会返回。此方法返回当前的phase周期数,如果Phaser已经中断,将会返回负数。

    11、bulkRegister(int parties):批量注册多个parties数组,规则同10、。

    12、getArrivedParties():获取已经到达的parties个数。

    13、getPhase():获取当前phase周期数。如果Phaser已经中断,则返回负值。

    14、getRegisteredParties():获取已经注册的parties个数。

    15、getUnarrivedParties():获取尚未到达的parties个数。

    16、onAdvance(int phase,int registeredParties):这个方法比较特殊,表示当进入下一个phase时可以进行的事件处理,如果返回true表示此Phaser应该终止(此后将会把Phaser的状态为termination,即isTermination()将返回true。),否则可以继续进行。phase参数表示当前周期数,registeredParties表示当前已经注册的parties个数。

    默认实现为:return registeredParties == 0;在很多情况下,开发者可以通过重写此方法,来实现自定义的advance时间处理机制。



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [phaser 功能] 推荐:

Phaser功能简述

- - 深入一点,你会更加快乐
    在JAVA 1.7引入了一个新的并发API:Phaser,一个可重用的同步barrier. 在此前,JAVA已经有CyclicBarrier、CountDownLatch这两种同步barrier,但是Phaser更加灵活,而且侧重于“重用”.     1、注册机制:与其他barrier不同的是,Phaser中的“注册的同步者(parties)”会随时间而变化,Phaser可以通过构造器初始化parties个数,也可以在Phaser运行期间随时加入(register)新的parties,以及在运行期间注销(deregister)parties.

BTrace功能

- - zzm
       在生产环境中可能经常遇到各种问题,定位问题需要获取程序运行时的数据信息,如方法参数、返回值、全局变量、堆栈信息等. 为了获取这些数据信息,我们可以 通过改写代码,增加日志信息的打印,再发布到生产环境. 通过这种方式,一方面将增大定位问题的成本和周期,对于紧急问题无法做到及时响应;另一方面重新部 署后环境可能已被破坏,很难重新问题的场景.

DTU 功能 - wilcolin

- - 博客园_首页
      DTU (Data Transfer unit)全称数据传输单元,是专门用于将串口数据转换为IP数据或将IP数据转换为串口数据通过无线通信网络进行传送的无线终端设备.       Winer WCTU 3121主要是运用于物联网通信行业的一种无线数据传输终端,是厦门为那通信科技有限公司自主开发的DTU系列产品之一,WCTU 3121是一款2G GPRS DTU产品,通过利用中国移动、中国联通的GPRS 2G网络为用户提供无线长距离的数据传输功能.

Android核心功能

- - 技术改变世界 创新驱动中国 - 《程序员》官网
Android功能模块的概况,就像看Android的“个人简历”一样,帮助我们对它的能力有整体上的认识,进而在应用开发之前可以更好地评估技术上的可能性和风险性. 每个Android开发者都会关心Android到底能够打造怎样的用户界面(User Interface,UI). Android界面框架中最有特色的部分是资源(Resource)和布局(Layout)体系,通过完善的控件库和简明的接口设计,开发者可以尽快搭建自己需要的界面.

Redis的AOF功能

- - CSDN博客数据库推荐文章
引言:  Redis是基于内存的数据库,同时也提供了若干持久化的方案,允许用户把内存中的数据,写入本地文件系统,以备下次重启或者当机之后继续使用. 本文将描述如何基于Redis来设置AOF功能. AOF是AppendOnly File的缩写,是Redis系统提供了一种记录Redis操作的持久化方案,在AOF生成的文件中,将忠实记录发生在Redis的操作,从而达到在Redis服务器重启或者当机之后,继续恢复之前数据状态的机制.

专注核心功能

- 小宇 - 互联网的那点事...
当我还小的时候,出了什么毛病都爱用风油精. 无论是虫叮蚊咬,晕车晕船还是感冒发烧,风油精都能派上用场. 因此当我颇为自豪的向我的小伙伴炫耀道“风油精什么都能治”的时候,他的一句“风油精什么都能治,什么都治不好”着实给我泼了一头冷水. 随着我逐渐长大,我遇到了更多“万能”的产品:能刮胡子能双卡双待能遥控电视的手机、能祛痘美白淡斑保湿去黑头的面膜、能交友能婚恋能看视频能做3D特效的网站等等.

Facebook推出怀旧功能

- Enlizh - 36氪
Facebook最近悄悄推出了怀旧功能,在页面右侧广告栏的上方会出现标题为“2010年的这一天”或“2009年的这一天”的一栏. 该栏中将会展示你在一两年的这天的状态更新,多出现在浏览相册的页面中. 点击“Show More” 可以查看更多之前的状态,就目前来看似乎最早只能回溯到2009年. 这也比较符合实际,毕竟在2007年至2008年左右Facebook还远没有现在这样火爆.

多功能贴心浴缸

- Quantum - 设计|生活|发现新鲜
这浴缸其实没什么特别,唯一的亮点就在这隔板上. 别小瞧这隔板,有了它,什么沐浴的瓶瓶罐罐,毛巾,衣物通通可以收纳. 甚至还可以放置咖啡浓茶来享受泡澡时间. 更特别的是,组装后还可以当换衣凳,防滑踏板等. 「设计,生活,发现新鲜」在新浪微博,更即时地获读更新,更直接地交流沟通. © 设计|生活|发现新鲜 | 原文链接 | 投稿 ! | 新浪微博 | 逛逛我们的在线商店.

专注核心功能

- 大宝 - 所有文章 - UCD大社区
当我还小的时候,出了什么毛病都爱用风油精. 无论是虫叮蚊咬,晕车晕船还是感冒发烧,风油精都能派上用场. 因此当我颇为自豪的向我的小伙伴炫耀道“风油精什么都能治”的时候,他的一句“风油精什么都能治,什么都治不好”着实给我泼了一头冷水. 随着我逐渐长大,我遇到了更多“万能”的产品:能刮胡子能双卡双待能遥控电视的手机、能祛痘美白淡斑保湿去黑头的面膜、能交友能婚恋能看视频能做3D特效的网站等等.

离线功能回归Gmail

- Myheimu - 驱动之家新闻_最新新闻
千呼万唤始出来──用这句话形容离线版Gmail、Google Docs 和 Google Calendar一点不为过,尤其是在Google一刀切取消了以前用于离线的Google Gears之后,大家就一直在等待一个离线版的应用解决方案,在Chromebook发布之后其意义更加重大. 今天Google终于宣布了可让Chrome实现离线的网页应用Gmail Offline ,该应用完全基于HTML5打造,界面则基于Gmail的平板界面(这是最让人不满的),在你没有网络连接的情况下,通过Chrome打开这个离线网页应用,即可管理自己的Gmail邮件了.