Java并发-AQS源码分析之条件队列

概述

通过之前的分析,深入了解了AbstractQueuedSynchronizer的内部结构和一些设计理念,知道了AbstractQueuedSynchronizer内部维护了一个同步状态和两个排队区,这两个排队区分别是同步队列和条件队列。
拿ATM机取款举例,ATM机如下图所示:
img

同步队列是主要的排队区,如果ATM机没开放,所有想要进入人都得在这里排队。而条件队列主要是为条件等待设置的,想象一下如果一个人通过排队终于成功获取锁进入了ATM机,但在取款之前发现自己没带银行卡,碰到这种情况虽然很无奈,但是它也必须接受这个事实,这时它只好乖乖的出去先准备好银行卡(进入条件队列等待),当然在出去之前还得把锁给释放了好让其他人能够进来,在准备好了银行卡(条件满足)之后它又得重新回到同步队列中去排队。当然进入房间的人并不都是因为没带银行卡,可能还有其他一些原因必须中断操作先去条件队列中去排队,所以条件队列可以有多个,依不同的等待条件而设置不同的条件队列。
同步队列和条件队列的区别:

  1. 同步队列的头结点为head,而条件队列的头结点为firstWaiter;
  2. 同步队列的尾结点为tail,而条件队列的尾结点为lastWaiter;
  3. 同步队列的头结点没有和任何线程绑定,而条件队列的firstWaiter绑定了线程。
  4. 同步队列是一条双向链表,而条件队列是一条单向链表。

Condition接口定义了条件队列中的所有操作,AbstractQueuedSynchronizer内部的ConditionObject类实现了Condition接口,下面看看Condition接口都定义了哪些操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public interface Condition {

//响应线程中断的条件等待
void await() throws InterruptedException;

//不响应线程中断的条件等待
void awaitUninterruptibly();

//设置相对时间的条件等待(不进行自旋)
long awaitNanos(long nanosTimeout) throws InterruptedException;

//设置相对时间的条件等待(进行自旋)
boolean await(long time, TimeUnit unit) throws InterruptedException;

//设置绝对时间的条件等待
boolean awaitUntil(Date deadline) throws InterruptedException;

//唤醒条件队列中的头结点
void signal();

//唤醒条件队列的所有结点
void signalAll();

}

Condition接口虽然定义了这么多方法,但总共就分为两类,以await开头的是线程进入条件队列等待的方法,以signal开头的是将条件队列中的线程“唤醒”的方法。
需要注意的是,调用signal方法可能唤醒线程也可能不会唤醒线程,什么时候会唤醒线程这得看情况,但是调用signal方法一定会将线程从条件队列中移到同步队列尾部。
await方法分为5种,分别是响应线程中断等待,不响应线程中断等待,设置相对时间不自旋等待,设置相对时间自旋等待,设置绝对时间等待;
signal方法只有2种,分别是只唤醒条件队列头结点和唤醒条件队列所有结点的操作。同一类的方法基本上是相通的。

响应线程中断的条件等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//响应线程中断的条件等待
public final void await() throws InterruptedException {
//如果线程被中断则抛出异常
if (Thread.interrupted()) {
throw new InterruptedException();
}
//将当前线程包装秤结点添加到条件队列尾部
Node node = addConditionWaiter();
//在进入条件等待之前先完全释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//线程一直在while循环里进行条件等待,直到线程被唤醒或者线程被中断并且进入同步队列中
while (!isOnSyncQueue(node)) {
//进行条件等待的线程都在这里被挂起, 线程被唤醒的情况有以下几种:
//1.同步队列的前驱结点已取消
//2.设置同步队列的前驱结点的状态为SIGNAL失败
//3.前驱结点释放锁后唤醒当前结点
LockSupport.park(this);
//当前线程醒来后立马检查是否被中断, 如果是则代表结点取消条件等待, 此时需要将结点移出条件队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
//线程醒来后就会以独占模式获取锁,如果获取时发生了中断,如果在调用await()方法时被中断,则依然是THROW_IE
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
//这步操作主要为防止线程在signal之前中断而导致没与条件队列断绝联系
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
//根据中断模式进行响应的中断处理
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
}

上述代码整个流程总结如下:

第一步:首先判断当前线程是否中断,如果被中断,则抛出异常,如果没有被中断,则继续下面的流程。
第二步:通过调用addConditionWaiter()将当前线程封装成Node节点存放到Condition队列的尾部。
第三步:因为当前线程已经获取了锁,所以调用await需要释放资源,所以通过调用fullyRelease()释放资源,也就是释放锁,因为这个锁是独占锁并且可以重入,所以要全部把资源释放,从fully字面上也可以理解。
第四步:通过while循环判断当前线程是否在同步队列上,如果没有在同步队列上,则需要阻塞当前线程,然后调用checkInterruptWaiting()方法判断是否被中断过,如果被中断过,则跳出while循环。
第五步:通过调用acquireQueued()方法获取资源,如果在调用这个方法时被中断,则中断类型变成REINTERRUPT(稍后处理中断),这个方法返回值只是记录是否被中断过,并不会响应中断。
第六步:如果是因为中断,此时waitStatus=0,但是此时它仍在条件队列中,所以需要从条件队列中清除。
第七步:如果被中断,则调用reportInterruptAfterWait()方法处理不同的中断类型。

第一步:将线程添加到条件队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node addConditionWaiter() {
//获取条件队列尾结点
Node t = lastWaiter;
// 如果尾结点被取消,则从条件队列中清除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前线程包装成Node结点,并且waitStatus状态是CONDITION。
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

将当前线程封装成Node节点,然后加入到Condition的尾部,在加入之前需要检查以下尾部节点t是否还在等待Condition条件,如果被signal或者被中断,则调用清除方法将尾节点从Condition队列中清除掉。

第二步:完全将锁释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//完全释放锁
final int fullyRelease(Node node) {
//判断资源是否释放成功
boolean failed = true;
try {
//获取当前的同步状态
int savedState = getState();
//使用当前的同步状态去释放锁
if (release(savedState)) {
failed = false;
//如果释放锁成功就返回当前同步状态
return savedState;
} else {
//如果释放锁失败就抛出运行时异常
throw new IllegalMonitorStateException();
}
} finally {
//保证没有成功释放锁就将该结点设置为取消状态
if (failed) {
node.waitStatus = Node.CANCELLED;
}
}
}

将当前线程包装成结点添加到条件队列尾部后,紧接着就调用fullyRelease方法释放锁。注意,方法名为fullyRelease也就这步操作会完全的释放锁,因为锁是可重入的,所以在进行条件等待前需要将锁全部释放了,不然的话别人就获取不了锁了。如果释放锁失败的话就会抛出一个运行时异常,如果成功释放了锁的话就返回之前的同步状态。

第三步:进行条件等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//线程一直在while循环里进行条件等待
while (!isOnSyncQueue(node)) {
//进行条件等待的线程都在这里被挂起, 线程被唤醒的情况有以下几种:
//1.同步队列的前驱结点已取消
//2.设置同步队列的前驱结点的状态为SIGNAL失败
//3.前驱结点释放锁后唤醒当前结点
LockSupport.park(this);
//当前线程醒来后立马检查是否被中断, 如果是则代表结点取消条件等待, 此时需要将结点移出条件队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}

//检查条件等待时的线程中断情况
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
private int checkInterruptWhileWaiting(Node node) {
//中断请求在signal操作之前:THROW_IE
//中断请求在signal操作之后:REINTERRUPT
//期间没有收到任何中断请求:0
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

//将取消条件等待的结点从条件队列转移到同步队列中
final boolean transferAfterCancelledWait(Node node) {
//如果这步CAS操作成功的话就表明中断发生在signal方法之前
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//状态修改成功后就将该结点放入同步队列尾部
enq(node);
return true;
}
//到这里表明CAS操作失败, 说明中断发生在signal方法之后
while (!isOnSyncQueue(node)) {
//如果sinal方法还没有将结点转移到同步队列, 就通过自旋等待一下
Thread.yield();
}
return false;
}

在以上两个操作完成了之后就会进入while循环,可以看到while循环里面首先调用LockSupport.park(this)将线程挂起了,所以线程就会一直在这里阻塞。在调用signal方法后仅仅只是将结点从条件队列转移到同步队列中去,至于会不会唤醒线程需要看情况。如果转移结点时发现同步队列中的前驱结点已取消,或者是更新前驱结点的状态为SIGNAL失败,这两种情况都会立即唤醒线程,否则的话在signal方法结束时就不会去唤醒已在同步队列中的线程,而是等到它的前驱结点来唤醒。当然,线程阻塞在这里除了可以调用signal方法唤醒之外,线程还可以响应中断,如果线程在这里收到中断请求就会继续往下执行。可以看到线程醒来后会马上检查是否是由于中断唤醒的还是通过signal方法唤醒的,如果是因为中断唤醒的同样会将这个结点转移到同步队列中去,只不过是通过调用transferAfterCancelledWait方法来实现的。最后执行完这一步之后就会返回中断情况并跳出while循环。

第四步:结点移出条件队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//线程醒来后就会以独占模式获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
//这步操作主要为防止线程在signal之前中断而导致没与条件队列断绝联系
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
//根据中断模式进行响应的中断处理
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}

//结束条件等待后根据中断情况做出相应处理
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
//如果中断模式是THROW_IE就抛出异常
if (interruptMode == THROW_IE) {
throw new InterruptedException();
//如果中断模式是REINTERRUPT就自己挂起
} else if (interruptMode == REINTERRUPT) {
selfInterrupt();
}
}

当线程终止了while循环也就是条件等待后,就会回到同步队列中。不管是因为调用signal方法回去的还是因为线程中断导致的,结点最终都会在同步队列中。这时就会调用acquireQueued方法执行在同步队列中获取锁的操作。也就是说,结点从条件队列出来后又是乖乖的走独占模式下获取锁的那一套,等这个结点再次获得锁之后,就会调用reportInterruptAfterWait方法来根据这期间的中断情况做出相应的响应。如果中断发生在signal方法之前,interruptMode就为THROW_IE,再次获得锁后就抛出异常;如果中断发生在signal方法之后,interruptMode就为REINTERRUPT,再次获得锁后就重新中断。

不响应线程中断的条件等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//不响应线程中断的条件等待
public final void awaitUninterruptibly() {
//将当前线程添加到条件队列尾部
Node node = addConditionWaiter();
//完全释放锁并返回当前同步状态
int savedState = fullyRelease(node);
boolean interrupted = false;
//结点一直在while循环里进行条件等待
while (!isOnSyncQueue(node)) {
//条件队列中所有的线程都在这里被挂起
LockSupport.park(this);
//线程醒来发现中断并不会马上去响应
if (Thread.interrupted()) {
interrupted = true;
}
}
if (acquireQueued(node, savedState) || interrupted) {
//在这里响应所有中断请求, 满足以下两个条件之一就会将自己挂起
//1.线程在条件等待时收到中断请求
//2.线程在acquireQueued方法里收到中断请求
selfInterrupt();
}
}

设置相对时间的条件等待(不进行自旋)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//设置定时条件等待(相对时间), 不进行自旋等待
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
//如果线程被中断则抛出异常
if (Thread.interrupted()) {
throw new InterruptedException();
}
//将当前线程添加到条件队列尾部
Node node = addConditionWaiter();
//在进入条件等待之前先完全释放锁
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//判断超时时间是否用完了
if (nanosTimeout <= 0L) {
//如果已超时就需要执行取消条件等待操作
transferAfterCancelledWait(node);
break;
}
//将当前线程挂起一段时间, 线程在这期间可能被唤醒, 也可能自己醒来
LockSupport.parkNanos(this, nanosTimeout);
//线程醒来后先检查中断信息
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
long now = System.nanoTime();
//超时时间每次减去条件等待的时间
nanosTimeout -= now - lastTime;
lastTime = now;
}
//线程醒来后就会以独占模式获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
//由于transferAfterCancelledWait方法没有把nextWaiter置空, 所有这里要再清理一遍
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
//根据中断模式进行响应的中断处理
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
//返回剩余时间
return nanosTimeout - (System.nanoTime() - lastTime);
}

设置相对时间的条件等待(进行自旋)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//设置定时条件等待(相对时间), 进行自旋等待
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
if (unit == null) { throw new NullPointerException(); }
//获取超时时间的毫秒数
long nanosTimeout = unit.toNanos(time);
//如果线程被中断则抛出异常
if (Thread.interrupted()) { throw new InterruptedException(); }
//将当前线程添加条件队列尾部
Node node = addConditionWaiter();
//在进入条件等待之前先完全释放锁
int savedState = fullyRelease(node);
//获取当前时间的毫秒数
long lastTime = System.nanoTime();
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//如果超时就需要执行取消条件等待操作
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
//如果超时时间大于自旋时间, 就将线程挂起一段时间
if (nanosTimeout >= spinForTimeoutThreshold) {
LockSupport.parkNanos(this, nanosTimeout);
}
//线程醒来后先检查中断信息
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
long now = System.nanoTime();
//超时时间每次减去条件等待的时间
nanosTimeout -= now - lastTime;
lastTime = now;
}
//线程醒来后就会以独占模式获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
//由于transferAfterCancelledWait方法没有把nextWaiter置空, 所有这里要再清理一遍
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
//根据中断模式进行响应的中断处理
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
//返回是否超时标志
return !timedout;
}

设置绝对时间的条件等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//设置定时条件等待(绝对时间)
public final boolean awaitUntil(Date deadline) throws InterruptedException {
if (deadline == null) { throw new NullPointerException(); }
//获取绝对时间的毫秒数
long abstime = deadline.getTime();
//如果线程被中断则抛出异常
if (Thread.interrupted()) { throw new InterruptedException(); }
//将当前线程添加到条件队列尾部
Node node = addConditionWaiter();
//在进入条件等待之前先完全释放锁
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//如果超时就需要执行取消条件等待操作
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
//将线程挂起一段时间, 期间线程可能被唤醒, 也可能到了点自己醒来
LockSupport.parkUntil(this, abstime);
//线程醒来后先检查中断信息
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
//线程醒来后就会以独占模式获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
//由于transferAfterCancelledWait方法没有把nextWaiter置空, 所有这里要再清理一遍
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
//根据中断模式进行响应的中断处理
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
//返回是否超时标志
return !timedout;
}

唤醒条件队列中的头结点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//唤醒条件队列中的下一个结点
public final void signal() {
//判断当前线程是否独占模式持有锁,如果不是则抛出异常
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
//获取条件队列中的第一个结点
Node first = firstWaiter;
//如果条件队列中有排队者
if (first != null) {
//唤醒条件队列中的头结点
doSignal(first);
}
}

//唤醒条件队列中的头结点
private void doSignal(Node first) {
do {
//1.将firstWaiter引用向后移动一位
if ( (firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
//2.将头结点的后继结点引用置空
first.nextWaiter = null;
//3.将头结点转移到同步队列, 转移完成后有可能唤醒线程
//4.如果transferForSignal操作失败就去唤醒下一个结点
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}

//将指定结点从条件队列转移到同步队列中
final boolean transferForSignal(Node node) {
//将等待状态从CONDITION设置为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//如果更新状态的操作失败就直接返回false
//可能是transferAfterCancelledWait方法先将状态改变了, 导致这步CAS操作失败
return false;
}
//将该结点添加到同步队列尾部,返回前驱结点
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
//出现以下情况就会唤醒当前线程
//1.前驱结点是取消状态
//2.更新前驱结点的状态为SIGNAL操作失败
LockSupport.unpark(node.thread);
}
return true;
}

可以看到signal方法最终的核心就是去调用transferForSignal方法,在transferForSignal方法中首先会用CAS操作将结点的状态从CONDITION设置为0,然后再调用enq方法将该结点添加到同步队列尾部。我们再看到接下来的if判断语句,这个判断语句主要是用来判断什么时候会去唤醒线程,出现这两种情况就会立即唤醒线程,一种是当发现前驱结点的状态是取消状态时,还有一种是更新前驱结点的状态失败时。这两种情况都会马上去唤醒线程,否则的话就仅仅只是将结点从条件队列中转移到同步队列中就完了,而不会立马去唤醒结点中的线程。signalAll方法也大致类似,只不过它是去循环遍历条件队列中的所有结点,并将它们转移到同步队列,转移结点的方法也还是调用transferForSignal方法。

唤醒条件队列的所有结点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//唤醒条件队列后面的全部结点
public final void signalAll() {
//判断当前线程是否持有锁
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
//获取条件队列头结点
Node first = firstWaiter;
if (first != null) {
//唤醒条件队列的所有结点
doSignalAll(first);
}
}

//唤醒条件队列的所有结点
private void doSignalAll(Node first) {
//先把头结点和尾结点的引用置空
lastWaiter = firstWaiter = null;
do {
//先获取后继结点的引用
Node next = first.nextWaiter;
//把即将转移的结点的后继引用置空
first.nextWaiter = null;
//将结点从条件队列转移到同步队列
transferForSignal(first);
//将引用指向下一个结点
first = next;
} while (first != null);
}

本文标题:Java并发-AQS源码分析之条件队列

文章作者:王洪博

发布时间:2019年02月05日 - 09:02

最后更新:2019年09月12日 - 10:09

原始链接:http://whb1990.github.io/posts/68b8bf54.html

▄︻┻═┳一如果你喜欢这篇文章,请点击下方"打赏"按钮请我喝杯 ☕
0%