13 KiB
目录
- 前言
- 线程
- 线程的创建
- 线程生命周期
- Callable 与 Future
- 线程池
- 度量性能
- 线程数量分配
- ThreadPoolExecutor
- 参考
前言
线程、线程池这一块,如果牵扯到并发安全,那估计就不是一篇文章能写完的了,所以本篇文章侧重复习一下基础知识,同时学习了一下 Callable 和 Future,这也是为了阅读 AsyncTask 源码打下基础。又正赶上极客时间开了《并发编程实战》一栏,补充了线程生命周期、多线程线程分配数量策略等相关知识。
线程
线程的创建
我们知道,线程的创建有两种方式:一种是继承 Thread 类,另一种是实现 Runnable 接口。但是这两种方式有一个共同的缺陷,那就是在执行任务后无法获取执行结果(两者 run 方法的返回类型都是 void )。于是在 JDK 5 就引入了 Callable 和 Future。
线程生命周期
Callable 与 Future
Callable
public interface Callable<V> {
V call() throws Exception;
}
Callable 是一个泛型接口,你可以完全把它理解为一个有返回类型并且可抛异常的 Runnable。
那么是如何获取 Callable 的返回结果呢?通常是通过 FutureTask 这个中间媒介来实现的。
FutureTask
FutureTask 实现了 Runnable 和 Future 接口,所以我们可以直接用 Thread 来执行它,也可以使用 ExecuteService 来执行。这里就以 Thread 包装来执行:
public class ThreadDemo {
public static void main(String[] args) throws Exception {
Callable<Integer> call = () -> {
System.out.println("线程执行...");
Thread.sleep(2000);
return 1;
};
FutureTask<Integer> task = new FutureTask<>(call);
new Thread(task).start();
System.out.println(task.get());
}
}
Future
Future 的核心思想是:一个方法,计算过程可能非常耗时,等待方法返回显然不可取。可以在执行方法的时候,立马就返回一个 Future 对象,通过这个 Future 对象去控制方法的计算过程。
public interface Future<V> {
/**
* 取消任务
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判断是否取消
*/
boolean isCancelled();
/**
* 判断是否已完成
*/
boolean isDone();
/**
* 获取任务结果,如果未完成,则等待
*/
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
线程池
首先,为什么要使用多线程呢?
你肯定会想到,线程池是为了线程的复用,避免重复创建和销毁线程所带来的性能损耗。这种说法并不严谨,在提升性能之前,首先要问题是:如何度量性能?
度量性能
度量性能的指标很多,但是有两个指标是最核心的,它们就是延迟和吞吐量。延迟指的是发出请求到收到响应这个过程的时间;延迟越短,意味着程序执行的越快,性能也就越好。吞吐量指的是在单位时间内能处理请求的数量;吞吐量越大,意味着程序能处理的请求越多,性能也就越好。
要想 "降低延迟,提供吞吐量",有两个方向,一个方向是优化算法,另一个方向是将硬件的性能发挥到极致。前者属于算法范畴,后者则是和并发息息相关了。在并发编程领域,提升性能本质上就是提升硬件的利用率,再具体点来说,就是提升 I/O 的利用率和 CPU 的利用率。
估计这个时候你会有疑问,操作系统不是已经解决了硬件的利用率问题了嘛?的确是这样,例如操作系统已经解决了磁盘和网卡的利用率问题,利用中断机制还能避免 CPU 轮询 I/O 状态,也提升了 CPU 的利用率。但是操作系统解决硬件利用率问题的对象往往是单一的硬件设备,而我们的并发程序,往往需要 CPU 和 I/O 设备相互配合工作。也就是说,我们需要解决 CPU 和 I/O 设备综合利用率的问题。关于这个综合利用率的问题,操作系统虽然没有办法完美解决,但是却给我们提供了方案,那就是:多线程。
线程数量分配
那么创建多少线程才合适呢?
如果 CPU 和 I/O 设备的利用率都很低,那么可以尝试通过增加线程来提高吞吐量。
在单核时代,多线程主要就是用来平衡 CPU 和 I/O 设备的。如果程序只有 CPU 计算,而没有 I/O 操作的话,多线程不但不会提升性能,还会使性能变得更差,原因是增加了线程切换的成本。但是在多核时代,纯计算型的程序也可以利用多线程来提升性能。这是为什么呢?因为利用多核可以降低响应时间。
比如要计算 1~100亿 的值,如果在四核的 CPU 上利用四个线程执行,线程 A 计算 [1,25亿),线程 B 计算 [25亿,50亿),线程 C 计算[50亿,75亿),线程 D 计算[75亿,100亿],之后在汇总,那么理论上应该比一个线程计算快四倍。一个线程,对于四核的 CPU,CPU 利用率只有 25%,而四个线程,则能够将 CPU 的利用率提高到 100%。
对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个四核的 CPU,每个核一个线程,理论上创建四个线程就可以了,再多创建线程只是会增加线程切换的成本。所以,**对于 CPU 密集型计算场景,理论上 "线程的数量 = CPU 核数" 就是最合适的。不过在工程上,线程的数量一般会设置为 " CPU 核数 +1 "。**这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
对于 I/O 密集型的计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,可以总结为:
线程数 = 1 + ( I/O 耗时 / CPU 耗时 )
不过上面这个公式只针对单核 CPU 的,至于多核 CPU,只需要等比扩大即可:
线程数 = CPU 核数 * [ 1 + ( I/O 耗时 / CPU 耗时 )]
ThreadPoolExecutor
当你看到这里,我还是推荐你看:Java并发编程:线程池的使用 这篇文章,这篇文章是我见过讲的最好的最仔细的!!!
ThreadPoolExecutor 类是线程池中最核心的一个类,它提供了四个构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue){};
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory){};
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler){};
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){};
前面三个构造方法都是通过调用第四个构造方法来进行初始化工作的,下面解释一下各个参数的含义:
-
corePoolSize
核心线程的大小。在创建线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法,从这两个方法名就可以看出,是预创建线程的意思。即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为零,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中。
-
maximumPoolSize
线程池中的最大线程数,当任务数量超过最大线程数时,其他任务可能就会被阻塞。最大线程数 = 核心线程 + 非核心线程。非核心线程只有当核心线程不够用且线程池有空余时才会被创建,执行完任务后非核心线程会被销毁。
-
keepAliveTime
非核心线程的超时时长,当执行时间超过这个时间时,非核心线程就会被回收。当 allowCoreThreadTimeOut 设置为 true 时,此属性也作用在核心线程上。
-
unit
参数 keepAliveTime 的时间单位,为 TimeUnit 类中的枚举值。
-
workQueue
一个阻塞队列,用来存储等待执行的任务。一般来说,阻塞队列使用 LinkedBlockingQueue 和 SynchronousQueue 较多。
-
threadFactory
线程工厂,主要用来创建线程。
-
handler
表示当拒绝处理任务时的策略,有以下四种取值:
-
ThreadPoolExecutor.AbortPolicy
丢弃任务并抛出 RejectedExecutionException 异常。
-
ThreadPoolExecutor.DiscardPolicy
丢弃任务,但不抛异常。
-
ThreadPoolExecutor.DiscardOldestPolicy
丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
-
ThreadPoolExecutor.CallerRunsPolicy
由调度线程处理该任务。
-
类继承关系如下:
这里我就拿参考文章中的示例跑了一遍:
public class ThreadDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,
2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
for (int i = 0; i < 15; i++) {
MyTask task = new MyTask(i);
executor.execute(task);
System.out.println("线程池中线程数目: " + executor.getPoolSize() + " 队列中等待执行的任务数: "
+ executor.getQueue().size() + " 已执行完的任务数: " + executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
public class MyTask implements Runnable {
private int taskName;
public MyTask(int taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println("正在执行 Task: " + taskName);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task: " + taskName + " 执行完毕");
}
}
当线程池中的线程数大于五时,便会将后续任务添加到任务等待队列中,当任务等待队列也满了之后,便会创建新的线程。如果将上面 for 循环改为 20,则会抛 RejectedExecutionException 任务拒绝异常。
不过在 Java 中,并不会推荐直接使用 ThreadPoolExcutor,而是使用 Executors 提供的几种静态方法:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}