深入理解线程池
使用线程池的好处
- 降低资源消耗
可以重复利用已创建的线程降低线程创建和销毁造成的消耗。 - 提高响应速度
当任务到达时,任务可以不需要等到线程创建就能立即执行。 - 提高线程的可管理性
线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
不使用线程池的坏处
- 频繁的线程创建和销毁会占用更多的CPU和内存。
- 频繁的线程创建和销毁会对GC产生比较大的压力。
- 线程太多,线程切换带来的开销将不可忽视。
- 线程太少,多核CPU得不到充分利用,是一种浪费。
线程池的工作原理
当一个新的任务提交到线程池之后:
- 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
- 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步。
- 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
ThreadPoolExecutor的处理流程
Executors
Executors是一个线程池工厂,提供了很多的工厂方法,我们来看看它大概能创建哪些线程池。
// 创建单一线程的线程池
public static ExecutorService newSingleThreadExecutor();
// 创建固定数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads);
// 创建带缓存的线程池
public static ExecutorService newCachedThreadPool();
// 创建定时调度的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 创建流式(fork-join)线程池
public static ExecutorService newWorkStealingPool();
创建单一线程的线程池
故名思意,这个线程池只有一个线程。若多个任务被提交到此线程池,那么会被缓存到队列(队列长度为Integer.MAX_VALUE)。当线程空闲的时候,按照FIFO的方式进行处理。
创建固定数量的线程池
和创建单一线程的线程池类似,只是这儿可以并行处理任务的线程数更多一些罢了。若多个任务被提交到此线程池,会有下面的处理过程。
如果线程的数量未达到指定数量,则创建线程来执行任务
如果线程池的数量达到了指定数量,并且有线程是空闲的,则取出空闲线程执行任务
如果没有线程是空闲的,则将任务缓存到队列(队列长度为Integer.MAX_VALUE)。当线程空闲的时候,按照FIFO的方式进行处理.
创建带缓存的线程池
这种方式创建的线程池,核心线程池的长度为0,线程池最大长度为Integer.MAX_VALUE。由于本身使用SynchronousQueue作为等待队列的缘故,导致往队列里面每插入一个元素,必须等待另一个线程从这个队列删除一个元素。
创建定时调度的线程池
和上面3个工厂方法返回的线程池类型有所不同,它返回的是ScheduledThreadPoolExecutor类型的线程池。平时我们实现定时调度功能的时候,可能更多的是使用第三方类库,比如:quartz等。但是对于更底层的功能,我们仍然需要了解
手动创建线程池
ThreadPoolExecutor
源码:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor构造方法有7个参数
- corePoolSize:线程池中的核心线程数。
- maximumPoolSize:线程池中的最大线程数。
- keepAliveTime:空闲时间,当线程池数量超过核心线程数时,多余的空闲线程存活的时间,即:这些线程多久被销毁。
- unit:空闲时间的单位,可以是毫秒、秒、分钟、小时和天,等等。
- workQueue:等待队列,线程池中的线程数超过核心线程数时,任务将放在等待队列,它是一个BlockingQueue类型的对象。
- threadFactory:线程工厂,我们可以使用它来创建一个线程。
- handler:拒绝策略,当线程池和等待队列都满了之后,需要通过该对象的回调函数进行回调处理。
为什么阿里Java规约禁止使用Java内置Executors创建线程池?
阿里巴巴Java规约中让我们手动创建线程池效果更好哦!
其实可以从ThreadPoolExecutor构造方法的7个参数出发。
规约中的原话:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
等待队列-workQueue
等待队列是BlockingQueue类型的,理论上只要是它的子类,都可以用来作为等待队列。
JDK中自带的一些阻塞队列
- ArrayBlockingQueue:队列是有界的,基于数组实现的阻塞队列。
- LinkedBlockingQueue:队列可以有界,也可以无界。基于链表实现的阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。该队列也是Executors.newCachedThreadPool()的默认队列。
- PriorityBlockingQueue:带优先级的无界阻塞队列。
通常情况下,我们需要指定阻塞队列的上界(比如1024)。另外,如果执行的任务很多,我们可能需要将任务进行分类,然后将不同分类的任务放到不同的线程池中执行。
线程工厂-threadFactory
ThreadFactory接口
ThreadFactory是一个接口,只有一个方法。
Executors的实现使用了默认的线程工厂-DefaultThreadFactory。它的实现主要用于创建一个线程,线程的名字为pool-{poolNum}-thread-{threadNum}。
源代码:
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
自定义线程名称就是实现ThreadFactory
/**
* 带有名称的线程工厂
* <p>为什么需要定义线程的名称?
* 因为,如果在线程很多的时候,定义线程的名称有助于我们调试和定位问题。
*/
class MyThreadFactory implements ThreadFactory {
/**
* 线程名称
*/
private final String threadName;
/**
* 构造器:传入线程名称,设置线程名称
*/
MyThreadFactory(String threadName) {
this.threadName = threadName;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadName);
t.setDaemon(true);
return t;
}
}
拒绝策略/线程池饱和策略-handler
什么是拒绝策略?
就是当线程池满了、队列也满了的时候,我们对任务采取的措施。或者丢弃、或者执行、或者其他…
JDK有哪些拒绝策略?
JDK自带4种拒绝策略
JDK自带4种拒绝策略,分别是:
1.CallerRunsPolicy:在调用者线程执行。
自实现CallerRunsPolicy
类似:
/**
* 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
*/
class MyCallerRunsPolicy implements RejectedExecutionHandler {
public MyCallerRunsPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
r.run();
}
}
}
2.AbortPolicy:直接抛出RejectedExecutionException异常。
自实现AbortPolicy
类似:
/**
* 线程池拒绝策略:AbortPolicy => ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。
* <p>
* 很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。
*/
class MyAbortPolicy implements RejectedExecutionHandler {
public MyAbortPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
3.DiscardPolicy:任务直接丢弃,不做任何处理。
/**
* 线程池拒绝策略:DiscardPolicy => 啥都不干,对于线程池的任务不抛弃也不会执行。
*/
class MyDiscardPolicy implements RejectedExecutionHandler {
public MyDiscardPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
}
4.DiscardOldestPolicy:丢弃队列里最旧的那个任务,再尝试执行当前任务。
/**
* 线程池拒绝策略:DiscardOldestPolicy => 抛弃线程池中老的任务,再把新的任务加进去
*/
class MyDiscardOldestPolicy implements RejectedExecutionHandler {
public MyDiscardOldestPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
executor.getQueue().poll();
executor.execute(r);
}
}
}
如何使用?
// 线程池拒绝策略:DiscardPolicy => 直接丢弃
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 自实现DiscardPolicy
// executor.setRejectedExecutionHandler(new MyDiscardPolicy());
// 线程池拒绝策略:AbortPolicy => 直接抛异常
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 自实现MyAbortPolicy
// executor.setRejectedExecutionHandler(new MyAbortPolicy());
// 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 自实现MyCallerRunsPolicy
// executor.setRejectedExecutionHandler(new MyCallerRunsPolicy());
// 线程池拒绝策略:DiscardOldestPolicy => 对于线程池中的任务不抛弃也不拒绝,啥也不干
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// 自实现MyDiscardOldestPolicy
// executor.setRejectedExecutionHandler(new MyDiscardOldestPolicy());
// 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。
executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
提交任务的两种方式
提及任务的方式有两种,分别是:submit和execute
这两个方法的区别:
- submit:submit()用于提交一个需要返回果的任务。该方法返回一个Future对象,通过调用这个对象的get()方法,我们就能获得返回结果。get()方法会一直阻塞,直到返回结果返回。另外,我们也可以使用它的重载方法get(long timeout, TimeUnit unit),这个方法也会阻塞,但是在超时时间内仍然没有返回结果时,将抛出异常TimeoutException。
submit(Runnable task)源代码:public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
- execute:execute()用于提交不需要返回结果的任务。
execute源代码:public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
关闭线程池的两种方式
可以调用线程池对象的shutdown()和shutdownNow()方法来关闭线程池。
这两个方法的区别:
- shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
shutdown
源代码:public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 确保安全关闭 checkShutdownAccess(); // 将线程池状态置为SHUTDOWN advanceRunState(SHUTDOWN); // 不再接受新任务 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
- shutdownNow()会将线程池状态置为STOP,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。
shutdownNow
源代码:public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 确保安全关闭 checkShutdownAccess(); // 将线程池状态置为STOP advanceRunState(STOP); // 打断所有线程 interruptWorkers(); // 清空队列 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); // 并将队列中的任务返回回来 return tasks; }
另外,关闭线程池涉及到两个返回boolean的方法,isShutdown()和isTerminated,分别表示是否关闭和是否终止。
如何正确配置线程池的参数?
- 任务的性质:CPU密集型、IO密集型和混杂型。
- 任务的优先级:高中低。
- 任务执行的时间:长中短。
- 任务的依赖性:是否依赖数据库或者其他系统资源。
通常来说,如果任务属于CPU密集型,那么我们可以将线程池数量设置成CPU的个数,以减少线程切换带来的开销。如果任务属于IO密集型,我们可以将线程池数量设置得更多一些,比如CPU个数*2。
可以通过Runtime.getRuntime().availableProcessors()来获取CPU的个数。
线程池监控
如果系统中大量用到了线程池,那么我们是不是有必要对线程池进行监控。
这样子有助于我们定位出现的问题。
ThreadPoolExecutor自带了一些方法:
- long getTaskCount():获取已经执行或正在执行的任务数。
- long getCompletedTaskCount():获取已经执行的任务数。
- int getLargestPoolSize():获取线程池曾经创建过的最大线程数,根据这个参数,我们可以知道线程池是否满过。
- int getPoolSize():获取线程池线程数。
- int getActiveCount():获取活跃线程数(正在执行任务的线程数)。
其它:
- protected void beforeExecute(Thread t, Runnable r):任务执行之前调用。
- protected void afterExecute(Runnable r, Throwable t):任务执行之后调用。
- protected void terminated():线程池结束之后调用。
遇到的一个问题
package com.lzhpo.threadpool.demo3;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 遇到的一个问题
*
* @author lzhpo
*/
public class AProblem {
static class DivTask implements Runnable {
int a,b;
public DivTask(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
double result = a / b;
System.out.println(result);
}
}
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executor.submit(new DivTask(100, i));
}
}
}
运行结果:
100.0
25.0
33.0
50.0
疑问:
- 我明明第一次的时候除数为0,为什么不报错?
- 按理论来说,应该是有5次输出的,为什么只有三次?
解决办法:
对submit的返回值进行处理。
因为submit是一个非阻塞的方法,就是不管你发生什么错误,我都会执行下去。
所以:
- 尽量使用手动的方式创建线程池,避免使用Executors工厂类。
- 根据场景,合理设置线程池的各个参数,包括线程池数量、队列、线程工厂和拒绝策略。
- 在调线程池submit()方法的时候,一定要尽量避免任务执行异常被吞掉的问题。
示例
HandCreateThreadPoolDemo1
:
package com.lzhpo.threadpool.demo3;
import java.util.Random;
import java.util.concurrent.*;
/**
* 手动创建线程池
*
* @author lzhpo
*/
public class HandCreateThreadPoolDemo1 {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
1,
1,
TimeUnit.SECONDS,
// 线程池缓冲队列
new LinkedBlockingDeque<>(10),
// 自定义ThreadFactory线程工厂
new MyThreadFactory("HandCreateThreadPoolDemo1")) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("I'm beforeExecute.");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("I'm afterExecute.");
}
@Override
protected void terminated() {
System.out.println("I'm terminated.");
}
};
/**
* 线程池拒绝策略
*/
// 线程池拒绝策略:DiscardPolicy => 直接丢弃
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 自实现DiscardPolicy
// executor.setRejectedExecutionHandler(new MyDiscardPolicy());
// 线程池拒绝策略:AbortPolicy => 直接抛异常
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 自实现MyAbortPolicy
// executor.setRejectedExecutionHandler(new MyAbortPolicy());
// 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 自实现MyCallerRunsPolicy
// executor.setRejectedExecutionHandler(new MyCallerRunsPolicy());
// 线程池拒绝策略:DiscardOldestPolicy => 对于线程池中的任务不抛弃也不拒绝,啥也不干
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// 自实现MyDiscardOldestPolicy
// executor.setRejectedExecutionHandler(new MyDiscardOldestPolicy());
// 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。
executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
/**
* 提交任务
*/
// 方法1:submit,非阻塞方法,有返回结果,也就是Future对象。
executor.submit(() -> {
System.out.println("This is a task.");
System.out.println(Thread.currentThread().getName());
;
});
// 方法2:execute。没有返回结果。
// executor.execute(() -> {
// System.out.println("This is a task.");
// });
/**
* 关闭线程池
*/
// 方法1:shutdown。shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
executor.shutdown();
// 方法2:立马结束,并且清空任务队列
// executor.shutdownNow();
}
}
//--------------------自定义线程名称------------------------
/**
* 带有名称的线程工厂
* <p>为什么需要定义线程的名称?
* 因为,如果在线程很多的时候,定义线程的名称有助于我们调试和定位问题。
*/
class MyThreadFactory implements ThreadFactory {
/**
* 线程名称
*/
private final String threadName;
/**
* 构造器:传入线程名称,设置线程名称
*/
MyThreadFactory(String threadName) {
this.threadName = threadName;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadName);
t.setDaemon(true);
return t;
}
}
//--------------------线程池拒绝策略------------------------
/**
* 线程池拒绝策略:AbortPolicy => ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。
* <p>
* 很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。
*/
class MyAbortPolicy implements RejectedExecutionHandler {
public MyAbortPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
*/
class MyCallerRunsPolicy implements RejectedExecutionHandler {
public MyCallerRunsPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
r.run();
}
}
}
/**
* 线程池拒绝策略:DiscardPolicy => 啥都不干,对于线程池的任务不抛弃也不会执行。
*/
class MyDiscardPolicy implements RejectedExecutionHandler {
public MyDiscardPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
}
/**
* 线程池拒绝策略:DiscardOldestPolicy => 抛弃线程池中老的任务,再把新的任务加进去
*/
class MyDiscardOldestPolicy implements RejectedExecutionHandler {
public MyDiscardOldestPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
executor.getQueue().poll();
executor.execute(r);
}
}
}
/**
* 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。
*/
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
new Thread(r, "新线程" + new Random().nextInt(10)).start();
}
}
- 本文标签: Java
- 本文链接: http://www.lzhpo.com/article/128
- 版权声明: 本文由lzhpo原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权