【线程池】

    技术2022-07-10  137

    目录

    创建线程的3种方式1. 继承Thread类2. 实现Runnable接口3. 线程池创建 使用线程池的意义线程池中的容器Executor 【执行器】ExecutorService 【执行器服务】自定义线程池的实现原理及流程Executors 【操作Executor的工具类】1. FixedThreadPool2. CacheThreadPool3. SingleThreadExecutor4. ScheduledThreadPool5. ForkJoinPool5. WorkStealingPool 自定义线程池大小配置计算方式

    创建线程的3种方式

    1. 继承Thread类

    class MyThread extends Thread { @Override public void run() { System.out.println("Hello MyThread!"); } } public static void main(String[] args) { new MyThread().start(); }

    2. 实现Runnable接口

    class MyRun implements Runnable { @Override public void run() { System.out.println("Hello MyRun!"); } } public static void main(String[] args) { new Thread(new MyRun()).start(); }

    3. 线程池创建

    public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(1); //lamda表达式创建线程丢到线程池中 service.execute(()->{ System.out.println("Hello MyThreadPool"); }); //等线程执行完关闭线程池 service.shutdown(); }

    使用线程池的意义

    线程是稀缺资源,它的创建和销毁是一个相对偏重且耗资源的操作,而Java线程依赖于内核线程,创建线程需要进行 操作系统状态切换 ,为避免资源过度消耗需要设法 重用线程 执行多个任务。线程池就是一个线程缓存,负责对线程进行统一分配、调优和监控。

    什么时候使用线程池?

    单个任务处理时间比较短需要处理的任务数量很大

    线程池优势

    重用存在的线程,减少线程创建,消亡的开销,提高性能提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。提高线程的可管理性,可统一分配,调优和监控

    线程池中的容器

    红色的是后边线程池常用的,先了解一下呦~

    ArrayBlockingQueue【有界阻塞队列】

    基于数组,满了就会阻塞。需要指定大小

    LinedBlockingQueue【无界阻塞队列】

    基于链表的无界队列,FIFO

    SynchronousQueue【无缓冲等待队列】

    容量为0,直接将任务交给消费者,必须等待消费后才能添加新元素

    DelayQueue【延迟队列】

    在指定时间才能获取队列元素,队列头元素是最接近过期的元素(与插入顺序无关)

    Executor 【执行器】

    void execute(Runnable command);

    该方法能执行一项任务

    ExecutorService 【执行器服务】

    向线程池提交任务有两种方法,分别是execute()和submit(),两者的区别主要是 execute()提交的是不需要有返回值的任务,而 submit提交的是需要有返回值的任务,并且submit()会返回一个Future对象 ,并且可以使用future.get()方法获取返回值,并且get方法会阻塞,直到有返回值。

    线程池的 关闭有shutdown()和shutdownNow 两个方法,他们的原理是遍历线程池中的工作线程,然后逐个调用interrupt方法来中断线程,所以无法中断的线程可能永远无法终止;但是二者也有区别,shutdownNow是将线程池的状态设置为STOP,然后尝试停止所有正在执行或者暂停的线程,并返回等待执行任务列表;而shutdown只是将线程池的状态设置成SHUTDOWN,然后中断所有没有正在执行的任务。当调用这两个方法中的任何一个后,isShutdown方法就会返回true,当所有任务都已经关闭后,调用isTerminaed方法会返回true。

    Runnable和Callable

    Runnable和Callable接口的实现类,都可以被ThreadPoolExecutor、ScheduledThreadPool、ForkJoinThread执行;

    Runnable接口中的run()方法是没有返回值的,它只是纯粹地去执行run()方法中的代码而已;

    Callable接口中的call()方法是有返回值的,是一个泛型,和Future、FutureTask配合可以用来获取异步执行的结果。

    Future

    Future接口和实现了该接口的FutureTask类来表示异步计算的结果

    如上图所示,主线程首先创建实现Runnable或Callable接口的任务对象,然后把任务对象提交给 ExecutorService 执行,

    如果使用的是submit提交,执行完毕后将返回一个实现Future接口的对象,最后,主线程可以执行 FutureTask.get() 方法来获取返回值;主线程也可以调用 FutureTask.cancel() 方法来取消此任务的执行。

    自定义线程池的实现原理及流程

    看下代码

    可以看下线程池ThreadPoolExecutor的全参构造函数源码: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 入参释义如下: 参数描述作用coolPoolSize线程核心线程数当一个任务提交到线程池时,线程池会创建一个线程来执行任务,即使其他的核心线程足够执行新任务,也会创建线程,直到需要执行的任务数大于核心线程数后才不再创建;如果线程池先调用了preStartAllCoreThread()方法,则会先启动所有核心线程。maximumPoolSize线程池最大线程数如果队列满了,并且已创建的线程数小于该值,则会创建新的线程执行任务。这里需要说明一点,如果使用的队列时无界队列,那么该值无用。keepAliveTime存活时间当线程池中线程超过超时时间没有新的任务进入,则停止该线程;只会停止多于核心线程数的那几个线程。unit线程存活的时间单位可以有天、小时、分钟、秒、毫秒、微妙、纳秒workQueue任务队列用于保存等待执行任务的阻塞队列。可以选择如下几个队列:数组结构的有界队列ArrayBlockingQueue、链表结果的有界队列LinkedBlockingQueue、不存储元素的阻塞队列SynchronousQueue、一个具有优先级的无界阻塞队列PriortyBlockingQueuethreadFactory创建线程的工厂可以通过工厂给每个线程创建更有意义的名字。使用Guava提供的ThreadFactoryBuilder可以快速的给线程池里的线程创建有意义的名字,代码如下new ThreadFactoryBuilder().setNameFormat(“aaaaaaaa”).build();handler拒绝策略当队列和线程都满了,说明线程池处于饱和状态,那么必须采取一种策略来处理新提交的任务。AbortPolicy(默认),表示无法处理新任务时抛出异常。CallerRunsPolicy:重试添加当前的任务,他会自动重复调用execute()方法。DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。DiscardPolicy:不处理,直接丢弃

    当一个线程提交到线程池时(execute() / submit()),先判断核心线程数(corePoolSize)是否已满,如果未满,则直接创建线程执行任务;如果已满,则判断队列(BlockingQueue)是否已满,如果未满,则将线程添加到队列中;如果已满,则判断线程池(maximumPoolSize)是否已满,如果未满,则创建线程池执行任务;如果线程池已满,则交给拒绝策略(RejectedExecutionHandler.rejectExcution())来处理。

    Executors 【操作Executor的工具类】

    JDK封装的线程池描述父类FixedThreadPool【固定线程数线程池】适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的应用。ThreadPoolExecutorCashedThreadPool【弹性线程池】这是一个无界的线程池,适用于执行很多短期异步任务的小程序,或者是负载比较轻的服务器。ThreadPoolExecutorSingleThreadPool【单一线程池】适用于需要保证顺序的执行各个任务,并且在任意时间点都不会有多个线程活动的场景。ThreadPoolExecutorScheduledThreadPool【定时任务线程池】适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景。ScheduledThreadPoolExecutorWorkStealingPool【工作窃取线程池】适合用在很耗时的操作,每个线程维护一个工作队列,其中一个干完就会去别的线程那儿偷活儿干,所以能合理使用CPUForkJoinsPoolForkJoinPool【分而治之的线程池】递归思想,适合用于大规模的数据计算(任务切分)AbstractExecutorService

    1. FixedThreadPool

    构造函数如下:

    public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

    构造函数中,核心线程数和最大线程数一致,keepAliveTime为0,队列使用的是无界阻塞队列LinkedBlockingQueue(最大值是Integer.MAX_VALUE);

    核心线程数和最大线程数保持一致,表明:如果队列满了之后,不会再创建新的线程;

    keepAliveTime为0,表明:如果运行线程数大于核心线程数时,如果线程执行完毕,空闲线程立刻被终止;

    使用无界阻塞队列,表明:当运行线程到达核心线程数时,不会再创建线程,只会将任务加入阻塞队列;因此最大线程数参数无效;因此keepAliveTime参数无效;且不会拒绝任务(既不会执行拒绝策略)

    2. CacheThreadPool

    构造函数如下:

    public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

    构造函数中,核心线程数为0,最大线程数为Integer.MAX_VALUE,意味着无界,keepAliveTime为60秒,阻塞队列使用没有存储空间的SynchronousQueue核心线程数为0,最大线程数为无界,表明:只要队列满了,就会创建新的线程放入线程池使用没有存储空间的SynchronousQueue。缺点:线程提交的速度高于线程被消费的速度,那么线程会被不断的创建,最终会因为线程创建过多而耗尽CPU和内存资源.

    3. SingleThreadExecutor

    构造函数如下:

    public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

    构造函数中,核心线程数和最大线程数均为1,keepAliveTime为0,队列使用的是无界阻塞队列LinkedBlockingQueue(最大值是Integer.MAX_VALUE)

    除了固定了核心线程数和最大线程数为1外,其余的参数均与FixedThreadPool一致,那么就是只有一个线程会反复循环从阻塞队列中获取任务执行

    4. ScheduledThreadPool

    构造函数如下si gai diu:

    public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }

    构造函数中,自己定义核心线程数,最大线程数为Integer.MAX_VALUE,keepAliveTime为0,队列使用的是DelayedWorkQueue,不是FIFO,队列头元素是最近过期的元素

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,//实现Runnable的任务 long initialDelay,//首个任务执行延迟时间 long period,//任务与任务的执行间隔时间 TimeUnit unit);//第3个参数的单位

    5. ForkJoinPool

    构造函数如下:

    public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),//min(0x7fff,CPU核数) defaultForkJoinWorkerThreadFactory,//默认线程工厂 null,//错误处理器 false//线程处理策略 true:FIFO | false:LIFO(默认) ); }

    5. WorkStealingPool

    构造函数如下:

    public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }

    构造函数中,和ForkJoinPool不同的是,线程处理策略是先进先出。

    自定义线程池大小配置计算方式

    先看下机器的CPU核数,然后在设定具体参数: CPU核数 = Runtime.getRuntime().availableProcessors() 分析下线程池处理的程序是CPU密集型,还是IO密集型 1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话,一般设置 5 或 8 2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:cpu核数 / (1-0.9),核数为4的话,一般设置 4
    Processed: 0.015, SQL: 9