手写线程池
线程池的实现原理:
线程池是一个典型的生产者-消费者模型
- 生产者:线程池创建线程,并将任务放入任务队列中。
- 消费者:线程池从任务队列中取出任务,并执行任务。
- 线程池:线程池中维护着一组线程,用来执行任务。
考虑一些问题
- 任务提交: 能够接收并处理用户提交的任务,任务通常是实现了 Runnable 接口或 Callable 接口的对象。
- 线程管理: 控制线程的创建、复用和销毁,包括核心线程数、最大线程数等参数的设置。
- 任务队列管理: 当线程都处于忙碌状态时,能将新提交的任务存储在任务队列中,等待线程空闲时执行
- 拒绝策略: 当任务队列已满且线程数达到最大时,决定如何处理新提交的任务,如抛出异常、丢弃任务等。
- 生命周期管理:可以启动、关闭线程池,关闭时能正确处理已提交但未执行的任务。
1. 线程池的大小:线程池的大小决定了线程池能同时执行的线程数量,它可以有效控制系统的资源开销,防止过多的线程因资源竞争而导致的性能下降。
2. 任务队列的大小:任务队列的大小决定了等待执行的任务数量,它可以防止任务积压,当线程池中的线程都在执行任务时,新任务将进入任务队列,等待线程池中的线程空闲时再执行。
3. 线程池的拒绝策略:当线程池的任务队列已满,且线程池中的线程都在执行任务时,如果再向队列中放入任务,线程池将采取拒绝策略处理该任务。
4. 线程池的生命周期:线程池的生命周期可以分为三种:
- 单例线程池:线程池在应用的整个生命周期中都存在,在没有任务需要执行时,线程池处于空闲状态。
- 固定线程池:线程池在创建后,一直存在,直到应用结束。
- 动态线程池:线程池的大小可以根据应用的运行情况动态调整,比如当有任务需要执行时,线程池的线程数量可以增加,当任务执行完成后,线程池的线程数量可以减少。
设计一些组件
- 线程池管理器:管理线程池的创建、销毁、线程的分配、任务的执行等。
- 工作线程: 继承自 Thread 类,用于执行具体的任务。每个工作线程会不断从任务队列中获取任务并执行。
- 任务队列(BlockingQueue):用于存储等待执行的任务,常见的有 ArrayBlockingQueue、LinkedBlockingQueue 等,根据需求选择有界队列或无界队列。
- 拒绝策略(RejectedExecutionHandler):定义当任务无法提交到线程池时的处理方式,如 AbortPolicy(抛出异常)、DiscardPolicy(丢弃任务)等。
下面是线程池的简单实现:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " is running");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
- 创建线程池:ExecutorService executor = Executors.newFixedThreadPool(3); 创建一个固定大小的线程池,线程池的大小为3。
- 执行任务:executor.execute(() -> { try { TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName() + " is running"); } catch (InterruptedException e) { e.printStackTrace(); } }); 执行一个任务,任务是一个 Runnable 接口,在 Runnable 中执行需要执行的逻辑。
- 关闭线程池:executor.shutdown(); 关闭线程池,使其不再接收新的任务。
注意:
- 线程池的大小:线程池的大小应该根据应用的需要进行合理配置,过多的线程数可能会导致资源消耗过多,导致系统性能下降。
- 任务队列的大小:任务队列的大小也应该根据应用的需要进行合理配置,过大可能会导致内存溢出,过小可能会导致任务积压,影响系统的性能。
- 线程池的拒绝策略:线程池的拒绝策略决定了当任务队列已满,且线程池中的线程都在执行任务时,如果再向队列中放入任务,线程池将采取什么策略处理该任务。默认的策略是 AbortPolicy,即抛出异常。
- 线程池的生命周期:线程池的生命周期应该根据应用的需要进行合理配置,单例线程池在应用的整个生命周期中都存在,固定线程池在创建后,一直存在,直到应用结束,动态线程池的大小可以根据应用的运行情况动态调整。
另一种具体的实现
定义线程池类:
MyThreadPoolExecutor
- execute()
- addWorker()
- shutdown()
- shutdownNow()
- isActive()
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class MyThreadPoolExecutor {
// 核心线程数
private final int corePoolSize;
// 最大线程数
private final int maximumPoolSize;
// 任务队列
private final BlockingQueue<Runnable> workQueue;
// 线程工厂
private final ThreadFactory threadFactory;
// 拒绝策略
private final RejectedExecutionHandler handler;
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.threadFactory = threadFactory;
this.handler = handler;
}
}
实现任务提交方法:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException();
}
// 当前线程数小于核心线程数,创建新线程执行任务
if (workerCountOf(ctl.get()) < corePoolSize) {
if (addWorker(task, true)) {
return;
}
}
// 线程数大于等于核心线程数,将任务放入任务队列
if (workQueue.offer(task)) {
// 再次检查线程是否已终止,若已终止则移除任务并执行拒绝策略
if (isShutdown() && remove(task)) {
handler.rejectedExecution(task, this);
}
// 若线程数为 0,则创建一个新线程
else if (workerCountOf(ctl.get()) == 0) {
addWorker(null, false);
}
}
// 任务队列已满,尝试创建新线程执行任务
else if (!addWorker(task, false)) {
handler.rejectedExecution(task, this);
}
}
实现线程创建和管理方法(addWorker 方法):
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查线程池状态是否合法
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty())) {
return false;
}
for (; ; ) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core? corePoolSize : maximumPoolSize)) {
return false;
}
if (compareAndIncrementWorkerCount(c)) {
break retry;
}
c = ctl.get();
if (runStateOf(c)!= rs) {
continue retry;
}
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t!= null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) {
throw new IllegalThreadStateException();
}
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s;
}
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) {
addWorkerFailed(w);
}
}
return workerStarted;
}
实现工作线程类(Worker 类):
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState()!= 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
void lock() {
acquire(1);
}
boolean tryLock() {
return compareAndSetState(0, 1);
}
void unlock() {
release(1);
}
boolean isLocked() {
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread)!= null &&!t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
实现任务执行方法(runWorker 方法):
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task!= null || (task = getTask())!= null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted()) {
wt.interrupt();
}
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
测试和优化
功能测试:编写测试代码,验证线程池是否能正确处理任务提交、线程管理、拒绝策略等功能。
性能测试:使用性能测试工具(如 JMH)对线程池进行性能测试,根据测试结果调整线程池的参数(如核心线程数、最大线程数、任务队列容量等),以达到最优性能。
稳定性测试:长时间运行线程池,观察是否存在内存泄漏、线程死锁等问题,并进行相应的优化和修复。