线程池的暂停和恢复
自定义一个线程池, 使得这个线程池能够暂停和恢复
我们创建一个类, 继承 ThreadPoolExecutor. 类的内部用一个 bool 变量 isPaused 来标记线程池 是否处于停止状态. 为了使多线程并发地访问和修改 isPaused. 我们需要用一把 ReentrantLock 锁. 我们需要进行线程间通信, 用到了 Condition. Condition 是一个接口, 主要依赖 两个方法 await() 和 signalAll . 这和 Object 中的 wait() 和 notifyAll() 方法类似.
我们需要用两个方法来设置线程池的暂停和恢复 pause() 和 resume(). 其本质都是修改 isPaused 变量, 特别的是每一次修改都需要上锁, 修改完都需要释放锁. 在resume()方法内, 无论是否成功修改 isPaused, 都需要使用 signalAll() 方法 将其他线程唤醒.
我们还需要重写 beforeExecute, 在每一个线程执行任务时, 都需要访问一下 isPaused() 方法, 用来判断线程池是否处于 暂停状态. 若处于暂停状态, 则将当前线程休眠.
public class PauseableThreadPool extends ThreadPoolExecutor {
private boolean isPaused;//线程池是否暂停标记位
//为了并发地访问和修改isPaused, 用一把锁
private final ReentrantLock lock = new ReentrantLock();
//Condition 主要有两个方法 await() 和 signalAll()
//Conditon中的 await() 对应 Object 的 wait();
//Condition中的 signalAll() 对应 Object的notifyAll()
private Condition unpaused= lock.newCondition();
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
//每次执行之前, 都调用这个方法, 用来查看线程池是否已经暂停
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
//先拿到一把锁
lock.lock();
try {
while(isPaused){ //如果当前线程池已经暂停了, 则将当前线程休眠
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();//最后释放锁
}
}
//让线程池处于暂停状态
public void pause(){
lock.lock();
try{
isPaused=true;
} finally {
lock.unlock();
}
}
//重新启用线程池
public void resume() {
lock.lock();//上锁
try{
isPaused=false;//设置标记位
unpaused.signalAll();//唤醒休眠的线程
}finally {
lock.unlock();
}
}
}
写一个 main 方法来测试一下
public static void main(String[] args) {
PauseableThreadPool threadPool =new PauseableThreadPool(1, 1,0,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 10; i++) {
threadPool.execute(new SimpleTask5());
}
try {
Thread.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.pause();
System.out.println("线程池被暂停");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.resume();
System.out.println("线程池被恢复");
//关闭线程池的
threadPool.shutdown();
}
执行的任务非常简单, 直接输出一句话
public class SimpleTask5 implements Runnable{
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
输出结果:
我被执行
我被执行
我被执行
我被执行
线程池被暂停
线程池被恢复
我被执行
我被执行
我被执行
我被执行
我被执行
我被执行
线程池先执行几个任务, 随后线程池被暂停了, 控制台没有输出. 接下来线程池恢复, 继续执行任务.
Last updated