AQS队列同步器

队列同步器(AbstractQueuedSynchronizer)

队列同步器,是一个用来构建锁或者其他同步组件的基础框架,像之前提到的重入锁,读写锁,都是使用这个框架搭建起来的。它使用一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

队列同步器接口的方法

1
protected boolean tryAcquire(int arg)

以独占的方式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS操作设置同步状态

1
protected boolean tryRelease(int arg)

独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态

1
protected int tryAcquireShared(int arg)

共享式获取同步状态,返回大于等于0的值,表示获取成功,反之失败

1
protected boolean tryReleaseShared(int arg)

共享释放同步状态

1
protected boolean isHeldExclusively()

当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占,返回的是一个布尔值

同步器的工作方式

先举一个例子,制作一个同步锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class test implements Lock {

//自定义同步器
private static class Sync extends AbstractQueuedSynchronizer{
protected Sync() {
super();
}

@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
//当状态为0的时候获得锁
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
if (getState()==0) {throw new IllegalMonitorStateException();}
setExclusiveOwnerThread(null);
setState(0);
return true;
//释放锁,将状态设置为0
}

@Override
protected boolean isHeldExclusively() {
return getState()==1;
//是否处于独占状态
}

//返回一个Condition,每个Condition都包含一个Condition队列
Condition newCondition(){return new ConditionObject();}
}

//将需要的操作代理到Sync上即可
private final Sync sync=new Sync();

@Override
public void lock() {
sync.acquire(1);
//表示获得了锁,即上锁的意思
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}

@Override
public void unlock() {
sync.release(1);
}

@Override
public Condition newCondition() {
return sync.newCondition();
}

public boolean isLocked(){
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads(){
return sync.hasQueuedThreads();
}

}

这个类在继承了Lock接口后,继承各种锁的操作方法,然后在类的内部有一个静态类,静态类继承了AbstractQueuedSynchronizer类,通过重写类的方法,去设置锁的类型。该Mutex类在状态为0的时候可以申请获得锁,之后若处于被锁住的状态,就不能够被其他对象所获得,只有等待该对象解锁后,才能够被其他对象获得。

值得注意的是,锁的设置一般不直接去设置内部的同步器,而是通过一个类去调用方法的方式去使用内部同步器的API来实现相关的功能。这就是一般同步器的工作方式。

队列同步器的实现原理

队列同步器是如何完成线程同步的呢?

同步队列

之前有提到,同步器的内部有同步队列FIFO,FIFO是一个双向队列,这个队列的数据模型和一般的双向队列相似。每当有一个线程尝试去进行同步,但同步失败的时候,该线程就会被阻塞,且该线程当时的状态和信息就会构造成一个节点,并将其加入同步队列,只有当同步状态释放的时候,才会把首节点中线程唤醒,再次去尝试获取同步状态。同步队列使得线程之间的数据同步变的更加的有序。队列节点元素有4种类型, 每种类型表示线程被阻塞的原因,这四种类型分别是:

  • CANCELLED : 表示该线程是因为超时或者中断原因而被放到队列中
  • CONDITION : 表示该线程是因为某个条件不满足而被放到队列中,需要等待一个条件,直到条件成立后才会出队
  • SIGNAL : 表示该线程需要被唤醒
  • PROPAGATE: 表示在共享模式下,当前节点执行释放release操作后,当前结点需要传播通知给后面所有节点

aqs2

独占式同步状态的获取与释放

在同步器内部,可以通过调用同步器的acquire方法获取同步状态。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

在同步器中,又会调用自定义的tryAcquire去判断当前是否获取了同步状态,若获取同步状态失败,则构造同步节点并通过addWaiter方法将该节点加入到同步队列尾部,在掉用acquireQueued使其死循环,不断地去尝试获得同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private Node addWaiter(Node mode) {
//把当前线程包装为node,设为独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果tail不为空,把node插入末尾
if (pred != null) {
node.prev = pred;
//此时可能有其他线程插入,所以重新判断tail
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//若队列为空或者cas设置失败后,调用enq自旋再次设置
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//此时可能有其他线程插入,所以重新判断tail是否为空
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

如果tail节点为空,执行enq(node);重新尝试,最终把node插入.在把node插入队列末尾后,它并不立即挂起该节点中线程,因为在插入它的过程中,前面的线程可能已经执行完成,所以它会先进行自旋操作acquireQueued(node, arg),尝试让该线程重新获取锁!当条件满足获取到了锁则可以从自旋过程中退出,否则继续。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果它的前继节点为头结点,尝试获取锁,获取成功则返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

AQS的模板方法acquire通过调用子类自定义实现的tryAcquire获取同步状态失败后->将线程构造成Node节点(addWaiter)->将Node节点添加到同步队列对尾(addWaiter)->节点以自旋的方法获取同步状态(acquirQueued)。在节点自旋获取同步状态时,只有其前驱节点是头节点的时候才会尝试获取同步状态,如果该节点的前驱不是头节点或者该节点的前驱节点是头节点但获取同步状态失败,则判断当前线程需要阻塞,如果需要阻塞则需要被唤醒过后才返回。

aqs

那么接下来看看看独占式同步器是怎么释放的吧,在执行完相应的逻辑后,就需要释放同步状态,这时候就需要调用release方法了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final boolean release(int arg) {
// 若释放同步状态成功
if (tryRelease(arg)) {
// 获取同步队列头节点
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒头节点的后继节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 获取当前节点等待状态
int ws = node.waitStatus;
// 若状态为SIGNAL、CONDITION或PROPAGATE,CAS将其状态置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取后继节点
Node s = node.next;
// 若后继节点为null或其状态为CANCELLED(等待超市或者被中断)
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾结点遍历
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒节点所关联的线程
LockSupport.unpark(s.thread);
}

这个方法在自定义方法准备好了之后,去把头结点向后移动一位,表示下一个节点也准备好了。从源码中可以发现唤醒的节点从尾遍历而不是从头遍历,原因是当前节点的后继可能为null、等待超时或被中断,所以从尾部向前进行遍。

共享式同步状态的获取与释放

前面在介绍锁的时候,除了重入锁这种独占锁之外,还有像读写锁一样的共享锁,同步器身为最高位,当然也有相应的共享式同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
// 将共享节点加入同步队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 中断标记
boolean interrupted = false;
// 死循环
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 若前驱节点为头节点
if (p == head) {
// 获取同步
int r = tryAcquireShared(arg);
// 若获取成功
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断线程是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

这个共享式的方法,在一开始调用了tryAcquireShared(arg)方法尝试去获取同步状态,这个方法自然也是自定义的方法,且这个方法的返回值如果是大于或等于0,自然就能够获取同步状态。如果并非如此,就进行doAcquireShared方法,它会不断地自旋去尝试获取同步状态。对应的获得了同步状态后,也有相应的共享释放方法释放状态。

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。

独占式超时获取同步状态

在JDK5之前,当一个线程获取不到锁而被阻塞在synchronized之外时,对该线程进行终端操作,此时线程的中断标志位会被修改,但线程依旧会阻塞在synchronized上,等待获取锁。而早JDK5之中,同步器提供了一个acquireInterruptibly方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

tryAcquireNanos()方法超时获取同步状态是响应中断获取同步状态的”增强版”,在doAcquireInterruptibly基础上增加了超时控制:主要是需要计算出睡眠的时间间隔nanosTimeout,在这个时间段内,如果获取到了同步状态则返回true,否则返回false,并进行异常处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
//
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 超时时间
final long deadline = System.nanoTime() + nanosTimeout;
// 将独占节点
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 死循环自旋
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 若前驱节点为头节点且获取同步状态成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 若获取失败,判断是否超时
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 判断线程是否中断
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

nanosTimeout的计算公式为:nanosTimeout-=now(当前唤醒时间)-lastTime(上次唤醒时间)。

aqs3