动态线程池管理器

2019-1-5

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.log4j.Log4j2;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 动态线程池管理器
 * - 根据名称创建线程池并缓存
 * - 线程池为缓存线程池,每个线程存活一定时间
 * - 线程池达到最大线程数,改为阻塞式,由调用线程执行
 * - 定时输出所有缓存线程池的状态
 * @author ZhangShuzheng
 * @date 2018/8/23
 */
@Log4j2
public class ThreadPoolManager {

    /**
     * 线程池map
     */
    private static ConcurrentHashMap<String, ThreadPoolExecutor> CACHE = new ConcurrentHashMap<>();

    /**
     * 并发操作锁
     */
    private static ReentrantLock reentrantLock = new ReentrantLock();

    /**
     * 核心线程数大小
     */
    private static int corePoolSize = 0;

    /**
     * 最大线程数大小
     */
    private static int maximumPoolSize = 50;

    /**
     * 回收等待时间
     */
    private static long keepAliveTime = 60L;

    /**
     * 缓存队列大小
     */
    private static int queueSize = 1000;

    /**
     * 是否开启监控线程
     */
    private static ThreadPoolMonitorThread threadPoolMonitorThread = null;

    /**
     * 监控线程打印日志间隔时间
     */
    private static long monitorIntervalTime = 60 * 1000L;

    /**
     * 根据名称获取线程池
     * @param poolName
     * @return
     */
    public static ThreadPoolExecutor get(String poolName) {
        reentrantLock.lock();
        ThreadPoolExecutor threadPoolExecutor = null;
        try {
            // 根据名称获取缓存线程池,没有则新建并缓存
            threadPoolExecutor = CACHE.get(poolName);
            if (null == threadPoolExecutor) {
                ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(poolName + "-%d").build();
                threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(queueSize), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
                CACHE.put(poolName, threadPoolExecutor);
            }
            // 开启监控线程
            if (null == threadPoolMonitorThread) {
                threadPoolMonitorThread = new ThreadPoolMonitorThread(CACHE, monitorIntervalTime);
                threadPoolMonitorThread.start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
        return threadPoolExecutor;
    }

}

@Log4j2
class ThreadPoolMonitorThread extends Thread {

    private ConcurrentHashMap<String, ThreadPoolExecutor> cache;

    private long monitorIntervalTime;

    public ThreadPoolMonitorThread(ConcurrentHashMap<String, ThreadPoolExecutor> cache, long monitorIntervalTime) {
        this.cache = cache;
        this.monitorIntervalTime = monitorIntervalTime;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(monitorIntervalTime);
                Iterator<Map.Entry<String, ThreadPoolExecutor>> iterator = cache.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry<String, ThreadPoolExecutor> entry = iterator.next();
                    String poolName = entry.getKey();
                    ThreadPoolExecutor threadPoolExecutor = entry.getValue();
                    int poolSize = threadPoolExecutor.getPoolSize();
                    int corePoolSize = threadPoolExecutor.getCorePoolSize();
                    int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
                    int largestPoolSize = threadPoolExecutor.getLargestPoolSize();
                    int activeCount = threadPoolExecutor.getActiveCount();
                    long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
                    long taskCount = threadPoolExecutor.getTaskCount();
                    log.info("[ThreadPoolMonitorThread][{}]: " +
                            "poolSize={}, " +
                            "corePoolSize={}, " +
                            "maximumPoolSize={}, " +
                            "largestPoolSize={}, " +
                            "activeCount={}, " +
                            "completedTaskCount={}, " +
                            "taskCount={}",
                            poolName, poolSize, corePoolSize, maximumPoolSize,
                            largestPoolSize,activeCount, completedTaskCount, taskCount);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

下载地址

百度网盘
密码: