0%

自定义线程池

最近看到牛客网一个帖子”面试官让我手撕一个线程池,二十分钟没手撕出来面试流程就终止了”。忽然意识到自己也手撕不出来线程池,遂加急补习了一下JUC和线程池相关的知识。下面做个全面的总结:

Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,ExecutorService,ThreadPoolExecutor这几个类。

Executor接口是顶层接口,只有一个execute方法,过于简单。通常不使用它,而是使用ExecutorService接口:

那么问题来了,怎么创建一个线程池对象呢?通常使用Executors工具类

面试题:execute和submit的区别

  1. execute是Executor接口的方法,而submit是ExecutorService的方法,并且ExecutorService接口继承了Executor接口。
  2. execute只接受Runnable参数,没有返回值;而submit可以接受Runnable参数和Callable参数,并且返回了Future对象,可以进行任务取消、获取任务结果、判断任务是否执行完毕/取消等操作。
  3. 通过execute方法提交的任务无法获取具体的异常信息;而submit方法可以通过Future对象获取异常信息。

Executors工具类

直接编码演示:每种线程池的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.atguigu.demojuc.chap08;

public class ThreadPoolDemo {

public static void main(String[] args) {


//1、基本使用
ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单一线程池
//ExecutorService threadPool = Executors.newFixedThreadPool(3); //固定大小线程池

//可以在线程中设置一下睡眠时间控制承受并发的能力(调整睡眠时间长短看效果)
//ExecutorService threadPool = Executors.newCachedThreadPool(); // 无限大小线程池(遇强则强)

try {
for (int i = 0; i < 1000; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + "开始执行业务逻辑");
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "结束");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}

定时任务:延迟执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.atguigu.jucdemo.chap08;

public class ScheduledThreadPoolDemo {

public static void main(String[] args) {

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
System.out.println(new Date());

try {

for (int i = 0; i < 10; i++) {
//延迟执行
scheduledThreadPool.schedule(()->{
System.out.println(Thread.currentThread().getName() + " 定时任务被执行" + new Date());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 5, TimeUnit.SECONDS);
}

} finally {
scheduledThreadPool.shutdown();
}
}
}

定时任务:延迟并间隔执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.atguigu.jucdemo.chap08;

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledAtFixRateDemo {

public static void main(String[] args) {

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
System.out.println(new Date());

//try {

for (int i = 0; i < 10; i++) {
//延迟执行
scheduledThreadPool.scheduleAtFixedRate(()->{
System.out.println(Thread.currentThread().getName() + " 定时任务被执行" + new Date());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 5, 1, TimeUnit.SECONDS);
}

/*} finally {
scheduledThreadPool.shutdown(); //不要销毁这个对象,因为任务会一直执行
}*/
}
}

底层原理

线程池的7个重要参数

  1. corePoolSize:线程池中的常驻核心线程数
  2. maximumPoolSize:线程池中能够容纳同时 执行的最大线程数,此值必须大于等于1
  3. keepAliveTime:多余的空闲线程的存活时间 当前池中线程数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余线程会被销毁直到 只剩下corePoolSize个线程为止
  4. unit:keepAliveTime的单位
  5. workQueue:任务队列,被提交但尚未被执行的任务
  6. threadFactory:表示生成线程池中工作线程的线程工厂, 用于创建线程,一般默认的即可
  7. handler:拒绝策略,表示当队列满了,并且工作线程等于线程池的最大线程数(maximumPoolSize)时,如何来拒绝 请求执行的runnable的策略

线程池底层工作原理

具体流程:

  1. 在创建了线程池后,线程池中的线程数为零

  2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断:

    1. 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
    2. 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列
    3. 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
    4. 如果队列满了且正在运行的线程数量等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。

  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:

    • 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
    • 线程池的所有任务完成后,它最终会收缩到corePoolSize的大小

拒绝策略

一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但这种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况

ThreadPoolExecutor自带的拒绝策略如下:

  1. AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
  2. CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
  3. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中,尝试再次提交当前任务。
  4. DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。 如果允许任务丢失,这是最好的一种策略。

以上内置的策略均实现了RejectedExecutionHandler接口,也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略

自定义线程池

​ 在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

自定义线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.atguigu.demojuc.chap08;

class MyRunnable implements Runnable{

private int param;

public MyRunnable(int param) {
this.param = param;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Runnable......" + param);
}
}

public class CustomizeThreadPoolDemo {

public static void main(String[] args) {

// 自定义连接池
ExecutorService threadPool = new ThreadPoolExecutor(2, 5,
2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() //丢弃任务并抛出异常
//new ThreadPoolExecutor.CallerRunsPolicy() //由调用线程处理该任务,谁调用谁处理
//new ThreadPoolExecutor.DiscardOldestPolicy() //丢弃队列中等待最久的任务,添加新任务
//new ThreadPoolExecutor.DiscardPolicy() //也是丢弃任务,但是不抛出异常。
/*new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定义拒绝策略");
}
}*/
);

try {
for (int i = 0; i < 9; i++) {
threadPool.execute(() -> {
//threadPool.submit(() -> { //这里也可以使用submit
System.out.println(Thread.currentThread().getName() + "执行了业务逻辑");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}

}
}
-------------本文结束感谢您的阅读-------------

欢迎关注我的其它发布渠道