BlogBlog
首页
  • Vue
  • TypeScript
  • React
  • Angular
  • Node.js
  • 小程序
  • Flutter
  • 数据产品
  • 大数据

    • Hadoop
    • Hive
    • Spark
  • MySQL
  • Redis
  • Java
  • Python
  • Golang
GitHub
首页
  • Vue
  • TypeScript
  • React
  • Angular
  • Node.js
  • 小程序
  • Flutter
  • 数据产品
  • 大数据

    • Hadoop
    • Hive
    • Spark
  • MySQL
  • Redis
  • Java
  • Python
  • Golang
GitHub

手写线程池

线程池的实现原理:

线程池是一个典型的生产者-消费者模型

  • 生产者:线程池创建线程,并将任务放入任务队列中。
  • 消费者:线程池从任务队列中取出任务,并执行任务。
  • 线程池:线程池中维护着一组线程,用来执行任务。

考虑一些问题

  • 任务提交: 能够接收并处理用户提交的任务,任务通常是实现了 Runnable 接口或 Callable 接口的对象。
  • 线程管理: 控制线程的创建、复用和销毁,包括核心线程数、最大线程数等参数的设置。
  • 任务队列管理: 当线程都处于忙碌状态时,能将新提交的任务存储在任务队列中,等待线程空闲时执行
  • 拒绝策略: 当任务队列已满且线程数达到最大时,决定如何处理新提交的任务,如抛出异常、丢弃任务等。
  • 生命周期管理:可以启动、关闭线程池,关闭时能正确处理已提交但未执行的任务。
1. 线程池的大小:线程池的大小决定了线程池能同时执行的线程数量,它可以有效控制系统的资源开销,防止过多的线程因资源竞争而导致的性能下降。
2. 任务队列的大小:任务队列的大小决定了等待执行的任务数量,它可以防止任务积压,当线程池中的线程都在执行任务时,新任务将进入任务队列,等待线程池中的线程空闲时再执行。
3. 线程池的拒绝策略:当线程池的任务队列已满,且线程池中的线程都在执行任务时,如果再向队列中放入任务,线程池将采取拒绝策略处理该任务。
4. 线程池的生命周期:线程池的生命周期可以分为三种:
   - 单例线程池:线程池在应用的整个生命周期中都存在,在没有任务需要执行时,线程池处于空闲状态。
   - 固定线程池:线程池在创建后,一直存在,直到应用结束。
   - 动态线程池:线程池的大小可以根据应用的运行情况动态调整,比如当有任务需要执行时,线程池的线程数量可以增加,当任务执行完成后,线程池的线程数量可以减少。

设计一些组件

  • 线程池管理器:管理线程池的创建、销毁、线程的分配、任务的执行等。
  • 工作线程: 继承自 Thread 类,用于执行具体的任务。每个工作线程会不断从任务队列中获取任务并执行。
  • 任务队列(BlockingQueue):用于存储等待执行的任务,常见的有 ArrayBlockingQueue、LinkedBlockingQueue 等,根据需求选择有界队列或无界队列。
  • 拒绝策略(RejectedExecutionHandler):定义当任务无法提交到线程池时的处理方式,如 AbortPolicy(抛出异常)、DiscardPolicy(丢弃任务)等。

下面是线程池的简单实现:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName() + " is running");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}
  1. 创建线程池:ExecutorService executor = Executors.newFixedThreadPool(3); 创建一个固定大小的线程池,线程池的大小为3。
  2. 执行任务:executor.execute(() -> { try { TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName() + " is running"); } catch (InterruptedException e) { e.printStackTrace(); } }); 执行一个任务,任务是一个 Runnable 接口,在 Runnable 中执行需要执行的逻辑。
  3. 关闭线程池:executor.shutdown(); 关闭线程池,使其不再接收新的任务。

注意:

  1. 线程池的大小:线程池的大小应该根据应用的需要进行合理配置,过多的线程数可能会导致资源消耗过多,导致系统性能下降。
  2. 任务队列的大小:任务队列的大小也应该根据应用的需要进行合理配置,过大可能会导致内存溢出,过小可能会导致任务积压,影响系统的性能。
  3. 线程池的拒绝策略:线程池的拒绝策略决定了当任务队列已满,且线程池中的线程都在执行任务时,如果再向队列中放入任务,线程池将采取什么策略处理该任务。默认的策略是 AbortPolicy,即抛出异常。
  4. 线程池的生命周期:线程池的生命周期应该根据应用的需要进行合理配置,单例线程池在应用的整个生命周期中都存在,固定线程池在创建后,一直存在,直到应用结束,动态线程池的大小可以根据应用的运行情况动态调整。

另一种具体的实现

定义线程池类:


MyThreadPoolExecutor

- execute()
    - addWorker()
- shutdown()
- shutdownNow()
- isActive()


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class MyThreadPoolExecutor {
    // 核心线程数
    private final int corePoolSize;
    // 最大线程数
    private final int maximumPoolSize;
    // 任务队列
    private final BlockingQueue<Runnable> workQueue;
    // 线程工厂
    private final ThreadFactory threadFactory;
    // 拒绝策略
    private final RejectedExecutionHandler handler;

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}

实现任务提交方法:

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException();
    }
    // 当前线程数小于核心线程数,创建新线程执行任务
    if (workerCountOf(ctl.get()) < corePoolSize) {
        if (addWorker(task, true)) {
            return;
        }
    }
    // 线程数大于等于核心线程数,将任务放入任务队列
    if (workQueue.offer(task)) {
        // 再次检查线程是否已终止,若已终止则移除任务并执行拒绝策略
        if (isShutdown() && remove(task)) {
            handler.rejectedExecution(task, this);
        }
        // 若线程数为 0,则创建一个新线程
        else if (workerCountOf(ctl.get()) == 0) {
            addWorker(null, false);
        }
    }
    // 任务队列已满,尝试创建新线程执行任务
    else if (!addWorker(task, false)) {
        handler.rejectedExecution(task, this);
    }
}

实现线程创建和管理方法(addWorker 方法):

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 检查线程池状态是否合法
        if (rs >= SHUTDOWN &&
               !(rs == SHUTDOWN &&
                        firstTask == null &&
                       !workQueue.isEmpty())) {
            return false;
        }

        for (; ; ) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                    wc >= (core? corePoolSize : maximumPoolSize)) {
                return false;
            }
            if (compareAndIncrementWorkerCount(c)) {
                break retry;
            }
            c = ctl.get();
            if (runStateOf(c)!= rs) {
                continue retry;
            }
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t!= null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) {
                        throw new IllegalThreadStateException();
                    }
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize) {
                        largestPoolSize = s;
                    }
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted) {
            addWorkerFailed(w);
        }
    }
    return workerStarted;
}

实现工作线程类(Worker 类):

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }

    protected boolean isHeldExclusively() {
        return getState()!= 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    void lock() {
        acquire(1);
    }

    boolean tryLock() {
        return compareAndSetState(0, 1);
    }

    void unlock() {
        release(1);
    }

    boolean isLocked() {
        return isHeldExclusively();
    }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread)!= null &&!t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

实现任务执行方法(runWorker 方法):

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task!= null || (task = getTask())!= null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                   !wt.isInterrupted()) {
                wt.interrupt();
            }
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

测试和优化

  • 功能测试:编写测试代码,验证线程池是否能正确处理任务提交、线程管理、拒绝策略等功能。

  • 性能测试:使用性能测试工具(如 JMH)对线程池进行性能测试,根据测试结果调整线程池的参数(如核心线程数、最大线程数、任务队列容量等),以达到最优性能。

  • 稳定性测试:长时间运行线程池,观察是否存在内存泄漏、线程死锁等问题,并进行相应的优化和修复。

最近更新:: 2025/5/5 12:43
Contributors: alice