Executor 框架在JDK5之后引进,其不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor
框架让并发编程变得更加简单。
执行任务需要实现Runnable
接口或Callable
接口。这两个接口的实现类可以被Executor
接口的实现类ThreadPoolExecutor
或ScheduledThreadPoolExecutor
执行。
public class ThreadPoolExecutor extends AbstractExecutorService
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
ThreadPoolExecutor
和 ScheduledThreadPoolExecutor
这两个关键类实现了 ExecutorService 接口。其中ThreadPoolExecutor
类是最常使用的线程池。
Future
接口以及 Future
接口的实现类 FutureTask
类都可以代表异步计算的结果。
Runnable
接口 不会返回结果或抛出检查异常,但是 Callable
接口 可以。因此在需要异步计算的场景需要Callable
对象,因此如果是Runnable对象需要先使用工具类Executors
可以实现将 Runnable
对象转换成 Callable
对象。
当我们把 Runnable
接口 或 Callable
接口 的实现类提交给 ThreadPoolExecutor
或 ScheduledThreadPoolExecutor
执行。(调用 submit()
方法时会返回一个 FutureTask
对象)
1、主线程首先要创建实现 Runnable
或者 Callable
接口的任务对象。
2、把创建完成的实现 Runnable
/Callable
接口的 对象直接交给 ExecutorService
执行: ExecutorService.execute(Runnable command)
)或者也可以把 Runnable
对象或Callable
对象提交给 ExecutorService
执行(ExecutorService.submit(Runnable task)
或 ExecutorService.submit(Callable <T> task)
)。
execute()方法和 submit()方法的区别:
execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
submit()
方法用于提交需要返回值的任务。线程池会返回一个 Future
类型的对象,通过这个 Future
对象可以判断任务是否执行成功,并且可以通过 Future
的 get()
方法来获取返回值,get()
方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
3、如果执行 ExecutorService.submit(…)
,ExecutorService
将返回一个实现Future
接口的对象(我们刚刚也提到过了执行 execute()
方法和 submit()
方法的区别,submit()
会返回一个 FutureTask 对象)。由于 FutureTask
实现了 Runnable
,我们也可以创建 FutureTask
,然后直接交给 ExecutorService
执行。
4、最后,主线程可以执行 FutureTask.get()
方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。
线程池构建有两种方式:通过线程池类ThreadPoolExecutor 的构造方法;通过工具类Executors来创建三种类型的 ThreadPoolExecutor:FixedThread
,SingleThreadExecutor
,CacheThreadExecutor
,这些线程池也是通过ThreadPoolExecutor构造方法构造出来的。
《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 返回线程池对象的弊端如下:
ThreadPoolExecutor
类中提供的四个构造方法。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
其中前三种方法都是在第四种构造方法基础上产生的。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor
3 个最重要的参数:
corePoolSize
: 核心线程数线程数定义了最小可以同时运行的线程数量。maximumPoolSize
: 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。workQueue
: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。ArrayBlockingQueue:数组实现,创建时需要指定容量,可以视为有界阻塞队列,按FIFO排序量。
LinkedBlockingQueue:按FIFO排序任务,容量可以设置,不设置的话容量默认是 Integer.MAX_VALUE,可以认为是无界队列。
DelayQueue:根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序,无界阻塞队列
ThreadPoolExecutor
其他常见参数 :
keepAliveTime
:当线程池中的线程数量大于 corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime
才会被回收销毁;unit
: keepAliveTime
参数的时间单位。threadFactory
:executor 创建新线程的时候会用到。handler
:饱和策略。ThreadPoolExecutor
饱和策略定义:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor
定义一些策略:
ThreadPoolExecutor.AbortPolicy
:抛出 RejectedExecutionException
来拒绝新任务的处理。这也是默认的饱和策略ThreadPoolExecutor.CallerRunsPolicy
:调用执行自己的线程运行任务,也就是直接在调用execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果应用程序可以承受此延迟并且要求任何一个任务请求都要被执行的话,可以选择这个策略。ThreadPoolExecutor.DiscardPolicy
:不处理新任务,直接丢弃掉。ThreadPoolExecutor.DiscardOldestPolicy
: 此策略将丢弃最早的未处理的任务请求。executor()
方法源码
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int workerCountOf(int c) {
return c & CAPACITY;
}
//任务队列
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
//如果任务为空,抛出异常
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();
//如果当前线程池中的运行的任务数量小于corePoolSize,通过addWorker新建一个线程,
//并将任务添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2、如果当前执行的任务数量大于等于corePollSize,判断当前线程池是否还在运行,如果处于运行状态,
//并且队列还没满时,将任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 由于在上述任务加入队列的过程中,线程池可能会变成shutdown状态,再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除刚刚添加的任务。执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3、插入队列失败,通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}