Java并发编程(十五)线程互助的SynchronousQueue

让线程之间互相帮助:SynchronousQueue

在线程池的介绍当中,提到一个非常特殊的队列,叫做SynchronousQueue。它的容量为0,对任何一个写操作,都要等待一个读操作,可以把SynchronousQueue看做为一个数据的交换通道。讲到这样一个需要等待操作的方法,不得不提到之前的LinkedBLockQueue,它的put方法和take方法,正是需要等待队列的来唤醒,而我们这个SynchronousQueue,也正有着异曲同工之秒,它的put方法和take方法都涉及到了一个类:

1
2
transferer.transfer(o, false, 0) == null)//put
transferer.transfer(null, false, 0)//take

而这个方法的参数有哪些呢?

1
abstract Object transfer(Object e, boolean timed, long nanos);

可以看到,这个是一个抽象方法,等待SynchronousQueue去实现,而第一个值Object表示是否传入参数,第二个值表示是否传入存在时间,第三个值表示时长。为什么要这么做呢?因为如果传入的值一直都没有相应的take操作将其取出,就会导致堵塞,这样会使得SynchronousQueue不能正常工作,所以需要设定自我销毁时间。

transfer主要分三个部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (mode == REQUEST) ? m.item : s.item;
}
}

根据SynchronousQueue的特性,对任何一个写操作,都要等待一个读操作。一开始的SNode表示等待队列的节点,之后的if所判断的是如果队列为空,则直接返回空,或者队列中元素的模式和本次操作相同。比如都是读操作,则需要等待入队,从elseif开始,表示入队成功,之后节点就会处于自旋、等待的状态,直到有一个相应的线程使其唤醒。之后的操作则是帮助两个头部节点完成出队的操作,当然,这帮助不一定会有效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (mode == REQUEST) ? m.item : s.item;
} else // lost match
s.casNext(m, mn); // help unlink
}
}
}

如果等待队列和本次操作,是互补的,那么就插入一个完成状态的节点,让他匹配到一个等待节点上,之后弹出这两个节点,并且使得对应的两个线程继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
else {                            // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}

如果线程发现等待队列的节点就是完成节点,那么帮助这个节点完成任务,其流程和步骤2是一样的。

下面用一个实际例子去展示:例子来自于:https://blog.csdn.net/yanyan19880509/article/details/52562039

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.SynchronousQueue;

public class 关于SynchronousQueueTest {
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();

Thread putThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("put 线程开始");
try {
queue.put(1);
} catch (InterruptedException e) {
}
System.out.println("put 线程结束");
}
});

Thread takeThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("take 线程开始");
try {
System.out.println("take 来自于 put线程: " + queue.take());
} catch (InterruptedException e) {
}
System.out.println("take 线程结束");
}
});

putThread.start();
Thread.sleep(1000);
takeThread.start();
}
}

可以看到,必须等待其他的线程使用take操作的时候,才能够让put的线程继续运行下去。这正是SynchronousQueue的特点,它不能够容乃任何事物,只能是一个任意门,一个通道,它是不能存放任何实体的。(注:SynchronousQueue内部没有容器指的是没有像数组那样的内存空间存多个元素,但是是有单地址内存空间,用于交换数据)