前言

成都创新互联公司专注于企业营销型网站建设、网站重做改版、久治网站定制设计、自适应品牌网站建设、H5页面制作、商城网站开发、集团公司官网建设、外贸营销网站建设、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为久治等各大城市提供网站开发制作服务。
JDK5中引入了CyclicBarrier和CountDownLatch这两个并发控制类,而JDK7中引入的Phaser按照官方的说法是提供了一个功能类似但是更加灵活的实现。接下来我们带着几个问题来研究一下Phaser与(CountDownLath、CyclicBarrier)到底有哪些类似,同时带来了哪些灵活性?
CyclicBarrier和CountDownLatch
CyclicBarrier介绍
在使用CyclicBarrier时,需要创建一个CyclicBarrier对象,构造函数需要一个整数作为参数,这个参数是一个“目标”,在CyclicBarrier对象创建后,内部会有一个计数器,初始值为0,CyclicBarrier对象的await方法每被调用一次,这个计数器就会加1,一旦这个计数器的值达到设定的“目标”,所有被CyclicBarrier.await阻塞住的线程都会继续执行。这个目标是固定的,一旦设定便不能修改。
举一个例子,假设有5个人爬香山,他们要爬到山顶,等到5个人到齐了再同时出发下山,那么我们要在山顶设定一个“目标”,同时还有一个计数器,这个目标就是5,每一个人到山顶后,这个人就要等待,同时计数器加1,等到5个人到齐了,也就是计数器达到了这个“目标”,所有等待的人就开始下山了。 更多内容请阅读《并发编程之CyclicBarrier原理与使用》
CountDownLathch介绍
使用CountDownLatch时,需要创建一个CountDownLatch对象,构造函数也需要一个整数作为参数,可以把这个参数想象成一个倒计时器,CountDownLatch对象本身是一个发令枪,所有调用CountDownLatch.await方法的线程都会等待发令枪的指令,一旦倒计时器为0,这些线程同时开始执行,而CountDownLatch.countDown方法就是为倒计时器减1。
更多内容请阅读《并发编程之CountDownLatch原理与使用》
对比分析
CyclicBarrier和CountDownLatch的共同点都是有一个目标和一个计数器,等到计数器达到目标后,所有阻塞的线程都将继续执行。它们的不同点是CyclicBarrier.await在等待的同时还修改计数器,而CountDownLatch.await只负责等待,CountDownLatch.countDown才修改计数器。
CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
CyclicBarrier的计数器更像一个阀门,需要所有线程都到达,然后继续执行,计数器递减,提供reset功能,可以多次使用。如下图:
Phaser是什么?
Phaser,翻译为移相器(阶段),它适用于这样一种场景,一个大任务可以分为多个阶段完成,且每个阶段的任务可以多个线程并发执行,但是必须上一个阶段的任务都完成了才可以执行下一个阶段的任务。
这种场景虽然使用CyclicBarrier 或者 CountDownLatch 也可以实现,但是要复杂的多,首先,具体需要多少个阶段是可能变的,其次,每个阶段的任务数也可能会变的。相比于CyclicBarrier 和 CountDownLath ,Phaser更加灵活更加方便。
Phaser使用方法
Phaser同时包含CyclicBarrier和CountDownLatch两个类的功能。
除了包含以上两个类的功能外,Phaser还提供了更大的灵活性。CyclicBarrier和CountdownLatch在构造函数指定目标后就无法修改,而Phaser提供了register和deregister方法可以对目标进行动态修改。
下面看一个最简单的使用案例:
- package com.niuh.tools;
 - import java.util.concurrent.Phaser;
 - /**
 - *
 - * Phaser示例
 - *
 - */
 - public class PhaserRunner {
 - // 定义每个阶段需要执行3个小任务
 - public static final int PARTIES = 3;
 - // 定义需要4个阶段完成的大任务
 - public static final int PHASES = 4;
 - public static void main(String[] args) {
 - Phaser phaser = new Phaser(PARTIES) {
 - @Override
 - protected boolean onAdvance(int phase, int registeredParties) {
 - System.out.println("==phase: " + phase + " finished==");
 - return super.onAdvance(phase, registeredParties);
 - }
 - };
 - for (int i = 0; i < PARTIES; i++) {
 - new Thread(() -> {
 - for (int j = 0; j < PHASES; j++) {
 - System.out.println(String.format("%s: phase: %d", Thread.currentThread().getName(), j));
 - phaser.arriveAndAwaitAdvance();
 - }
 - }, "Thread " + i).start();
 - }
 - }
 - }
 
这里我们定义个需要4个阶段完成的大任务,每个阶段需要3个小任务,针对这些小任务,我们分别起3个线程来执行这些小任务,查看输出结果为:
可以看到,每个阶段都是三个线程都完成来才进入下一个阶段。这是怎么实现的呢?
Phaser原理猜测
结合AQS的原理,大概猜测一下Phaser的实现原理:
结合上面的案例带入:初始时当前阶段为0,参与者为3个,未完成参与者数为3;
基于这样的一个思路,整体能说的通,至于是不是这样?让我们一起来看源码吧。
Phaser源码分析
主要API
内部类QNode
QNode用来跟踪当前线程的信息的。QNode被组织成单向链表的形式。用来管理是否阻塞或者被中断。
QNode继承自ForkJoinPool.ManagedBlocker。ForkJoinPool来管理是否阻塞和中断状态。这里只需要重写isReleasable和block。
- static final class QNode implements ForkJoinPool.ManagedBlocker {
 - final Phaser phaser;
 - final int phase;
 - final boolean interruptible;
 - final boolean timed;
 - boolean wasInterrupted;
 - long nanos;
 - final long deadline;
 - volatile Thread thread; // nulled to cancel wait
 - QNode next;
 - QNode(Phaser phaser, int phase, boolean interruptible,
 - boolean timed, long nanos) {
 - this.phaser = phaser;
 - this.phase = phase;
 - this.interruptible = interruptible;
 - this.nanos = nanos;
 - this.timed = timed;
 - this.deadline = timed ? System.nanoTime() + nanos : 0L;
 - thread = Thread.currentThread();
 - }
 - public boolean isReleasable() {
 - if (thread == null)
 - return true;
 - if (phaser.getPhase() != phase) {
 - thread = null;
 - return true;
 - }
 - if (Thread.interrupted())
 - wasInterrupted = true;
 - if (wasInterrupted && interruptible) {
 - thread = null;
 - return true;
 - }
 - if (timed) {
 - if (nanos > 0L) {
 - nanos = deadline - System.nanoTime();
 - }
 - if (nanos <= 0L) {
 - thread = null;
 - return true;
 - }
 - }
 - return false;
 - }
 - public boolean block() {
 - if (isReleasable())
 - return true;
 - else if (!timed)
 - LockSupport.park(this);
 - else if (nanos > 0L)
 - LockSupport.parkNanos(this, nanos);
 - return isReleasable();
 - }
 - }
 
整体代码比较简单。要注意的是在isReleasable中使用了thread=null来使得避免解锁任务。使用方法类似于internalAwaitAdvance中的用法。先完成的参与者放入队列中的节点,这里我们只需要关注 thread 和 next两个属性即可,很明显这是一个单链表,存储这入队的线程。
主要属性
- /*
 - * unarrived -- 还没有抵达屏障的参与者的个数 (bits 0-15)
 - * parties -- 需要等待的参与者的个数 (bits 16-31)
 - * phase -- 屏障所处的阶段 (bits 32-62)
 - * terminated -- 屏障的结束标记 (bit 63 / sign)
 - */
 - // 状态变量,用于存储当前阶段phase、参与者数parties、未完成的参与者数unarrived_count
 - private volatile long state;
 - // 最多可以有多少个参与者,即每个阶段最多有多少个任务
 - private static final int MAX_PARTIES = 0xffff;
 - // 最多可以有多少阶段
 - private static final int MAX_PHASE = Integer.MAX_VALUE;
 - // 参与者数量的偏移量
 - private static final int PARTIES_SHIFT = 16;
 - // 当前阶段的偏移量
 - private static final int PHASE_SHIFT = 32;
 - // 未完成的参与者数的掩码,低16位
 - private static final int UNARRIVED_MASK = 0xffff; // to mask ints
 - // 参与者数,中间16位
 - private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
 - // counts的掩码,counts等于参与者数和未完成的参与者数的 '|' 操作
 - private static final long COUNTS_MASK = 0xffffffffL;
 - private static final long TERMINATION_BIT = 1L << 63;
 - // 一次一个参与者完成
 - private static final int ONE_ARRIVAL = 1;
 - // 增加减少参与者时使用
 - private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
 - // 减少参与者时使用
 - private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
 - // 没有参与者使用
 - private static final int EMPTY = 1;
 - // 用于求未完成参与者数量
 - private static int unarrivedOf(long s) {
 - int counts = (int)s;
 - return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
 - }
 - // 用于求参与者数量(中间16位),注意int的为止
 - private static int partiesOf(long s) {
 - return (int)s >>> PARTIES_SHIFT;
 - }
 - // 用于求阶段数(高32位),注意int的位置
 - private static int phaseOf(long s) {
 - return (int)(s >>> PHASE_SHIFT);
 - }
 - // 已完成参与者数量
 - private static int arrivedOf(long s) {
 - int counts = (int)s;
 - return (counts == EMPTY) ? 0 :
 - (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
 - }
 - /**
 - * The parent of this phaser, or null if none
 - */
 - private final Phaser parent;
 - /**
 - * The root of phaser tree. Equals this if not in a tree.
 - */
 - private final Phaser root;
 - // 用于存储已经=完成参与者所在的线程,根据当前阶段的奇偶性选择不同的队列
 - private final AtomicReference
 evenQ; - private final AtomicReference
 oddQ; 
主要属性位 state 和 evenQ 及 oddQ
如果是空状态,也就是没有子阶段注册的初始阶段。这里用一个EMPTY状态表示,也就是0个子阶段和一个未到达阶段。
所有的状态变化都是通过CAS操作执行的,唯一例外是注册一个子相移器(sub-Phaser),用于构成树的,也就是Phaser的父Phaser非空。这个子相移器的分阶段是通过一个内置锁来设置。
构造方法
- public Phaser() {
 - this(null, 0);
 - }
 - public Phaser(int parties) {
 - this(null, parties);
 - }
 - public Phaser(Phaser parent) {
 - this(parent, 0);
 - }
 - public Phaser(Phaser parent, int parties) {
 - if (parties >>> PARTIES_SHIFT != 0)
 - throw new IllegalArgumentException("Illegal number of parties");
 - int phase = 0;
 - this.parent = parent;
 - if (parent != null) { // 父phaser不为空
 - final Phaser root = parent.root;
 - this.root = root; // 指向root phaser
 - this.evenQ = root.evenQ; // 两个栈,整个phaser链只有一份
 - this.oddQ = root.oddQ;
 - if (parties != 0)
 - phase = parent.doRegister(1); // 向父phaser注册当前线程
 - }
 - else {
 - this.root = this; // 否则,自己是root phaser
 - this.evenQ = new AtomicReference
 (); // 负责创建两个栈(QNode链) - this.oddQ = new AtomicReference
 (); - }
 - // 状态变量state的存储分为三段
 - this.state = (parties == 0) ? (long)EMPTY :
 - ((long)phase << PHASE_SHIFT) |
 - ((long)parties << PARTIES_SHIFT) |
 - ((long)parties);
 - }
 
构造函数中还有一个parent和root,这是用来构造多层级阶段的,用于构成树的。
重点还是还是看state的赋值方式,高32位存储当前阶段phase,中间16位存储参与者的数量,低16位存储未完成参与者的数量。
主要方法
下面我们一起来看看几个主要方法的源码,重点是三个private的核心方法:doArrive、doRegister、reconcileState
register方法
增加一个参与者,需要同时增加parties和unarrived两个数值,也就是state中的16位和低16位(中间16位存储参与者的数量,低16位存储未完成参与者的数量)
- public int register() {
 - return doRegister(1);
 - }
 
这里主要调用的是doRegister方法,我们往下看。
doRegister方法
- private int doRegister(int registrations) {
 - // adjustment to state
 - // state应该加的值,注意这里是相当于同时增加parties和unarrived
 - long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; //计算出需要调整的量
 - final Phaser parent = this.parent; //查看可能存在的相移器
 - int phase;
 - for (;;) {
 - // state的值
 - long s = (parent == null) ? state : reconcileState(); // reconcileState()方法是调整当前phaser的状态与root的一致
 - // state的低32未,也就是parties和unarrived的值
 - int counts = (int)s;
 - // parties的值
 - int parties = counts >>> PARTIES_SHIFT;
 - // unarrived的值
 - int unarrived = counts & UNARRIVED_MASK;
 - // 检查是否溢出
 - if (registrations > MAX_PARTIES - parties) //如果需要注册的数量超过运行注册的最大值,则抛出异常状态异常
 - throw new IllegalStateException(badRegister(s));
 - // 当前阶段phase
 - phase = (int)(s >>> PHASE_SHIFT);
 - if (phase < 0) //如果当前状态为终止状态则跳出循环直接退出
 - break;
 - // 不是第一个参与者
 - if (counts != EMPTY) { // not 1st registration //如果当前状态不是第一次注册线程
 - if (parent == null || reconcileState() == s) { //如果当相移器的父相移器为空,则直接信息CAS,如果当前相移器部位空则调用reconcileState处理,这个稍后再看。reconcileState这里主要为了防止出现同步性错误。
 - // unarrived等于0说明当前阶段正在执行onAdvance()方法,等待其执行完毕
 - if (unarrived == 0) // wait out advance
 - root.internalAwaitAdvance(phase, null);
 - // 否则就修改state的值,增加adjust,如果成功就跳出循环
 - else if (UNSAFE.compareAndSwapLong(this, stateOffset,
 - s, s + adjust))
 - break;
 - }
 - }
 - // 是第一个参与者,当前状态是第一次注册。如果如果当前相移器没有父相移器。则直接进行CAS
 - else if (parent == null) { // 1st root registration
 - // 计算state的值
 - long next = ((long)phase << PHASE_SHIFT) | adjust;
 - // 修改state的值,如果成功就跳出循环
 - if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
 - break;
 - }
 - else { // 如果当前是第一次设置,并且该相移器被组织在一个树中则需要考虑一下,则需要使用内置锁来进如
 - // 多层级阶段的处理方式
 - synchronized (this) { // 1st sub registration
 - if (state == s) { // recheck under lock 这里有可能发生竞争。所以这里还需要检查一下,如果失败则需退出同步区重新尝试进入。
 - phase = parent.doRegister(1); // 调用其父相移器的注册方法
 - if (phase < 0)
 - break;
 - // finish registration whenever parent registration
 - // succeeded, even when racing with termination,
 - // since these are part of the same "transaction".
 - while (!UNSAFE.compareAndSwapLong
 - (this, stateOffset, s,
 - ((long)phase << PHASE_SHIFT) | adjust)) {
 - s = state;
 - phase = (int)(root.state >>> PHASE_SHIFT);
 - // assert (int)s == EMPTY;
 - }
 - break;
 - }
 - }
 - }
 - }
 - return phase;
 - }
 
增加一个参与者的总体的逻辑为:
arriveAndAwaitAdvance()方法
当前线程当前阶段执行完毕,等待其他线程完成当前阶段。 如果当前线程是该阶段最后一个到达的,则当前线程会执行onAdvance()方法,并唤醒其它线程进入下一个阶段。
- public int arriveAndAwaitAdvance() {
 - // Specialization of doArrive+awaitAdvance eliminating some reads/paths
 - final Phaser root = this.root;
 - for (;;) {
 - // state的值
 - long s = (root == this) ? state : reconcileState();
 - // 当前阶段
 - int phase = (int)(s >>> PHASE_SHIFT);
 - if (phase < 0)
 - return phase;
 - // parties 和 unarrived的值
 - int counts = (int)s;
 - // unarrived的值(state的低16位)
 - int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
 - if (unarrived <= 0)
 - throw new IllegalStateException(badArrive(s));
 - // 修改state的值
 - if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
 - s -= ONE_ARRIVAL)) {
 - // 如果不是最后一个到达的,则调用internalAwaitAdvance()方法自旋或进入队列等待
 - if (unarrived > 1)
 - // 这里是直接返回了,internalAwaitAdvance()方法的源码见register()方法解析
 - return root.internalAwaitAdvance(phase, null);
 - // 到这里说明是最后一个到达的参与者
 - if (root != this)
 - return parent.arriveAndAwaitAdvance();
 - // n 只保留了state中parties的部分,也就是中16位
 - long n = s & PARTIES_MASK; // base of next state
 - // parties的值,即下一次需要到达的参与者数量
 - int nextUnarrived = (int)n >>> PARTIES_SHIFT;
 - // 执行onAdvance()方法,返回true表示下一阶段参与者数量为0了,也就是结束了
 - if (onAdvance(phase, nextUnarrived))
 - n |= TERMINATION_BIT;
 - else if (nextUnarrived == 0)
 - n |= EMPTY;
 - else
 - n |= nextUnarrived; // n加上unarrived的值
 - // 下阶段等待当前阶段加1
 - int nextPhase = (phase + 1) & MAX_PHASE;
 - // n 加上下一个阶段的值
 - n |= (long)nextPhase << PHASE_SHIFT;
 - // 修改state的值为n
 - if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
 - return (int)(state >>> PHASE_SHIFT); // terminated
 - // 唤醒其它参与者并进入下一个阶段
 - releaseWaiters(phase);
 - // 返回下一阶段的值
 - return nextPhase;
 - }
 - }
 - }
 
arriveAndAwaitAdvance的大致逻辑为:
internalAwaitAdvance方法
internalAwaitAdvance方法。实际上Phaser中阻塞都是通过这个语句实现的。这个语句必须通过根相移器调用。换句话说所有的阻塞都是在根相移器阻塞的。
输入参数中phase是需要阻塞的阶段。node是用来跟踪可能中断的阻塞节点。
- // 等待onAdvance()方法执行完毕
 - // 原理是先自旋一定次数,如果进入下一个阶段,这个方法直接返回了,
 - // 如果自旋一定次数还没有进入下一个阶段,则当前线程入队列,等待onAdvance()执行完成唤醒
 - private int internalAwaitAdvance(int phase, QNode node) {
 - // assert root == this;
 - // 保证队列为空
 - releaseWaiters(phase-1); // ensure old queue clean
 - boolean queued = false; // true when node is enqueued
 - int lastUnarrived = 0; // to increase spins upon change
 - // 自旋的次数
 - int spins = SPINS_PER_ARRIVAL;
 - long s;
 - int p;
 - // 检查当前阶段是否变化,如果变化了说明进入下一个阶段了,这时候就没有必要自旋了
 - while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
 - // 如果node为空,注册的时候传入的为空
 - if (node == null) { // spinning in noninterruptible mode
 - // 未完成的参与者数量
 - int unarrived = (int)s & UNARRIVED_MASK;
 - // unarrived 有变化,增加自旋次数
 - if (unarrived != lastUnarrived &&
 - (lastUnarrived = unarrived) < NCPU)
 - spins += SPINS_PER_ARRIVAL;
 - boolean interrupted = Thread.interrupted();
 - // 自旋次数万了,则新建一个节点
 - if (interrupted || --spins < 0) { // need node to record intr
 - node = new QNode(this, phase, false, false, 0L);
 - node.wasInterrupted = interrupted;
 当前题目:并发编程之Phaser原理与应用
转载来于:http://www.csdahua.cn/qtweb/news1/273201.html网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网