Java并发编程(十二)如何提高锁的性能

几种提高锁性能的方法

减小锁持有的时间

synchronized虽然可以保持原子性,但是大量的使用synchronized会使系统的整体性能下降,于是,我们可以只在需要上锁的时候上锁,对某些不改变系统数据的方法不上锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

public synchronized void syncMethod() throws InterruptedException {
code1();

for (int i = 0; i < 100000; i++) {
count++;
}
code2();
}
public void code1() throws InterruptedException {
Thread.sleep(1000);
}
public void code2() throws InterruptedException {
Thread.sleep(1000);
}

//可以改为
public void syncMethod() throws InterruptedException {
code1();
synchronized (this){
for (int i = 0; i < 100000; i++) {
count++;
}}
code2();
}
public void code1() throws InterruptedException {
Thread.sleep(1000);
}
public void code2() throws InterruptedException {
Thread.sleep(1000);
}
//其实这个例子不能很好的说明,因为jvm对锁有优化,使得不需要上锁的区域会自动发生逃逸现象。

比如code1和code2是不影响原子性的普通操作,而它们所占用的时间又很多,于是便可以将synchronized从一个整体函数拆分到一个个体中,更甚至可以使用lock和unlock继续细分,使得仅仅需要同步的方法上锁。

减小锁的粒度:锁细化

除了可以通过减小锁持有的时间去提高性能外,还可以通过减小锁的粒度去提高性能,就比如HashMap,在使用put()的时候,它是线程不安全的,如果直接使用synchronized去上锁,那么可能会对整体性能造成很大的影响,因为HashMap经常需要被使用。所以,便可以通过减小锁的粒度的形式去改变它,就比如ConcurrentHashMap,它本身有16个空的容器可以容纳元素,它在使用put方法的时候,进行计算,如果各个线程所得出来的key值不相同,那就不进行加锁,也就是说,最多可以共同容纳16个线程不加锁去put元素,极大的提高了效率。以下提供ConcurrentHashMap源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//JDK7
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

//JDK8
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

注意,并不是所有的锁细化都可以提高性能,因为一旦进行锁细化操作后,如果一个全局的变量需要获得锁,那就必须等待所有被细化的锁释放之后才能进行。比如ConcurrentHashMap的size();就必须获得整体的锁,所以在执行类似操作的时候,要注意当时的场景是否需要经常使用全局变量,比如一个需要实时统计数据的股票交易所,就不要使用锁细化。

增大锁的粒度:锁粗化

有的时候为了提高性能甚至还可以反其道而行之,提高锁的粒度。因为存在一个特殊的场景,需要经常获得锁,但是不需要获得锁的部分所执行的时间又极短,所以与其不停的去申请锁并释放锁,不如直接申请一个大锁,锁住全部内容,以为每次加锁和解锁都是需要时间的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
synchronized (this){
for (int i = 0; i < 100000; i++) {
count++;
}}

Thread.sleep(2);//中间所执行时间极短

synchronized (this){
for (int i = 0; i < 100000; i++) {
count++;
}}

//不如就
synchronized (this){
for (int i = 0; i < 100000; i++) {
count++;
}

Thread.sleep(2);


for (int i = 0; i < 100000; i++) {
count++;
}
}
//减少锁的申请,减小开销

用读写锁去代替独占锁

在生活中很多应用场合中,读取数据的次数总是要大于写入次数的,就比如购物平台,所以很多时候都可以用读写锁ReadWriteLock去代替独占锁,或者用重入锁去代替独占锁,很多特殊的锁在特定环境下总能够获得更大的性能,要看场合使用。

锁分离

从读写锁的角度出发,我们可以从中获得启发,那就是将锁分离,读写锁可以拆分为读锁和写锁,用作于很多不同的场景,而我们也可以试着将锁分离,或者增加更多的锁,从而用更多的方法去得到灵活性。就比如之前的LinkedBlockingQueue。

1
2
3
4
5
6
7
8
9
10
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

使用了两个锁,,分别去控制put方法和take方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();//不能有两个线程同时进行put
try {
while (count.get() == capacity) {//如果达到了上限,便等待
notFull.await();//等待
}
enqueue(node);//跳出了等待,便入队
c = count.getAndIncrement();//入队成功,总数加一
if (c + 1 < capacity)//如果仍未满,就通知其他线程的put继续入队
notFull.signal();
} finally {
putLock.unlock();//解锁
}
if (c == 0)
signalNotEmpty();//插入成功,通知take操作
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//以此类推......