java线程池原理--如何实现线程和任务分离,并保证线程在一定时间内不销毁的?

    技术2022-07-14  86

    先说下结论:

    如何实现线程和任务分离呢?

    线程和任务都是实现了Runnable接口的类,不同的是,扮演线程的类通过start()方法创建线程并执行其中的run()方法,而扮演任务的类则不会调用start()方法,在线程的run()方法中能获取到任务的引用,并直接调用任务的run方法(无论是直接在run()方法中使用,还是在somemethod()方法中,然后在run()方法中调用somemethod()).

    如何实现不销毁的呢?

    首先,纠正一点,有人说在使用线程池的时候,从中找一个空闲的线程拿过来用,用完了再放回线程池,这个说法很形象,但是并不准确.实际情况是,我们并不需要知道哪些空闲,也不是我们拿过来用,也不需要在用完以后放回线程池,我们只需要不停地提交给线程池我们需要完成的任务,线程池就会自己去创建线程(一定条件下)执行任务,或者那些已经创建的空闲的线程,在发现新的任务时,会主动去争取这个任务,并执行.而线程之所以不会被销毁,是因为要么,它在执行任务,要么它在等待任务.java线程池中的线程,会在创建之初被分配一个任务,在这个任务执行完后,就会去BlockingQueue阻塞队列中拿任务,如果拿不到,就会阻塞在那里,拿到了就执行,执行完了,继续去拿,如此循环往复(指定线程的存活时间,则等待任务一定时间后,还是拿不到,就会被销毁了).

     

    如何使用线程池?

    public static void testThreadPoolExecutor() { //1.创建线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); for(int i = 0; i < 15; i ++){ MyTask myTask = new MyTask(i); //2.提交任务到线程池 executor.execute(myTask); System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+ executor.getQueue().size()+",已执行完别的任务数目:"+executor.getCompletedTaskCount()); } //3.关闭线程池 executor.shutdown(); }

    其中,自定义的task:

    //1.实现Runnable接口,或者实现Thread类 class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } //2.重写run方法 @Override public void run() { System.out.println("正在执行task "+taskNum + " 当前线程id为:" + Thread.currentThread().getId()); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task "+taskNum+"执行完毕"); } }

    在执行提交任务到线程池的代码时,发生了什么呢?且看源码:

    public void execute(Runnable command) { if (command == null) //1.抛异常 throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //2.调用addWorker()方法 if (addWorker(command, true)) return; c = ctl.get(); } //3.添加到workQueue if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //4.拒绝任务 reject(command); else if (workerCountOf(recheck) == 0) //5.调用addWorker()方法 addWorker(null, false); } else if (!addWorker(command, false)) //6.调用addWorker()方法失败,则拒绝任务 reject(command); }

    提交任务到线程池,可能会有下面几种情况:

    1.抛出异常

    2.调用addWorker()方法

    3.将任务添加到workQueue队列

    4.拒绝任务

    抛出异常和拒绝任务没什么好说的了,提交任务到队列,就是把任务添加到队列里,也没什么.现在需要研究的是addWorker()方法,做了什么?

    private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //上面的代码主要是检查一些参数,看还能不能添加worker,主要看下面的代码 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //1.这里又worker,又task,又thread的,他们之间什么关系呢? //work是根据task创建的,thread t是从worker来的 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); //2.这里把新的worker添加到线程池的workers集合中了 workers.add(w); workerAdded = true; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { //3.这里一个线程开始运行了,就是新创建的worker中的thread t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

    上面源码中的注释3处,线程开始运行,实际执行的run()方法在哪里呢?

    先看下ThreadPoolExecutor.Worker类的源码:

    //1.注意Worker实现了Runnable接口 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { //省略 //2.主要看这三个属性 /** Thread this worker is running in. Null if factory fails. */ @SuppressWarnings("serial") // Unlikely to be serializable final Thread thread; /** Initial task to run. Possibly null. */ @SuppressWarnings("serial") // Not statically typed as Serializable Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; //3.主要看这两个方法,构造方法中,worker和自己的属性纠缠不清 /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */ public void run() { runWorker(this); } //省略... }

    上面提到,addWorke()方法中,最后有线程启动了,但是真正被执行的run方法在哪里呢?是不是Worker类中的run方法呢?

    可以通过调试,加断点的方法证明,确实实际执行的run()方法就是Worker类中的run方法!同时也说明,此时启动的线程是新的worker的线程,在它的run()方法中虽然调用了线程池的runWorker()方法,但是,这个方法是在这个worker的线程中运行的.

    接下来看下线程池的runWorker()方法的源码:

    final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //1.循环获取task,从worker中获取task,或者通过getTask()方法获取task while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { //2.首先应该明确这个runWorker()方法是在这个worker的线程中执行的,所以task的run方法虽然是作为普通方法调用的,但是也是在worker的线程中执行的. task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

    一个worker对应线程池中的一个线程.在上面的源码中,我们可以看到,worker线程会循环获取任务,要么获取最初提交到worker中的task,要么通过调用线程池中的getTask()方法获取任务.这里也实现了线程和任务的分离(线程和任务都是实现了Runnable接口的类).我们可以简单分析一下,最初提交到worker的任务执行完后,worker就会通过线程池的getTask()方法获取任务,如果能获取到,就可以继续处理了,如果获取不到呢?为什么没有被销毁呢?这就要看下getTask()的源码了:

    private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? //1.这是一个无限循环 for (;;) { int c = ctl.get(); // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } //2.上面的是特殊情况的处理,看下面代码,从workQueue中获取任务 try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

    上面的workQueue是线程池的属性:

    private final BlockingQueue<Runnable> workQueue;

    阻塞队列,阻塞队列有什么特点呢?或者说它的poll()和take()方法有什么特点呢?  

    /** * Retrieves and removes the head of this queue, waiting if necessary * until an element becomes available. * * @return the head of this queue * @throws InterruptedException if interrupted while waiting */ E take() throws InterruptedException; /** * Retrieves and removes the head of this queue, waiting up to the * specified wait time if necessary for an element to become available. * * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the * {@code timeout} parameter * @return the head of this queue, or {@code null} if the * specified waiting time elapses before an element is available * @throws InterruptedException if interrupted while waiting */ E poll(long timeout, TimeUnit unit) throws InterruptedException;

    注释说的是什么呢?调用这两个方法的线程,在获取不到元素时,会阻塞.poll和take的区别就是前者可以指定等待时间,后者则不行.

    在核心线程获取不到任务时,如果没有指定核心线程的存活时间,会一直阻塞在take方法上,如果指定了核心线程的存活时间,则线程会阻塞在poll方法上,直到等待时间耗尽.这就实现了核心线程不销毁.

     

    调用的顺序如下:

    ThreadPoolExecutor.execute()---->ThreadPoolExecutor.addWorker()---->Worker.run()---->ThreadPoolExecutor..runWorker()---->BlockingQueue.take() || pBlockingQueue.take()||BlockingQueue.poll()

    线程池基本原理和实现线程不销毁的基本方式,我们就清楚了.在这个基础上,对于其他的一些问题,我们也可以进行分析了.

     

     

     

     

    Processed: 0.022, SQL: 9