Java并发编程(十一)并发容器

并发容器

一些常用的工具

  • ConcurrentHashMap:线程安全的hashmap。
  • CopyOnWriteArrayList:线程安全的ArrayList,比较适合读多写少的场合,完胜vector。
  • ConcurrentLinkedQueue:线程安全的队列,它们两之间的差别在于数据结构不一样,一个是类似于数组,更适合直接查找,一个则是像链表一样,更适合添加和修改。
  • BlockingQueue:一个阻塞队列,也是一个接口,通常用于数据共享的通道。
  • ConcurrentSkipListMap:线程安全的跳表。
ADD()操作 DELETE()操作 INSERT操作 INDEX取值操作 ITERATOR取值操作
ArrayList/Vector/Stack 极优 极优
LinkedList 极优

初步实现线程安全

想要保持HashMap的线程安全的话,出了手动使用synchronized包围之外,还能够调用一个类去实现它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Map map= Collections.synchronizedMap(new HashMap<>());
//将HashMap对象传入synchronized的map,正如字面意思所示
//看看里面的实现
SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this;
}
//list同理
List list=Collections.synchronizedList(new LinkedList<>());
//
SynchronizedList(List<E> list) {
super(list);
this.list = list;
}

Objects.requireNonNull是判断有没有传入参数,mutex表示的是一个锁,表明要拥有这个锁的时候才能对map进行读取或写入,这样的话效率很低。

一个更好用的办法,就是用ConcurrentHashMap。

ConcurrentLinkedQueue

ConcurrentLinkedQueue是在高并发环境下最好的队列了,队列经常被用于实际生活,比如消息队列,用餐的推送顺序,购票网站等,之前讲过的Java并发编程(六)的ArrayList线程不安全,我们用线程安全的代替试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.concurrent.ConcurrentLinkedQueue;
public class test implements Runnable{
static ConcurrentLinkedQueue list=new ConcurrentLinkedQueue();
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(new test());
Thread t2=new Thread(new test());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(list.size());
}

@Override
public void run() {
for (int i = 0; i < 100000; i++)
{
list.offer(i);
//add也行,一样的
// public boolean add(E e) {
// return offer(e);
// }
}

}
}

200000

我们看看ConcurrentLinkedQueue是如何实现的吧!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}//无参数的构造方法,将类的头结点和子节点都设置为新的节点
//Node 是一个类,里面有两个参数
private static class Node<E> {
volatile E item;
volatile Node<E> next;
......
}
//E表示是泛型的容器,用传入的类型定义目标值,和用一个新的Node去定义next值,类似于链表一般,一个接着一个。
//而Node又有几个方法
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}

boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
//casItem的意思是设置当前的值,使用cas操作(compareAndSwapObject)去保证其原子性

队列里有head和tail两个值,分别代表的头结点和尾节点,头结点值永远不会为null,而我们正是使用头结点去遍历这个队列,每当有一个新的值入队的时候,都会从尾部入队,直接进入尾节点,从而只需移动尾节点便可以入队,极大的方便与插入和删除操作。而tail的更新情况并不一定是随时更新,而是可能会产生延迟,比如要进入了两个数值,尾节点才会移动到末尾。而我们入队的方法有两个,一个是add,一个是offer,但是实际上使用的都是offer。

1
2
3
4
//add也行,一样的
// public boolean add(E e) {
// return offer(e);
// }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean offer(E e) {
checkNotNull(e);
//检查是否为空
final Node<E> newNode = new Node<E>(e);
//然后便建立一个新的节点
for (Node<E> t = tail, p = t;;) {//p为头结点
Node<E> q = p.next;
if (q == null) {//等于空表示是最后的节点
//第一次加入元素时,p.next为空,于是便使用一个新的节点
if (p.casNext(null, newNode)) {
//新节点与p的next交换(p为头节点)
if (p != t) //头结点不等于尾节点
casTail(t, newNode);//允许失败
return true;//返回true
}
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
//
else
p = (p != t && t != (t = tail)) ? t : q;
//取最后一个节点
}
}

在if (q == null)这个判断中,如何q为空,可能队列为空,也可能加入新的节点,正常执行增加节点操作。但在队列为空时,p==t(即head=tail),因此不会执行castail()操作。

但在执行了第一个元素入队后,再执行第二个元素入队时,p的next不为空,于是就开始查找最后一个节点,执行 p = (p != t && t != (t = tail)) ? t : q; 使得p取下一个节点或最后的节点,于是p就成了链表的第一个元素。这个时候再进行循环,p的null(即第一个元素的next)为空,而此时的p不等于尾节点,于是变更新尾节点,使得尾节点移动至链表最后。

而出现特殊情况 else if (p == q) 出现的时候,便使用新的tail作为链表的末尾。

特殊情况是怎么诞生的呢?比如两个线程去读取队列的一个值,一个读取完了之后,另一个执行了修改,那么第一个线程再次去读取的时候,就会发生两次数据不一致的问题。就会出现一个(t != (t = tail))的情况,这时候,如果尾节点没有被修改,则返回head,重新查找尾节点。

看看出队的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;

if (item != null && p.casItem(item, null)) {
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

但在执行了第一个元素入队后,再执行第一个元素出队时,此刻的tail并没有更新,所以会直接执行最后一个操作,使得p=q;就是说把头结点的值变为第一个节点的值,然后再循环,执行到 item != null && p.casItem(item, null) 此时的item元素不会为空,因为它现在是第一个节点,然后再将其cas交换,交换后,item值已经为空了,于是就不会等于h,然后执行 updateHead(h, ((q = p.next) != null) ? q : p);

1
2
3
4
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}

再把head和tail变为同一个元素。

CopyOnWriteArrayList

CopyOnWriteArrayList常被用作于商品类目的展示,黑名单等各种读多写少的场景。但是它只能保证数据的最终一致性,不能保证数据的实时一致性,也就是说,被修改的值不一定能够被马上读到。我们来看看它的源码把。

1
2
3
4
5
6
7
8
9
10
11
12
//读取的时候,返回一个get(getArray(), index)
public E get(int index) {
return get(getArray(), index);
}
//
final Object[] getArray() {
return array;
}
//
private E get(Object[] a, int index) {
return (E) a[index];
}

这相当于每次都返回一个array[index]的内部数组,因为这个数组不会被修改,只能够被另一个array数组给替换,从而保证了原子性,所以就没有类似于synchronized和lock的加锁操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//add方法
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
//set
final void setArray(Object[] a) {
array = a;
}

一开始便设置了一个重入锁,进行了加锁操作,然后没增加一个新元素,都会创建一个新的数组,然后使用setArray去改变这个数组的值,由于整体都加了锁,所以不会影响到原子性,其次array变量是一个volatile类型。

1
private transient volatile Object[] array;

所以被修改后,其他线程会被察觉到并将其修改,保证可见性和有序性。

BlockingQueue

之前在写线程池的时候,介绍过四个任务队列,它们都有一个共同的特点就是都具有阻塞功能,因为它们都来自于一个接口BlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface BlockingQueue<E> extends Queue<E> {

boolean add(E e);

boolean offer(E e);

void put(E e) throws InterruptedException;

boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

int remainingCapacity();

E take() throws InterruptedException;

E poll(long timeout, TimeUnit unit)
throws InterruptedException;

boolean remove(Object o);

public boolean contains(Object o);

int drainTo(Collection<? super E> c);

int drainTo(Collection<? super E> c, int maxElements);
}

而我们就以ArrayBlockingQueue为例。虽然说ArrayBlockingQueue和concurrentlinkedqueue也有offer方法和poll方法,但是ArrayBlockingQueue由于继承的是BlockQueue,所以还有两个独有的方法,那就是put和take。

这有什么区别呢?可以猜的到这两个方法具有阻塞功能,就以offer方法和put方法来对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;//直接返回
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
//
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();//阻塞
enqueue(e);
} finally {
lock.unlock();
}
}
//
private E dequeue() {

final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//唤醒notfull
return x;
}
//
private final Condition notEmpty;

private final Condition notFull;

它们之间最为明显的差别就是,offer在队列已经满的时候,直接返回一个false,而put方法则是使用了condition.await去阻塞这个线程继续运行,等待另一个dequeue方法,移除一个元素出队后,再去唤醒put方法,使其继续将未能加入线程的元素继续加入进去。

同理,take方法也是如此

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();//直接返回
} finally {
lock.unlock();
}
}
//
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();//阻塞
return dequeue();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {

final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}

使用take想要获得队列首部信息的时候,如果没有值,便发生阻塞,直到有元素入队,使用notEmpty方法唤醒为止。

ConcurrentSkipListMap

跳表(SkipList),是除了哈希表(HashTable)以外,一个非常特别的表。哈希图是用哈希表去实现的,那么跳表也有对应的map结构,那就是跳图(ConcurrentSkipListMap),这个ConcurrentSkipListMap非常的神奇,这是一个用空间去换时间的算法,它的数据结构如下:图片资源来自于:http://www.liuhaihua.cn/archives/40657.html

skipmap1

顶层有着很少的数据和很大的区间,然后往下走,数据越密,区间越小。每个节点都有key值和value值,它的运行机制如下:当你要寻找一个key值的时候,从顶层开始找,如果需要找的key值小于当前节点,便继续前进一个节点,如果需要找的key值大于当前节点,便直接进入当前节点的下一层继续寻找,如果需要找的key值等于当前节点,便返回当前节点的value值。

skipmap2

如图所示,如果你要找的值为70,从顶层开始找,大于20往下,大于40再往下,小于40则往前,最后找到了目标值70。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;

public class test {
public static void main(String[] args) {
Map<Integer,String> s=new ConcurrentSkipListMap<Integer,String>();
for (int i = 0; i <30 ; i++) {
s.put(i,"i am "+i);
}
for (Map.Entry<Integer,String> e:s.entrySet()
) {
System.out.println(e.getKey());
}
}
}
//可以发现它的输出是有序的,而HashMap的输出是无序的。

我们看看它的源码构成:

1
2
3
4
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;

每一个节点除了自己的key和value,还有下一个节点的next,类似于链表

1
2
3
4
5
6
7
boolean casValue(Object cmp, Object val) {
return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}

boolean casNext(Node<K,V> cmp, Node<K,V> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

都使用CAS操作去保证原子性

1
2
3
4
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;

特别是这个index,定义了本身节点,右节点和下节点,这三个节点,这将其数据结构的实现联系到了一起,看看构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    public ConcurrentSkipListMap() {
this.comparator = null;
initialize();
}
//
private void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
null, null, 1);
}
//
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}

这个HeadIndex的三个参数分别是需要寻找的目标节点的移动,和当前节点的本身的值和右值和下值,最后一个参数表示了层数。这个构造函数是整个SkipMap的核心所在。