java 手编线程池_死磕 java线程系列之自己动手写一个线程池

news/2025/3/15 22:48:13/

欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

96755d2a278252bf49e48eaa283efe98.png

(手机横屏看源码更方便)

问题

(1)自己动手写一个线程池需要考虑哪些因素?

(2)自己动手写的线程池如何测试?

简介

线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文彤哥将手把手带你写一个可用的线程池。

属性分析

线程池,顾名思义它首先是一个“池”,这个池里面放的是线程,线程是用来执行任务的。

首先,线程池中的线程应该是有类别的,有的是核心线程,有的是非核心线程,所以我们需要两个变量标识核心线程数量coreSize和最大线程数量maxSize。

为什么要区分是否为核心线程呢?这是为了控制系统中线程的数量。

当线程池中线程数未达到核心线程数coreSize时,来一个任务加一个线程是可以的,也可以提高任务执行的效率。

当线程池中线程数达到核心线程数后,得控制一下线程的数量,来任务了先进队列,如果任务执行足够快,这些核心线程很快就能把队列中的任务执行完毕,完全没有新增线程的必要。

当队列中任务也满了,这时候光靠核心线程就无法及时处理任务了,所以这时候就需要增加新的线程了,但是线程也不能无限制地增加,所以需要控制其最大线程数量maxSize。

其次,我们需要一个任务队列来存放任务,这个队列必须是线程安全的,我们一般使用BlockingQueue阻塞队列来充当,当然使用ConcurrentLinkedQueue也是可以的(注意ConcurrentLinkedQueue不是阻塞队列,不能运用在jdk的线程池中)。

最后,当任务越来越多而线程处理却不及时,迟早会达到一种状态,队列满了,线程数也达到最大线程数了,这时候怎么办呢?这时候就需要走拒绝策略了,也就是这些无法及时处理的任务怎么办的一种策略,常用的策略有丢弃当前任务、丢弃最老的任务、调用者自己处理、抛出异常等。

根据上面的描述,我们定义一个线程池一共需要这么四个变量:核心线程数coreSize、最大线程数maxSize、阻塞队列BlockingQueue、拒绝策略RejectPolicy。

另外,为了便于给线程池一个名称,我们再加一个变量:线程池的名称name。

所以我们得出了线程池的属性及构造方法大概如下:

public class MyThreadPoolExecutor implements Executor {

/**

* 线程池的名称

*/

private String name;

/**

* 核心线程数

*/

private int coreSize;

/**

* 最大线程数

*/

private int maxSize;

/**

* 任务队列

*/

private BlockingQueue taskQueue;

/**

* 拒绝策略

*/

private RejectPolicy rejectPolicy;

public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue taskQueue, RejectPolicy rejectPolicy) {

this.name = name;

this.coreSize = coreSize;

this.maxSize = maxSize;

this.taskQueue = taskQueue;

this.rejectPolicy = rejectPolicy;

}

}

任务流向分析

根据上面的属性分析,基本上我们已经得到了任务流向的完整逻辑:

首先,如果运行的线程数小于核心线程数,直接创建一个新的核心线程来运行新的任务。

其次,如果运行的线程数达到了核心线程数,则把新任务入队列。

然后,如果队列也满了,则创建新的非核心线程来运行新的任务。

最后,如果非核心线程数也达到最大了,那就执行拒绝策略。

3d47d5645ca18240afb0fae73a3d5e61.png

代码逻辑大致如下:

@Override

public void execute(Runnable task) {

// 正在运行的线程数

int count = runningCount.get();

// 如果正在运行的线程数小于核心线程数,直接加一个线程

if (count < coreSize) {

// 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小

if (addWorker(task, true)) {

return;

}

// 如果添加核心线程失败,进入下面的逻辑

}

// 如果达到了核心线程数,先尝试让任务入队

// 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false

if (taskQueue.offer(task)) {

// do nothing,为了逻辑清晰这里留个空if

// 【本篇文章由公众号“彤哥读源码”原创】

} else {

// 如果入队失败,说明队列满了,那就添加一个非核心线程

if (!addWorker(task, false)) {

// 如果添加非核心线程失败了,那就执行拒绝策略

rejectPolicy.reject(task, this);

}

}

}

创建线程逻辑分析

首先,创建线程的依据是正在运行的线程数量有没有达到核心线程数或者最大线程数,所以我们还需要一个变量runningCount用来记录正在运行的线程数。

其次,这个变量runningCount需要在并发环境下加加减减,所以这里需要使用到Unsafe的CAS指令来控制其值的修改,用了CAS就要给这个变量加上volatile修饰,为了方便我们这里直接使用AtomicInteger来作为这个变量的类型。

然后,因为是并发环境中,所以需要判断runningCount < coreSize(或maxSize)(条件一)的同时修改runningCount CAS加一(条件二)成功了才表示可以增加一个线程,如果条件一失败则表示不能再增加线程了直接返回false,如果条件二失败则表示其它线程先修改了runningCount的值,则重试。

最后,创建一个线程并运行新任务,且不断从队列中拿任务来运行【本篇文章由公众号“彤哥读源码”原创】。

bd000a76097f58db6f3db888cadfe744.png

代码逻辑如下:

private boolean addWorker(Runnable newTask, boolean core) {

// 自旋判断是不是真的可以创建一个线程

for (; ; ) {

// 正在运行的线程数

int count = runningCount.get();

// 核心线程还是非核心线程

int max = core ? coreSize : maxSize;

// 不满足创建线程的条件,直接返回false

if (count >= max) {

return false;

}

// 修改runningCount成功,可以创建线程

if (runningCount.compareAndSet(count, count + 1)) {

// 线程的名字

String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();

// 创建线程并启动

new Thread(() -> {

System.out.println("thread name: " + Thread.currentThread().getName());

// 运行的任务

Runnable task = newTask;

// 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了

while (task != null || (task = getTask()) != null) {

try {

// 执行任务

task.run();

} finally {

// 任务执行完成,置为空

task = null;

}

}

}, threadName).start();

break;

}

}

return true;

}

取任务逻辑分析

从队列中取任务应该使用take()方法,这个方法会一直阻塞直至取到任务或者中断,如果中断了就返回null,这样当前线程也就可以安静地结束了,另外还要注意中断了记得把runningCount减一。

private Runnable getTask() {

try {

// take()方法会一直阻塞直到取到任务为止

return taskQueue.take();

} catch (InterruptedException e) {

// 线程中断了,返回null可以结束当前线程

// 当前线程都要结束了,理应要把runningCount的数量减一

runningCount.decrementAndGet();

return null;

}

}

好了,到这里我们自己的线程池就写完了,下面我们一起来想想怎么测试呢?

测试逻辑分析

我们再来回顾下自己的写的线程池的构造方法:

public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue taskQueue, RejectPolicy rejectPolicy) {

this.name = name;

this.coreSize = coreSize;

this.maxSize = maxSize;

this.taskQueue = taskQueue;

this.rejectPolicy = rejectPolicy;

}

name,这个随便传;

coreSize,我们假设为5;

maxSize,我们假设为10;

taskQueue,任务队列,既然我们设置的是有边界的,我们就用最简单的ArrayBlockingQueue好吧,容量设置为15,这样里面最多可以存储15条任务;

rejectPolicy,拒绝策略,我们假设使用丢弃当前任务的策略,OK,我们来实现一个。

/**

* 丢弃当前任务

*/

public class DiscardRejectPolicy implements RejectPolicy {

@Override

public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {

// do nothing

System.out.println("discard one task");

}

}

OK,这样一个线程池就创建完成了,下面就是执行任务了,我们假设通过for循环连续不断地添加100个任务好不好。

public class MyThreadPoolExecutorTest {

public static void main(String[] args) {

Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());

AtomicInteger num = new AtomicInteger(0);

for (int i = 0; i < 100; i++) {

threadPool.execute(()->{

try {

Thread.sleep(1000);

System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

}

}

我们分析下这段程序:

(1)先连续创建了5个核心线程,并执行了新任务;

(2)后面的15个任务进了队列;

(3)队列满了,又连续创建了5个线程,并执行了新任务;

(4)后面的任务就没得执行了,全部走了丢弃策略;

(5)所以真正执行成功的任务应该是 5 + 15 + 5 = 25 条任务;

运行之:

thread name: core_test2

thread name: core_test5

thread name: core_test3

thread name: core_test4

thread name: core_test1

thread name: test6

thread name: test7

thread name: test8

thread name: test9

discard one task

thread name: test10

discard one task

...省略被拒绝的任务

【本篇文章由公众号“彤哥读源码”原创】

discard one task

running: 1570546871851: 2

running: 1570546871851: 8

running: 1570546871851: 7

running: 1570546871851: 6

running: 1570546871851: 5

running: 1570546871851: 3

running: 1570546871851: 4

running: 1570546871851: 1

running: 1570546871851: 10

running: 1570546871851: 9

running: 1570546872852: 14

running: 1570546872852: 20

running: 1570546872852: 19

running: 1570546872852: 17

running: 1570546872852: 18

running: 1570546872852: 16

running: 1570546872852: 15

running: 1570546872852: 12

running: 1570546872852: 13

running: 1570546872852: 11

running: 1570546873852: 21

running: 1570546873852: 24

running: 1570546873852: 23

running: 1570546873852: 25

running: 1570546873852: 22

可以看到,创建了5个核心线程、5个非核心线程,成功执行了25条任务,完成没问题,完美^^。

总结

(1)自己动手写一个线程池需要考虑的因素主要有:核心线程数、最大线程数、任务队列、拒绝策略。

(2)创建线程的时候要时刻警惕并发的陷阱;

彩蛋

我们知道,jdk自带的线程池还有两个参数:keepAliveTime、unit,它们是干什么的呢?

答:它们是用来控制何时销毁非核心线程的,当然也可以销毁核心线程,具体的分析请期待下一章吧。

完整源码

Executor接口

public interface Executor {

void execute(Runnable command);

}

MyThreadPoolExecutor线程池实现类

/**

* 自动动手写一个线程池

*/

public class MyThreadPoolExecutor implements Executor {

/**

* 线程池的名称

*/

private String name;

/**

* 线程序列号

*/

private AtomicInteger sequence = new AtomicInteger(0);

/**

* 核心线程数

*/

private int coreSize;

/**

* 最大线程数

*/

private int maxSize;

/**

* 任务队列

*/

private BlockingQueue taskQueue;

/**

* 拒绝策略

*/

private RejectPolicy rejectPolicy;

/**

* 当前正在运行的线程数【本篇文章由公众号“彤哥读源码”原创】

* 需要修改时线程间立即感知,所以使用AtomicInteger

* 或者也可以使用volatile并结合Unsafe做CAS操作(参考Unsafe篇章讲解)

*/

private AtomicInteger runningCount = new AtomicInteger(0);

public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue taskQueue, RejectPolicy rejectPolicy) {

this.name = name;

this.coreSize = coreSize;

this.maxSize = maxSize;

this.taskQueue = taskQueue;

this.rejectPolicy = rejectPolicy;

}

@Override

public void execute(Runnable task) {

// 正在运行的线程数

int count = runningCount.get();

// 如果正在运行的线程数小于核心线程数,直接加一个线程

if (count < coreSize) {

// 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小

if (addWorker(task, true)) {

return;

}

// 如果添加核心线程失败,进入下面的逻辑

}

// 如果达到了核心线程数,先尝试让任务入队

// 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false

if (taskQueue.offer(task)) {

// do nothing,为了逻辑清晰这里留个空if

} else {

// 如果入队失败,说明队列满了,那就添加一个非核心线程

if (!addWorker(task, false)) {

// 如果添加非核心线程失败了,那就执行拒绝策略

rejectPolicy.reject(task, this);

}

}

}

private boolean addWorker(Runnable newTask, boolean core) {

// 自旋判断是不是真的可以创建一个线程

for (; ; ) {

// 正在运行的线程数

int count = runningCount.get();

// 核心线程还是非核心线程

int max = core ? coreSize : maxSize;

// 不满足创建线程的条件,直接返回false

if (count >= max) {

return false;

}

// 修改runningCount成功,可以创建线程

if (runningCount.compareAndSet(count, count + 1)) {

// 线程的名字

String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();

// 创建线程并启动

new Thread(() -> {

System.out.println("thread name: " + Thread.currentThread().getName());

// 运行的任务【本篇文章由公众号“彤哥读源码”原创】

Runnable task = newTask;

// 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了

while (task != null || (task = getTask()) != null) {

try {

// 执行任务

task.run();

} finally {

// 任务执行完成,置为空

task = null;

}

}

}, threadName).start();

break;

}

}

return true;

}

private Runnable getTask() {

try {

// take()方法会一直阻塞直到取到任务为止

return taskQueue.take();

} catch (InterruptedException e) {

// 线程中断了,返回null可以结束当前线程

// 当前线程都要结束了,理应要把runningCount的数量减一

runningCount.decrementAndGet();

return null;

}

}

}

RejectPolicy拒绝策略接口

public interface RejectPolicy {

void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor);

}

DiscardRejectPolicy丢弃策略实现类

/**

* 丢弃当前任务

*/

public class DiscardRejectPolicy implements RejectPolicy {

@Override

public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {

// do nothing

System.out.println("discard one task");

}

}

测试类

public class MyThreadPoolExecutorTest {

public static void main(String[] args) {

Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());

AtomicInteger num = new AtomicInteger(0);

for (int i = 0; i < 100; i++) {

threadPool.execute(()->{

try {

Thread.sleep(1000);

System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

}

}

欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。


http://www.ppmy.cn/news/602903.html

相关文章

java带参数的方法笔记_具有Java参数的方法的类声明

类声明可以包含在Java中具有参数的方法。演示此过程的程序如下&#xff1a;示例class Message {public void messagePrint(String msg) {System.out.println("The message is: " msg);}}public class Demo {public static void main(String args[]) {Message m new…

java 0-100整数多少个是7的倍数_输出1-100之间7的倍数的个数。

java小白&#xff0c;在学习java基础视频&#xff0c;看到毕老师给的一个练习。便自己编写了一下。在编写的过程中。输出个数是没有问题的。代码如下&#xff1a;//目的&#xff1a;计算1-100之间7的倍数的个数。/**要求&#xff1a;1&#xff0c;先求出1-100之间7的倍数。2&am…

java 原子量_Java原子量 - Rickxue的个人空间 - OSCHINA - 中文开源技术交流社区

所谓的原子量即操作变量的操作是“原子的”&#xff0c;该操作不可再分&#xff0c;因此是线程安全的。为何要使用原子变量呢&#xff0c;原因是多个线程对单个变量操作也会引起一些问题。在Java5之前&#xff0c;可以通过volatile、synchronized关键字来解决并发访问的安全问题…

java 数据结构 无向图_数据结构-无向图

1.图的定义图(Graph)是由顶点(vertex)的有穷非空集合和顶点之间边(edge)的集合组成&#xff0c;通常表示为&#xff1a;G(V,E)&#xff0c;其中&#xff0c;G表示一个图&#xff0c;V是图G中顶点的集合&#xff0c;E是图G中边的集合 a.若顶点之间 Vi 和 Vj 之间没有方向,则为无…

java命令行读入密码_java-在命令行上隐藏输入

是的&#xff0c;可以做。 这称为命令行输入屏蔽。 您可以轻松实现此目的。您可以使用单独的线程擦除输入的回显字符&#xff0c;并用星号替换。 使用下面显示的EraserThread类完成此操作import java.io.*;class EraserThread implements Runnable {private boolean stop;/***p…

mysql utf8mb4_general_ci_MySQL编码utf8与utf8mb4 utf8mb4_unicode_ci与utf8mb4_general_ci字符集小结...

本篇文章小编给大家分享一下MySQL编码utf8与utf8mb4 utf8mb4_unicode_ci与utf8mb4_general_ci字符集小结&#xff0c;小编觉得挺不错的&#xff0c;现在分享给大家供大家参考&#xff0c;有需要的小伙伴们可以来看看。utf8mb4 已成为 MySQL 8.0 的默认字符集&#xff0c;在MySQ…

Glassdoor美国公司员工及面试者评价数据

一、数据简介 除了股东、债权人、政府等外部利益相关者外&#xff0c;员工的利益更应该得到公司的恰当保护&#xff0c;因为员工才是公司创造价值的真正主体。提高企业在产品市场的竞争力&#xff0c;首先就是要提高员工对企业的满意度&#xff0c;只有员工的满意度更高、幸福感…

java action dao_java中Action层、Service层和Dao层的功能区分

一、Action/Service/DAO简介&#xff1a;Action是管理业务(Service)调度和管理跳转的。Service是管理具体的功能的。Action只负责管理&#xff0c;而Service负责实施。DAO只完成增删改查&#xff0c;虽然可以1-n&#xff0c;n-n&#xff0c;1-1关联&#xff0c;模糊、动态、子查…