github地址: https://github.com/lishanglei/thread-pool.git
设置为synchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue
它没有容量,每执行一个插入操作就会堵塞,需要再执行一个删除操作才会被唤醒
反之,每一个删除操作也要等待对应的插入操作
public class SynchronousQueue { private static ExecutorService pool; public static void main(String[] args) { /** * 当任务队列为synchronousQueue, * 创建的线程数大于maximumPoolSize时,直接执行了拒绝策列抛出异常 * 使用synchronousQueue队列,提交的任务不会被保存,总是会马上提交执行, * 如果用于执行任务的线程数少于maximumPoolSize时 * 则尝试创建新的线程,如果达到maximumPoolSize设置的最大值, * 则根据你设置的handler执行拒绝策略 * 因此这种方式提交的任务不会被缓存起来,而是会被马上执行,在这何种情况下, * 你需要对你程序的并发量有一个准确的评估 * 才能设置适合的maximumPoolSize数量 */ pool = new ThreadPoolExecutor( 1, //核心线程数 2, //最大线程数 1000, //线程存活时间 TimeUnit.MILLISECONDS, //线程存活时间单位 new java.util.concurrent.SynchronousQueue<>(), //直接提交队列 Executors.defaultThreadFactory(), //线程工厂 new ThreadPoolExecutor.AbortPolicy()); //拒绝策略:直接抛出异常 for (int i = 0; i < 3; i++) { pool.execute(new Thread(() -> { System.out.println(Thread.currentThread().getName()); })); } } } 线程池中的线程就是通过ThreadPoolExecutor中的ThreadFactory线程工厂创建的,
那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名,优先级等
public class ThreadFactoryDemo { private static ThreadPoolExecutor executor; public static void main(String[] args) { //自定义线程工厂 executor = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() { @Override public Thread newThread(Runnable r) { System.out.println("线程" + r.hashCode() + "创建"); Thread thread = new Thread(r, "threadPool" + r.hashCode()); return thread; } }); for (int i = 0; i < 10; i++) { executor.execute(new Thread(() -> { System.out.println("ThreadName:" + Thread.currentThread().getName()); })); } } } 通过这三个接口我们可以监控每个任务的开始和结束时间
beforeExecute:线程池中任务运行前执行afterExecute:线程池中任务运行完毕后执行terminated:线程池退出后执行 public class ThreadPoolExecutorExpand { private static ThreadPoolExecutor executor ; /** * Nthreads=CPU数量 * Ucpu=目标CPU的使用率,0<=Ucpu<=1 * W/C=任务等待时间与任务计算时间的比率 */ //private static int Nthreads = Ncpu*Ucpu*(1+W/C) public static void main(String[] args) { //实现自定义接口 executor=new ThreadPoolExecutor( 2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() { @Override public Thread newThread(Runnable r) { System.out.println("线程"+r.hashCode()+"创建"); Thread thread =new Thread(r,"threadPool"+r.hashCode()); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy() ){ protected void beforeExecute(Thread t,Runnable r){ System.out.println("准备执行: "+((ThreadTask)r).getTaskName()); } protected void afterExecute(Runnable r,Throwable t){ System.out.println("执行完毕: "+((ThreadTask)r).getTaskName()); } protected void terminated() { System.out.println("线程池退出"); } }; for(int i=0;i<10;i++){ executor.execute(new ThreadTask("Task" +i)); } /** * 可以看到通过对beforeExecute()、afterExecute()和terminated()的实现, * 我们对线程池中线程的运行状态进行了监控, * 在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池, * 当线程池调用该方法后,线程池中不再接受后续添加的任务。但是,此时线程池不会立刻退出, * 直到添加到线程池中的任务都已经处理完成,才会退出。 */ executor.shutdown(); } } @Data class ThreadTask implements Runnable{ private String taskName; public ThreadTask(String taskName) { this.taskName = taskName; } @Override public void run() { //输出执行线程的名称 System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName()); } }