并发编程专题六-线程池的使用与分析

Java基础

浏览数:13

2020-5-28

AD:资源代下载服务

五一要结束了,是时候开始新的一波学习了~

一、什么是线程池?为什么要用线程池?

线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。 

优势:

  1. 降低资源的消耗。降低线程创建和销毁的资源消耗;
  2. 提高响应速度。例如:线程的创建时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间
  3. 提高线程的可管理性。

二、如何实现一个线程池

根据线程池的概念,如果要自己创建线程池,应该满足一下条件。

  1. 保存线程的容器。因为线程必须在池子已经创建好了,并且可以保持住,因此,需要一个容器去保存我们的线程。
  2. 可以接受外部任务。线程还要能够接受外部的任务,冰并运行这个任务。
  3. 保存任务的容器,有些任务可能来不及执行,因此需要将来不及执行的任务通过容器保存起来。

根据以上的条件以及之前我们学的并发编程知识,我们先手动自己尝试写一个线程池

Code:


import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * @Auther: DarkKing
 * @Date: 2019/5/4 12:09
 * @Description:
 */
public class MyThreadPool {
    // 线程池中默认线程的个数为5
    private static int WORK_NUM = 5;
    // 队列默认任务个数为100
    private static int TASK_COUNT = 100;  
    
    // 工作线程组
    private WorkThread[] workThreads;

    // 任务队列,作为一个缓冲
    private final BlockingQueue<Runnable> taskQueue;
    private final int worker_num;//用户在构造这个池,希望的启动的线程数

    // 创建具有默认线程个数的线程池
    public MyThreadPool() {
        this(WORK_NUM,TASK_COUNT);
    }

    // 创建线程池,worker_num为线程池中工作线程的个数
    public MyThreadPool(int worker_num,int taskCount) {
    	if (worker_num<=0) worker_num = WORK_NUM;
    	if(taskCount<=0) taskCount = TASK_COUNT;
        this.worker_num = worker_num;
        taskQueue = new ArrayBlockingQueue<>(taskCount);
        workThreads = new WorkThread[worker_num];
        for(int i=0;i<worker_num;i++) {
        	workThreads[i] = new WorkThread();
        	workThreads[i].start();
        }
       
    }
    
    // 执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器决定
    public void execute(Runnable task) {
    	try {
			taskQueue.put(task);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

    }


    // 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
    public void destroy() {
        // 工作线程停止工作,且置为null
        System.out.println("ready close pool.....");
        for(int i=0;i<worker_num;i++) {
        	workThreads[i].stopWorker();
        	workThreads[i] = null;//help gc
        }
        taskQueue.clear();// 清空任务队列
    }

    // 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数
    @Override
    public String toString() {
        return "WorkThread number:" + worker_num
                + "  wait task number:" + taskQueue.size();
    }

    /**
     * 内部类,工作线程
     */
    private class WorkThread extends Thread{
    	
    	@Override
    	public void run(){
    		Runnable r = null;
    		try {
				while (!isInterrupted()) {
				    //监听阻塞队列,如果有任务,则执行相应的任务
					r = taskQueue.take();
					if(r!=null) {
						System.out.println(getId()+" ready exec :"+r);
						r.run();
					}
					r = null;//help gc;
				} 
			} catch (Exception e) {
				// TODO: handle exception
			}
    	}
    	
    	//停止线程
    	public void stopWorker() {
    		interrupt();
    	}
    	
    }
}

1、定义WorkThread类,用来表示执行的线程,用于监听阻塞队列任务。

2、创建构建函数,我们将线程池进行初始化,并启动所有的工作线程。workThreads用来保存运行的线程,使用BlockingQueue<Runnable> taskQueue用来保存我们的任务队列

3、创建提交任务方法execute,用于提交我们的任务。

4、创建销毁线程池的方法destroy,用于销毁线程池。

之后我们编写测试类


import java.util.Random;

/**
 * @Auther: DarkKing
 * @Date: 2019/5/4 12:09
 * @Description:
 */
public class TestMyThreadPool {
    public static void main(String[] args) throws InterruptedException {
        // 创建3个线程的线程池
        MyThreadPool t = new MyThreadPool(3,0);
        t.execute(new MyTask("testA"));
        t.execute(new MyTask("testB"));
        t.execute(new MyTask("testC"));
        t.execute(new MyTask("testD"));
        t.execute(new MyTask("testE"));
        System.out.println(t);
        Thread.sleep(10000);
        t.destroy();// 所有线程都执行完成才destory
        System.out.println(t);
    }

    // 任务类
    static class MyTask implements Runnable {

        private String name;
        private Random r = new Random();

        public MyTask(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {// 执行任务
            try {
                Thread.sleep(r.nextInt(1000)+2000);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
                        +Thread.currentThread().isInterrupted());
            }
            System.out.println("任务 " + name + " 完成");
        }
    }
}

能按照线程池的方式进行执行。

我们简单地手动写了一个线程池,但以上线程池有哪些问题呢?

1、启动的时候就将所有线程启动了,如果长时间不用比较消耗资源。

2、没有任务饱和的一个错略。

3、没有任务超时机制等等。

三、JDK中的线程池和工作机制

我们大致了解了线程池的一个机制,那我们看下JDK中,是如何实现线程池的吧。

1、线程池的创建

JAVA中,ThreadPoolExecutor,是所有线程池实现的父类,类结构图如下。

 

它的构造函数含有以下参数

参数 含义
int corePoolSize  线程池中核心线程数,< corePoolSize  ,就会创建新线程,= corePoolSize  ,这个任务就会保存到BlockingQueue,如果调用prestartAllCoreThreads()方法就会一次性的启动corePoolSize  个数的线程。
int maximumPoolSize 允许的最大线程数,如果BlockingQueue也满了,并且线程数< maximumPoolSize时候就会再次创建新的线程
long keepAliveTime 线程空闲下来后,存活的时间,这个参数只在线程数> corePoolSize才有用
TimeUnit unit 存活时间的单位值
BlockingQueue<Runnable> workQueue 保存任务的阻塞队列
ThreadFactory threadFactory 创建线程的工厂,给新建的线程赋予名字
RejectedExecutionHandler handler

饱和策略

AbortPolicy :直接抛出异常,默认;

CallerRunsPolicy:用调用者所在的线程来执行任务

DiscardOldestPolicy:丢弃阻塞队列里最老的任务,队列里最靠前的任务

DiscardPolicy :当前任务直接丢弃

实现自己的饱和策略只要实现RejectedExecutionHandler接口即可

提交任务

execute(Runnable command)  不需要返回

Future<T> submit(Callable<T> task) 需要返回值

关闭线程池

shutdown(),shutdownNow();

shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程

shutdown()设置线程池的状态,只会中断所有没有执行任务的线程

2、线程池的工作机制

 

 

1、如果工作线程数小于核心线程数,则创建工作线程

2、如果工作线程数等于或者大于核心线程数,则将任务提交到阻塞队列中

3、如果阻塞队列也满了,但线程数小于最大线程数,则创建新的线程

4、如果创建新的线程也满了,则执行任务饱和策略。

源码如下

3、如何合理配置线程池

根据任务的性质来:计算密集型(CPU),IO密集型,混合型

计算密集型:例如加密,大数分解,正则……等

推荐:机器的Cpu核心数+1,为什么+1,防止页缺失,(机器的Cpu核心=Runtime.getRuntime().availableProcessors();)

IO密集型:读取文件,数据库连接,网络通讯,

推荐:线程数适当大一点,机器的Cpu核心数*2,

混合型:尽量拆分,如果IO密集型远远计算密集型,拆分意义不大。

在阻塞队列的选择上,应该使用有界,无界队列可能会导致内存溢出

4、预定义的线程池

Java中,帮我们预定了5种线程池

1、FixedThreadPool

创建固定线程数量的线程池,适用于负载较重的服务器,使用了无界队列

2、SingleThreadExecutor

创建单个线程的线程池,适用于需要顺序保证执行任务,不会有多个线程业务,使用了无界队列

3、CachedThreadPool

会根据需要来创建新线程的,适用于执行很多短期异步任务的程序,使用了SynchronousQueue

4、WorkStealingPool(JDK7以后)

工作密取线程池,基于ForkJoinPool实现。适用于大任务分解的线程池。

5、ScheduledThreadPoolExecutor

需要定期执行周期任务的线程池。有两种实现

newSingleThreadScheduledExecutor:只包含一个线程,只需要单个线程执行周期任务,保证顺序的执行各个任务

newScheduledThreadPool 可以包含多个线程的,线程执行周期任务,适度控制后台线程数量的时候

方法说明:

schedule:只执行一次,任务还可以延时执行

scheduleAtFixedRate:提交固定时间间隔的任务

scheduleWithFixedDelay:提交固定延时间隔执行的任务

具体使用,大家可以自行百度,都比较多,一般如果并发比较高的业务中,以上线程池都不建议使用,因为他们采用的都是无界队列,任务量比较大的时候有可能导致内存溢出。一般正确用法是直接使用ThreadPoolExecutor进行创建,或根据自身需求进行自定义。

 

本章重点:线程池的创建,使用,以及运行原理。

其他阅读   并发编程专题

作者:DrakKing