主线程等待子线程执行结束再执行—— CountDown

作者:杏彩彩票app下载

Java并发系列[3]----AbstractQueuedSynchronizer源码分析之共享模式,

通过上一篇的分析,我们知道了独占模式获取锁有三种方式,分别是不响应线程中断获取,响应线程中断获取,设置超时时间获取。在共享模式下获取锁的方式也是这三种,而且基本上都是大同小异,我们搞清楚了一种就能很快的理解其他的方式。虽然说AbstractQueuedSynchronizer源码有一千多行,但是重复的也比较多,所以读者不要刚开始的时候被吓到,只要耐着性子去看慢慢的自然能够渐渐领悟。就我个人经验来说,阅读AbstractQueuedSynchronizer源码有几个比较关键的地方需要弄明白,分别是独占模式和共享模式的区别,结点的等待状态,以及对条件队列的理解。理解了这些要点那么后续源码的阅读将会轻松很多。当然这些在我的《Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析》这篇文章里都有详细的介绍,读者可以先去查阅。本篇对于共享模式的分析也是分为三种获取锁的方式和一种释放锁的方式。

  1. 不响应线程中断的获取

    1 //以不可中断模式获取锁(共享模式) 2 public final void acquireShared(int arg) { 3 //1.尝试去获取锁 4 if (tryAcquireShared(arg) < 0) { 5 //2.如果获取失败就进入这个方法 6 doAcquireShared(arg); 7 } 8 } 9 10 //尝试去获取锁(共享模式) 11 //负数:表示获取失败 12 //零值:表示当前结点获取成功, 但是后继结点不能再获取了 13 //正数:表示当前结点获取成功, 并且后继结点同样可以获取成功 14 protected int tryAcquireShared(int arg) { 15 throw new UnsupportedOperationException(); 16 }

调用acquireShared方法是不响应线程中断获取锁的方式。在该方法中,首先调用tryAcquireShared去尝试获取锁,tryAcquireShared方法返回一个获取锁的状态,这里AQS规定了返回状态若是负数代表当前结点获取锁失败,若是0代表当前结点获取锁成功,但后继结点不能再获取了,若是正数则代表当前结点获取锁成功,并且这个锁后续结点也同样可以获取成功。子类在实现tryAcquireShared方法获取锁的逻辑时,返回值需要遵守这个约定。如果调用tryAcquireShared的返回值小于0,就代表这次尝试获取锁失败了,接下来就调用doAcquireShared方法将当前线程添加进同步队列。我们看到doAcquireShared方法。

 1 //在同步队列中获取(共享模式)
 2 private void doAcquireShared(int arg) {
 3     //添加到同步队列中
 4     final Node node = addWaiter(Node.SHARED);
 5     boolean failed = true;
 6     try {
 7         boolean interrupted = false;
 8         for (;;) {
 9             //获取当前结点的前继结点
10             final Node p = node.predecessor();
11             //如果前继结点为head结点就再次尝试去获取锁
12             if (p == head) {
13                 //再次尝试去获取锁并返回获取状态
14                 //r < 0, 表示获取失败
15                 //r = 0, 表示当前结点获取成功, 但是后继结点不能再获取了
16                 //r > 0, 表示当前结点获取成功, 并且后继结点同样可以获取成功
17                 int r = tryAcquireShared(arg);
18                 if (r >= 0) {
19                     //到这里说明当前结点已经获取锁成功了, 此时它会将锁的状态信息传播给后继结点
20                     setHeadAndPropagate(node, r);
21                     p.next = null;
22                     //如果在线程阻塞期间收到中断请求, 就在这一步响应该请求
23                     if (interrupted) {
24                         selfInterrupt();
25                     }
26                     failed = false;
27                     return;
28                 }
29             }
30             //每次获取锁失败后都会判断是否可以将线程挂起, 如果可以的话就会在parkAndCheckInterrupt方法里将线程挂起
31             if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
32                 interrupted = true;
33             }
34         }
35     } finally {
36         if (failed) {
37             cancelAcquire(node);
38         }
39     }
40 }

进入doAcquireShared方法首先是调用addWaiter方法将当前线程包装成结点放到同步队列尾部。这个添加结点的过程我们在讲独占模式时讲过,这里就不再讲了。结点进入同步队列后,如果它发现在它前面的结点就是head结点,因为head结点的线程已经获取锁进入房间里面了,那么下一个获取锁的结点就轮到自己了,所以当前结点先不会将自己挂起,而是再一次去尝试获取锁,如果前面那人刚好释放锁离开了,那么当前结点就能成功获得锁,如果前面那人还没有释放锁,那么就会调用shouldParkAfterFailedAcquire方法,在这个方法里面会将head结点的状态改为SIGNAL,只有保证前面结点的状态为SIGNAL,当前结点才能放心的将自己挂起,所有线程都会在parkAndCheckInterrupt方法里面被挂起。如果当前结点恰巧成功的获取了锁,那么接下来就会调用setHeadAndPropagate方法将自己设置为head结点,并且唤醒后面同样是共享模式的结点。下面我们看下setHeadAndPropagate方法具体的操作。

 1 //设置head结点并传播锁的状态(共享模式)
 2 private void setHeadAndPropagate(Node node, int propagate) {
 3     Node h = head;
 4     //将给定结点设置为head结点
 5     setHead(node);
 6     //如果propagate大于0表明锁可以获取了
 7     if (propagate > 0 || h == null || h.waitStatus < 0) {
 8         //获取给定结点的后继结点
 9         Node s = node.next;
10         //如果给定结点的后继结点为空, 或者它的状态是共享状态
11         if (s == null || s.isShared()) {
12             //唤醒后继结点
13             doReleaseShared();
14         }
15     }
16 }
17 
18 //释放锁的操作(共享模式)
19 private void doReleaseShared() {
20     for (;;) {
21         //获取同步队列的head结点
22         Node h = head;
23         if (h != null && h != tail) {
24             //获取head结点的等待状态
25             int ws = h.waitStatus;
26             //如果head结点的状态为SIGNAL, 表明后面有人在排队
27             if (ws == Node.SIGNAL) {
28                 //先把head结点的等待状态更新为0
29                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
30                     continue;
31                 }
32                 //再去唤醒后继结点
33                 unparkSuccessor(h);
34              //如果head结点的状态为0, 表明此时后面没人在排队, 就只是将head状态修改为PROPAGATE
35             }else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
36                 continue;
37             }
38         }
39         //只有保证期间head结点没被修改过才能跳出循环
40         if (h == head) {
41             break;
42         }
43     }
44 }

调用setHeadAndPropagate方法首先将自己设置成head结点,然后再根据传入的tryAcquireShared方法的返回值来决定是否要去唤醒后继结点。前面已经讲到当返回值大于0就表明当前结点成功获取了锁,并且后面的结点也可以成功获取锁。这时当前结点就需要去唤醒后面同样是共享模式的结点,注意,每次唤醒仅仅只是唤醒后一个结点,如果后一个结点不是共享模式的话,当前结点就直接进入房间而不会再去唤醒更后面的结点了。共享模式下唤醒后继结点的操作是在doReleaseShared方法进行的,共享模式和独占模式的唤醒操作基本也是相同的,都是去找到自己座位上的牌子(等待状态),如果牌子上为SIGNAL表明后面有人需要让它帮忙唤醒,如果牌子上为0则表明队列此时并没有人在排队。在独占模式下是如果发现没人在排队就直接离开队列了,而在共享模式下如果发现队列后面没人在排队,当前结点在离开前仍然会留个小纸条(将等待状态设置为PROPAGATE)告诉后来的人这个锁的可获取状态。那么后面来的人在尝试获取锁的时候可以根据这个状态来判断是否直接获取锁。

  1. 响应线程中断的获取

    1 //以可中断模式获取锁(共享模式) 2 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { 3 //首先判断线程是否中断, 如果是则抛出异常 4 if (Thread.interrupted()) { 5 throw new InterruptedException(); 6 } 7 //1.尝试去获取锁 8 if (tryAcquireShared(arg) < 0) { 9 //2. 如果获取失败则进人该方法 10 doAcquireSharedInterruptibly(arg); 11 } 12 } 13 14 //以可中断模式获取(共享模式) 15 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { 16 //将当前结点插入同步队列尾部 17 final Node node = addWaiter(Node.SHARED); 18 boolean failed = true; 19 try { 20 for (;;) { 21 //获取当前结点的前继结点 22 final Node p = node.predecessor(); 23 if (p == head) { 24 int r = tryAcquireShared(arg); 25 if (r >= 0) { 26 setHeadAndPropagate(node, r); 27 p.next = null; 28 failed = false; 29 return; 30 } 31 } 32 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { 33 //如果线程在阻塞过程中收到过中断请求, 那么就会立马在这里抛出异常 34 throw new InterruptedException(); 35 } 36 } 37 } finally { 38 if (failed) { 39 cancelAcquire(node); 40 } 41 } 42 }

响应线程中断获取锁的方式和不响应线程中断获取锁的方式在流程上基本是相同的,唯一的区别就是在哪里响应线程的中断请求。在不响应线程中断获取锁时,线程从parkAndCheckInterrupt方法中被唤醒,唤醒后就立马返回是否收到中断请求,即使是收到了中断请求也会继续自旋直到获取锁后才响应中断请求将自己给挂起。而响应线程中断获取锁会才线程被唤醒后立马响应中断请求,如果在阻塞过程中收到了线程中断就会立马抛出InterruptedException异常。

  1. 设置超时时间的获取

    1 //以限定超时时间获取锁(共享模式) 2 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { 3 if (Thread.interrupted()) { 4 throw new InterruptedException(); 5 } 6 //1.调用tryAcquireShared尝试去获取锁 7 //2.如果获取失败就调用doAcquireSharedNanos 8 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); 9 } 10 11 //以限定超时时间获取锁(共享模式) 12 private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { 13 long lastTime = System.nanoTime(); 14 final Node node = addWaiter(Node.SHARED); 15 boolean failed = true; 16 try { 17 for (;;) { 18 //获取当前结点的前继结点 19 final Node p = node.predecessor(); 20 if (p == head) { 21 int r = tryAcquireShared(arg); 22 if (r >= 0) { 23 setHeadAndPropagate(node, r); 24 p.next = null; 25 failed = false; 26 return true; 27 } 28 } 29 //如果超时时间用完了就结束获取, 并返回失败信息 30 if (nanosTimeout <= 0) { 31 return false; 32 } 33 //1.检查是否满足将线程挂起要求(保证前继结点状态为SIGNAL) 34 //2.检查超时时间是否大于自旋时间 35 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) { 36 //若满足上面两个条件就将当前线程挂起一段时间 37 LockSupport.parkNanos(this, nanosTimeout); 38 } 39 long now = System.nanoTime(); 40 //超时时间每次减去获取锁的时间 41 nanosTimeout -= now - lastTime; 42 lastTime = now; 43 //如果在阻塞时收到中断请求就立马抛出异常 44 if (Thread.interrupted()) { 45 throw new InterruptedException(); 46 } 47 } 48 } finally { 49 if (failed) { 50 cancelAcquire(node); 51 } 52 } 53 }

如果看懂了上面两种获取方式,再来看设置超时时间的获取方式就会很轻松,基本流程都是一样的,主要是理解超时的机制是怎样的。如果第一次获取锁失败会调用doAcquireSharedNanos方法并传入超时时间,进入方法后会根据情况再次去获取锁,如果再次获取失败就要考虑将线程挂起了。这时会判断超时时间是否大于自旋时间,如果是的话就会将线程挂起一段时间,否则就继续尝试获取,每次获取锁之后都会将超时时间减去获取锁的时间,一直这样循环直到超时时间用尽,如果还没有获取到锁的话就会结束获取并返回获取失败标识。在整个期间线程是响应线程中断的。

  1. 共享模式下结点的出队操作

    1 //释放锁的操作(共享模式) 2 public final boolean releaseShared(int arg) { 3 //1.尝试去释放锁 4 if (tryReleaseShared(arg)) { 5 //2.如果释放成功就唤醒其他线程 6 doReleaseShared(); 7 return true; 8 } 9 return false; 10 } 11 12 //尝试去释放锁(共享模式) 13 protected boolean tryReleaseShared(int arg) { 14 throw new UnsupportedOperationException(); 15 } 16 17 //释放锁的操作(共享模式) 18 private void doReleaseShared() { 19 for (;;) { 20 //获取同步队列的head结点 21 Node h = head; 22 if (h != null && h != tail) { 23 //获取head结点的等待状态 24 int ws = h.waitStatus; 25 //如果head结点的状态为SIGNAL, 表明后面有人在排队 26 if (ws == Node.SIGNAL) { 27 //先把head结点的等待状态更新为0 28 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) { 29 continue; 30 } 31 //再去唤醒后继结点 32 unparkSuccessor(h); 33 //如果head结点的状态为0, 表明此时后面没人在排队, 就只是将head状态修改为PROPAGATE 34 }else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) { 35 continue; 36 } 37 } 38 //只有保证期间head结点没被修改过才能跳出循环 39 if (h == head) { 40 break; 41 } 42 } 43 }

线程在房间办完事之后就会调用releaseShared方法释放锁,首先调用tryReleaseShared方法尝试释放锁,该方法的判断逻辑由子类实现。如果释放成功就调用doReleaseShared方法去唤醒后继结点。走出房间后它会找到原先的座位(head结点),看看座位上是否有人留了小纸条(状态为SIGNAL),如果有就去唤醒后继结点。如果没有(状态为0)就代表队列没人在排队,那么在离开之前它还要做最后一件事情,就是在自己座位上留下小纸条(状态设置为PROPAGATE),告诉后面的人锁的获取状态,整个释放锁的过程和独占模式唯一的区别就是在这最后一步操作。

注:以上全部分析基于JDK1.7,不同版本间会有差异,读者需要注意

通过上一篇的分析,我们知道了独占模式获取锁有三种方式,分别是不响应线...

Java并发系列[2]----AbstractQueuedSynchronizer源码分析之独占模式,

在上一篇《Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析》中我们介绍了AbstractQueuedSynchronizer基本的一些概念,主要讲了AQS的排队区是怎样实现的,什么是独占模式和共享模式以及如何理解结点的等待状态。理解并掌握这些内容是后续阅读AQS源码的关键,所以建议读者先看完我的上一篇文章再回过头来看这篇就比较容易理解。在本篇中会介绍在独占模式下结点是怎样进入同步队列排队的,以及离开同步队列之前会进行哪些操作。AQS为在独占模式和共享模式下获取锁分别提供三种获取方式:不响应线程中断获取,响应线程中断获取,设置超时时间获取。这三种方式整体步骤大致是相同的,只有少部分不同的地方,所以理解了一种方式再看其他方式的实现都是大同小异。在本篇中我会着重讲不响应线程中断的获取方式,其他两种方式也会顺带讲一下不一致的地方。

  1. 怎样以不响应线程中断获取锁?

    1 //不响应中断方式获取(独占模式) 2 public final void acquire(int arg) { 3 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { 4 selfInterrupt(); 5 } 6 }

上面代码中虽然看起来简单,但是它按照顺序执行了下图所示的4个步骤。下面我们会逐个步骤进行演示分析。

图片 1

第一步:!tryAcquire(arg)

1 //尝试去获取锁(独占模式)
2 protected boolean tryAcquire(int arg) {
3     throw new UnsupportedOperationException();
4 }

这时候来了一个人,他首先尝试着去敲了敲门,如果发现门没锁(tryAcquire(arg)=true),那就直接进去了。如果发现门锁了(tryAcquire(arg)=false),就执行下一步。这个tryAcquire方法决定了什么时候锁是开着的,什么时候锁是关闭的。这个方法必须要让子类去覆盖,重写里面的判断逻辑。

第二步:addWaiter(Node.EXCLUSIVE)

 1 //将当前线程包装成结点并添加到同步队列尾部
 2 private Node addWaiter(Node mode) {
 3     //指定持有锁的模式
 4     Node node = new Node(Thread.currentThread(), mode);
 5     //获取同步队列尾结点引用
 6     Node pred = tail;
 7     //如果尾结点不为空, 表明同步队列已存在结点
 8     if (pred != null) {
 9         //1.指向当前尾结点
10         node.prev = pred;
11         //2.设置当前结点为尾结点
12         if (compareAndSetTail(pred, node)) {
13             //3.将旧的尾结点的后继指向新的尾结点
14             pred.next = node;
15             return node;
16         }
17     }
18     //否则表明同步队列还没有进行初始化
19     enq(node);
20     return node;
21 }
22 
23 //结点入队操作
24 private Node enq(final Node node) {
25     for (;;) {
26         //获取同步队列尾结点引用
27         Node t = tail;
28         //如果尾结点为空说明同步队列还没有初始化
29         if (t == null) {
30             //初始化同步队列
31             if (compareAndSetHead(new Node())) {
32                 tail = head;
33             }
34         } else {
35             //1.指向当前尾结点
36             node.prev = t;
37             //2.设置当前结点为尾结点
38             if (compareAndSetTail(t, node)) {
39                 //3.将旧的尾结点的后继指向新的尾结点
40                 t.next = node;
41                 return t;
42             }
43         }
44     }
45 }

执行到这一步表明第一次获取锁失败,那么这个人就给自己领了块号码牌进入排队区去排队了,在领号码牌的时候会声明自己想要以什么样的方式来占用房间(独占模式or共享模式)。注意,这时候他并没有坐下来休息(将自己挂起)哦。

第三步:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

 1 //以不可中断方式获取锁(独占模式)
 2 final boolean acquireQueued(final Node node, int arg) {
 3     boolean failed = true;
 4     try {
 5         boolean interrupted = false;
 6         for (;;) {
 7             //获取给定结点的前继结点的引用
 8             final Node p = node.predecessor();
 9             //如果当前结点是同步队列的第一个结点, 就尝试去获取锁
10             if (p == head && tryAcquire(arg)) {
11                 //将给定结点设置为head结点
12                 setHead(node);
13                 //为了帮助垃圾收集, 将上一个head结点的后继清空
14                 p.next = null;
15                 //设置获取成功状态
16                 failed = false;
17                 //返回中断的状态, 整个循环执行到这里才是出口
18                 return interrupted;
19             }
20             //否则说明锁的状态还是不可获取, 这时判断是否可以挂起当前线程
21             //如果判断结果为真则挂起当前线程, 否则继续循环, 在这期间线程不响应中断
22             if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
23                 interrupted = true;
24             }
25         }
26     } finally {
27         //在最后确保如果获取失败就取消获取
28         if (failed) {
29             cancelAcquire(node);
30         }
31     }
32 }
33 
34 //判断是否可以将当前结点挂起
35 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
36     //获取前继结点的等待状态
37     int ws = pred.waitStatus;
38     //如果前继结点状态为SIGNAL, 表明前继结点会唤醒当前结点, 所以当前结点可以安心的挂起了
39     if (ws == Node.SIGNAL) {
40         return true;
41     }
42     
43     if (ws > 0) {
44         //下面的操作是清理同步队列中所有已取消的前继结点
45         do {
46             node.prev = pred = pred.prev;
47         } while (pred.waitStatus > 0);
48         pred.next = node;
49     } else {
50         //到这里表示前继结点状态不是SIGNAL, 很可能还是等于0, 这样的话前继结点就不会去唤醒当前结点了
51         //所以当前结点必须要确保前继结点的状态为SIGNAL才能安心的挂起自己
52         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
53     }
54     return false;
55 }
56 
57 //挂起当前线程
58 private final boolean parkAndCheckInterrupt() {
59     LockSupport.park(this);
60     return Thread.interrupted();
61 }

领完号码牌进入排队区后就会立马执行这个方法,当一个结点首次进入排队区后有两种情况,一种是发现他前面的那个人已经离开座位进入房间了,那他就不坐下来休息了,会再次去敲一敲门看看那小子有没有完事。如果里面的人刚好完事出来了,都不用他叫自己就直接冲进去了。否则,就要考虑坐下来休息一会儿了,但是他还是不放心,如果他坐下来睡着后没人提醒他怎么办?他就在前面那人的座位上留一个小纸条,好让从里面出来的人看到纸条后能够唤醒他。还有一种情况是,当他进入排队区后发现前面还有好几个人在座位上排队呢,那他就可以安心的坐下来咪一会儿了,但在此之前他还是会在前面那人(此时已经睡着了)的座位上留一个纸条,好让这个人在走之前能够去唤醒自己。当一切事情办妥了之后,他就安安心心的睡觉了,注意,我们看到整个for循环就只有一个出口,那就是等线程成功的获取到锁之后才能出去,在没有获取到锁之前就一直是挂在for循环的parkAndCheckInterrupt()方法里头。线程被唤醒后也是从这个地方继续执行for循环。

第四步:selfInterrupt()

1 //当前线程将自己中断
2 private static void selfInterrupt() {
3     Thread.currentThread().interrupt();
4 }

由于上面整个线程一直是挂在for循环的parkAndCheckInterrupt()方法里头,没有成功获取到锁之前不响应任何形式的线程中断,只有当线程成功获取到锁并从for循环出来后,他才会查看在这期间是否有人要求中断线程,如果是的话再去调用selfInterrupt()方法将自己挂起。

  1. 怎样以响应线程中断获取锁?

    1 //以可中断模式获取锁(独占模式) 2 private void doAcquireInterruptibly(int arg) throws InterruptedException { 3 //将当前线程包装成结点添加到同步队列中 4 final Node node = addWaiter(Node.EXCLUSIVE); 5 boolean failed = true; 6 try { 7 for (;;) { 8 //获取当前结点的前继结点 9 final Node p = node.predecessor(); 10 //如果p是head结点, 那么当前线程就再次尝试获取锁 11 if (p == head && tryAcquire(arg)) { 12 setHead(node); 13 p.next = null; // help GC 14 failed = false; 15 //获取锁成功后返回 16 return; 17 } 18 //如果满足条件就挂起当前线程, 此时响应中断并抛出异常 19 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { 20 //线程被唤醒后如果发现中断请求就抛出异常 21 throw new InterruptedException(); 22 } 23 } 24 } finally { 25 if (failed) { 26 cancelAcquire(node); 27 } 28 } 29 }

响应线程中断方式和不响应线程中断方式获取锁流程上大致上是相同的。唯一的一点区别就是线程从parkAndCheckInterrupt方法中醒来后会检查线程是否中断,如果是的话就抛出InterruptedException异常,而不响应线程中断获取锁是在收到中断请求后只是设置一下中断状态,并不会立马结束当前获取锁的方法,一直到结点成功获取到锁之后才会根据中断状态决定是否将自己挂起。

  1. 怎样设置超时时间获取锁?

    1 //以限定超时时间获取锁(独占模式) 2 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { 3 //获取系统当前时间 4 long lastTime = System.nanoTime(); 5 //将当前线程包装成结点添加到同步队列中 6 final Node node = addWaiter(Node.EXCLUSIVE); 7 boolean failed = true; 8 try { 9 for (;;) { 10 //获取当前结点的前继结点 11 final Node p = node.predecessor(); 12 //如果前继是head结点, 那么当前线程就再次尝试获取锁 13 if (p == head && tryAcquire(arg)) { 14 //更新head结点 15 setHead(node); 16 p.next = null; 17 failed = false; 18 return true; 19 } 20 //超时时间用完了就直接退出循环 21 if (nanosTimeout <= 0) { 22 return false; 23 } 24 //如果超时时间大于自旋时间, 那么等判断可以挂起线程之后就会将线程挂起一段时间 25 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) { 26 //将当前线程挂起一段时间, 之后再自己醒来 27 LockSupport.parkNanos(this, nanosTimeout); 28 } 29 //获取系统当前时间 30 long now = System.nanoTime(); 31 //超时时间每次都减去获取锁的时间间隔 32 nanosTimeout -= now - lastTime; 33 //再次更新lastTime 34 lastTime = now; 35 //在获取锁的期间收到中断请求就抛出异常 36 if (Thread.interrupted()) { 37 throw new InterruptedException(); 38 } 39 } 40 } finally { 41 if (failed) { 42 cancelAcquire(node); 43 } 44 } 45 }

设置超时时间获取首先会去获取一下锁,第一次获取锁失败后会根据情况,如果传入的超时时间大于自旋时间那么就会将线程挂起一段时间,否则的话就会进行自旋,每次获取锁之后都会将超时时间减去获取一次锁所用的时间。一直到超时时间小于0也就说明超时时间用完了,那么这时就会结束获取锁的操作然后返回获取失败标志。注意在以超时时间获取锁的过程中是可以响应线程中断请求的。

  1. 线程释放锁并离开同步队列是怎样进行的?

    1 //释放锁的操作(独占模式) 2 public final boolean release(int arg) { 3 //拨动密码锁, 看看是否能够开锁 4 if (tryRelease(arg)) { 5 //获取head结点 6 Node h = head; 7 //如果head结点不为空并且等待状态不等于0就去唤醒后继结点 8 if (h != null && h.waitStatus != 0) { 9 //唤醒后继结点 10 unparkSuccessor(h); 11 } 12 return true; 13 } 14 return false; 15 } 16 17 //唤醒后继结点 18 private void unparkSuccessor(Node node) { 19 //获取给定结点的等待状态 20 int ws = node.waitStatus; 21 //将等待状态更新为0 22 if (ws < 0) { 23 compareAndSetWaitStatus(node, ws, 0); 24 } 25 //获取给定结点的后继结点 26 Node s = node.next; 27 //后继结点为空或者等待状态为取消状态 28 if (s == null || s.waitStatus > 0) { 29 s = null; 30 //从后向前遍历队列找到第一个不是取消状态的结点 31 for (Node t = tail; t != null && t != node; t = t.prev) { 32 if (t.waitStatus <= 0) { 33 s = t; 34 } 35 } 36 } 37 //唤醒给定结点后面首个不是取消状态的结点 38 if (s != null) { 39 LockSupport.unpark(s.thread); 40 } 41 }

线程持有锁进入房间后就会去办自己的事情,等事情办完后它就会释放锁并离开房间。通过tryRelease方法可以拨动密码锁进行解锁,我们知道tryRelease方法是需要让子类去覆盖的,不同的子类实现的规则不一样,也就是说不同的子类设置的密码不一样。像在ReentrantLock当中,房间里面的人每调用tryRelease方法一次,state就减1,直到state减到0的时候密码锁就开了。大家想想这个过程像不像我们在不停的转动密码锁的转轮,而每次转动转轮数字只是减少1。CountDownLatch和这个也有点类似,只不过它不是一个人在转,而是多个人每人都去转一下,集中大家的力量把锁给开了。线程出了房间后它会找到自己原先的座位,也就是找到head结点。看看座位上有没有人给它留了小纸条,如果有的话它就知道有人睡着了需要让它帮忙唤醒,那么它就会去唤醒那个线程。如果没有的话就表明同步队列中暂时还没有人在等待,也没有人需要它唤醒,所以它就可以安心的离去了。以上过程就是在独占模式下释放锁的过程。

注:以上全部分析基于JDK1.7,不同版本间会有差异,读者需要注意

在上一篇《Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析》中我们...

  • Constructs a {@code CountDownLatch} initialized with the given count.
    • @param count the number of times {@link #countDown} must be invoked
  • before threads can pass through {@link #await}

  • @throws IllegalArgumentException if {@code count} is negative
    */
    public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
    }
    这里看清楚一点:构造器中要指定大于0的参数。

private void doReleaseShared() {
for (;;) {
Node h = head;//获得头结点
if (h != null && h != tail) {
int ws = h.waitStatus;//获取头结点的状态默认值为0
if (ws == Node.SIGNAL) {如果等于SIGNAL唤醒状态
//将头结点的状态置成0,并使用Node.SIGNAL(-1)与0比较,continue,h的状态设置为0,不会再进入if (ws == Node.SIGNAL)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}//判断ws是否为0,并且h的状态不等于0,这里是个坑啊,ws等于0,h就是0啊,所以if进不来的,并设置节点为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
如果头结点等于h,其实也没有一直loop,由于上面写的Node h = head,就算前面的条件都不满足,这里一定会break
if (h == head) // loop if head changed
break;
}
}
深入到这里,以超出我的想象力。

private static void countDownLatchTest(List<String> roles){
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i=0;i<10;i++){
CountRunnable countRunnable = new CountRunnable(i+"",i+"",roles,countDownLatch);
new Thread(countRunnable).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private String id;
private String name;
private List<String> roles;
private CountDownLatch countDownLatch;
private String password;

public CountRunnable(String id, String name, List<String> roles,
                     CountDownLatch countDownLatch){
    this.name = name;
    this.id = id;
    this.roles = roles;
    this.countDownLatch = countDownLatch;
}

@Override
public void run() {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    roles.add(name);
    System.out.println("执行了-id:"+id+"name:"+name+"roles:"+roles.size());
    try {
        countDownLatch.countDown();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public String getId() {
    return id;
}

public void setId(String id) {
    this.id = id;
}

public String getName() {
    return name;
}

public void setName(String name) {
    this.name = name;
}

public List<String> getRoles() {
    return roles;
}

public void setRoles(List<String> roles) {
    this.roles = roles;
}

public String getPassword() {
    return password;
}

public void setPassword(String password) {
    this.password = password;
}

image.png

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//由于采用的公平锁,所以要将节点放到队列里
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {//本质是等待共享锁的释放
final Node p = node.predecessor();//获得节点的前继
if (p == head) { //如果前一个节点等于前继
int r = tryAcquireShared(arg);//就判断尝试获取锁
/*
这里要注意一下r的值就2种情况-1和1:
情况1.r为-1,latch没有调用countDown(),state是没有变化的导致state一直大于0或者调用了countDown(),但是state不等于0,直接在for循环中等待
情况2.r为1,证明countDown(),已经减到0,当前线程还在队列中,state已经等于0了.接下来就是唤醒队列中的节点
*/
if (r >= 0) {
setHeadAndPropagate(node, r);//将当前节点设置头结点。
p.next = null; // help GC 删除旧的头结点
failed = false;
return;
}
}
//当前节点不是头结点,当前线程一直等待,直到获取到共享锁。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate:

二、源码介绍

CountDownLatch是通过“共享锁”实现的。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是“锁计数器”的初始状态,表示该“共享锁”最多能被count个线程同时获取。当某线程调用该CountDownLatch对象的await()方法时,该线程会等待“共享锁”可用时,才能获取“共享锁”进而继续运行。而“共享锁”可用的条件,就是“锁计数器”的值为0!而“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1;通过这种方式,必须有count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待线程才能继续运行。

三、CountDownLatch总结

本文由杏彩发布,转载请注明来源

关键词: