几种提高锁性能的方法 减小锁持有的时间 synchronized虽然可以保持原子性,但是大量的使用synchronized会使系统的整体性能下降,于是,我们可以只在需要上锁的时候上锁,对某些不改变系统数据的方法不上锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public synchronized void syncMethod () throws InterruptedException { code1(); for (int i = 0 ; i < 100000 ; i++) { count++; } code2(); } public void code1 () throws InterruptedException { Thread.sleep(1000 ); } public void code2 () throws InterruptedException { Thread.sleep(1000 ); } public void syncMethod () throws InterruptedException { code1(); synchronized (this ){ for (int i = 0 ; i < 100000 ; i++) { count++; }} code2(); } public void code1 () throws InterruptedException { Thread.sleep(1000 ); } public void code2 () throws InterruptedException { Thread.sleep(1000 ); }
比如code1和code2是不影响原子性的普通操作,而它们所占用的时间又很多,于是便可以将synchronized从一个整体函数拆分到一个个体中,更甚至可以使用lock和unlock继续细分,使得仅仅需要同步的方法上锁。
减小锁的粒度:锁细化 除了可以通过减小锁持有的时间去提高性能外,还可以通过减小锁的粒度去提高性能,就比如HashMap,在使用put()的时候,它是线程不安全的,如果直接使用synchronized去上锁,那么可能会对整体性能造成很大的影响,因为HashMap经常需要被使用。所以,便可以通过减小锁的粒度的形式去改变它,就比如ConcurrentHashMap,它本身有16个空的容器可以容纳元素,它在使用put方法的时候,进行计算,如果各个线程所得出来的key值不相同,那就不进行加锁,也就是说,最多可以共同容纳16个线程不加锁去put元素,极大的提高了效率。以下提供ConcurrentHashMap源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public V put (K key, V value) { Segment<K,V> s; if (value == null ) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null ) s = ensureSegment(j); return s.put(key, hash, value, false ); } final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
注意,并不是所有的锁细化都可以提高性能,因为一旦进行锁细化操作后,如果一个全局的变量需要获得锁,那就必须等待所有被细化的锁释放之后才能进行。比如ConcurrentHashMap的size();就必须获得整体的锁,所以在执行类似操作的时候,要注意当时的场景是否需要经常使用全局变量,比如一个需要实时统计数据的股票交易所,就不要使用锁细化。
增大锁的粒度:锁粗化 有的时候为了提高性能甚至还可以反其道而行之,提高锁的粒度。因为存在一个特殊的场景,需要经常获得锁,但是不需要获得锁的部分所执行的时间又极短,所以与其不停的去申请锁并释放锁,不如直接申请一个大锁,锁住全部内容,以为每次加锁和解锁都是需要时间的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 synchronized (this ){ for (int i = 0 ; i < 100000 ; i++) { count++; }} Thread.sleep(2 ); synchronized (this ){ for (int i = 0 ; i < 100000 ; i++) { count++; }} synchronized (this ){ for (int i = 0 ; i < 100000 ; i++) { count++; } Thread.sleep(2 ); for (int i = 0 ; i < 100000 ; i++) { count++; } }
用读写锁去代替独占锁 在生活中很多应用场合中,读取数据的次数总是要大于写入次数的,就比如购物平台,所以很多时候都可以用读写锁ReadWriteLock去代替独占锁,或者用重入锁去代替独占锁,很多特殊的锁在特定环境下总能够获得更大的性能,要看场合使用。
锁分离 从读写锁的角度出发,我们可以从中获得启发,那就是将锁分离,读写锁可以拆分为读锁和写锁,用作于很多不同的场景,而我们也可以试着将锁分离,或者增加更多的锁,从而用更多的方法去得到灵活性。就比如之前的LinkedBlockingQueue。
1 2 3 4 5 6 7 8 9 10 private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();
使用了两个锁,,分别去控制put方法和take方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException(); int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }