多线程编程六-线程池的使用

    技术2022-07-11  69

    目录

     

    1 JDK自带的线程池

    2 七大参数简介:

    3 线程池工作流程

    4 自定义拒绝策略

    5 和spring整合

    6 合理配置线程数


    1 JDK自带的线程池

    我们知道JDK可以通过Executors类来创建线程池,但是这些线程池都有缺点,所以在生产环境中我们要自定义线程池来使用 Executors.newFixedThreadPool(),缺点:可能创建大量任务,导致oom Executors.newCachedThreadPool(),缺点,可能创建大量线程,导致oom Executors.newSingleThreadExecutor(),缺点:可能创建大量任务,导致oom

    2 七大参数简介:

    corePoolSize:线程池的大小。线程池创建之后不会立即去创建线程,而是等待线程的到来。当当前执行的线程数大于该值是,线程会加入到缓冲队列; maximumPoolSize:线程池中创建的最大线程数; keepAliveTime:空闲的线程多久时间后被销毁。默认情况下,该值在线程数大于corePoolSize时,对超出corePoolSize值得这些线程起作用。默认60 unit:TimeUnit枚举类型的值,代表keepAliveTime时间单位,可以取下列值: TimeUnit.DAYS; //天   TimeUnit.HOURS; //小时   TimeUnit.MINUTES; //分钟   TimeUnit.SECONDS; //秒(默认)   TimeUnit.MILLISECONDS; //毫秒   TimeUnit.MICROSECONDS; //微妙   TimeUnit.NANOSECONDS; //纳秒 workQueue:阻塞队列,用来存储等待执行的任务,决定了线程池的排队策略,有以下取值:   ArrayBlockingQueue;   LinkedBlockingQueue;   SynchronousQueue; threadFactory:线程工厂,是用来创建线程的。默认new Executors.DefaultThreadFactory(); handler:线程拒绝策略。当创建的线程超出maximumPoolSize,且缓冲队列已满时,新任务会拒绝,有以下取值:   ThreadPoolExecutor.AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常   ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。   ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)   ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    3 线程池工作流程

    线程池执行任务的时候如果核心线程数未被占用则创建新的线程来执行任务如果当前任务量大于核心线程数则开始把任务添加到工作队列如果工作队列也满了则在不超过最大线程数的情况下创建线程执行任务如果最大线程数也满了则启用拒绝策略

    代码实验

    @Test public void testThreadPool() throws InterruptedException {     ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();     // 核心线程数为5     int corePoolSize = 5;     executor.setCorePoolSize(corePoolSize);     // 最大线程数为10     executor.setMaxPoolSize(corePoolSize * 2);     // 等待队列容量为10     executor.setQueueCapacity(10);     executor.setThreadNamePrefix("my-pool");     // 优雅停止     executor.setWaitForTasksToCompleteOnShutdown(true);     executor.initialize();     for (int i = 0; i < 25; i ++) {         // 睡10ms,确保先加的任务先执行         Thread.sleep(10);         int finalI = i;         try {             executor.execute(() -> {                 System.out.println(DateUtil.datetimeToString(new Date()) + ":" + Thread.currentThread() + "->" + finalI);                 try {                     Thread.sleep(1000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             });         }catch (Exception e) {             System.out.println("task " + i + " error");             continue;         }     }     Thread.sleep(1000000); }

    输出 2020-07-01 22:30:36:Thread[my-pool1,5,main]->0 2020-07-01 22:30:36:Thread[my-pool2,5,main]->1 2020-07-01 22:30:36:Thread[my-pool3,5,main]->2 2020-07-01 22:30:36:Thread[my-pool4,5,main]->3 2020-07-01 22:30:36:Thread[my-pool5,5,main]->4 2020-07-01 22:30:36:Thread[my-pool6,5,main]->15 2020-07-01 22:30:36:Thread[my-pool7,5,main]->16 2020-07-01 22:30:36:Thread[my-pool8,5,main]->17 2020-07-01 22:30:36:Thread[my-pool9,5,main]->18 2020-07-01 22:30:36:Thread[my-pool10,5,main]->19 task 20 error task 21 error task 22 error task 23 error task 24 error 2020-07-01 22:30:37:Thread[my-pool1,5,main]->5 2020-07-01 22:30:37:Thread[my-pool2,5,main]->6 2020-07-01 22:30:37:Thread[my-pool3,5,main]->7 2020-07-01 22:30:37:Thread[my-pool4,5,main]->8 2020-07-01 22:30:37:Thread[my-pool5,5,main]->9 2020-07-01 22:30:37:Thread[my-pool6,5,main]->10 2020-07-01 22:30:37:Thread[my-pool7,5,main]->11 2020-07-01 22:30:37:Thread[my-pool8,5,main]->12 2020-07-01 22:30:37:Thread[my-pool9,5,main]->13 2020-07-01 22:30:37:Thread[my-pool10,5,main]->14

    4 自定义拒绝策略

    手动设置拒绝策略 executor.setRejectedExecutionHandler((r, executor1) -> System.out.println("丢弃任务:" + r)); 测试 2020-07-01 22:37:18:Thread[my-pool1,5,main]->0 2020-07-01 22:37:18:Thread[my-pool2,5,main]->1 2020-07-01 22:37:18:Thread[my-pool3,5,main]->2 2020-07-01 22:37:18:Thread[my-pool4,5,main]->3 2020-07-01 22:37:18:Thread[my-pool5,5,main]->4 2020-07-01 22:37:18:Thread[my-pool6,5,main]->15 2020-07-01 22:37:18:Thread[my-pool7,5,main]->16 2020-07-01 22:37:18:Thread[my-pool8,5,main]->17 2020-07-01 22:37:18:Thread[my-pool9,5,main]->18 2020-07-01 22:37:18:Thread[my-pool10,5,main]->19 丢弃任务:com.demo.util.FTPUtilTest$$Lambda$3/334203599@396e2f39 丢弃任务:com.demo.util.FTPUtilTest$$Lambda$3/334203599@a74868d 丢弃任务:com.demo.util.FTPUtilTest$$Lambda$3/334203599@12c8a2c0 丢弃任务:com.demo.util.FTPUtilTest$$Lambda$3/334203599@7e0e6aa2 丢弃任务:com.demo.util.FTPUtilTest$$Lambda$3/334203599@365185bd 2020-07-01 22:37:19:Thread[my-pool1,5,main]->5 2020-07-01 22:37:19:Thread[my-pool2,5,main]->6 2020-07-01 22:37:19:Thread[my-pool3,5,main]->7 2020-07-01 22:37:19:Thread[my-pool4,5,main]->8 2020-07-01 22:37:19:Thread[my-pool5,5,main]->9 2020-07-01 22:37:19:Thread[my-pool6,5,main]->10 2020-07-01 22:37:19:Thread[my-pool7,5,main]->11 2020-07-01 22:37:19:Thread[my-pool8,5,main]->12 2020-07-01 22:37:19:Thread[my-pool9,5,main]->13 2020-07-01 22:37:20:Thread[my-pool10,5,main]->14

    5 和spring整合

    spring中可以使用@Async来轻松实现异步调用使用的时候务必配置value属性,确保用的是自定义的线程池 // 创建自定义线程池

    @Bean("threadPool") public Executor taskExecutor() {     ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();     int corePoolSize = Runtime.getRuntime().availableProcessors();     executor.setCorePoolSize(corePoolSize);     executor.setMaxPoolSize(corePoolSize * 2);     executor.setQueueCapacity(20);     executor.setThreadNamePrefix("thread-poll-");     executor.setWaitForTasksToCompleteOnShutdown(true);     executor.setRejectedExecutionHandler((r, executor1) -> System.out.println("拒绝……"));     executor.initialize();     return executor; }

    // 异步执行类

    @Slf4j @Component public class AsyncClass{     @Async("threadPool")     public void test1() throws InterruptedException {         Thread.sleep(500L);         log.info("test1...");     }     @Async("threadPool")     public void test2() throws InterruptedException {         Thread.sleep(200L);         log.info("test2...");     }     @Async("threadPool")     public void test3() throws InterruptedException {         Thread.sleep(300L);         log.info("test3...");     } }

    // 测试代码

    @Autowired private AsyncClass asyncClass; @Test public void test() throws InterruptedException {     asyncClass.test1();     asyncClass.test2();     asyncClass.test3();     log.info("main...");     Thread.sleep(5000L); }

    输出 2020-07-01 22:46:30.272 - [main] INFO  com.demo.thread.ThreadPoolTest : 30 - main... 2020-07-01 22:46:30.475 - [thread-poll-2] INFO  com.demo.threadpool.AsyncClass : 23 - test2... 2020-07-01 22:46:30.584 - [thread-poll-3] INFO  com.demo.threadpool.AsyncClass : 29 - test3... 2020-07-01 22:46:30.787 - [thread-poll-1] INFO  com.demo.threadpool.AsyncClass : 17 - test1... 由此可见我们的方法都是异步执行的,根据线程的名称可知使用的是我们自定义的线程池

    6 合理配置线程数

    CPU核数:Runtime.getRuntime().availableProcessors(); 如果是cpu密集型业务,则尽量少配置线程数,减少线程切换。一般公式:cpu核数+1 如果是io密集型,任务线程并不是一直在执行任务,大量io导致大量阻塞,这种加速主要是利用了浪费掉的阻塞时间。 尽可能多配置线程数,一般公式:cpu核数 / (1-阻塞系数),阻塞系数一般是0.8-0.9

    Processed: 0.012, SQL: 9