Contents

Java 多线程 - 自定义线程池

1. 为什么要用线程池?

池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

2. 自定义一个简单的线程池

一个线程池应该具备以下要素:

  • 任务队列:用于缓存提交的任务。
  • 任务线程管理功能:一个线程池必须能够很好地管理和控制线程数量,可通过如下三个参数来实现,比如创建线程池时初始的线程数量init;线程池自动扩充时最大的线程数量max;在线程池空闲时需要释放线程但是也要维护一定数量的活跃数量或者核心数量core。有了这三个参数,就能够很好地控制线程池中的线程数量,将其维护在一个合理的范围之内,三者之间的关系是init<=core<=max。
  • 任务拒绝策略:如果线程数量已达到上限且任务队列已满,则需要有相应的拒绝策略来通知任务提交者。
  • 线程工厂:主要用于个性化定制线程,比如将线程设置为守护线程以及设置线程名称等。
  • QueueSize:任务队列主要存放提交的Runnable,但是为了防止内存溢出,需要有limit数量对其进行控制。
  • Keepedalive时间:该时间主要决定线程各个重要参数自动维护的时间间隔。

2.1 线程池实现类图

https://chenxqblog-1258795182.cos.ap-guangzhou.myqcloud.com/threadpool.png

上图为线程池实现类图,下面看具体的代码。

2.2 ThreadPool

先定义一个线程池接口,定义常用的方法。

public interface ThreadPool {

	//提交任务到线程池
	void execute(Runnable runnable);

	// 关闭线程池
	void shutdown();

	// 获取线程池的初始化大小
	int getInitSize();

	// 获取线程池的最大线程数
	int getMaxSize();

	// 获取线程池的核心线程数
	int getCoreSize();

	// 获取线程池中用于缓存任务队列的大小
	int getQueueSize();

	// 获取线程池中活跃线程的数量
	int getActiveCount();

	// 查看线程池是否已经被shutdown
	boolean isShutdown();
}

2.3 RunnableQueue

我们需要一个任务队列,用来存放提交的任务,该队列是一个BlockedQueue,并且有limit的限制。

public interface RunnableQueue {

   // 当有新任务进来时首先会offer到队列中
   void offer(Runnable runnable);

   // 工作线程通过take方法获取Runnable
   Runnable take() throws InterruptedException;

   // 获取任务队列中任务的数量
   int size();
}

2.4 ThreadFactory

ThreadFactory提供了创建线程的接口,以便于个性化地定制Thread,比如Thread应该被加到哪个Group中,优先级、线程名字以及是否为守护线程等。

@FunctionalInterface
public interface ThreadFactory {

	Thread createThread(Runnable runnable);
}

2.5 拒绝策略(DenyPolicy)

DenyPolicy主要用于当Queue中的runnable达到了limit上限时,决定采用何种策略通知提交者。该接口中定义了三种默认的实现。

  1. DiscardDenyPolicy:直接将任务丢弃。
  2. AbortDenyPolicy:向任务提交者抛出异常。
  3. RunnerDenyPolicy:使用提交者所在的线程执行任务。
public interface DenyPolicy {

	void reject(Runnable runnable, ThreadPool threadPool);

	/**
	 * 该拒绝策略会直接将任务丢弃
	 */
	class DiscardDenyPolicy implements DenyPolicy {

		@Override
		public void reject(Runnable runnable, ThreadPool threadPool) {
			//do nothing
			System.out.println("task will be discard");
		}
	}

	/**
	 * 该拒绝策略会向任务提交者抛出异常
	 */
	class AbortDenyPolicy implements DenyPolicy {

		@Override
		public void reject(Runnable runnable, ThreadPool threadPool) {
			throw new RunnableDenyException("The Runnable " + runnable + " will be abort.");
		}
	}

	/**
	 * 该拒绝策略会使任务在提交者所在的线程中执行任务
	 */
	class RunnerDenyPolicy implements DenyPolicy {

		@Override
		public void reject(Runnable runnable, ThreadPool threadPool) {
			if (!threadPool.isShutdown()) {
				runnable.run();
			}
		}
	}
}

这里还定义了一个 RunnableDenyException ,主要用于通知任务提交者,任务队列已经无法再接收新的任务。

public class RunnableDenyException extends RuntimeException{

	public RunnableDenyException(String message) {
		super(message);
	}
}

2.6 InternalTask

InternalTask是Runnable的一个实现,是实际任务存储的数据结构。主要用于线程池内部,该类会使用到RunnableQueue,然后不断地从queue中取出某个runnable,并运行runnable的run方法。

public class InternalTask implements Runnable{

	private final RunnableQueue runnableQueue;

	private volatile boolean running = true;

	public InternalTask(RunnableQueue runnableQueue) {
		this.runnableQueue = runnableQueue;
	}

	@Override
	public void run() {
		// 如果当前任务为running并且没有被中断,则其将不断地从queue中获取runnable,然后执行run方法
        // 这是提交到线程池的任务最终运行的地方
		while (running && !Thread.currentThread().isInterrupted()) {
			try {
				Runnable task = runnableQueue.take();
				task.run();
			} catch (InterruptedException e) {
				running = false;
				break;
			}
		}
	}

	// 停止当前任务,主要会在线程池的shutdown方法中使用
	public void stop() {
		this.running = false;
	}

}

代码还对该类增加了一个开关方法stop,主要用于停止当前线程,一般在线程池销毁和线程数量维护的时候会使用到。

2.7 线程池详细实现

在LinkedRunnableQueue中有几个重要的属性,第一个是limit,也就是Runnable队列的上限;当提交的Runnable数量达到limit上限时,则会调用DenyPolicy的reject方法;runnableList是一个双向循环列表,用于存放Runnable任务

public class LinkedRunnableQueue implements RunnableQueue{

	// 任务队列的最大容量,在构造时传入
	private final int limit;

	// 若任务队列已满,则执行拒绝策略
	private final DenyPolicy denyPolicy;

	// 存放任务的队列
	private final LinkedList<Runnable> runnableList = new LinkedList<>();

	private final ThreadPool threadPool;

	public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
		this.limit = limit;
		this.denyPolicy  = denyPolicy;
		this.threadPool = threadPool;
	}

	@Override
	public void offer(Runnable runnable) {
		synchronized (runnableList) {
			if (runnableList.size() >= limit) {
				// 无法容纳新的任务,执行拒绝策略
				denyPolicy.reject(runnable, threadPool);
			} else {
				// 将任务加入队尾,并且唤醒阻塞中的线程
				runnableList.addLast(runnable);
				runnableList.notifyAll();
			}
		}
	}

	/**
	 * take方法也是同步方法,线程不断从队列中获取Runnable任务,当队列为空的时候工作线程会陷入阻塞,
	 * 有可能在阻塞的过程中被中断,为了传递中断信号需要在catch语句块中将异常抛出以通知上游(InternalTask)
	 * @return 任务
	 * @throws InterruptedException 中断异常,通知上游(InternalTask)
	 */
	@Override
	public Runnable take() throws InterruptedException {
		synchronized (runnableList) {
			while (runnableList.isEmpty()) {
				try {
					// 如果任务队列中没有可执行任务,则当前线程挂起,进入runnableList关联的monitor waitset中等待唤醒
					runnableList.wait();
				} catch (InterruptedException e) {
					// 被中断时需要将异常抛出
					throw e;
				}
			}
			// 从任务队列头排除一个任务
			return runnableList.removeFirst();
		}
	}

	@Override
	public int size() {
		return runnableList.size();
	}
}

根据前面的讲解,线程池需要有数量控制属性、创建线程工厂、任务队列策略等功能,线程池初始化代码如下:

public class BasicThreadPool extends Thread implements ThreadPool{

	// 初始化线程数量
	private final int initSize;

	// 线程池最大数量
	private final int maxSize;

	// 线程池核心线程数量
	private final int coreSize;

	// 当前活跃的线程数量
	private int activeCount;

	// 创建线程所需的工厂
	private final ThreadFactory threadFactory;

	// 任务队列
	private final RunnableQueue runnableQueue;

	// 线程池是否已经被shutdown
	private volatile boolean isShutdown = false;

	// 工作线程队列
	private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();

	private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();

	private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();

	private final long keepAliveTime;

	private final TimeUnit timeUnit;

	// 构造时需要传递的参数:初始的线程数量,最大的线程数量,核心线程数量,任务队列的最大数量
	public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {
		this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY, queueSize, DEFAULT_DENY_POLICY, 10 ,TimeUnit.SECONDS);
	}

	// 构造线程池时需要传入的参数,该构造函数需要的参数比较多
	public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {
		this.initSize = initSize;
		this.maxSize = maxSize;
		this.coreSize = coreSize;
		this.threadFactory = threadFactory;
		this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
		this.keepAliveTime = keepAliveTime;
		this.timeUnit = timeUnit;
		this.init();
	}

	// 初始化时,先创建 initSize 个线程
	private void init() {
		start();
		for (int i = 0; i < initSize; i++) {
			newThread();
		}
	}

	private void newThread() {
		//创建任务线程,并且启动
		InternalTask internalTask = new InternalTask(runnableQueue);
		Thread thread = this.threadFactory.createThread(internalTask);
		ThreadTask threadTask = new ThreadTask(thread, internalTask);
		threadQueue.offer(threadTask);
		this.activeCount++;
		thread.start();
	}

	private void removeThread() {
		// 从线程池中移除某个线程
		ThreadTask threadTask = threadQueue.remove();
		threadTask.internalTask.stop();
		this.activeCount--;
	}

	@Override
	public void execute(Runnable runnable) {
		if (this.isShutdown) {
			throw new IllegalStateException("The thread pool is destroy");
		}
		// 提交任务只是简单地往任务队列中插入Runnable
		this.runnableQueue.offer(runnable);
	}

	@Override
	public void run() {
		// run 方法继承自Thread,主要用于维护线程数量,比如扩容、回收工作
		while (!isShutdown && !isInterrupted()) {
			try {
				timeUnit.sleep(keepAliveTime);
			} catch (InterruptedException e) {
				isShutdown = true;
				break;
			}

			synchronized (this) {
				if (isShutdown) {
					break;
				}
				//当前队列中有尚未处理,并且activeCount<coreSize则继续扩容
				if (runnableQueue.size() > 0 && activeCount < coreSize) {
					for (int i = initSize; i < coreSize; i++) {
						newThread();
					}
					// continue 的目的在于不想让线程的扩容直接达到maxsize
					continue;
				}
				// 当前队列中有任务尚未处理,并且activeCount<maxSize则继续扩容
				if (runnableQueue.size() > 0 && activeCount < maxSize) {
					for (int i = coreSize; i < maxSize; ++i) {
						newThread();
					}
				}

				// 如果任务队列中没有任务,则需要回收,回收至coreSize即可
				if (runnableQueue.size() == 0 && activeCount > coreSize) {
					for (int i = coreSize; i < activeCount; i++) {
						removeThread();
					}
				}
			}
		}
	}

	//ThreadTask 只是InternalTask和Thread的一个组合
	private static class ThreadTask {
		Thread thread;
		InternalTask internalTask;

		public ThreadTask(Thread thread, InternalTask internalTask) {
			this.thread = thread;
			this.internalTask = internalTask;
		}
	}

	/**
	 * 销毁线程池主要为了是停止BasicThreadPool线程,停止线程池中的活动线程并且将isShutdown开关变量更改为true。
	 */
	@Override
	public void shutdown() {
		synchronized (this) {
			if (isShutdown) {
				return;
			}

			isShutdown = true;
			threadQueue.forEach(threadTask -> {
				threadTask.internalTask.stop();
				threadTask.thread.interrupt();
			});

			this.interrupt();
		}
	}

	@Override
	public int getInitSize() {
		if (isShutdown) {
			throw new IllegalStateException("The thread pool is destroy");
		}
		return this.initSize;
	}

	@Override
	public int getMaxSize() {
		if (isShutdown) {
			throw new IllegalStateException("The thread pool is destroy");
		}
		return this.maxSize;
	}

	@Override
	public int getCoreSize() {
		if (isShutdown) {
			throw new IllegalStateException("The thread pool is destroy");
		}
		return this.coreSize;
	}

	@Override
	public int getQueueSize() {
		if (isShutdown) {
			throw new IllegalStateException("The thread pool is destroy");
		}
		return runnableQueue.size();
	}

	@Override
	public int getActiveCount() {
		synchronized (this) {
			return this.activeCount;
		}
	}

	@Override
	public boolean isShutdown() {
		return this.isShutdown;
	}

	private static class DefaultThreadFactory implements ThreadFactory {

		private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);

		private static final ThreadGroup group = new ThreadGroup("MyThreadPool-" + GROUP_COUNTER.getAndIncrement());

		private static final AtomicInteger COUNTER = new AtomicInteger(0);

		@Override
		public Thread createThread(Runnable runnable) {
			return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndIncrement());
		}
	}
}

自动维护线程的代码块是同步代码块,主要是为了阻止在线程维护过程中线程池销毁引起的数据不一致问题。

任务队列中若存在积压任务,并且当前活动线程少于核心线程数,则新建 coreSize-initSize数量的线程,并且将其加入到活动线程队列中,为了防止马上进行maxSize-coreSize数量的扩充,建议使用continue终止本次循环。

任务队列中有积压任务,并且当前活动线程少于最大线程数,则新建maxSize-coreSize数量的线程,并且将其加入到活动队列中。

当前线程池不够繁忙时,则需要回收部分线程,回收到coreSize数量即可,回收时调用removeThread()方法,在该方法中需要考虑的一点是,如果被回收的线程恰巧从Runnable任务取出了某个任务,则会继续保持该线程的运行,直到完成了任务的运行为止,详见InternalTask的run方法。

3. 线程池的应用

写一个简单的程序分别测试线程池的任务提交、线程池线程数量的动态扩展,以及线程池的销毁功能。

public class ThreadPoolTest {

	public static void main(String[] args) throws InterruptedException {
		//定义线程池,初始化线程数为2,核心线程数为4,最大线程数位6,任务队列最多允许1000个任务
		final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000);
		// 定义20个任务并且提交给线程池
		for (int i = 0; i < 20; i++) {
			threadPool.execute(()-> {
				try {
					TimeUnit.SECONDS.sleep(10);
					System.out.println(Thread.currentThread().getName() + " is running and done.");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			});
		}

		for (; ;) {
			//不断输出线程池的信息
			System.out.println("getActiveCount: " + threadPool.getActiveCount());
			System.out.println("getQueueSize: " + threadPool.getQueueSize());
			System.out.println("getCoreSize: " + threadPool.getCoreSize());
			System.out.println("getMaxSize: " + threadPool.getMaxSize());
			System.out.println("================================================");
			TimeUnit.SECONDS.sleep(5);
		}
	}
}

上述测试代码中,定义了一个Basic线程池,其中初始化线程数量为2,核心线程数量为4,最大线程数量为6,最大任务队列数量为1000,同时提交了20个任务到线程池中,然后在main线程中不断地输出线程池中的线程数量信息监控变化,运行上述代码,截取的部分输出信息如下:

getActiveCount: 2
getQueueSize: 18
getCoreSize: 4
getMaxSize: 6
================================================
getActiveCount: 2
getQueueSize: 18
getCoreSize: 4
getMaxSize: 6
================================================
thread-pool-1 is running and done.
thread-pool-0 is running and done.
getActiveCount: 4
getQueueSize: 14
getCoreSize: 4
getMaxSize: 6
================================================
getActiveCount: 4
getQueueSize: 14
getCoreSize: 4
getMaxSize: 6
================================================
thread-pool-2 is running and done.
thread-pool-3 is running and done.
thread-pool-0 is running and done.
thread-pool-1 is running and done.
getActiveCount: 6
getQueueSize: 8
getCoreSize: 4
getMaxSize: 6
================================================
getActiveCount: 6
getQueueSize: 8
getCoreSize: 4
getMaxSize: 6
================================================
thread-pool-4 is running and done.
thread-pool-5 is running and done.
thread-pool-3 is running and done.
thread-pool-2 is running and done.
thread-pool-0 is running and done.
thread-pool-1 is running and done.
getActiveCount: 6
getQueueSize: 2
getCoreSize: 4
getMaxSize: 6
================================================
getActiveCount: 6
getQueueSize: 2
getCoreSize: 4
getMaxSize: 6
================================================
thread-pool-3 is running and done.
thread-pool-2 is running and done.
thread-pool-5 is running and done.
thread-pool-4 is running and done.
thread-pool-1 is running and done.
thread-pool-0 is running and done.
getActiveCount: 6
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
================================================
getActiveCount: 6
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
================================================
thread-pool-3 is running and done.
thread-pool-2 is running and done.
getActiveCount: 5
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
================================================
getActiveCount: 5
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
================================================
getActiveCount: 4
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
================================================

通过上述输出信息可以看出,线程池中线程的动态扩展状况以及任务执行情况,在输出的最后会发现active count停留在了core size的位置,这也符合我们的设计,最后为了确定线程池中的活跃线程数量

================================================
getActiveCount: 4
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
================================================
getActiveCount: 4
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6

4. 参考

【1】《Java 高并发编程详解》-汪文君