Java并发编程(九)同步控制

多线程的团队协作

重入锁(ReentrantLock)

什么是重入锁

重入锁和synchronized非常的相似,我们知道synchronized的锁区域是用大括号所包围起来的,而ReentrantLock则是可以自己决定加锁的位置和解锁的位置的

1
2
L.lock();
L.unloock();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.locks.ReentrantLock;

public class test implements Runnable {
public static ReentrantLock L=new ReentrantLock();
public static int i;
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(new test());
Thread t2=new Thread(new test());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
@Override
public void run() {
for (int j = 0; j <10000 ; j++) {
L.lock();
L.lock();
try {
i++;
} finally {
L.unlock();
L.unlock();
}
}

}
}


20000

与synchronized比较大的区别还在于,ReentrantLock是可以重入的,如上述代码所示,使用了两个L.lock(),但是仍然能够运行,这表示着这是同一个线程在获得锁,每次获得锁就会在计数器上+1,在解锁的时候在计数器上-1,这样最后的计数器为零的时候完全释放锁。这种方法可以看做为,只要是同一个线程获取的锁,就可以被一直重入。

而lock()的实现为

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

这是一个叫做CAS操作,意思为比较和交换,这种操作方式能够获得原子性,以致于起到synchronized的作用。

ReentrantLock的中断

重入锁还有另一个非常人性化的一点,那就是会自然的中断,比如在遇到产生的死锁问题的时候ReentrantLock就会自然中断,防止死锁的发生,这是synchronized所不具有的。

还记得之前讲过的interrupt吗,它仅仅是起到一个标记作用而已,为什么不用flag去代替它呢?效果不是一样吗?其实,interrupt标记还能够作用于其他的类用于线程的控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import java.util.concurrent.locks.ReentrantLock;

public class test implements Runnable {
public static ReentrantLock lock1=new ReentrantLock();
public static ReentrantLock lock2=new ReentrantLock();
int lock;

public test(int lock) {
this.lock = lock;
}

public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(new test(1));
Thread t2=new Thread(new test(2));
t1.start();
t2.start();
Thread.sleep(2000);
t2.interrupt();
}
@Override
public void run() {
try {
if(lock==1){
lock1.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock2.lockInterruptibly();
}else {
lock2.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock1.lockInterruptibly();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (lock1.isHeldByCurrentThread())
{lock1.unlock();}
if (lock2.isHeldByCurrentThread())
{lock2.unlock();}
System.out.println(Thread.currentThread().getId()+"退出");
}

}
}



java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at test.run(test.java:38)
at java.lang.Thread.run(Thread.java:748)
12 退出
13 退出

一开始,设置了两个线程,线程t1先获得锁,t2阻塞,当在等待两秒后,设置了可以中断的标记,于是t2便不再尝试去获得锁,直接断开,从而开始输出错误信息。

限时等待锁

使用中断标记去打断锁的话,虽然可以解决死锁,但也可能会出现数据不一致的问题,于是乎可以使用等待时间这一方式去等待锁,如果时间到了还没有获得锁,便直接放弃它。使用的api叫做trylock()

1
2
3
4
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

它有的构造方法有两个参数,一个是数值,一个是计时单位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class test implements Runnable{
public static ReentrantLock lock=new ReentrantLock();

public static void main(String[] args) {
Thread t1=new Thread(new test());
Thread t2=new Thread(new test());
t1.start();
t2.start();
}
@Override
public void run() {
try {
if (lock.tryLock(5, TimeUnit.SECONDS)){
Thread.sleep(6000);
}else {
System.out.println("get lock faild");
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}

}
}

get lock faild

一开始t1便运行run方法获得了锁,并等待6秒,然后t2进入了if判断语句中,开始等待5秒,等待时间过后,仍然没有获得锁,便返回一个false值,打印出get lock faild,最后lock再解锁。当然,你也会想,万一传入的参数是0怎么办呢,其实,你传入的参数是0的话,就和没有传参数的构造函数一样的,可以猜想的到,没有参数,那就不会等待,一旦感觉到阻塞,就立即退出。

公平锁和非公平锁

线程在相互竞争资源的顺序可以被自由决定吗?默认是不能的,其实设计者也能想的到,要由自己去控制资源的顺序,已达到安全的、稳定的目的,但是这样做会使的线程运行变得更慢,因为要给快速运行的线程加一个队列的话,开销是很大的,所以大多数的时候,是随意的、非公平的,但这并不代表不可以由自己自由的去决定。

我们先看一个ReentrantLock的构造函数

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

这个单构造函数传入的是布尔值,这个布尔值是选择这个重入锁是公平的还是非公平的,默认是非公平的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.locks.ReentrantLock;

public class test implements Runnable {
public static ReentrantLock lock=new ReentrantLock(true);

public static void main(String[] args) {
Thread t1=new Thread(new test(),"joker");
Thread t2=new Thread(new test(),"alex");
t1.start();
t2.start();
}
@Override
public void run() {
while (true){
try {
lock.lock();
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 获得锁");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

}
}


joker 获得锁
alex 获得锁
joker 获得锁
alex 获得锁
joker 获得锁
alex 获得锁

这就是重入锁设置公平和非公平的办法啦,看一来是不是有点眼熟呢,没错,我们之前的双线程累加到20000的例子,也可以用这个来实现。看看底层的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

这里在判断条件中,公平锁多了一个hasQueuedPredecessors()方法,既加入了同步队列中当前节点是否有前驱节点的判断,如果返回true,则表明有线程比当前线程更早的请求锁,因此需要等待前驱线程获取并释放锁之后才能继续获得锁。

总结:ReentrantLock的几个重要方法:

  • lock();
  • lockinterruptibly();
  • trylock();
  • unlock();
  • new ReentrantLock(true);
底层实现

这里稍微讲一下底层的实现,我们可以从重入锁的类接口看到它是接入了一个Lock的接口,并重写它的方法,但值得注意的是,它们实际上使用的,是来自一个叫做AQS的同步器(AbstractQueuedSynchronizer)

1
abstract static class Sync extends AbstractQueuedSynchronizer

获得锁的调用的是tryAcquire方法,而这个方法来自于AQS队列同步器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
        protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

可以看到这里使用getState();去判断获取当前状态,如果为0,说明没有线程获得了该锁,代表可以进行CAS操作去获取锁

补充

总结了几点Synchronized和ReentrantLock的区别:

  1. Synchronized是JVM层次的锁实现,ReentrantLock是JDK层次的锁实现;

  2. Synchronized的锁状态是无法在代码中直接判断的,但是ReentrantLock可以通过ReentrantLock#isLocked判断;

  3. Synchronized是非公平锁,ReentrantLock是可以是公平也可以是非公平的;

  4. Synchronized是不可以被中断的,而ReentrantLock#lockInterruptibly方法是可以被中断的;

  5. 在发生异常时Synchronized会自动释放锁(由javac编译时自动实现),而ReentrantLock需要开发者在finally块中显示释放锁;

  6. ReentrantLock获取锁的形式有多种:如立即返回是否成功的tryLock(),以及等待指定时长的获取,更加灵活;

  7. Synchronized在特定的情况下对于已经在等待的线程是后来的线程先获得锁,而ReentrantLock对于已经在等待的线程一定是先来的线程先获得锁;

Condition条件

与synchronized对应的是lock和unlock,那么与Tread类的wait和notify方法,ReentrantLock也有相应的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class test implements Runnable {
public static ReentrantLock lock=new ReentrantLock();
public static Condition condition=lock.newCondition();//设置condition

public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(new test());
t1.start();
Thread.sleep(2000);
//继续加锁
lock.lock();
condition.signal();
lock.unlock();
}
@Override
public void run() {
try{
lock.lock();
System.out.println("开始等待");
condition.await();
System.out.println("继续运行");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}


开始等待
继续运行

使用condition.await()和condition.signal()方法,分别让线程阻塞和唤醒。

线程的交通信号灯:Semaphore

线程的公平锁和非公平锁是线程之间排队运行用的,但除此之外,我们还有别的控制线程的方法,就比如说,某个函数只是偶尔被调用,但需要调用的时候,却会变得非常频繁,这时候我们就要控制好线程的队列了,不能一口气涌入成千上万条线程,最终导致负载过大,停止运行。

于是便需要一个信号灯,也叫作信号量 :Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class test implements Runnable{
final Semaphore semaphore=new Semaphore(5);
public static void main(String[] args) {
ExecutorService exec= Executors.newFixedThreadPool(20);//线程池,后面会提
final test demo=new test();
for (int i = 0; i <20 ; i++) {
exec.submit(demo);//运行run方法
}
}
@Override
public void run() {
try{
semaphore.acquire();//此处进入线程为5的时候,阻塞后面的线程,直到使用release为止
//模拟耗时
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId()+"to doing");
semaphore.release();//通知acquire继续放行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

主要使用acquire和release方法,控制着线程的运行数量。

  • semaphore.acquire();//此处进入线程为5的时候,阻塞后面的线程,直到使用release为止
  • semaphore.release();;//通知acquire继续放行

倒计时器:CountDownLatch

如果说Semaphore如同交通信号灯一般,获取一定数量,每执行一次release释放一个线程,再允许进入一个线程,这样的流水线,那么CountDownLatch就是每次等到达一定数量,一口气放行。

CountDownLatch就像一个火箭的发射台,要等到全部准备就绪的时候,才能够发射,缺一个都不行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class test implements Runnable{
static final CountDownLatch end=new CountDownLatch(10);
static final test demo=new test();

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService= Executors.newFixedThreadPool(10);
for (int i = 0; i <10 ; i++) {
executorService.submit(demo);
}
//等待所有就绪
end.await();//主线程等待end的count值达到0之后继续执行下一步
//开始
System.out.println("开始");
executorService.shutdown();//执行结束
}
@Override
public void run() {
try{
Thread.sleep(new Random().nextInt(10)*100);
System.out.println("检查完成");
end.countDown();//执行这个方法表示线程已经准备就绪了,count计数器-1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


检查完成
检查完成
检查完成
检查完成
检查完成
检查完成
检查完成
检查完成
检查完成
检查完成
开始

里面涉及了几个方法,

  • end.countDown();计数器
  • end.await();等待完成计数量
  • new CountDownLatch(10);传入一个参数表示数值

循环栅栏:CyclicBarrier

CyclicBarrier和CountDownLatch非常相似,最大的区别就是CyclicBarrier可以重复被利用,而CountDownLatch不行。

CyclicBarrier正如它的名字一样,循环栅栏,栅栏是一个阻挡别人进入的障碍物,CyclicBarrier和CountDownLatch一样有一个计数器,不过CountDownLatch的计数器被定义了之后就只能被一直减少,最后减少到0时,完全结束,而CyclicBarrier的计数器则是从0开始增加,直到指定数值时开始放行,然后计数器归零,并等待下一波线程的访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.Random;
import java.util.concurrent.*;

public class test implements Runnable{
static final CyclicBarrier end=new CyclicBarrier(5);
static final test demo=new test();

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService= Executors.newFixedThreadPool(20);
for (int i = 0; i <20 ; i++) {
executorService.submit(demo);
}
}
@Override
public void run() {
try{

Thread.sleep(new Random().nextInt(10)*100);
end.await();
System.out.println(Thread.currentThread().getId()+"集合完成");
end.await();
System.out.println(Thread.currentThread().getId()+"执行完成");

} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}

使用方法更为简单,且可以被复用,运行之后可以看到一开始是集结了5个线程之后,才会继续运行下去,这表示了CyclicBarrier的阻塞作用它后面的线程,CyclicBarrier主要的方法就是await();方法,值得注意的是,CyclicBarrier有了更多的异常处理:BrokenBarrierException e 这个异常处理表示如果发生了意外使得线程破损,无法继续运行,那就让那个线程中断,防止阻塞后面的线程,而之所以CountDownLatch不这样做,是因为,它的特性就是必须全部都集合了之后才能运行,所以没有这个异常处理。

线程的阻塞工具:LockSupport

之前说过线程的挂起(suspend)和继续执行(resume),这个在之后有了一个比较灵活的类去代替,它就是LockSupport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.concurrent.locks.LockSupport;

public class test implements Runnable {
public static void main(String[] args) throws InterruptedException {
Thread t=new Thread(new test());
t.start();
Thread.sleep(3000);
LockSupport.unpark(t);
}
@Override
public void run() {
System.out.println(Thread.currentThread().getId()+"执行run方法中,已经被阻塞");
LockSupport.park();

}
}

unpark传入参数表示要解锁的线程

1
2
3
4
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

unpark方法也会和resume方法一样先于park(suspend)方法执行,但是与其不同的是,unpark就算先执行了,也不会产生死循环,因为unpark会产生一个特别的许可,这个许可,不可叠加且只有一个,只会被park所获取,所以就算unpark先于park发生,只要产生了这个许可,park就能够获取这个许可并且停止阻塞继续运行

ReadWriteLock读写锁

我们在对一个对象进行操作的时候,无论是读取还是写入,为了保持原子性,都用以synchronized包围,但是,在很多实际应用中,读取操作都是要远大于写入操作的,但是却全都用synchronized去上锁,这无疑降低了某些方面的效率,于是乎,设计者根据这种情况,设计了一个叫做读写锁的类,将读取操作与写入操作分离,使两个线程在读取同一个对象的时候,不阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class test{
private static Lock lock=new ReentrantLock();
private static ReentrantReadWriteLock reentrantReadWriteLock=new ReentrantReadWriteLock();
private static Lock read=reentrantReadWriteLock.readLock();
private int value=0;

public static void main(String[] args) {
final test demo=new test();
Runnable readrunnable=new Runnable() {
@Override
public void run() {
try {
demo.handleRead(read);
//demo.handleRead(lock);
} catch (Exception e) {
e.printStackTrace();
}
}
};
for (int i = 0; i <18 ; i++) {
new Thread(readrunnable).start();//开启线程
}
}

public Object handleRead(Lock lock){
try {
lock.lock();
Thread.sleep(1000);//线程休眠
return value;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
return value;
}
}
}

当这个方法开启的时候,在休眠一秒后加了锁,按道理来说,18个线程至少要等18秒才能运行完成,但是对于锁类型是读写锁的线程而言,可以被多个线程重复进入,重入锁和读写锁的区别在于,重入锁是对于本线程的行为可以被重复进入而不加锁,读写锁则是对于多个不进行修改操作的线程,都可以进入而不加锁。

方法名 功能
int getReadLockCount() 获取读锁的数量,此时读锁的数量不一定等于获取锁的数量,因为锁可以重入,可能有线程重入了读锁
int getReadHoldCount() 获取当前线程重入读锁的次数
int getWriteHoldCount() 获取当前线程重入写锁的次数
int isWriteLocked() 判断锁的状态是否是写锁,返回true,表示锁的状态是写锁
读写状态的设计

读写锁和重入锁在设计上都继承了AQS队列同步器,相关描述来自掘金作者天堂同志。

1
abstract static class Sync extends AbstractQueuedSynchronizer
  • 在AQS中,通过int类型的全局变量state来表示同步状态,即用state来表示锁。ReentrantReadWriteLock也是通过AQS来实现锁的,但是ReentrantReadWriteLock有两把锁:读锁和写锁,它们保护的都是同一个资源,那么如何用一个共享变量来区分锁是写锁还是读锁呢?答案就是按位拆分。
  • 由于state是int类型的变量,在内存中占用4个字节,也就是32位。将其拆分为两部分:高16位和低16位,其中高16位用来表示读锁状态,低16位用来表示写锁状态。当设置读锁成功时,就将高16位加1,释放读锁时,将高16位减1;当设置写锁成功时,就将低16位加1,释放写锁时,将第16位减1。如下图所示:

aqs4

  • 那么如何根据state的值来判断当前锁的状态时写锁还是读锁呢?
  • 假设锁当前的状态值为S,将S和16进制数0x0000FFFF进行与运算,即S&0x0000FFFF,运算时会将高16位全置为0,将运算结果记为c,那么c表示的就是写锁的数量。如果c等于0就表示还没有线程获取锁;如果c不等于0,就表示有线程获取到了锁,c等于几就代表写锁重入了几次。
  • 将S无符号右移16位(S>>>16),得到的结果就是读锁的数量。当S>>>16得到的结果不等于0,且c也不等于0时,就表示当前线程既持有了写锁,也持有了读锁。
  • 当成功获取到读锁时,如何对读锁进行加1呢?S +(1<<16)得到的结果,就是将对锁加1。释放读锁是,就进行S - (1<<16)运算。
  • 当成功获取到写锁时,令S+1即表示写锁状态+1;释放写锁时,就进行S-1运算。
  • 由于读锁和写锁的状态值都只占用16位,所以读锁的最大数量为 2^{16})-1,写锁可被重入的最大次数为2^{16}-1。
写锁的获取与释放

写锁是一个排它锁,只能被一个线程所获取,这个锁的主要方法,也来自于AQS同步器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected final boolean tryAcquire(int acquires) {

Thread current = Thread.currentThread();
int c = getState();
// exclusiveCount()方法的作用是将同步变量与0xFFFF做&运算,计算结果就是写锁的数量。
// 因此w的值的含义就是写锁的数量
int w = exclusiveCount(c);
// 如果c不为0就表示锁被占用了,但是占用的是写锁还是读书呢?这个时候就需要根据w的值来判断了。
// 如果c等于0就表示此时锁还没有被任何线程占用,那就让线程直接去尝试获取锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//
/**
* 1. 如果w为0,说明写锁的数量为0,而此时又因为c不等于0,说明锁被占用,但是不是写锁,那么此时锁的状态一定是读锁,
* 既然是读锁状态,那么写锁此时来获取锁时,就肯定失败,因此当w等于0时,tryAcquire()方法返回false。
* 2. 如果w不为0,说明此时锁的状态时写锁,接着进行current != getExclusiveOwnerThread()判断,判断持有锁的线程是否是当前线程
* 如果不是当前线程,那么tryAcquire()返回false;如果是当前线程,那么就进行后面的逻辑。为什么是当前线程持有锁,就还能执行后面的逻辑呢?
* 因为读写锁是支持重入的。
*/
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 下面一行代码是判断,写锁的重入次数或不会超过最大限制,这个最大限制是:2的16次方减1
// 为什么是2的16次方减1呢?因为state的低16位存放的是写锁,因此写锁数量的最大值是2的16次方减1
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
/**
* 1. writerShouldBlock()方法的作用是判断当前线程是否应该阻塞,对于公平的写锁和非公平写锁的具体实现不一样。
* 对于非公平写锁而言,直接返回false,因为非公平锁获取锁之前不需要去判断是否排队
* 对于公平锁写锁而言,它会判断同步队列中是否有人在排队,有人排队,就返回true,表示当前线程需要阻塞。无人排队就返回false。
*
* 2. 当writerShouldBlock()返回true时,表示当前线程还不能直接获取锁,因此tryAcquire()方法直接返回false。
* 当writerShouldBlock()返回false时,表示当前线程可以尝试去获取锁,因此会执行if判断中后面的逻辑,即通过CAS方法尝试去修改同步变量的值,
* 如果修改同步变量成功,则表示当前线程获取到了锁,最终tryAcquire()方法会返回true。如果修改失败,那么tryAcquire()会返回false,表示获取锁失败。
*
*/
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

写锁的释放与排他锁的释放逻辑也几乎一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
// 判断是否是当前线程持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 将state的值减去releases
int nextc = getState() - releases;
// 调用exclusiveCount()方法,计算写锁的数量。如果写锁的数量为0,表示写锁被完全释放,此时将AQS的exclusiveOwnerThread属性置为null
// 并返回free标识,表示写锁是否被完全释放
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
读锁的获取与释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
// for死循环,直到满足相应的条件才会return退出,否则一直循环
for (;;) {
int c = getState();
// 锁的状态为写锁时,持有锁的线程不等于当期那线程,就说明当前线程获取锁失败,返回-1
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 尝试设置同步变量的值,只要设置成功了,就表示当前线程获取到了锁,然后就设置锁的获取次数等相关信息
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

//
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
// 将修改同步变量的值(读锁状态减去1<<16)
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

//
锁降级

读写锁会发生锁降级的事件,这里的锁降级指的是线程获取到了写锁,在没有释放写锁的情况下,又获取读锁。为什么不支持锁升级呢?举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void test(){
ReadWriteLock lock = new ReentrantReadWriteLock();
// 创建读锁
Lock readLock = lock.readLock();
// 创建写锁
Lock writeLock = lock.writeLock();
readLock.lock();
try{
// ...处理业务逻辑
writeLock.lock(); // 代码①
}finally {
readLock.unlock();
}
}

在上面的示例代码中,假如T1线程先获取到了读锁,然后执行后面的代码,在执行到代码①的上一行时,T2线程也去获取读锁,由于读锁是共享锁,且此时写锁还没有被获取,所以此时T2线程可以获取到读锁,当T1执行到代码①时,尝试去获取写锁,由于有T2线程占用了读锁,所以T1线程是无法获取到写锁的,只能等待,当T2也执行到代码①时,由于T1占有了读锁,导致T2无法获取到写锁,这样两个线程就一直等待,即获取不到写锁,也释放不掉读锁。因此锁是不支持锁升级的。

读写锁支持锁的降级,锁的降级是为了保证可见性。让T1线程对数据的修改对其他线程可见。读锁不支持条件等待队列。当调用ReadLock类的newCondition()方法时,会直接抛出异常。

1
2
3
public Condition newCondition() {
throw new UnsupportedOperationException();
}

因为读锁是共享锁,最大获取次数为2^{16}-1,同一时刻可以被多个线程持有,对于读锁而言,其他线程没有必要等待获取读锁,Condition的等待唤醒毫无意义。

那么锁降级中,先获取到读锁有没有必要呢?答案是肯定的。如果当前线程不获取读锁而是直接释放写锁,假设此刻存在另一个线程获取了写锁并修改了数据,那么当前线程就无法感知到另一个线程的更新。如果当前线程获取到了读锁,遵循锁降级的步骤,则另一个线程就会被阻塞,直到当前的线程使用数据并释放读锁之后,另一个线程才能获取写锁进行数据更新。