Java并发编程(十)线程池

线程池

线程池的基本使用

创建一个线程池

线程的每一次创建,最后的结果免不了都是销毁,那么在大型的系统的,就有着不断地创建和销毁,线程的创建和销毁开销是很大的,甚至在一些小的任务当中,创建和销毁的开销甚至超过了任务本身,那么,有方法可以去减少这种开销吗?设计者当然也想到了,于是乎有了线程池这个事物,使用线程池去创建线程,线程执行完任务后再回到线程池,等待下次任务的执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class test implements Runnable {
public static void main(String[] args) {
test t1=new test();
ExecutorService es= Executors.newFixedThreadPool(5);
for (int i = 0; i <10 ; i++) {
es.submit(t1);
}
es.shutdown();
}
@Override
public void run() {
System.out.println(System.currentTimeMillis()+"线程ID:"+Thread.currentThread().getId());

}
}


1577176965545线程ID:12
1577176965545线程ID:15
1577176965545线程ID:13
1577176965545线程ID:12
1577176965545线程ID:14
1577176965546线程ID:15
1577176965546线程ID:12
1577176965546线程ID:13
1577176965545线程ID:16
1577176965546线程ID:14

可以看到,线程池只创建了5个线程,却要执行10个任务,使用线程池的方法可以使得线程被重复利用。从而减少线程的创建和销毁所占用的内存。

线程池的种类

线程池不仅仅只有一种创建方法,它还有其他的种类

  • newFixedThreadPool(int nThreads)创建一个拥有固定数量的线程池
  • newSingleThreadExecutor()创建一个只有一个线程的线程池
  • newCachedThreadPool()创建一个拥有自动改变大小的线程池
  • newSingleThreadScheduledExecutor()创建一个有固定时间执行的单个线程池
  • newScheduledThreadPool(int corePoolSize)创建一个有固定时间执行的且指定数量的线程池
固定时间执行任务 :newScheduledThreadPool(int corePoolSize)

这个函数可以指定固定时间完成线程的执行,看看它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//返回的是一个ScheduledExecutorService 对象
//ScheduledExecutorService :
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

它有三个方法,最主要的两个是scheduleAtFixedRate和scheduleWithFixedDelay,它们的区别在于传入的是period还是delay,实际运用的时候,AtFixedRate是等待initial+n*period的时间允许,而WithFixedDelay是在线程执行完后等待delay时间运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class test {
public static void main(String[] args) {
test t=new test();
ScheduledExecutorService es = Executors.newScheduledThreadPool(10);
es.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis()/1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},0,2, TimeUnit.SECONDS);
}
}


1577178569
1577178571
1577178573
1577178575
1577178577
......
//每次丢失间隔两秒执行一次
//而我们把AtFixedRate换为WithFixedDelay呢?
1577178975
1577178978
1577178981
1577178984
//执行时间间隔变成了3秒

但如果我们的周期时间要大于等待时间呢?比如说8秒,那我们的AtFixedRate就会执行完后继续执行而不是再继续等待,反之,WithFixedDelay是还要再等待2秒才会继续执行。

线程池的内部实现

ThreadPoolExecutor

倒回来去看看前三个线程池构造方法,可以点进去看他们的实现,发现他们的实现都是来源于一个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

它们的不同也仅仅是传入的参数不同而已,并且都返回了一个叫做ThreadPoolExecutor的函数。

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
  • corePoolSize
  • maximumPoolSize
  • keepAliveTime超过指定线程数量的线程在被销毁前的存活时间
  • TimeUnit unit
  • BlockingQueue workQueue任务队列
  • Executors.defaultThreadFactory()
  • handler:拒绝策略

这里有几个比较不好理解的词语:BlockingQueue和handler

任务队列

BlockingQueue是一个任务队列,它可以指定任务队列的样式,通常有那么几种:

  • 直接提交的队列:SynchronousQueue。它没有容量,它的每一个插入操作都要等待一个删除操作,这个队列锁提交的任务不会被真实的保存,而是将新任务交给线程执行,如果没有新的进程,便创建一个新的进程,它通常要设置很大的线程容量,否则很容易执行拒绝策略。
  • 有界的任务队列:ArrayBlockingQueue。这个队列的特点就是,在等待队列已经满之时,又有新的任务加入,且指定线程数目还未超过最大线程数,就创建新的进程执行任务。
  • 无界的任务队列:LinkedBlockQueue。这个队列在有新的任务加入队列时,但线程已经达到指定的容量,新的任务会进入队列一直等待,而不是创建新的进程。
  • 优先任务队列:PriorityBlockQueue。顾名思义,带有优先级的队列,一般的队列都是依照着先进先出的规则执行,而这个队列的任务带有优先级标志,按照优先级去执行。

前面提到过newCacheThreadPool,这个函数的指定线程为0,最大线程数为无穷大。一般情况下,这个线程池没有线程,但是当有新的任务分配到的时候,会加入SynchronousQueue,提交任务。执行完成后,空闲线程若无其他任务,就会在60秒内被回收。

拒绝策略

jdk内置的拒绝策略
  • AbortPolicy策略:直接抛出异常
  • CallerRunsPolicy策略:直接在调用者线程中,运行被抛弃的任务
  • DiscardOledestPOlicy策略:丢弃最老的一个请求
  • DIscardPolicy策略:默默丢弃无法处理的任务
演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.util.concurrent.*;

public class test implements Runnable {
public static void main(String[] args) throws InterruptedException {
test t=new test();
ExecutorService es= new ThreadPoolExecutor(2,5,0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString()+"is discard");
}
});
for (int i = 0; i <Integer.MAX_VALUE ; i++) {
es.submit(t);
Thread.sleep(10);
}
}
@Override
public void run() {
System.out.println(System.currentTimeMillis()+" :Theard ID:"+Thread.currentThread().getId());
try{
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


1577183355191 :Theard ID:12
1577183355201 :Theard ID:13
1577183355291 :Theard ID:12
1577183355301 :Theard ID:13
1577183355334 :Theard ID:14
1577183355344 :Theard ID:15
1577183355354 :Theard ID:16
java.util.concurrent.FutureTask@135fbaa4is discard
java.util.concurrent.FutureTask@45ee12a7is discard
java.util.concurrent.FutureTask@330bedb4is discard
1577183355391 :Theard ID:12
1577183355401 :Theard ID:13
java.util.concurrent.FutureTask@2503dbd3is discard
java.util.concurrent.FutureTask@4b67cf4dis discard
1577183355434 :Theard ID:14
1577183355444 :Theard ID:15
1577183355454 :Theard ID:16
java.util.concurrent.FutureTask@7ea987acis discard
java.util.concurrent.FutureTask@12a3a380is discard
java.util.concurrent.FutureTask@29453f44is discard
......

如上所示,在创建了两个线程之后,发现所需线程不够,便达到了最大线程数,此刻在进行访问的时候,任务太多,处理不过来,便选择了抛弃任务,从而出现了拒绝策略,不去执行这个任务。

自定义线程创建

线程池的目的是实现线程复用,但线程池并没有使用new语句去创建多个线程,它仅仅是进行了submit而已,那么这些线程是怎么来的呢?它们来自于ThreadFactory。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.concurrent.*;

public class test implements Runnable{
private static int i=0;//定制名字
public static void main(String[] args) throws InterruptedException {
test t=new test();
ExecutorService es=new ThreadPoolExecutor(5,
5,
0L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
i++;//
Thread t=new Thread(r,""+i);//定制名字
Thread t=new Thread(r);
t.setDaemon(true);//设置为守护线程
System.out.println("create"+t);
return t;
}
});
for (int i = 0; i <5 ; i++) {
es.submit(t);
}
Thread.sleep(2000);
//es.shutdown();

}

@Override
public void run() {

}
}

createThread[Thread-0,5,main]
createThread[Thread-1,5,main]
createThread[Thread-2,5,main]
createThread[Thread-3,5,main]
createThread[Thread-4,5,main]

如上,在创建了线程池之后,重写线程池的ThreadFactory方法,修改了线程工厂的构造方法,这使得我们在submit这个t对象后,使用构造方法构造了许多线程,可以看到这个线程所输出的信息。由于将所有的线程都设置为了守护线程,这使得我们的主线程退出后,会自动销毁所有线程,而不需要再加一个es.shutdown();去手动的结束线程。

扩展线程池

我们在使用线程的时候,对于线程的创建和销毁状态是不可见的,只能在执行run方法的时候,才能够从中察觉,但能不能在创建或者销毁的时候去监控它呢?当然是可以的啦!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class test {
public static class mytask implements Runnable{
private String name;

public mytask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("正在执行"+" 线程ID:"+Thread.currentThread().getId()+",task name="+name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService es=new ThreadPoolExecutor(5,5,0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行"+((mytask) r).name);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成"+((mytask) r).name);
}

@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
for (int i = 0; i < 5; i++) {
mytask t=new mytask("task_geym_"+i);
es.execute(t);
Thread.sleep(10);
}
es.shutdown();
}
}
准备执行task_geym_0
正在执行 线程ID:12,task name=task_geym_0
准备执行task_geym_1
正在执行 线程ID:13,task name=task_geym_1
准备执行task_geym_2
正在执行 线程ID:14,task name=task_geym_2
准备执行task_geym_3
正在执行 线程ID:15,task name=task_geym_3
准备执行task_geym_4
正在执行 线程ID:16,task name=task_geym_4
执行完成task_geym_0
执行完成task_geym_1
执行完成task_geym_2
执行完成task_geym_3
执行完成task_geym_4
线程池退出

只要选择自行重写beforeExecute和afterExecute方法便可以实现啦。

线程池中的堆栈

在上面的时候,我们发现线程池使用execute去执行程序而不是submit,这是有很多原因的,其中一点就是,execute可以打印出异常堆栈,这代表的,如果线程的创建出了一个无法被编译器识别的错误的,可以将其错误的地方打印出来,这使得我们的程序更为优秀。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class test implements Runnable{
int a,b;

public test(int a, int b) {
this.a = a;
this.b = b;
}

public static void main(String[] args) {
ThreadPoolExecutor es=new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
for (int i = 0; i <5 ; i++) {
es.execute(new test(100,i));
}

}

@Override
public void run() {
double re=a/b;
System.out.println(re);
}
}

Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at test.run(test.java:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
100.0
25.0
33.0
50.0

比如这个,发生了除数为0,而导致线程的出错,但是如果你的方法是submit,则打印不出来异常堆栈。顺着提示的错误点,点进去,你就可以快速找到是哪里错了,但是这样仍然不能够找到提交的地点,那应该怎么做呢?

首先写一个新的类并继承ThreadPoolExecutor,并重写提交和执行这两个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class 打印出异常堆栈的类 extends ThreadPoolExecutor {
public 打印出异常堆栈的类(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable command) {
super.execute(wrap(command,clientTrace(),Thread.currentThread().getName()));
}

@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName()));
}
private Exception clientTrace(){
return new Exception("客户端堆栈追踪");
}
private Runnable wrap(final Runnable task,final Exception clientStack,String clientThreadName){
return new Runnable() {
@Override
public void run() {
try {
task.run();
} catch (Exception e) {
clientStack.printStackTrace();
throw e;
}
}
};
}
}

而submit函数返回的是一个Runnable型的类,那么我们便可以

1
2
3
4
5
6
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

写一个Runnable型函数去返回它,调用的时候还是使用task.run();继续调用,但区别在于传入了一个叫做Exception型的参数:clientStack.printStackTrace();使其找到提交的地点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class test implements Runnable{
int a,b;

public test(int a, int b) {
this.a = a;
this.b = b;
}

public static void main(String[] args) {
ThreadPoolExecutor es=new 打印出异常堆栈的类(0,Integer.MAX_VALUE,0L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
for (int i = 0; i <5 ; i++) {
es.submit(new test(100,i));
}

}

@Override
public void run() {
double re=a/b;
System.out.println(re);
}
}

java.lang.Exception: client stack trace
at 打印出异常堆栈的类.clientTrace(打印出异常堆栈的类.java:21)
at 打印出异常堆栈的类.submit(打印出异常堆栈的类.java:18)
at test.main(test.java:17)
100.0
25.0
33.0
50.0

瞬间定位提交地点。

Fork/Join框架

一般来说,线程都有各自的任务,但各个任务的难度都不相同,有的繁琐而艰巨,有的只是喝茶聊天,所以设计者为了平衡这样的状况,设计出了一个叫做Fork/Join的框架,它能够实现线程之间的互助,比如说t1线程早就执行完了自己的任务,他就会去帮助t2线程执行任务。而很多线程在执行任务的时候,都是由队列的顶部去抓取数据,而在Fork/Join框架线程互助的时候,t1会从t2的底部去抓取数据,从而避免了数据的竞争。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class test extends RecursiveTask<Long> {
private static final int Threshold =10000;
private long start;
private long end;

public test(long start, long end) {
this.start = start;
this.end = end;
}

public static void main(String[] args) {
ForkJoinPool fjp=new ForkJoinPool();
test t=new test(0,200000L);
ForkJoinTask<Long> result=fjp.submit(t);
try {
long res=result.get();
System.out.println("sum="+res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
protected Long compute() {
long sum=0;
boolean canCompute=(end-start)<Threshold;
if (canCompute){
for (long i = start; i <end ; i++) {
sum+=i;
}
}else {
//分成100个小任务
long step=(start+end)/100;
ArrayList<test> subtests=new ArrayList<test>();
long pos=start;
for (int i = 0; i <100 ; i++) {
long lastone=pos+step;
if(lastone>end){
lastone=end;
}
test subtest=new test(pos,lastone);
pos+=step+1;
subtests.add(subtest);
subtest.fork();
}//分而治之
for (test t:
subtests) {
sum+=t.join();
}//最后累加
}
return sum;
}
}
sum=19989995149

这是一道计算1到200000求和的计算题,利用了算法当中的分治法,但这个分治法是使用多线程是完成的。首先设置了阈值为10000,若大于这个阈值,则进行将其往下分解100个小任务,每个任务将每一段都构造新的test()进行加入队列,其次在使用fork方法执行每个test类的compute方法,分解的值都加入subtests队列。最后每个subtest使用join方法导出值,并且加入sum。