Java并发编程(十四)无锁状态

无锁

加锁是保证线程安全的常用手法,但是被加锁之后,往往会产生一些效率的问题,那不使用锁,也能保证线程安全吗?当然可以,并且,使用无锁的方法去实现的线程安全,不仅仅在效率上占优,而且,还不会产生死锁的问题哦。

无锁的策略:CAS(Compare And Swap)

经常被使用的无锁方法,叫做CAS,就是比较和交换。它不是一个固定的类或者方法,而是一个常用的策略,是一个算法。CAS通常包括着三个参数(V,E,N)V代表着即将要被更新的变量,E代表着预期的值,N代表着新的值,只有当且仅当V=E的时候,才会把V更新为N。这表示着如果有多个线程企图使用CAS操作去更新一个值的时候,只有一个会更新成功,并且把更新成功后其他线程的E变换为更新过后的值,导致了其他线程的V!=E 从而使得其他线程的更新操作失败,不过,这并不会直接导致他们取消这次操作,而是让他们再次开始自旋去尝试进行CAS操作。

AtomicInteger:无锁的线程安全整数

AtomicInteger是一个使用CAS操作去实现的类,它是一个无锁的、线程安全的整体,它的任何类似于i++之类的操作,即使没有被加上锁,依然是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class test implements Runnable{
static AtomicInteger a=new AtomicInteger();
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(new test());
Thread t2=new Thread(new test());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("a="+a);
ExecutorService es= Executors.newFixedThreadPool(10);
for (int i = 0; i <10 ; i++) {
es.submit(new test());
}
Thread.sleep(1000);
es.shutdown();
System.out.println("a="+a);
}
@Override
public void run() {
for (int i = 0; i <100000; i++) {
a.incrementAndGet();//a++操作
}
}
}

a=200000
a=1200000

可以看到,无论是单独的使用线程,还是使用线程池,它们对于a的++操作,都是线程安全的。而AtomicInteger常用的方法有:

1
2
3
4
5
6
7
public final int getAndIncrement()//i++
public final int getAndDecrement()//i--
public final int getAndAdd(int delta)//i+delta
public final int incrementAndGet()//++i
public final int decrementAndGet()//--i
public final int addAndGet(int delta)//delta+i
//注意顺序

我们看看incrementAndGet()的源码吧!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//jdk8,经过了层层封装,就以JDK7为例
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
//
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

//jdk7
public final int incrementAndGet() {
for (;;) {
int current = get();//获取值
int next = current + 1;//创建新的对象使其+1,这种本身不变,通过创建新值的方式,来使自己增加,更具有安全性
if (compareAndSet(current, next))//CAS操作
return next;
}
}
//
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

一开始看到这个for (;;) 它是一个无限的循环,正如开头所说的那样,就算失败也不会返回false,而是进行不断地尝试,直到正确为止。第二,这个CAS操作会在自己的线程中创建一个新的值,这样的线程局部变量仅仅对自身有效,而在执行CAS操作的时候,如果在这个时候,目标的值被改变,就不会返回,而是自旋,直到成功为止。

Unsafe:Java中的指针

看到无论是JDK7还是JDK8,都使用了一个叫做Unfase的类去调用CAS操作,在compareAndSet中,有着四个参数,this代表的是自身这个对象,valueOffset代表着目标的偏移量,expect是旧值,update是新值。而Java中其实并没有指针,所以他是不安全的,是Unsafe的,而这个valueOffset就是一个字段到目标头部的偏移量,通过这个偏移量快速定位字段,看不懂也没关系,这涉及到底层实现。

在AtomicInteger中,Unsafe有这样的定义:

1
private static final Unsafe unsafe = Unsafe.getUnsafe();

它实际获取的是Unsafe这个类中的getUnsafe方法

1
2
3
4
5
6
7
8
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (var0.getClassLoader() != null) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}

它有一个特别的地方,在那个if的判断语句当中,有着var0.getClassLoader() != null ,这个判断,它表示着如果这个类的类加载器不为空,则直接抛出异常,并且停止工作,这使我们无法直接去使用这个类,说明它是一个JDK内部的专属类,不可以被外部对象所使用。也就是说,我们不需要或不能去直接使用Unsafe,而是应该通过CAS去操纵它。

AtomReference:无锁的对象引用

除了AtomicInteger这样对整数进行无锁,却又能保证线程安全的类之外,设计者当然也考虑到了对象的线程安全使用问题。它的使用和AtomicInteger非常类似,在此不多赘述,只展示区别:

1
2
3
4
5
6
7
8
9
static AtomicReference<String> a=new AtomicReference<String>();//被定义
//内部实现
public final V getAndSet(V newValue) {
while (true) {
V x = get();
if (compareAndSet(x, newValue))
return x;
}
}

但是需要补充的是,它们都涉及到了一个问题:对象的值被修改回原值,到底算不算修改。比如有三个线程ABC,A把i=1修改成i=2,B把i=2,再修改成i=1,那么对于C线程来说,i在进行CAS比较的时候,到底是被修改了还是没有被修改呢?但从代码上而言,对于C线程来说,i没有被修改,应该继续去执行CAS操作,而不是看做为被修改,从而再次循环。这或许在思维和逻辑上表示正确,但是现实生活中,却不应该是这样子,比如家里人给你1000块钱生活费,你一到账就用3秒时间全部还了花呗,你不能说在1分钟后,家里人问你到账没有,你说没有。实际上你确确实实花了那一部分钱,但这个在CAS操作中并没有被记录而已。接下来我们使用代码去还原当时的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import java.util.concurrent.atomic.AtomicReference;

public class test
{
static AtomicReference<Integer> money=new AtomicReference<Integer>();

public static void main(String[] args) {
money.set(900);
//你有900块
for (int i = 0; i <3 ; i++) {
new Thread(){
@Override
public void run(){
while (true){
Integer m=money.get();
if(m<1000){//生活费不足
if(money.compareAndSet(m,m+1000)){
System.out.println("不足1000元,已打钱,余额:"+money.get());
break;
}

}else {
//System.out.println("还不需要");
break;
}
}
}
}.start();
}

new Thread(){
@Override
public void run(){
for (int i = 0; i <100 ; i++) {
while (true){
Integer m=money.get();
if(m>500){//还花呗
System.out.println("大于500块");
if (money.compareAndSet(m,m-500)){
System.out.println("成功还款500,余额:"+money.get());
break;
}
}else {
System.out.println("没有足够金额");
break;
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();

}

}


不足1000元,已打钱,余额:1900
大于500
成功还款500,余额:1400
大于500
成功还款500,余额:900
不足1000元,已打钱,余额:1900
大于500
成功还款500,余额:1400
大于500
成功还款500,余额:900
不足1000元,已打钱,余额:1900

这种几率非常非常的小,但是还是有可能发生的,为了解决这样的问题,就诞生了AtomicStampedReference。当然,也可以使用带版本号的方式去辨别。

AtomicStampedReference:带有时间戳的对象引用

它会产生一个时间戳单位,并且在进行CAS的时候,会多传入两个参数,使其同时进行CAS操作,让它的时间戳+1,这样,修改了时间戳之后,代表了执行成功,就不会再次执行这样的操作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
 //源码  
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
//修改后

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;

public class test
{
static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(900,0);

public static void main(String[] args) {
for (int i = 0; i <3 ; i++) {
final int time=money.getStamp();//获取了时间戳
new Thread(){
@Override
public void run(){
while (true){
Integer m=money.getReference();
if(m<1000){//生活费不足
if(money.compareAndSet(m,m+1000,time,time+1)){//多传入了两个参数
System.out.println("不足1000元,已打钱,余额:"+money.getReference());
break;
}

}else {
//System.out.println("还不需要");
break;
}
}
}
}.start();
}

new Thread(){
@Override
public void run(){
for (int i = 0; i <100 ; i++) {
while (true){
int time=money.getStamp();
Integer m=money.getReference();
if(m>500){//还花呗
System.out.println("大于500块");
if (money.compareAndSet(m,m-500,time,time+1)){
System.out.println("成功还款500,余额:"+money.getReference());
break;
}
}else {
System.out.println("没有足够金额");
break;
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();

}

}

不足1000元,已打钱,余额:1900
大于500
成功还款500,余额:1400
大于500
成功还款500,余额:900
大于500
成功还款500,余额:400
没有足够金额
没有足够金额
没有足够金额

AtomicIntegerArray:无锁的数组

同理,这里仅仅是简单展示一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.atomic.AtomicIntegerArray;

public class test implements Runnable{
static AtomicIntegerArray a=new AtomicIntegerArray(10);
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(new test());
Thread t2=new Thread(new test());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("a="+a);
}
@Override
public void run() {
for (int i = 0; i <100000; i++) {
a.incrementAndGet(i%a.length());//a++操作
}
}
}


a=[20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000]

AtomicIntegerFieldUpdater:使int型也拥有原子性

在实际开发过程中,由于初期考虑不周,没有对某些类设计为原子性,可能会在以后所应用的某些场景,出现线程安全的问题。就比如一些餐馆的菜单类在ID字段上设计为int型,很多时候我们并不需要去频繁的改动它,或者说,这菜品的ID也只是一个只读类型。但是在之后的过程中,这家餐馆被收购了,需要非常频繁的去改动菜品的ID,这就会涉及到数据不一致的问题,但是当初被设计的时候没有被考虑到,如果我们需要更改的话,可能会非常麻烦或者出现其他的错误,所以与其修改,不如去增加,这样更安全。所以我们使用AtomicIntegerFieldUpdater,去实现它。

举一个场景,某一地要进行一个选举,现在开始模拟投票场景,如果选民投了候选人一票,就记为1,否则为0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class test {
public static class candidate{
volatile int score;
}
public final static AtomicIntegerFieldUpdater<candidate> scoreUpdater
=AtomicIntegerFieldUpdater.newUpdater(candidate.class,"score");
//传入一个泛型,在newUpdater中传入要绑定的类和类属性
public static AtomicInteger all=new AtomicInteger(0);
//总票数,用于检查updater工作是否正确
public static void main(String[] args) throws InterruptedException {
final candidate stu=new candidate();//一个类的实例
Thread[] t=new Thread[10000];
for (int i = 0; i <10000 ; i++) {
t[i]=new Thread(){
@Override
public void run() {
if (Math.random()>0.4){//假设有60%的人投了票
scoreUpdater.incrementAndGet(stu);//安全的增加这个类的属性的值,即使它不是AtomicInteger型
all.incrementAndGet();//起到验证作用
}
}
};
t[i].start();
}
for (int i = 0; i <10000 ; i++) {
t[i].join();
}
System.out.println("score="+stu.score);
System.out.println("all="+all);
}
}

//score=6013
//all=6013

模拟这个选举,有60%的人投了这个候选人,那么使用这个候选人的属性,使得它绑定到scoreUpdater,使其成为原子的。因此做了个试验去验证。我们再来看看它是怎么实现的吧:

1
2
3
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass, String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>(tclass, fieldName, Reflection.getCallerClass());
}

它传入的参数,一个是calss的值,一个是calss的属性,然后定位到这个类的属性,将其绑定成AtomicInteger,并且返回。最后使用AtomicIntegerFieldUpdater要注意几点:

  1. Updater只能修改可见变量,因为在源码中,使用了反射,所以score不能设置为private。
  2. 必须声明为volatile型
  3. 它的AtomicIntegerFieldUpdaterImpl中,也运用到了Unsafe,所以不支持静态变量,不能设置为static