线程池 线程池的基本使用 创建一个线程池 线程的每一次创建,最后的结果免不了都是销毁,那么在大型的系统的,就有着不断地创建和销毁,线程的创建和销毁开销是很大的,甚至在一些小的任务当中,创建和销毁的开销甚至超过了任务本身,那么,有方法可以去减少这种开销吗?设计者当然也想到了,于是乎有了线程池这个事物,使用线程池去创建线程,线程执行完任务后再回到线程池,等待下次任务的执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class test implements Runnable { public static void main (String[] args) { test t1=new test(); ExecutorService es= Executors.newFixedThreadPool(5 ); for (int i = 0 ; i <10 ; i++) { es.submit(t1); } es.shutdown(); } @Override public void run () { System.out.println(System.currentTimeMillis()+"线程ID:" +Thread.currentThread().getId()); } } 1577176965545 线程ID:12 1577176965545 线程ID:15 1577176965545 线程ID:13 1577176965545 线程ID:12 1577176965545 线程ID:14 1577176965546 线程ID:15 1577176965546 线程ID:12 1577176965546 线程ID:13 1577176965545 线程ID:16 1577176965546 线程ID:14
可以看到,线程池只创建了5个线程,却要执行10个任务,使用线程池的方法可以使得线程被重复利用。从而减少线程的创建和销毁所占用的内存。
线程池的种类 线程池不仅仅只有一种创建方法,它还有其他的种类
newFixedThreadPool(int nThreads)创建一个拥有固定数量的线程池
newSingleThreadExecutor()创建一个只有一个线程的线程池
newCachedThreadPool()创建一个拥有自动改变大小的线程池
newSingleThreadScheduledExecutor()创建一个有固定时间执行的单个线程池
newScheduledThreadPool(int corePoolSize)创建一个有固定时间执行的且指定数量的线程池
固定时间执行任务 :newScheduledThreadPool(int corePoolSize) 这个函数可以指定固定时间完成线程的执行,看看它:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public <V> ScheduledFuture<V> schedule (Callable<V> callable, long delay, TimeUnit unit) ; public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
它有三个方法,最主要的两个是scheduleAtFixedRate和scheduleWithFixedDelay,它们的区别在于传入的是period还是delay,实际运用的时候,AtFixedRate是等待initial+n*period的时间允许,而WithFixedDelay是在线程执行完后等待delay时间运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class test { public static void main (String[] args) { test t=new test(); ScheduledExecutorService es = Executors.newScheduledThreadPool(10 ); es.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { Thread.sleep(1000 ); System.out.println(System.currentTimeMillis()/1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } },0 ,2 , TimeUnit.SECONDS); } } 1577178569 1577178571 1577178573 1577178575 1577178577 ...... 1577178975 1577178978 1577178981 1577178984
但如果我们的周期时间要大于等待时间呢?比如说8秒,那我们的AtFixedRate就会执行完后继续执行而不是再继续等待,反之,WithFixedDelay是还要再等待2秒才会继续执行。
线程池的内部实现 ThreadPoolExecutor 倒回来去看看前三个线程池构造方法,可以点进去看他们的实现,发现他们的实现都是来源于一个类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
它们的不同也仅仅是传入的参数不同而已,并且都返回了一个叫做ThreadPoolExecutor的函数。
1 2 3 4 5 6 7 8 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
corePoolSize
maximumPoolSize
keepAliveTime超过指定线程数量的线程在被销毁前的存活时间
TimeUnit unit
BlockingQueue workQueue任务队列
Executors.defaultThreadFactory()
handler:拒绝策略
这里有几个比较不好理解的词语:BlockingQueue和handler
任务队列 BlockingQueue是一个任务队列,它可以指定任务队列的样式,通常有那么几种:
直接提交的队列:SynchronousQueue。它没有容量,它的每一个插入操作都要等待一个删除操作,这个队列锁提交的任务不会被真实的保存,而是将新任务交给线程执行,如果没有新的进程,便创建一个新的进程,它通常要设置很大的线程容量,否则很容易执行拒绝策略。
有界的任务队列:ArrayBlockingQueue。这个队列的特点就是,在等待队列已经满之时,又有新的任务加入,且指定线程数目还未超过最大线程数,就创建新的进程执行任务。
无界的任务队列:LinkedBlockQueue。这个队列在有新的任务加入队列时,但线程已经达到指定的容量,新的任务会进入队列一直等待,而不是创建新的进程。
优先任务队列:PriorityBlockQueue。顾名思义,带有优先级的队列,一般的队列都是依照着先进先出的规则执行,而这个队列的任务带有优先级标志,按照优先级去执行。
前面提到过newCacheThreadPool,这个函数的指定线程为0,最大线程数为无穷大。一般情况下,这个线程池没有线程,但是当有新的任务分配到的时候,会加入SynchronousQueue,提交任务。执行完成后,空闲线程若无其他任务,就会在60秒内被回收。
拒绝策略 jdk内置的拒绝策略
AbortPolicy策略:直接抛出异常
CallerRunsPolicy策略:直接在调用者线程中,运行被抛弃的任务
DiscardOledestPOlicy策略:丢弃最老的一个请求
DIscardPolicy策略:默默丢弃无法处理的任务
演示 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import java.util.concurrent.*;public class test implements Runnable { public static void main (String[] args) throws InterruptedException { test t=new test(); ExecutorService es= new ThreadPoolExecutor(2 ,5 ,0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10 ), Executors.defaultThreadFactory(), new RejectedExecutionHandler(){ @Override public void rejectedExecution (Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString()+"is discard" ); } }); for (int i = 0 ; i <Integer.MAX_VALUE ; i++) { es.submit(t); Thread.sleep(10 ); } } @Override public void run () { System.out.println(System.currentTimeMillis()+" :Theard ID:" +Thread.currentThread().getId()); try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } } } 1577183355191 :Theard ID:12 1577183355201 :Theard ID:13 1577183355291 :Theard ID:12 1577183355301 :Theard ID:13 1577183355334 :Theard ID:14 1577183355344 :Theard ID:15 1577183355354 :Theard ID:16 java.util.concurrent.FutureTask@135f baa4is discard java.util.concurrent.FutureTask@45 ee12a7is discard java.util.concurrent.FutureTask@330 bedb4is discard 1577183355391 :Theard ID:12 1577183355401 :Theard ID:13 java.util.concurrent.FutureTask@2503 dbd3is discard java.util.concurrent.FutureTask@4 b67cf4dis discard 1577183355434 :Theard ID:14 1577183355444 :Theard ID:15 1577183355454 :Theard ID:16 java.util.concurrent.FutureTask@7 ea987acis discard java.util.concurrent.FutureTask@12 a3a380is discard java.util.concurrent.FutureTask@29453f 44is discard ......
如上所示,在创建了两个线程之后,发现所需线程不够,便达到了最大线程数,此刻在进行访问的时候,任务太多,处理不过来,便选择了抛弃任务,从而出现了拒绝策略,不去执行这个任务。
自定义线程创建 线程池的目的是实现线程复用,但线程池并没有使用new语句去创建多个线程,它仅仅是进行了submit而已,那么这些线程是怎么来的呢?它们来自于ThreadFactory。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import java.util.concurrent.*;public class test implements Runnable { private static int i=0 ; public static void main (String[] args) throws InterruptedException { test t=new test(); ExecutorService es=new ThreadPoolExecutor(5 , 5 , 0L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { @Override public Thread newThread (Runnable r) { i++; Thread t=new Thread(r,"" +i); Thread t=new Thread(r); t.setDaemon(true ); System.out.println("create" +t); return t; } }); for (int i = 0 ; i <5 ; i++) { es.submit(t); } Thread.sleep(2000 ); } @Override public void run () { } } createThread[Thread-0 ,5 ,main] createThread[Thread-1 ,5 ,main] createThread[Thread-2 ,5 ,main] createThread[Thread-3 ,5 ,main] createThread[Thread-4 ,5 ,main]
如上,在创建了线程池之后,重写线程池的ThreadFactory方法,修改了线程工厂的构造方法,这使得我们在submit这个t对象后,使用构造方法构造了许多线程,可以看到这个线程所输出的信息。由于将所有的线程都设置为了守护线程,这使得我们的主线程退出后,会自动销毁所有线程,而不需要再加一个es.shutdown();去手动的结束线程。
扩展线程池 我们在使用线程的时候,对于线程的创建和销毁状态是不可见的,只能在执行run方法的时候,才能够从中察觉,但能不能在创建或者销毁的时候去监控它呢?当然是可以的啦!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class test { public static class mytask implements Runnable { private String name; public mytask (String name) { this .name = name; } @Override public void run () { System.out.println("正在执行" +" 线程ID:" +Thread.currentThread().getId()+",task name=" +name); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main (String[] args) throws InterruptedException { ExecutorService es=new ThreadPoolExecutor(5 ,5 ,0L , TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()){ @Override protected void beforeExecute (Thread t, Runnable r) { System.out.println("准备执行" +((mytask) r).name); } @Override protected void afterExecute (Runnable r, Throwable t) { System.out.println("执行完成" +((mytask) r).name); } @Override protected void terminated () { System.out.println("线程池退出" ); } }; for (int i = 0 ; i < 5 ; i++) { mytask t=new mytask("task_geym_" +i); es.execute(t); Thread.sleep(10 ); } es.shutdown(); } } 准备执行task_geym_0 正在执行 线程ID:12 ,task name=task_geym_0 准备执行task_geym_1 正在执行 线程ID:13 ,task name=task_geym_1 准备执行task_geym_2 正在执行 线程ID:14 ,task name=task_geym_2 准备执行task_geym_3 正在执行 线程ID:15 ,task name=task_geym_3 准备执行task_geym_4 正在执行 线程ID:16 ,task name=task_geym_4 执行完成task_geym_0 执行完成task_geym_1 执行完成task_geym_2 执行完成task_geym_3 执行完成task_geym_4 线程池退出
只要选择自行重写beforeExecute和afterExecute方法便可以实现啦。
线程池中的堆栈 在上面的时候,我们发现线程池使用execute去执行程序而不是submit,这是有很多原因的,其中一点就是,execute可以打印出异常堆栈,这代表的,如果线程的创建出了一个无法被编译器识别的错误的,可以将其错误的地方打印出来,这使得我们的程序更为优秀。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import java.util.concurrent.SynchronousQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class test implements Runnable { int a,b; public test (int a, int b) { this .a = a; this .b = b; } public static void main (String[] args) { ThreadPoolExecutor es=new ThreadPoolExecutor(0 ,Integer.MAX_VALUE,0L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); for (int i = 0 ; i <5 ; i++) { es.execute(new test(100 ,i)); } } @Override public void run () { double re=a/b; System.out.println(re); } } Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero at test.run(test.java:24 ) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149 ) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624 ) at java.lang.Thread.run(Thread.java:748 ) 100.0 25.0 33.0 50.0
比如这个,发生了除数为0,而导致线程的出错,但是如果你的方法是submit,则打印不出来异常堆栈。顺着提示的错误点,点进去,你就可以快速找到是哪里错了,但是这样仍然不能够找到提交的地点,那应该怎么做呢?
首先写一个新的类并继承ThreadPoolExecutor,并重写提交和执行这两个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import java.util.concurrent.BlockingQueue;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class 打印出异常堆栈的类 extends ThreadPoolExecutor { public 打印出异常堆栈的类(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute (Runnable command) { super .execute(wrap(command,clientTrace(),Thread.currentThread().getName())); } @Override public Future<?> submit(Runnable task) { return super .submit(wrap(task,clientTrace(),Thread.currentThread().getName())); } private Exception clientTrace () { return new Exception("客户端堆栈追踪" ); } private Runnable wrap (final Runnable task,final Exception clientStack,String clientThreadName) { return new Runnable() { @Override public void run () { try { task.run(); } catch (Exception e) { clientStack.printStackTrace(); throw e; } } }; } }
而submit函数返回的是一个Runnable型的类,那么我们便可以
1 2 3 4 5 6 public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; }
写一个Runnable型函数去返回它,调用的时候还是使用task.run();继续调用,但区别在于传入了一个叫做Exception型的参数:clientStack.printStackTrace();使其找到提交的地点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import java.util.concurrent.SynchronousQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class test implements Runnable { int a,b; public test (int a, int b) { this .a = a; this .b = b; } public static void main (String[] args) { ThreadPoolExecutor es=new 打印出异常堆栈的类(0 ,Integer.MAX_VALUE,0L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); for (int i = 0 ; i <5 ; i++) { es.submit(new test(100 ,i)); } } @Override public void run () { double re=a/b; System.out.println(re); } } java.lang.Exception: client stack trace at 打印出异常堆栈的类.clientTrace(打印出异常堆栈的类.java:21 ) at 打印出异常堆栈的类.submit(打印出异常堆栈的类.java:18 ) at test.main(test.java:17 ) 100.0 25.0 33.0 50.0
瞬间定位提交地点。
Fork/Join框架 一般来说,线程都有各自的任务,但各个任务的难度都不相同,有的繁琐而艰巨,有的只是喝茶聊天,所以设计者为了平衡这样的状况,设计出了一个叫做Fork/Join的框架,它能够实现线程之间的互助,比如说t1线程早就执行完了自己的任务,他就会去帮助t2线程执行任务。而很多线程在执行任务的时候,都是由队列的顶部去抓取数据,而在Fork/Join框架线程互助的时候,t1会从t2的底部去抓取数据,从而避免了数据的竞争。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import java.util.ArrayList;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.RecursiveTask;public class test extends RecursiveTask <Long > { private static final int Threshold =10000 ; private long start; private long end; public test (long start, long end) { this .start = start; this .end = end; } public static void main (String[] args) { ForkJoinPool fjp=new ForkJoinPool(); test t=new test(0 ,200000L ); ForkJoinTask<Long> result=fjp.submit(t); try { long res=result.get(); System.out.println("sum=" +res); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Override protected Long compute () { long sum=0 ; boolean canCompute=(end-start)<Threshold; if (canCompute){ for (long i = start; i <end ; i++) { sum+=i; } }else { long step=(start+end)/100 ; ArrayList<test> subtests=new ArrayList<test>(); long pos=start; for (int i = 0 ; i <100 ; i++) { long lastone=pos+step; if (lastone>end){ lastone=end; } test subtest=new test(pos,lastone); pos+=step+1 ; subtests.add(subtest); subtest.fork(); } for (test t: subtests) { sum+=t.join(); } } return sum; } } sum=19989995149
这是一道计算1到200000求和的计算题,利用了算法当中的分治法,但这个分治法是使用多线程是完成的。首先设置了阈值为10000,若大于这个阈值,则进行将其往下分解100个小任务,每个任务将每一段都构造新的test()进行加入队列,其次在使用fork方法执行每个test类的compute方法,分解的值都加入subtests队列。最后每个subtest使用join方法导出值,并且加入sum。