CountDownLatch是多线程中一个比较重要的概念,它可以使得一个或多个线程等待其他线程执行完毕之后再执行。它内部有一个计数器和一个阻塞队列,每当一个线程调用countDown()方法后,计数器的值减少1。当计数器的值不为0时,调用await()方法的线程将会被加入到阻塞队列,一直阻塞到计数器的值为0。

目前创新互联已为近1000家的企业提供了网站建设、域名、网站空间、成都网站托管、企业网站设计、南山网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
- public class CountDownLatch {
 - //构造一个值为count的计数器
 - public CountDownLatch(int count);
 - //阻塞当前线程直到计数器为0
 - public void await() throws InterruptedException;
 - //在单位为unit的timeout时间之内阻塞当前线程
 - public boolean await(long timeout, TimeUnit unit);
 - //将计数器的值减1,当计数器的值为0时,阻塞队列内的线程才可以运行
 - public void countDown();
 - }
 
下面给一个简单的示例:
- package com.yang.testCountDownLatch;
 - import java.util.concurrent.CountDownLatch;
 - public class Main {
 - private static final int NUM = 3;
 - public static void main(String[] args) throws InterruptedException {
 - CountDownLatch latch = new CountDownLatch(NUM);
 - for (int i = 0; i < NUM; i++) {
 - new Thread(() -> {
 - try {
 - Thread.sleep(2000);
 - System.out.println(Thread.currentThread().getName() + "运行完毕");
 - } catch (InterruptedException e) {
 - e.printStackTrace();
 - } finally {
 - latch.countDown();
 - }
 - }).start();
 - }
 - latch.await();
 - System.out.println("主线程运行完毕");
 - }
 - }
 
输出如下:
看得出来,主线程会等到3个子线程执行完毕才会执行。
可以看得出来,CountDownLatch里面有一个继承AQS的内部类Sync,其实是AQS来支持CountDownLatch的各项操作的。
new CountDownLatch(int count)用来创建一个AQS同步队列,并将计数器的值赋给了AQS的state。
- public CountDownLatch(int count) {
 - if (count < 0) throw new IllegalArgumentException("count < 0");
 - this.sync = new Sync(count);
 - }
 - private static final class Sync extends AbstractQueuedSynchronizer {
 - Sync(int count) {
 - setState(count);
 - }
 - }
 
countDown()方法会对计数器进行减1的操作,当计数器值为0时,将会唤醒在阻塞队列中等待的所有线程。其内部调用了Sync的releaseShared(1)方法
- public void countDown() {
 - sync.releaseShared(1);
 - }
 - public final boolean releaseShared(int arg) {
 - if (tryReleaseShared(arg)) {
 - //此时计数器的值为0,唤醒所有被阻塞的线程
 - doReleaseShared();
 - return true;
 - }
 - return false;
 - }
 
tryReleaseShared(arg)内部使用了自旋+CAS操将计数器的值减1,当减为0时,方法返回true,将会调用doReleaseShared()方法。对CAS机制不了解的同学,可以先参考我的另外一篇文章浅探CAS实现原理
- protected boolean tryReleaseShared(int releases) {
 - //自旋
 - for (;;) {
 - int c = getState();
 - if (c == 0)
 - //此时计数器的值已经为0了,其他线程早就执行完毕了,当前线程也已经再执行了,不需要再次唤醒了
 - return false;
 - int nextc = c-1;
 - //使用CAS机制,将state的值变为state-1
 - if (compareAndSetState(c, nextc))
 - return nextc == 0;
 - }
 - }
 
doReleaseShared()是AQS中的方法,该方法会唤醒队列中所有被阻塞的线程。
- private void doReleaseShared() {
 - for (;;) {
 - Node h = head;
 - if (h != null && h != tail) {
 - int ws = h.waitStatus;
 - if (ws == Node.SIGNAL) {
 - if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 - continue; // loop to recheck cases
 - unparkSuccessor(h);
 - }
 - else if (ws == 0 &&
 - !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
 - continue; // loop on failed CAS
 - }
 - if (h == head) // loop if head changed
 - break;
 - }
 - }
 
这段方法比较难理解,会另外篇幅介绍。这里只要认为该段方法会唤醒所有因调用await()方法而阻塞的线程。
当计数器的值不为0时,该方法会将当前线程加入到阻塞队列中,并把当前线程挂起。
- public void await() throws InterruptedException {
 - sync.acquireSharedInterruptibly(1);
 - }
 
同样是委托内部类Sync,调用其
acquireSharedInterruptibly()方法
- public final void acquireSharedInterruptibly(int arg)
 - throws InterruptedException {
 - if (Thread.interrupted())
 - throw new InterruptedException();
 - if (tryAcquireShared(arg) < 0)
 - doAcquireSharedInterruptibly(arg);
 - }
 
接着看Sync内的tryAcquireShared()方法,如果当前计数器的值为0,则返回1,最终将导致await()不会将线程阻塞。如果当前计数器的值不为0,则返回-1。
- protected int tryAcquireShared(int acquires) {
 - return (getState() == 0) ? 1 : -1;
 - }
 
tryAcquireShared方法返回一个负值时,将会调用AQS中的
doAcquireSharedInterruptibly()方法,将调用await()方法的线程加入到阻塞队列中,并将此线程挂起。
- private void doAcquireSharedInterruptibly(int arg)
 - throws InterruptedException {
 - //将当前线程构造成一个共享模式的节点,并加入到阻塞队列中
 - final Node node = addWaiter(Node.SHARED);
 - boolean failed = true;
 - try {
 - for (;;) {
 - final Node p = node.predecessor();
 - if (p == head) {
 - int r = tryAcquireShared(arg);
 - if (r >= 0) {
 - setHeadAndPropagate(node, r);
 - p.next = null; // help GC
 - failed = false;
 - return;
 - }
 - }
 - if (shouldParkAfterFailedAcquire(p, node) &&
 - parkAndCheckInterrupt())
 - throw new InterruptedException();
 - }
 - } finally {
 - if (failed)
 - cancelAcquire(node);
 - }
 - }
 
同样,以上的代码位于AQS中,在没有了解AQS结构的情况下去理解上述代码,有些困难,关于AQS源码,会另开篇幅介绍。
CountDownLatch的使用场景很广泛,一般用于分头做某些事,再汇总的情景。例如:
数据报表:当前的微服务架构十分流行,大多数项目都会被拆成若干的子服务,那么报表服务在进行统计时,需要向各个服务抽取数据。此时可以创建与服务数相同的线程数,交由线程池处理,每个线程去对应服务中抽取数据,注意需要在finally语句块中进行countDown()操作。主线程调用await()阻塞,直到所有数据抽取成功,最后主线程再进行对数据的过滤组装等,形成直观的报表。
风险评估:客户端的一个同步请求查询用户的风险等级,服务端收到请求后会请求多个子系统获取数据,然后使用风险评估规则模型进行风险评估。如果使用单线程去完成这些操作,这个同步请求超时的可能性会很大,因为服务端请求多个子系统是依次排队的,请求子系统获取数据的时间是线性累加的。此时可以使用CountDownLatch,让多个线程并发请求多个子系统,当获取到多个子系统数据之后,再进行风险评估,这样请求子系统获取数据的时间就等于最耗时的那个请求的时间,可以大大减少处理时间。
                标题名称:一篇带给你CountDownLatch实现原理
                
                网页路径:http://www.csdahua.cn/qtweb/news17/65317.html
            
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网